import string import unittest from galaxy import model from galaxy.tools import ToolOutput from galaxy.tools.actions import DefaultToolAction from galaxy.tools.actions import on_text_for_names from galaxy.tools.actions import determine_output_format from xml.etree.ElementTree import XML import tools_support TEST_HANDLER_NAME = "test_handler_1" # I cannot think of a saner way to test if data is being wrapped than use a # data param in the output label - though you would probably never want to do # this. DATA_IN_LABEL_TOOL_CONTENTS = ''' echo "$param1" < $out1 ''' # Tool with two outputs - used to verify all datasets within same job get same # object store id. TWO_OUTPUTS = ''' echo "$param1" < $out1 ''' def test_on_text_for_names(): def assert_on_text_is( expected, *names ): on_text = on_text_for_names( names ) assert on_text == expected, "Wrong on text value %s, expected %s" % ( on_text, expected ) assert_on_text_is( "data 1", "data 1" ) assert_on_text_is( "data 1 and data 2", "data 1", "data 2" ) assert_on_text_is( "data 1, data 2, and data 3", "data 1", "data 2", "data 3" ) assert_on_text_is( "data 1, data 2, and others", "data 1", "data 2", "data 3", "data 4" ) assert_on_text_is( "data 1 and data 2", "data 1", "data 1", "data 2" ) class DefaultToolActionTestCase( unittest.TestCase, tools_support.UsesApp, tools_support.UsesTools ): def setUp( self ): self.setup_app( mock_model=False ) history = model.History() self.history = history self.trans = MockTrans( self.app, self.history ) self.app.model.context.add( history ) self.app.model.context.flush() self.action = DefaultToolAction() self.app.config.len_file_path = "moocow" self.app.job_config[ "get_handler" ] = lambda h: TEST_HANDLER_NAME self.app.object_store = MockObjectStore() def test_output_created( self ): _, output = self._simple_execute() assert len( output ) == 1 assert "out1" in output def test_output_label( self ): _, output = self._simple_execute() self.assertEquals( output[ "out1" ].name, "Output (moo)" ) def test_output_label_data( self ): hda1 = self.__add_dataset() hda2 = self.__add_dataset() incoming = { "param1": hda1, "repeat1": [ {"param2": hda2}, ] } job, output = self._simple_execute( tools_support.SIMPLE_CAT_TOOL_CONTENTS, incoming, ) self.assertEquals( output[ "out1" ].name, "Test Tool on data 2 and data 1" ) def test_object_store_ids( self ): _, output = self._simple_execute( contents=TWO_OUTPUTS ) self.assertEquals( output[ "out1" ].name, "Output (moo)" ) self.assertEquals( output[ "out2" ].name, "Output 2 (moo)" ) def test_params_wrapped( self ): hda1 = self.__add_dataset() _, output = self._simple_execute( contents=DATA_IN_LABEL_TOOL_CONTENTS, incoming=dict( repeat1=[ dict( param1=hda1 ) ] ), ) # Again this is a stupid way to ensure data parameters are wrapped. self.assertEquals( output[ "out1" ].name, "Output (%s)" % hda1.dataset.get_file_name() ) def test_handler_set( self ): job, _ = self._simple_execute() assert job.handler == TEST_HANDLER_NAME def __add_dataset( self, state='ok' ): hda = model.HistoryDatasetAssociation() hda.dataset = model.Dataset() hda.dataset.state = 'ok' hda.dataset.external_filename = "/tmp/datasets/dataset_001.dat" self.history.add_dataset( hda ) self.app.model.context.flush() return hda def _simple_execute( self, contents=None, incoming=None ): if contents is None: contents = tools_support.SIMPLE_TOOL_CONTENTS if incoming is None: incoming = dict(param1="moo") self._init_tool( contents ) return self.action.execute( tool=self.tool, trans=self.trans, history=self.history, incoming=incoming, ) def test_determine_output_format(): # Test simple case of explicitly defined output with no changes. direct_output = quick_output("txt") __assert_output_format_is("txt", direct_output) # Test if format is "input" (which just uses the last input on the form.) input_based_output = quick_output("input") __assert_output_format_is("fastq", input_based_output, [("i1", "fasta"), ("i2", "fastq")]) # Test using format_source (testing a couple different positions) input_based_output = quick_output("txt", format_source="i1") __assert_output_format_is("fasta", input_based_output, [("i1", "fasta"), ("i2", "fastq")]) input_based_output = quick_output("txt", format_source="i2") __assert_output_format_is("fastq", input_based_output, [("i1", "fasta"), ("i2", "fastq")]) change_format_xml = """ """ change_format_output = quick_output("fastq", change_format_xml=change_format_xml) # Test maching a change_format when. __assert_output_format_is("fastqillumina", change_format_output, param_context={"options_type": {"output_type": "illumina"}} ) # Test change_format but no match __assert_output_format_is("fastq", change_format_output, param_context={"options_type": {"output_type": "sanger"}} ) change_on_metadata_xml_template = string.Template(""" """) change_on_metadata_illumina = change_on_metadata_xml_template.safe_substitute({'input': "i2"}) change_on_metadata_output = quick_output("fastq", change_format_xml=change_on_metadata_illumina) __assert_output_format_is("fastqillumina", change_on_metadata_output, [("i1", "txt"), ("i2", "txt")] ) change_on_metadata_solexa = change_on_metadata_xml_template.safe_substitute({'input': "i1"}) print change_on_metadata_solexa change_on_metadata_output = quick_output("fastq", change_format_xml=change_on_metadata_solexa) __assert_output_format_is("fastqsolexa", change_on_metadata_output, [("i1", "txt"), ("i2", "txt")] ) def __assert_output_format_is( expected, output, input_extensions=[], param_context=[] ): inputs = {} last_ext = "data" i = 1 for name, ext in input_extensions: hda = model.HistoryDatasetAssociation(extension=ext) hda.metadata.random_field = str(i) # Populate a random metadata field for testing inputs[ name ] = hda last_ext = ext i += 1 actual_format = determine_output_format( output, param_context, inputs, last_ext ) assert actual_format == expected, "Actual format %s, does not match expected %s" % (actual_format, expected) def quick_output(format, format_source=None, change_format_xml=None): test_output = ToolOutput( "test_output" ) test_output.format = format test_output.format_source = format_source if change_format_xml: test_output.change_format = XML(change_format_xml) else: test_output.change_format = None return test_output class MockTrans( object ): def __init__( self, app, history, user=None ): self.app = app self.history = history self.user = user self.sa_session = self.app.model.context self.model = app.model def db_dataset_for( self, input_db_key ): return None def get_galaxy_session( self ): return model.GalaxySession() def get_current_user_roles( self ): return [] def log_event( self, *args, **kwargs ): pass class MockObjectStore( object ): def __init__( self ): self.created_datasets = [] self.first_create = True self.object_store_id = "mycoolid" def create( self, dataset ): self.created_datasets.append( dataset ) if self.first_create: self.first_create = False assert dataset.object_store_id is None dataset.object_store_id = self.object_store_id else: assert dataset.object_store_id == self.object_store_id