import galaxy.model from galaxy.model.orm import * from base.twilltestcase import * from base.test_db_util import * class TestFormsAndSampleTracking( TwillTestCase ): # ====== Setup Users, Groups & Roles required for this test suite ========= def test_0000_initiate_users( self ): """Ensuring all required user accounts exist""" self.logout() self.login( email='test1@bx.psu.edu', username='regular-user1' ) global regular_user1 regular_user1 = get_user( 'test1@bx.psu.edu' ) assert regular_user1 is not None, 'Problem retrieving user with email "test1@bx.psu.edu" from the database' global regular_user1_private_role regular_user1_private_role = get_private_role( regular_user1 ) self.logout() self.login( email='test2@bx.psu.edu', username='regular-user2' ) global regular_user2 regular_user2 = get_user( 'test2@bx.psu.edu' ) assert regular_user2 is not None, 'Problem retrieving user with email "test2@bx.psu.edu" from the database' global regular_user2_private_role regular_user2_private_role = get_private_role( regular_user2 ) self.logout() self.login( email='test3@bx.psu.edu', username='regular-user3' ) global regular_user3 regular_user3 = get_user( 'test3@bx.psu.edu' ) assert regular_user3 is not None, 'Problem retrieving user with email "test3@bx.psu.edu" from the database' global regular_user3_private_role regular_user3_private_role = get_private_role( regular_user3 ) self.logout() self.login( email='test@bx.psu.edu', username='admin-user' ) global admin_user admin_user = get_user( 'test@bx.psu.edu' ) assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' global admin_user_private_role admin_user_private_role = get_private_role( admin_user ) def test_0005_create_required_groups_and_roles( self ): """Testing creating all required groups and roles for this script""" # Logged in as admin_user # Create role1 name = 'Role1' description = "This is Role1's description" user_ids = [ str( admin_user.id ), str( regular_user1.id ), str( regular_user3.id ) ] self.create_role( name=name, description=description, in_user_ids=user_ids, in_group_ids=[], create_group_for_role='no', private_role=admin_user.email ) # Get the role object for later tests global role1 role1 = get_role_by_name( name ) # Create group1 name = 'Group1' self.create_group( name=name, in_user_ids=[ str( regular_user1.id ) ], in_role_ids=[ str( role1.id ) ] ) # Get the group object for later tests global group1 group1 = get_group_by_name( name ) assert group1 is not None, 'Problem retrieving group named "Group1" from the database' # NOTE: To get this to work with twill, all select lists on the ~/admin/role page must contain at least # 1 option value or twill throws an exception, which is: ParseError: OPTION outside of SELECT # Due to this bug in twill, we create the role, we bypass the page and visit the URL in the # associate_users_and_groups_with_role() method. # #create role2 name = 'Role2' description = 'This is Role2' user_ids = [ str( admin_user.id ) ] group_ids = [ str( group1.id ) ] private_role = admin_user.email self.create_role( name=name, description=description, in_user_ids=user_ids, in_group_ids=group_ids, private_role=private_role ) # Get the role object for later tests global role2 role2 = get_role_by_name( name ) assert role2 is not None, 'Problem retrieving role named "Role2" from the database' def test_0010_create_library( self ): """Testing creating the target data library and folder""" # Logged in as admin_user for index in range( 0, 2 ): name = 'library%s' % str( index + 1 ) description = '%s description' % name synopsis = '%s synopsis' % name self.create_library( name=name, description=description, synopsis=synopsis ) # Get the libraries for later use global library1 library1 = get_library( 'library1', 'library1 description', 'library1 synopsis' ) assert library1 is not None, 'Problem retrieving library (library1) from the database' global library2 library2 = get_library( 'library2', 'library2 description', 'library2 synopsis' ) assert library2 is not None, 'Problem retrieving library (library2) from the database' # setup add_library_item permission to regular_user1 # Set permissions on the library, sort for later testing. permissions_in = [ 'LIBRARY_ACCESS' ] permissions_out = [] # Role1 members are: admin_user, regular_user1, regular_user3. # Each of these users will be permitted for LIBRARY_ACCESS, LIBRARY_ADD on # library1 and library2. for library in [ library1, library2 ]: self.library_permissions( self.security.encode_id( library.id ), library.name, str( role1.id ), permissions_in, permissions_out ) def test_0015_create_folders( self ): # adding a folder for library in [ library1, library2 ]: name = "%s_folder1" % library.name description = "%s description" % name self.add_folder( 'library_admin', self.security.encode_id( library.id ), self.security.encode_id( library.root_folder.id ), name=name, description=description ) global library1_folder1 library1_folder1 = get_folder( library1.root_folder.id, 'library1_folder1', 'library1_folder1 description' ) assert library1_folder1 is not None, 'Problem retrieving library folder named "library1_folder1" from the database' global library2_folder1 library2_folder1 = get_folder( library2.root_folder.id, 'library2_folder1', 'library2_folder1 description' ) assert library2_folder1 is not None, 'Problem retrieving library folder named "library2_folder1" from the database' # add folders 4 levels deep to library1_folder1 # level 2 name = "%s_folder2" % library2.name description = "%s description" % name self.add_folder( 'library_admin', self.security.encode_id( library2.id ), self.security.encode_id( library2_folder1.id ), name=name, description=description ) global library2_folder2 library2_folder2 = get_folder( library2_folder1.id, name, description ) assert library2_folder2 is not None, 'Problem retrieving library folder named "%s" from the database' % name # level 3 name = "%s_folder3" % library2.name description = "%s description" % name self.add_folder( 'library_admin', self.security.encode_id( library2.id ), self.security.encode_id( library2_folder2.id ), name=name, description=description ) global library2_folder3 library2_folder3 = get_folder( library2_folder2.id, name, description ) assert library2_folder3 is not None, 'Problem retrieving library folder named "%s" from the database' % name # level 4 name = "%s_folder4" % library2.name description = "%s description" % name self.add_folder( 'library_admin', self.security.encode_id( library2.id ), self.security.encode_id( library2_folder3.id ), name=name, description=description ) global library2_folder4 library2_folder4 = get_folder( library2_folder3.id, name, description ) assert library2_folder4 is not None, 'Problem retrieving library folder named "%s" from the database' % name # # ====== Form definition test methods ================================================ # def test_0020_create_request_form_definition( self ): """Testing creating a sequencing request form definition, editing the name and description and adding fields""" # Logged in as admin_user # Create a form definition tmp_name = "Temp form" tmp_desc = "Temp form description" form_type = galaxy.model.FormDefinition.types.REQUEST self.create_form( name=tmp_name, description=tmp_desc, form_type=form_type, num_fields=0, field_name='1_field_name', strings_displayed=[ 'Create a new form definition' ], strings_displayed_after_submit=[ tmp_name, tmp_desc, form_type ] ) def test_0025_edit_request_form_fields( self ): # field names tmp_name = "Temp form" tmp_form = get_form( tmp_name ) # Edit the name and description of the form definition, and add 3 fields. new_name = "Request Form" new_desc = "Request Form description" form_type = galaxy.model.FormDefinition.types.REQUEST # labels global request_field_label1 request_field_label1 = 'Request form field1' global request_field_label2 request_field_label2 = 'Request form field2' global request_field_label3 request_field_label3 = 'Request form field3' global request_form_field_name1 request_form_field_name1 = 'request_form_field1' global request_form_field_name2 request_form_field_name2 = 'request_form_field2' global request_form_field_name3 request_form_field_name3 = 'request_form_field3' field_dicts = [ dict( label=request_field_label1, desc='Description of '+request_field_label1, type='SelectField', required='optional', selectlist=[ 'option1', 'option2' ], name=request_form_field_name1 ), dict( label=request_field_label2, desc='Description of '+request_field_label2, type='AddressField', required='optional', name=request_form_field_name2 ), dict( label=request_field_label3, desc='Description of '+request_field_label3, type='TextField', required='required', name=request_form_field_name3 ) ] self.edit_form( id=self.security.encode_id( tmp_form.current.id ), new_form_name=new_name, new_form_desc=new_desc, field_dicts=field_dicts, field_index=len( tmp_form.fields ), strings_displayed=[ 'Edit form definition "%s"' % tmp_name ], strings_displayed_after_submit=[ "The form '%s' has been updated with the changes." % new_name ] ) # Get the form_definition object for later tests global request_form_definition1 request_form_definition1 = get_form( new_name ) assert request_form_definition1 is not None, 'Problem retrieving form named "%s" from the database' % new_name assert len( request_form_definition1.fields ) == len( tmp_form.fields ) + len( field_dicts ) # check form view self.view_form( id=self.security.encode_id( request_form_definition1.current.id ), form_name=new_name, form_desc=new_desc, form_type=form_type, field_dicts=field_dicts ) def test_0030_create_sample_form_definition( self ): """Testing creating sequencing sample form definition and adding fields""" name = "Sample Form" desc = "This is Sample Form's description" form_type = galaxy.model.FormDefinition.types.SAMPLE global sample_form_layout_grid_name sample_form_layout_grid_name = 'Layout Grid1' self.create_form( name=name, description=desc, form_type=form_type, form_layout_name=sample_form_layout_grid_name, num_fields=0, field_name='1_field_name', strings_displayed=[ 'Create a new form definition' ], strings_displayed_after_submit=[ "The form '%s' has been updated with the changes." % name ] ) def test_0035_add_fields_to_sample_form( self ): # now add fields to the sample form definition name = "Sample Form" desc = "This is Sample Form's description" form_type = galaxy.model.FormDefinition.types.SAMPLE tmp_form = get_form( name ) global sample_field_label1 sample_field_label1 = 'Sample form field1' global sample_field_label2 sample_field_label2 = 'Sample form field2' global sample_field_label3 sample_field_label3 = 'Sample form field3' field_dicts = [ dict( label=sample_field_label1, desc='Description of '+sample_field_label1, type='SelectField', required='optional', selectlist=[ 'option1', 'option2' ], name='sample_form_field1' ), dict( label=sample_field_label2, desc='Description of '+sample_field_label2, type='TextField', required='optional', name='sample_form_field2' ), dict( label=sample_field_label3, desc='Description of '+sample_field_label3, type='TextField', required='required', name='sample_form_field3' ) ] self.edit_form( id=self.security.encode_id( tmp_form.current.id ), new_form_name=name, new_form_desc=desc, field_dicts=field_dicts, field_index=len( tmp_form.fields ), strings_displayed=[ 'Edit form definition "%s"' % name ], strings_displayed_after_submit=[ "The form '%s' has been updated with the changes." % name ] ) global sample_form_definition1 sample_form_definition1 = get_form( name ) assert sample_form_definition1 is not None, "Error retrieving form %s from db" % name assert len( sample_form_definition1.fields ) == len( field_dicts ) # check form view self.view_form( id=self.security.encode_id( sample_form_definition1.current.id ), form_name=name, form_desc=desc, form_type=form_type, form_layout_name=sample_form_layout_grid_name, field_dicts=field_dicts ) def test_0040_create_request_type( self ): """Testing creating a request_type""" name = 'Request type1' sample_states = [ ( 'New', 'Sample entered into the system' ), ( 'Received', 'Sample tube received' ), ( 'Library Started', 'Sample library preparation' ), ( 'Run Started', 'Sequence run in progress' ), ( 'Done', 'Sequence run complete' ) ] self.create_request_type( name, name+" description", self.security.encode_id( request_form_definition1.id ), self.security.encode_id( sample_form_definition1.id ), sample_states, strings_displayed=[ 'Create a new request type' ], strings_displayed_after_submit=[ "The request type has been created." ] ) global request_type1 request_type1 = get_request_type_by_name( name ) assert request_type1 is not None, 'Problem retrieving request type named "%s" from the database' % name # check view self.view_request_type( self.security.encode_id( request_type1.id ), request_type1.name, strings_displayed=[ request_form_definition1.name, sample_form_definition1.name ], sample_states=sample_states) def test_0045_set_request_type_permissions( self ): # Role1 members are: admin_user, regular_user1, regular_user3. Each of these users will be permitted for # REQUEST_TYPE_ACCESS on this request_type permissions_in = [ k for k, v in galaxy.model.RequestType.permitted_actions.items() ] permissions_out = [] self.request_type_permissions( self.security.encode_id( request_type1.id ), request_type1.name, str( role1.id ), permissions_in, permissions_out ) # Make sure the request_type1 is not accessible by regular_user2 since regular_user2 does not have Role1. self.logout() self.login( email=regular_user2.email ) self.visit_url( '%s/requests_common/create_request?cntrller=requests&request_type=True' % self.url ) try: self.check_page_for_string( 'There are no request types created for a new request.' ) raise AssertionError, 'The request_type %s is accessible by %s when it should be restricted' % ( request_type1.name, regular_user2.email ) except: pass self.logout() self.login( email=admin_user.email ) # ====== Sequencing request test methods - regular user perspective ================ def test_0050_create_request( self ): """Testing creating a sequencing request as a regular user""" # logged in as admin_user # Create a user_address self.logout() self.login( email=regular_user1.email ) # add new address for this user address_dict = dict( short_desc="Office", name="James Bond", institution="MI6" , address="MI6 Headquarters", city="London", state="London", postal_code="007", country="United Kingdom", phone="007-007-0007" ) self.add_user_address( self.security.encode_id( regular_user1.id ), address_dict ) global user_address1 user_address1 = get_user_address( regular_user1, address_dict[ 'short_desc' ] ) # Set field values - the tuples in the field_values list include the field_value, and True if refresh_on_change # is required for that field. field_value_tuples = [ ( request_form_field_name1, 'option1', False ), ( request_form_field_name2, ( str( user_address1.id ), str( user_address1.id ) ), True ), ( request_form_field_name3, 'field3 value', False ) ] # Create the request name = 'Request1' desc = 'Request1 Description' self.create_request( cntrller='requests', request_type_id=self.security.encode_id( request_type1.id ), name=name, desc=desc, field_value_tuples=field_value_tuples, strings_displayed=[ 'Create a new sequencing request', request_field_label1, request_field_label2, request_field_label3 ], strings_displayed_after_submit=[ 'The sequencing request has been created.', name, desc ] ) global request1 request1 = get_request_by_name( name ) def test_0055_verify_request_details( self ): # Make sure the request's state is now set to NEW assert request1.state is not request1.states.NEW, "The state of the request '%s' should be set to '%s'" \ % ( request1.name, request1.states.NEW ) # check request page self.view_request( cntrller='requests', request_id=self.security.encode_id( request1.id ), strings_displayed=[ 'Sequencing request "%s"' % request1.name, 'There are no samples.' ], strings_not_displayed=[ request1.states.SUBMITTED, request1.states.COMPLETE, request1.states.REJECTED, 'Submit request' ] ) # this button should NOT show up as there are no samples yet # check if the request is showing in the 'new' filter self.check_request_grid( cntrller='requests', state=request1.states.NEW, strings_displayed=[ request1.name ] ) self.view_request_history( cntrller='requests', request_id=self.security.encode_id( request1.id ), strings_displayed=[ 'History of sequencing request "%s"' % request1.name, request1.states.NEW, 'Sequencing request created' ], strings_not_displayed=[ request1.states.SUBMITTED, request1.states.COMPLETE, request1.states.REJECTED ] ) def test_0060_edit_basic_request_info( self ): """Testing editing the basic information and email settings of a sequencing request""" # logged in as regular_user1 fields = [ ( request_form_field_name1, 'option2' ), ( request_form_field_name2, str( user_address1.id ) ), ( request_form_field_name3, 'field3 value (edited)' ) ] new_name=request1.name + ' (Renamed)' new_desc=request1.desc + ' (Re-described)' self.edit_basic_request_info( request_id=self.security.encode_id( request1.id ), cntrller='requests', name=request1.name, new_name=new_name, new_desc=new_desc, new_fields=fields, strings_displayed=[ 'Edit sequencing request "%s"' % request1.name ], strings_displayed_after_submit=[ new_name, new_desc ] ) refresh( request1 ) # define the sample states when we want an email notification global email_notification_sample_states email_notification_sample_states = [ request1.type.states[2], request1.type.states[4] ] # check email notification settings check_sample_states = [] for state in email_notification_sample_states: check_sample_states.append( ( state.name, state.id, True ) ) strings_displayed = [ 'Edit sequencing request "%s"' % request1.name, 'Email notification settings' ] additional_emails = [ 'test@.bx.psu.edu', 'test2@.bx.psu.edu' ] strings_displayed_after_submit = [ "The changes made to the email notification settings have been saved", '\r\n'.join( additional_emails ) ] self.edit_request_email_settings( cntrller='requests', request_id=self.security.encode_id( request1.id ), check_request_owner=True, additional_emails='\r\n'.join( additional_emails ), check_sample_states=check_sample_states, strings_displayed=strings_displayed, strings_displayed_after_submit=strings_displayed_after_submit ) # lastly check the details in the request page strings_displayed = [ 'Sequencing request "%s"' % new_name, new_desc ] for field in fields: strings_displayed.append( field[1] ) for state_name, id, is_checked in check_sample_states: strings_displayed.append( state_name ) for email in additional_emails: strings_displayed.append( email ) self.view_request( cntrller='requests', request_id=self.security.encode_id( request1.id ), strings_displayed=strings_displayed, strings_not_displayed=[] ) def test_0065_add_samples_to_request( self ): """Testing adding samples to request""" # logged in as regular_user1 # Sample fields - the tuple represents a sample name and a list of sample form field values target_library_info = dict(library=self.security.encode_id(library2.id), folder=self.security.encode_id(library2_folder1.id) ) sample_value_tuples = \ [ ( 'Sample1', target_library_info, [ 'option1', 'sample1 field2 value', 'sample1 field3 value' ] ), ( 'Sample2', target_library_info, [ 'option2', 'sample2 field2 value', 'sample2 field3 value' ] ), ( 'Sample3', target_library_info, [ 'option1', 'sample3 field2 value', 'sample3 field3 value' ] ) ] strings_displayed_after_submit = [ 'Unsubmitted' ] for sample_name, lib_info, field_values in sample_value_tuples: strings_displayed_after_submit.append( sample_name ) strings_displayed_after_submit.append( library2.name ) strings_displayed_after_submit.append( library2_folder1.name ) # add the sample values too for values in field_values: strings_displayed_after_submit.append( values ) # list folders that populates folder selectfield when a data library is selected folder_options = [ library2_folder1.name, library2_folder2.name, library2_folder3.name, library2_folder4.name ] # Add samples to the request self.add_samples( cntrller='requests', request_id=self.security.encode_id( request1.id ), sample_value_tuples=sample_value_tuples, folder_options=folder_options, strings_displayed=[ 'Add samples to sequencing request "%s"' % request1.name, '' ], # sample name input field strings_displayed_after_submit=strings_displayed_after_submit ) # check the new sample field values on the request page strings_displayed = [ 'Sequencing request "%s"' % request1.name, 'Submit request' ] # this button should appear now strings_displayed.extend( strings_displayed_after_submit ) strings_displayed_count = [] strings_displayed_count.append( ( library2.name, len( sample_value_tuples ) ) ) strings_displayed_count.append( ( library2_folder1.name, len( sample_value_tuples ) ) ) self.view_request( cntrller='requests', request_id=self.security.encode_id( request1.id ), strings_displayed=strings_displayed, strings_displayed_count=strings_displayed_count ) def test_0070_edit_samples_of_new_request( self ): """Testing editing the sample information of new request1""" # logged in as regular_user1 # target data library - change it to library1 target_library_info = dict(library=self.security.encode_id( library1.id ), folder=self.security.encode_id( library1_folder1.id ) ) new_sample_value_tuples = \ [ ( 'Sample1_renamed', target_library_info, [ 'option2', 'sample1 field2 value edited', 'sample1 field3 value edited' ] ), ( 'Sample2_renamed', target_library_info, [ 'option1', 'sample2 field2 value edited', 'sample2 field3 value edited' ] ), ( 'Sample3_renamed', target_library_info, [ 'option2', 'sample3 field2 value edited', 'sample3 field3 value edited' ] ) ] strings_displayed_after_submit = [ 'Unsubmitted' ] for sample_name, lib_info, field_values in new_sample_value_tuples: strings_displayed_after_submit.append( sample_name ) # add the sample values too for values in field_values: strings_displayed_after_submit.append( values ) strings_displayed = [ 'Edit Current Samples of Sequencing Request "%s"' % request1.name, '', # sample name input field library2_folder1.name, # all the folders in library2 should show up in the folder selectlist library2_folder2.name, library2_folder3.name, library2_folder4.name ] # Add samples to the request self.edit_samples( cntrller='requests', request_id=self.security.encode_id( request1.id ), sample_value_tuples=new_sample_value_tuples, strings_displayed=strings_displayed, strings_displayed_after_submit=strings_displayed_after_submit ) # check the changed sample field values on the request page strings_displayed = [ 'Sequencing request "%s"' % request1.name ] strings_displayed.extend( strings_displayed_after_submit ) strings_displayed_count = [] strings_displayed_count.append( ( library1.name, len( new_sample_value_tuples ) ) ) strings_displayed_count.append( ( library1_folder1.name, len( new_sample_value_tuples ) ) ) self.view_request( cntrller='requests', request_id=self.security.encode_id( request1.id ), strings_displayed=strings_displayed, strings_displayed_count=strings_displayed_count ) def test_0075_submit_request( self ): """Testing submitting a sequencing request""" # logged in as regular_user1 self.submit_request( cntrller='requests', request_id=self.security.encode_id( request1.id ), request_name=request1.name, strings_displayed_after_submit=[ 'The sequencing request has been submitted.' ] ) refresh( request1 ) # Make sure the request is showing in the 'submitted' filter self.check_request_grid( cntrller='requests', state=request1.states.SUBMITTED, strings_displayed=[ request1.name ] ) # Make sure the request's state is now set to 'submitted' assert request1.state is not request1.states.SUBMITTED, "The state of the sequencing request '%s' should be set to '%s'" \ % ( request1.name, request1.states.SUBMITTED ) # the sample state should appear once for each sample strings_displayed_count = [ ( request1.type.states[0].name, len( request1.samples ) ) ] # after submission, these buttons should not appear strings_not_displayed = [ 'Add sample', 'Submit request' ] # check the request page self.view_request( cntrller='requests', request_id=self.security.encode_id( request1.id ), strings_displayed=[ request1.states.SUBMITTED ], strings_displayed_count=strings_displayed_count, strings_not_displayed=strings_not_displayed ) strings_displayed=[ 'History of sequencing request "%s"' % request1.name, 'Sequencing request submitted by %s' % regular_user1.email, 'Sequencing request created' ] strings_displayed_count = [ ( request1.states.SUBMITTED, 1 ) ] self.view_request_history( cntrller='requests', request_id=self.security.encode_id( request1.id ), strings_displayed=strings_displayed, strings_displayed_count=strings_displayed_count, strings_not_displayed=[ request1.states.COMPLETE, request1.states.REJECTED ] ) # ====== Sequencing request test methods - Admin perspective ================ def test_0080_receive_request_as_admin( self ): """Testing receiving a sequencing request and assigning it barcodes""" # logged in as regular_user1 self.logout() # login as a admin_user to assign bar codes to samples self.login( email=admin_user.email ) self.check_request_grid( cntrller='requests_admin', state=request1.states.SUBMITTED, strings_displayed=[ request1.name ] ) strings_displayed = [ request1.states.SUBMITTED, 'Reject this request', 'Add sample', 'Edit samples' ] self.view_request( cntrller='requests_admin', request_id=self.security.encode_id( request1.id ), strings_displayed=strings_displayed ) def test_0085_add_bar_codes( self ): # Set bar codes for the samples bar_codes = [ '10001', '10002', '10003' ] strings_displayed_after_submit = [ 'Changes made to the samples have been saved.' ] strings_displayed_after_submit.extend( bar_codes ) self.add_bar_codes( cntrller='requests_admin', request_id=self.security.encode_id( request1.id ), bar_codes=bar_codes, strings_displayed=[ 'Edit Current Samples of Sequencing Request "%s"' % request1.name ], strings_displayed_after_submit=strings_displayed_after_submit ) # the second sample state should appear once for each sample strings_displayed_count = [ ( request1.type.states[1].name, len( request1.samples ) ), ( request1.type.states[0].name, 0 ) ] # check the request page self.view_request( cntrller='requests_admin', request_id=self.security.encode_id( request1.id ), strings_displayed=bar_codes, strings_displayed_count=strings_displayed_count ) # the sample state descriptions of the future states should not appear # here the state names are not checked as all of them appear at the top of # the page like: state1 > state2 > state3 strings_not_displayed=[ request1.type.states[2].desc, request1.type.states[3].desc, request1.type.states[4].desc ] # check history of each sample for sample in request1.samples: strings_displayed = [ 'History of sample "%s"' % sample.name, 'Sequencing request submitted and sample state set to %s' % request1.type.states[0].name, request1.type.states[0].name, request1.type.states[1].name ] self.view_sample_history( cntrller='requests_admin', sample_id=self.security.encode_id( sample.id ), strings_displayed=strings_displayed, strings_not_displayed=strings_not_displayed ) def test_0090_request_lifecycle( self ): """Testing request life-cycle as it goes through all the states""" # logged in as admin_user self.check_request_grid( cntrller='requests_admin', state=request1.states.SUBMITTED, strings_displayed=[ request1.name ] ) strings_displayed=[ 'History of sequencing request "%s"' % request1.name ] # Change the states of all the samples of this request to ultimately be COMPLETE for index, state in enumerate( request_type1.states ): # start from the second state onwards if index > 1: # status message if index == len( request_type1.states ) - 1: status_msg = 'All samples of this sequencing request are in the final sample state (%s).' % state.name else: status_msg = 'All samples of this sequencing request are in the (%s) sample state. ' % state.name # check email notification message email_msg = '' if state.id in [ email_state.id for email_state in email_notification_sample_states ]: email_msg = 'Email notification failed as SMTP server not set in config file' self.change_sample_state( request_id=self.security.encode_id( request1.id ), sample_ids=[ sample.id for sample in request1.samples ], new_sample_state_id=self.security.encode_id( state.id ), strings_displayed=[ 'Edit Current Samples of Sequencing Request "%s"' % request1.name ], strings_displayed_after_submit = [ status_msg, email_msg ] ) # check request history page if index == len( request_type1.states ) - 1: strings_displayed.append( status_msg ) else: strings_displayed.append( status_msg ) self.view_request_history( cntrller='requests_admin', request_id=self.security.encode_id( request1.id ), strings_displayed=strings_displayed, strings_not_displayed=[ request1.states.REJECTED ] ) refresh( request1 ) # check if the request's state is now set to 'complete' self.check_request_grid( cntrller='requests_admin', state='Complete', strings_displayed=[ request1.name ] ) assert request1.state is not request1.states.COMPLETE, "The state of the sequencing request '%s' should be set to '%s'" \ % ( request1.name, request1.states.COMPLETE ) def test_0095_admin_create_request_on_behalf_of_regular_user( self ): """Testing creating and submitting a request as an admin on behalf of a regular user""" # Logged in as regular_user1 self.logout() self.login( email=admin_user.email ) # Create the request name = "Request2" desc = 'Request2 Description' # new address new_address_dict = dict( short_desc="Home", name="Sherlock Holmes", institution="None" , address="221B Baker Street", city="London", state="London", postal_code="34534", country="United Kingdom", phone="007-007-0007" ) # Set field values - the tuples in the field_values list include the field_value, and True if refresh_on_change # is required for that field. field_value_tuples = [ ( request_form_field_name1, 'option2', False ), ( request_form_field_name2, ( 'new', new_address_dict ) , True ), ( request_form_field_name3, 'field_2_value', False ) ] self.create_request( cntrller='requests_admin', request_type_id=self.security.encode_id( request_type1.id ), other_users_id=self.security.encode_id( regular_user1.id ), name=name, desc=desc, field_value_tuples=field_value_tuples, strings_displayed=[ 'Create a new sequencing request', request_field_label1, request_field_label2, request_field_label3 ], strings_displayed_after_submit=[ "The sequencing request has been created.", name, desc ] ) global request2 request2 = get_request_by_name( name ) global user_address2 user_address2 = get_user_address( regular_user1, new_address_dict[ 'short_desc' ] ) # Make sure the request is showing in the 'new' filter self.check_request_grid( cntrller='requests_admin', state=request2.states.NEW, strings_displayed=[ request2.name ] ) # Make sure the request's state is now set to 'new' assert request2.state is not request2.states.NEW, "The state of the request '%s' should be set to '%s'" \ % ( request2.name, request2.states.NEW ) def test_0100_add_samples_to_request( self ): target_library_info = dict( library=None, folder=None ) # Sample fields - the tuple represents a sample name and a list of sample form field values sample_value_tuples = \ [ ( 'Sample1', target_library_info, [ 'option1', 'sample1 field2 value', 'sample1 field3 value' ] ), ( 'Sample2', target_library_info, [ 'option2', 'sample2 field2 value', 'sample2 field3 value' ] ), ( 'Sample3', target_library_info, [ 'option1', 'sample3 field2 value', 'sample3 field3 value' ] ) ] strings_displayed_after_submit = [ 'Unsubmitted' ] for sample_name, lib_info, field_values in sample_value_tuples: strings_displayed_after_submit.append( sample_name ) for values in field_values: strings_displayed_after_submit.append( values ) # Add samples to the request self.add_samples( cntrller='requests_admin', request_id=self.security.encode_id( request2.id ), sample_value_tuples=sample_value_tuples, strings_displayed=[ 'Add samples to sequencing request "%s"' % request2.name, '' ], # sample name input field strings_displayed_after_submit=strings_displayed_after_submit ) # Submit the request self.submit_request( cntrller='requests_admin', request_id=self.security.encode_id( request2.id ), request_name=request2.name, strings_displayed_after_submit=[ 'The sequencing request has been submitted.' ] ) refresh( request2 ) # Make sure the request is showing in the 'submitted' filter self.check_request_grid( cntrller='requests_admin', state=request2.states.SUBMITTED, strings_displayed=[ request2.name ] ) # Make sure the request's state is now set to 'submitted' assert request2.state is not request2.states.SUBMITTED, "The state of the sequencing request '%s' should be set to '%s'" \ % ( request2.name, request2.states.SUBMITTED ) # Make sure both requests are showing in the 'All' filter self.check_request_grid( cntrller='requests_admin', state='All', strings_displayed=[ request1.name, request2.name ] ) def test_0105_set_request_target_library( self ): # list folders that populates folder selectfield when a data library is selected folder_options = [ library2_folder1.name, library2_folder2.name, library2_folder3.name, library2_folder4.name ] # set the target data library to library2 using sample operation user interface self.change_sample_target_data_library( cntrller='requests', request_id=self.security.encode_id( request2.id ), sample_ids=[ sample.id for sample in request2.samples ], new_library_id=self.security.encode_id( library2.id ), new_folder_id=self.security.encode_id( library2_folder1.id ), folder_options=folder_options, strings_displayed=[ 'Edit Current Samples of Sequencing Request "%s"' % request2.name ], strings_displayed_after_submit=[ 'Changes made to the samples have been saved.' ] ) # check the changed target data library & folder on the request page strings_displayed_count = [] strings_displayed_count.append( ( library2.name, len( request1.samples ) ) ) strings_displayed_count.append( ( library2_folder1.name, len( request1.samples ) ) ) strings_displayed = [ 'Sequencing request "%s"' % request2.name, regular_user1.email, request2.states.SUBMITTED ] select_target_library_message = "Select a target data library and folder for a sample before selecting its datasets to transfer from the external service" self.view_request( cntrller='requests', request_id=self.security.encode_id( request2.id ), strings_displayed=strings_displayed, strings_not_displayed=[ select_target_library_message ], strings_displayed_count=strings_displayed_count ) def test_0110_reject_request( self ): """Testing rejecting a request""" # Logged in as admin_user rejection_reason="This is why the sequencing request was rejected." self.reject_request( request_id=self.security.encode_id( request2.id ), request_name=request2.name, comment=rejection_reason, strings_displayed=[ 'Reject sequencing request "%s"' % request2.name ], strings_displayed_after_submit=[ 'Sequencing request (%s) has been rejected.' % request2.name ] ) refresh( request2 ) # Make sure the request is showing in the 'rejected' filter self.check_request_grid( cntrller='requests_admin', state=request2.states.REJECTED, strings_displayed=[ request2.name ] ) # Make sure the request's state is now set to REJECTED assert request2.state is not request2.states.REJECTED, "The state of the request '%s' should be set to '%s'" \ % ( request2.name, request2.states.REJECTED ) # The rejection reason should show up in the request page and the request history page rejection_message = 'Sequencing request marked rejected by %s. Reason: %s' % ( admin_user.email, rejection_reason ) strings_displayed = [ request2.states.REJECTED, rejection_message ] self.view_request( cntrller='requests', request_id=self.security.encode_id( request2.id ), strings_displayed=strings_displayed ) self.view_request_history( cntrller='requests', request_id=self.security.encode_id( request2.id ), strings_displayed=strings_displayed ) # login as the regular user to make sure that the request2 is fully editable self.logout() self.login( email=regular_user1.email ) strings_displayed=[ 'Sequencing request "%s"' % request2.name, request1.states.REJECTED, rejection_message ] visible_buttons = [ 'Add sample', 'Edit samples', 'Submit request' ] strings_displayed.extend( visible_buttons ) self.view_request( cntrller='requests', request_id=self.security.encode_id( request2.id ), strings_displayed=strings_displayed, strings_not_displayed=[ request1.states.SUBMITTED, request1.states.COMPLETE, request1.states.NEW] ) def __test_0115_select_datasets_for_transfer( self ): """Testing selecting datasets for data transfer""" # Logged in as admin_user self.logout() self.login( email=admin_user.email ) # Setup the dummy datasets for sample1 of request1 sample_datasets = [ '/path/to/sample1_dataset1', '/path/to/sample1_dataset2', '/path/to/sample1_dataset3' ] sample_dataset_file_names = [ dataset.split( '/' )[-1] for dataset in sample_datasets ] global request1_sample1 request1_sample1 = request1.get_sample( 'Sample1_renamed' ) external_service = request1_sample1.external_service strings_displayed_after_submit = [ 'Datasets (%s) have been selected for sample (%s)' % \ ( str( sample_dataset_file_names )[1:-1].replace( "'", "" ), request1_sample1.name ) ] strings_displayed = [ 'Select datasets to transfer from data directory configured for the sequencer' ] self.add_datasets_to_sample( request_id=self.security.encode_id( request2.id ), sample_id= self.security.encode_id( request1_sample1.id ), external_service_id=self.security.encode_id( external_serviceexternal_service.id ), sample_datasets=sample_datasets, strings_displayed=strings_displayed, strings_displayed_after_submit=strings_displayed_after_submit ) assert len( request1_sample1.datasets ) == len( sample_datasets ) # check the sample dataset info page for sample1_dataset in request1_sample1.datasets: strings_displayed = [ '"%s" Dataset' % request1_sample1.name, sample1_dataset.file_path, sample1_dataset.transfer_status.NOT_STARTED ] self.view_sample_dataset( sample_dataset_id=self.security.encode_id( sample1_dataset.id ), strings_displayed=strings_displayed ) def __test_0120_manage_sample_datasets( self ): """Testing renaming, deleting and initiating transfer of sample datasets""" # Logged in as admin_user # Check renaming datasets new_sample_dataset_names = [ ( 'path', request1_sample1.datasets[0].name ), ( 'to', request1_sample1.datasets[1].name+'/renamed' ), ( 'none', request1_sample1.datasets[2].name+'_renamed' ) ] sample_dataset_ids = [ self.security.encode_id( dataset.id ) for dataset in request1_sample1.datasets ] strings_displayed = [ 'Rename datasets for Sample "%s"' % request1_sample1.name ] strings_displayed_after_submit = [ 'Changes saved successfully.' ] strings_displayed_after_submit.extend( [ new_name for prefix, new_name in new_sample_dataset_names ] ) self.rename_sample_datasets( sample_id=self.security.encode_id( request1_sample1.id ), sample_dataset_ids=sample_dataset_ids, new_sample_dataset_names=new_sample_dataset_names, strings_displayed=strings_displayed ) # Check deletion sample_dataset_ids = [ self.security.encode_id( request1_sample1.datasets[0].id ) ] strings_displayed = [ 'Manage "%s" datasets' % request1_sample1.name ] strings_displayed_after_submit = [ '%i datasets have been deleted.' % len( sample_dataset_ids ) ] strings_not_displayed = [ request1_sample1.datasets[0].name ] self.delete_sample_datasets( sample_id=self.security.encode_id( request1_sample1.id ), sample_dataset_ids=sample_dataset_ids, strings_displayed=strings_displayed, strings_displayed_after_submit=strings_displayed_after_submit, strings_not_displayed=strings_not_displayed ) refresh( request1_sample1 ) assert len( request1_sample1.datasets ) == ( len( new_sample_dataset_names )-1 ) # Check data transfer # In this test we only test transfer initiation. For data transfer to complete # successfully we need RabbitMQ setup. Since that is not possible in the functional # tests framework, this checks if correct error message is displayed and the transfer # status of the sample datasets remains at 'Not started' when the Transfer button is clicked. sample_dataset_ids = [ self.security.encode_id( dataset.id ) for dataset in request1_sample1.datasets ] strings_displayed = [ 'Manage "%s" datasets' % request1_sample1.name ] strings_displayed_count = [ ( galaxy.model.SampleDataset.transfer_status.NOT_STARTED, len( request1_sample1.datasets ) ) ] strings_displayed_after_submit = [ "Error in sequencer login information. Please set your API Key in your User Preferences to transfer datasets." ] strings_not_displayed = [ galaxy.model.SampleDataset.transfer_status.IN_QUEUE, galaxy.model.SampleDataset.transfer_status.TRANSFERRING, galaxy.model.SampleDataset.transfer_status.ADD_TO_LIBRARY, galaxy.model.SampleDataset.transfer_status.COMPLETE ] self.start_sample_datasets_transfer( sample_id=self.security.encode_id( request1_sample1.id ), sample_dataset_ids=sample_dataset_ids, strings_displayed=strings_displayed, strings_displayed_after_submit=strings_displayed_after_submit, strings_not_displayed=strings_not_displayed, strings_displayed_count=strings_displayed_count ) # check the sample dataset info page for sample1_dataset in request1_sample1.datasets: strings_displayed = [ '"%s" Dataset' % request1_sample1.name, sample1_dataset.file_path, sample1_dataset.transfer_status.NOT_STARTED ] self.view_sample_dataset( sample_dataset_id=self.security.encode_id( sample1_dataset.id ), strings_displayed=strings_displayed ) def test_999_reset_data_for_later_test_runs( self ): """Reseting data to enable later test runs to pass""" # Logged in as admin_user self.logout() self.login( email=admin_user.email ) ################## # Purge all libraries ################## for library in [ library1, library2 ]: self.delete_library_item( 'library_admin', self.security.encode_id( library.id ), self.security.encode_id( library.id ), library.name, item_type='library' ) self.purge_library( self.security.encode_id( library.id ), library.name ) ################## # Mark all requests deleted and delete all their samples ################## for request in [ request1, request2 ]: # delete samples for sample in request.samples: # delete sample datasets for sample_dataset in sample.datasets: delete_obj( sample_dataset ) delete_obj( sample ) mark_obj_deleted( request ) ################## # Delete request_type permissions ################## for request_type in [ request_type1 ]: delete_request_type_permissions( request_type.id ) ################## # Mark all request_types deleted ################## for request_type in [ request_type1 ]: mark_obj_deleted( request_type ) ################## # Mark all forms deleted ################## for form in [ request_form_definition1, sample_form_definition1 ]: self.mark_form_deleted( self.security.encode_id( form.current.id ) ) ################## # Mark all user_addresses deleted ################## for user_address in [ user_address1, user_address2 ]: mark_obj_deleted( user_address ) ################## # Delete all non-private roles ################## for role in [ role1, role2 ]: self.mark_role_deleted( self.security.encode_id( role.id ), role.name ) self.purge_role( self.security.encode_id( role.id ), role.name ) # Manually delete the role from the database refresh( role ) delete_obj( role ) ################## # Delete all groups ################## for group in [ group1 ]: self.mark_group_deleted( self.security.encode_id( group.id ), group.name ) self.purge_group( self.security.encode_id( group.id ), group.name ) # Manually delete the group from the database refresh( group ) delete_obj( group )