""" Test Tool execution and state handling logic. """ from unittest import TestCase import galaxy.model from galaxy.tools import DefaultToolState from galaxy.tools.parameters import params_to_incoming from galaxy.util.bunch import Bunch from galaxy.util import string_to_object from galaxy.util import object_to_string from galaxy.util.odict import odict import tools_support from galaxy import eggs eggs.require( "Paste" ) from paste import httpexceptions BASE_REPEAT_TOOL_CONTENTS = ''' echo "$param1" #for $r in $repeat# "$r.param2" #end for# < $out1 %s ''' # Tool with a repeat parameter, to test state update. REPEAT_TOOL_CONTENTS = BASE_REPEAT_TOOL_CONTENTS % '''''' REPEAT_COLLECTION_PARAM_CONTENTS = BASE_REPEAT_TOOL_CONTENTS % '''''' class ToolExecutionTestCase( TestCase, tools_support.UsesApp, tools_support.UsesTools ): def setUp(self): self.setup_app() self.history = galaxy.model.History() self.trans = MockTrans( self.app, self.history ) self.app.dataset_collections_service = MockCollectionService() self.tool_action = MockAction( self.trans ) def tearDown(self): self.tear_down_app() def test_state_new( self ): self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS ) template, template_vars = self.__handle_with_incoming( param1="moo", # no runtool_btn, just rerenders the form mako with tool # state populated. ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert state.inputs[ "param1" ] == "moo" def test_execute( self ): self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS ) template, template_vars = self.__handle_with_incoming( param1="moo", runtool_btn="dummy", ) self.__assert_exeuted( template, template_vars ) # Didn't specify a rerun_remap_id so this should be None assert self.tool_action.execution_call_args[ 0 ][ "rerun_remap_job_id" ] is None def test_execute_exception( self ): self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS ) self.tool_action.raise_exception( ) template, template_vars = self.__handle_with_incoming( param1="moo", runtool_btn="dummy", ) assert template == "message.mako" assert template_vars[ "status" ] == "error" assert "Error executing tool" in template_vars[ "message" ] def test_execute_errors( self ): self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS ) self.tool_action.return_error( ) template, template_vars = self.__handle_with_incoming( param1="moo", runtool_btn="dummy", ) assert template == "message.mako" assert template_vars[ "status" ] == "error" assert "Test Error Message" in template_vars[ "message" ], template_vars def test_redirect( self ): self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS ) self.tool_action.expect_redirect = True redirect_raised = False try: template, template_vars = self.__handle_with_incoming( param1="moo", runtool_btn="dummy", ) except httpexceptions.HTTPFound: redirect_raised = True assert redirect_raised def test_remap_job( self ): self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS ) template, template_vars = self.__handle_with_incoming( param1="moo", rerun_remap_job_id=self.app.security.encode_id(123), runtool_btn="dummy", ) self.__assert_exeuted( template, template_vars ) assert self.tool_action.execution_call_args[ 0 ][ "rerun_remap_job_id" ] == 123 def test_invalid_remap_job( self ): self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS ) template, template_vars = self.__handle_with_incoming( param1="moo", rerun_remap_job_id='123', # Not encoded runtool_btn="dummy", ) assert template == "message.mako" assert template_vars[ "status" ] == "error" assert "invalid job" in template_vars[ "message" ] def test_repeat_state_updates( self ): self._init_tool( REPEAT_TOOL_CONTENTS ) # Fresh state contains no repeat elements template, template_vars = self.__handle_with_incoming() state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 0 # Hitting add button adds repeat element template, template_vars = self.__handle_with_incoming( repeat1_add="dummy", ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 1 # Hitting add button again adds another repeat element template, template_vars = self.__handle_with_incoming( state, **{ "repeat1_add": "dummy", "repeat1_0|param2": "moo2", } ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 2 assert state.inputs[ "repeat1" ][ 0 ][ "param2" ] == "moo2" # Hitting remove drops a repeat element template, template_vars = self.__handle_with_incoming( state, repeat1_1_remove="dummy" ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 1 def test_data_param_execute( self ): self._init_tool( tools_support.SIMPLE_CAT_TOOL_CONTENTS ) hda = self.__add_dataset(1) # Execute tool action template, template_vars = self.__handle_with_incoming( param1=1, runtool_btn="dummy", ) self.__assert_exeuted( template, template_vars ) # Tool 'executed' once, with hda as param1 assert len( self.tool_action.execution_call_args ) == 1 assert self.tool_action.execution_call_args[ 0 ][ "incoming" ][ "param1" ] == hda def test_data_param_state_update( self ): self._init_tool( tools_support.SIMPLE_CAT_TOOL_CONTENTS ) hda = self.__add_dataset( 1 ) # Update state template, template_vars = self.__handle_with_incoming( param1=1, ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert hda == state.inputs[ "param1" ] def test_simple_multirun_state_update( self ): hda1, hda2 = self.__setup_multirun_job() template, template_vars = self.__handle_with_incoming( **{ "param1|__multirun__": [ 1, 2 ], } ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) self.__assert_state_serializable( state ) self.assertEquals( state.inputs[ "param1|__multirun__" ], [ 1, 2 ] ) def test_simple_collection_multirun_state_update( self ): hdca = self.__setup_collection_multirun_job() encoded_id = self.app.security.encode_id(hdca.id) template, template_vars = self.__handle_with_incoming( **{ "param1|__collection_multirun__": encoded_id, } ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) self.__assert_state_serializable( state ) self.assertEquals( state.inputs[ "param1|__collection_multirun__" ], encoded_id ) def test_repeat_multirun_state_updates( self ): self._init_tool( REPEAT_TOOL_CONTENTS ) # Fresh state contains no repeat elements self.__handle_with_incoming() # Hitting add button adds repeat element template, template_vars = self.__handle_with_incoming(**{ "param1|__multirun__": [ 1, 2 ], "repeat1_add": "dummy", }) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) self.assertEquals( state.inputs[ "param1|__multirun__" ], [ 1, 2 ] ) assert len( state.inputs[ "repeat1" ] ) == 1 # Hitting add button again adds another repeat element template, template_vars = self.__handle_with_incoming( state, **{ "repeat1_0|param2|__multirun__": [ 1, 2 ], "repeat1_add": "dummy", } ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) self.assertEquals( state.inputs[ "param1|__multirun__" ], [ 1, 2 ] ) self.assertEquals( state.inputs[ "repeat1" ][0][ "param2|__multirun__" ], [ 1, 2 ] ) def test_simple_multirun_execution( self ): hda1, hda2 = self.__setup_multirun_job() template, template_vars = self.__handle_with_incoming( **{ "param1|__multirun__": [ 1, 2 ], "runtool_btn": "dummy", } ) self.__assert_exeuted( template, template_vars ) # Tool 'executed' twice, with param1 as hda1 and hda2 respectively. assert len( self.tool_action.execution_call_args ) == 2 self.assertEquals( self.tool_action.execution_call_args[ 0 ][ "incoming" ][ "param1" ], hda1 ) self.assertEquals( self.tool_action.execution_call_args[ 1 ][ "incoming" ][ "param1" ], hda2 ) self.assertEquals( len( template_vars[ "jobs" ] ), 2 ) def test_cannot_multirun_and_remap( self ): hda1, hda2 = self.__setup_multirun_job() template, template_vars = self.__handle_with_incoming( **{ "param1|__multirun__": [ 1, 2 ], "rerun_remap_job_id": self.app.security.encode_id(123), # Not encoded "runtool_btn": "dummy", } ) self.assertEquals( template, "message.mako" ) assert template_vars[ "status" ] == "error" assert "multiple jobs" in template_vars[ "message" ] def test_multirun_with_state_updates( self ): hda1, hda2 = self.__setup_multirun_job() # Fresh state contains no repeat elements template, template_vars = self.__handle_with_incoming() state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 0 self.__assert_state_serializable( state ) # Hitting add button adds repeat element template, template_vars = self.__handle_with_incoming( **{ "param1|__multirun__": [ 1, 2 ], "repeat1_add": "dummy", } ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 1 self.assertEquals( state.inputs[ "param1|__multirun__" ], [ 1, 2 ] ) self.__assert_state_serializable( state ) # Hitting add button again adds another repeat element template, template_vars = self.__handle_with_incoming( state, **{ "repeat1_add": "dummy", "repeat1_0|param2": 1, } ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) self.assertEquals( state.inputs[ "param1|__multirun__" ], [ 1, 2 ] ) assert len( state.inputs[ "repeat1" ] ) == 2 assert state.inputs[ "repeat1" ][ 0 ][ "param2" ] == hda1 self.__assert_state_serializable( state ) # Hitting remove drops a repeat element template, template_vars = self.__handle_with_incoming( state, repeat1_1_remove="dummy" ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 1 self.__assert_state_serializable( state ) def test_collection_multirun_with_state_updates( self ): hda1, hda2 = self.__setup_multirun_job() collection = self.__history_dataset_collection_for( [ hda1, hda2 ] ) collection_id = self.app.security.encode_id( collection.id ) self.app.dataset_collections_service = Bunch( match_collections=lambda collections: None ) template, template_vars = self.__handle_with_incoming( **{ "param1|__collection_multirun__": collection_id, "runtool_btn": "dummy", } ) self.__assert_exeuted( template, template_vars ) def test_subcollection_multirun_with_state_updates( self ): self._init_tool( REPEAT_COLLECTION_PARAM_CONTENTS ) hda1, hda2 = self.__add_dataset( 1 ), self.__add_dataset( 2 ) collection = self.__history_dataset_collection_for( [ hda1, hda2 ], collection_type="list:paired" ) collection_id = self.app.security.encode_id( collection.id ) self.app.dataset_collections_service = Bunch( match_collections=lambda collections: None ) template, template_vars = self.__handle_with_incoming( repeat1_add="dummy", ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert len( state.inputs[ "repeat1" ] ) == 1 template, template_vars = self.__handle_with_incoming( state, **{ "repeat1_0|param2|__collection_multirun__": "%s|paired" % collection_id, "repeat1_add": "dummy", } ) state = self.__assert_rerenders_tool_without_errors( template, template_vars ) assert state.inputs[ "repeat1" ][ 0 ][ "param2|__collection_multirun__" ] == "%s|paired" % collection_id def __history_dataset_collection_for( self, hdas, collection_type="list", id=1234 ): collection = galaxy.model.DatasetCollection( collection_type=collection_type, ) to_element = lambda hda: galaxy.model.DatasetCollectionElement( collection=collection, element=hda, ) elements = map(to_element, hdas) if collection_type == "list:paired": paired_collection = galaxy.model.DatasetCollection( collection_type="paired", ) paired_collection.elements = elements list_dce = galaxy.model.DatasetCollectionElement( collection=collection, element=paired_collection, ) elements = [ list_dce ] collection.elements = elements history_dataset_collection_association = galaxy.model.HistoryDatasetCollectionAssociation( id=id, collection=collection, ) hdcas = self.trans.sa_session.model_objects[ galaxy.model.HistoryDatasetCollectionAssociation ] hdcas[ id ] = history_dataset_collection_association return history_dataset_collection_association def __assert_state_serializable( self, state ): self.__state_to_string( state ) # Will thrown exception if there is a problem... def __setup_multirun_job( self ): self._init_tool( tools_support.SIMPLE_CAT_TOOL_CONTENTS ) hda1, hda2 = self.__add_dataset( 1 ), self.__add_dataset( 2 ) return hda1, hda2 def __setup_collection_multirun_job( self ): self._init_tool( tools_support.SIMPLE_CAT_TOOL_CONTENTS ) hdca = self.__add_collection_dataset( 1 ) return hdca def __handle_with_incoming( self, previous_state=None, **kwds ): """ Execute tool.handle_input with incoming specified by kwds (optionally extending a previous state). """ if previous_state: incoming = self.__to_incoming( previous_state, **kwds) else: incoming = kwds return self.tool.handle_input( trans=self.trans, incoming=incoming, ) def __to_incoming( self, state, **kwds ): new_incoming = {} params_to_incoming( new_incoming, self.tool.inputs, state.inputs, self.app ) # Copy meta parameters over lost by params_to_incoming... for key, value in state.inputs.iteritems(): if key.endswith( "|__multirun__" ): new_incoming[ key ] = value new_incoming[ "tool_state" ] = self.__state_to_string( state ) new_incoming.update( kwds ) return new_incoming def __add_dataset( self, id, state='ok' ): hda = galaxy.model.HistoryDatasetAssociation() hda.id = id hda.dataset = galaxy.model.Dataset() hda.dataset.state = 'ok' self.trans.sa_session.model_objects[ galaxy.model.HistoryDatasetAssociation ][ id ] = hda self.history.datasets.append( hda ) return hda def __add_collection_dataset( self, id, collection_type="paired", *hdas ): hdca = galaxy.model.HistoryDatasetCollectionAssociation() hdca.id = id collection = galaxy.model.DatasetCollection() hdca.collection = collection collection.elements = [ galaxy.model.DatasetCollectionElement(element=self.__add_dataset( 1 )) ] collection.type = collection_type self.trans.sa_session.model_objects[ galaxy.model.HistoryDatasetCollectionAssociation ][ id ] = hdca self.history.dataset_collections.append( hdca ) return hdca def __assert_rerenders_tool_without_errors( self, template, template_vars ): assert template == "tool_form.mako" self.__assert_no_errors( template_vars ) state = template_vars[ "tool_state" ] return state def __assert_exeuted( self, template, template_vars ): if template == "tool_form.mako": self.__assert_no_errors( template_vars ) self.assertEquals( template, "tool_executed.mako", "Expected tools_execute template - got template %s with vars %s" % ( template, template_vars) ) def __assert_no_errors( self, template_vars ): assert "errors" in template_vars, "tool_form.mako rendered without errors defintion." errors = template_vars[ "errors" ] assert not errors, "Template rendered unexpected errors - %s" % errors def __string_to_state( self, state_string ): encoded_state = string_to_object( state_string ) state = DefaultToolState() state.decode( encoded_state, self.tool, self.app ) return state def __inputs_to_state( self, inputs ): tool_state = DefaultToolState() tool_state.inputs = inputs return tool_state def __state_to_string( self, tool_state ): return object_to_string( tool_state.encode( self.tool, self.app ) ) def __inputs_to_state_string( self, inputs ): tool_state = self.__inputs_to_state( inputs ) return self.__state_to_string( tool_state ) class MockAction( object ): def __init__( self, expected_trans ): self.expected_trans = expected_trans self.execution_call_args = [] self.expect_redirect = False self.exception_after_exection = None self.error_message_after_excution = None def execute( self, tool, trans, **kwds ): assert self.expected_trans == trans self.execution_call_args.append( kwds ) num_calls = len( self.execution_call_args ) if self.expect_redirect: raise httpexceptions.HTTPFound( "http://google.com" ) if self.exception_after_exection is not None: if num_calls > self.exception_after_exection: raise Exception( "Test Exception" ) if self.error_message_after_excution is not None: if num_calls > self.error_message_after_excution: return None, "Test Error Message" return galaxy.model.Job(), odict(dict(out1="1")) def raise_exception( self, after_execution=0 ): self.exception_after_exection = after_execution def return_error( self, after_execution=0 ): self.error_message_after_excution = after_execution class MockTrans( object ): def __init__( self, app, history ): self.app = app self.history = history self.history._active_datasets_children_and_roles = filter( lambda hda: hda.active and hda.history == history, self.app.model.context.model_objects[ galaxy.model.HistoryDatasetAssociation ] ) self.workflow_building_mode = False self.webapp = Bunch( name="galaxy" ) self.sa_session = self.app.model.context def get_history( self ): return self.history class MockCollectionService( object ): def __init__( self ): self.collection_info = object() def match_collections( self, collections_to_match ): return self.collection_info