import os
import json
import unittest
import tools_support
from galaxy import model
from galaxy import util
from galaxy.tools.parameters import output_collect
DEFAULT_TOOL_OUTPUT = "out1"
DEFAULT_EXTRA_NAME = "test1"
class CollectPrimaryDatasetsTestCase( unittest.TestCase, tools_support.UsesApp, tools_support.UsesTools ):
def setUp( self ):
self.setup_app( mock_model=False )
object_store = MockObjectStore()
self.app.object_store = object_store
self._init_tool( tools_support.SIMPLE_TOOL_CONTENTS )
self._setup_test_output( )
self.app.config.collect_outputs_from = "job_working_directory"
self.app.model.Dataset.object_store = object_store
def tearDown( self ):
if self.app.model.Dataset.object_store is self.app.object_store:
self.app.model.Dataset.object_store = None
def test_empty_collect( self ):
assert len( self._collect() ) == 0
def test_collect_multiple( self ):
path1 = self._setup_extra_file( name="test1" )
path2 = self._setup_extra_file( name="test2" )
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
self.assertEquals( len( datasets[ DEFAULT_TOOL_OUTPUT ] ), 2 )
created_hda_1 = datasets[ DEFAULT_TOOL_OUTPUT ][ "test1" ]
self.app.object_store.assert_created_with_path( created_hda_1.dataset, path1 )
created_hda_2 = datasets[ DEFAULT_TOOL_OUTPUT ][ "test2" ]
self.app.object_store.assert_created_with_path( created_hda_2.dataset, path2 )
# Test default metadata stuff
assert created_hda_1.visible
assert created_hda_1.dbkey == "?"
def test_collect_hidden( self ):
self._setup_extra_file( visible="hidden" )
created_hda = self._collect_default_extra()
assert not created_hda.visible
def test_collect_ext( self ):
self._setup_extra_file( ext="txt" )
created_hda = self._collect_default_extra()
assert created_hda.ext == "txt"
def test_copied_to_imported_histories( self ):
self._setup_extra_file( )
cloned_hda = self.hda.copy()
history_2 = self._new_history( hdas=[ cloned_hda ])
assert len( history_2.datasets ) == 1
self._collect()
# Make sure extra primary was copied to cloned history with
# cloned output.
assert len( history_2.datasets ) == 2
def test_dbkey_from_filename( self ):
self._setup_extra_file( dbkey="hg19" )
created_hda = self._collect_default_extra()
assert created_hda.dbkey == "hg19"
def test_dbkey_from_galaxy_json( self ):
path = self._setup_extra_file( )
self._append_job_json( dict( dbkey="hg19" ), output_path=path )
created_hda = self._collect_default_extra()
assert created_hda.dbkey == "hg19"
def test_name_from_galaxy_json( self ):
path = self._setup_extra_file( )
self._append_job_json( dict( name="test_from_json" ), output_path=path )
created_hda = self._collect_default_extra()
assert "test_from_json" in created_hda.name
def test_info_from_galaxy_json( self ):
path = self._setup_extra_file( )
self._append_job_json( dict( info="extra output info" ), output_path=path )
created_hda = self._collect_default_extra()
assert created_hda.info == "extra output info"
def test_extension_from_galaxy_json( self ):
path = self._setup_extra_file( )
self._append_job_json( dict( ext="txt" ), output_path=path )
created_hda = self._collect_default_extra()
assert created_hda.ext == "txt"
def test_new_file_path_collection( self ):
self.app.config.collect_outputs_from = "new_file_path"
self.app.config.new_file_path = self.test_directory
self._setup_extra_file( )
created_hda = self._collect_default_extra( job_working_directory="/tmp" )
assert created_hda
def test_job_param( self ):
self._setup_extra_file( )
assert len( self.job.output_datasets ) == 1
self._collect_default_extra()
assert len( self.job.output_datasets ) == 2
extra_job_assoc = filter( lambda job_assoc: job_assoc.name.startswith( "__" ), self.job.output_datasets )[ 0 ]
assert extra_job_assoc.name == "__new_primary_file_out1|test1__"
def test_pattern_override_designation( self ):
self._replace_output_collectors( '''''' )
self._setup_extra_file( subdir="subdir", filename="foo.txt" )
primary_outputs = self._collect( )[ DEFAULT_TOOL_OUTPUT ]
assert len( primary_outputs ) == 1
created_hda = primary_outputs.values()[ 0 ]
assert "foo.txt" in created_hda.name
assert created_hda.ext == "txt"
def test_name_and_ext_pattern( self ):
self._replace_output_collectors( '''''' )
self._setup_extra_file( subdir="subdir", filename="foo1.txt" )
self._setup_extra_file( subdir="subdir", filename="foo2.tabular" )
primary_outputs = self._collect( )[ DEFAULT_TOOL_OUTPUT ]
assert len( primary_outputs ) == 2
assert primary_outputs[ "foo1" ].ext == "txt"
assert primary_outputs[ "foo2" ].ext == "tabular"
def test_custom_pattern( self ):
# Hypothetical oral metagenomic classifier that populates a directory
# of files based on name and genome. Use custom regex pattern to grab
# and classify these files.
self._replace_output_collectors( '''''' )
self._setup_extra_file( subdir="genome_breakdown", filename="samp1__hg19.fasta" )
self._setup_extra_file( subdir="genome_breakdown", filename="samp2__lactLact.fasta" )
self._setup_extra_file( subdir="genome_breakdown", filename="samp3__hg19.fasta" )
self._setup_extra_file( subdir="genome_breakdown", filename="samp4__lactPlan.fasta" )
self._setup_extra_file( subdir="genome_breakdown", filename="samp5__fusoNucl.fasta" )
# Put a file in directory we don't care about, just to make sure
# it doesn't get picked up by pattern.
self._setup_extra_file( subdir="genome_breakdown", filename="overview.txt" )
primary_outputs = self._collect( )[ DEFAULT_TOOL_OUTPUT ]
assert len( primary_outputs ) == 5
genomes = dict( samp1="hg19", samp2="lactLact", samp3="hg19", samp4="lactPlan", samp5="fusoNucl" )
for key, hda in primary_outputs.iteritems():
assert hda.dbkey == genomes[ key ]
def test_name_versus_designation( self ):
""" This test demonstrates the difference between name and desgination
in grouping patterns and named patterns such as __designation__,
__name__, __designation_and_ext__, and __name_and_ext__.
"""
self._replace_output_collectors( '''''')
self._setup_extra_file( subdir="subdir_for_name_discovery", filename="example1.txt" )
self._setup_extra_file( subdir="subdir_for_designation_discovery", filename="example2.txt" )
primary_outputs = self._collect( )[ DEFAULT_TOOL_OUTPUT ]
name_output = primary_outputs[ "example1" ]
designation_output = primary_outputs[ "example2" ]
# While name is also used for designation, designation is not the name -
# it is used in the calculation of the name however...
assert name_output.name == "example1"
assert designation_output.name == "%s (%s)" % ( self.hda.name, "example2" )
def test_cannot_read_files_outside_job_directory( self ):
self._replace_output_collectors( '''''')
exception_thrown = False
try:
self._collect( )
except Exception:
exception_thrown = True
assert exception_thrown
def _collect_default_extra( self, **kwargs ):
return self._collect( **kwargs )[ DEFAULT_TOOL_OUTPUT ][ DEFAULT_EXTRA_NAME ]
def _collect( self, job_working_directory=None ):
if not job_working_directory:
job_working_directory = self.test_directory
return self.tool.collect_primary_datasets( self.outputs, job_working_directory, "txt" )
def _replace_output_collectors( self, xml_str ):
# Rewrite tool as if it had been created with output containing
# supplied dataset_collector elem.
elem = util.parse_xml_string( xml_str )
self.tool.outputs[ DEFAULT_TOOL_OUTPUT ].dataset_collectors = output_collect.dataset_collectors_from_elem( elem )
def _append_job_json( self, object, output_path=None, line_type="new_primary_dataset" ):
object[ "type" ] = line_type
if output_path:
name = os.path.basename( output_path )
object[ "filename" ] = name
line = json.dumps( object )
with open( os.path.join( self.test_directory, "galaxy.json" ), "a" ) as f:
f.write( "%s\n" % line )
def _setup_extra_file( self, **kwargs ):
path = kwargs.get( "path", None )
filename = kwargs.get( "filename", None )
if not path and not filename:
name = kwargs.get( "name", DEFAULT_EXTRA_NAME )
visible = kwargs.get( "visible", "visible" )
ext = kwargs.get( "ext", "data" )
template_args = ( self.hda.id, name, visible, ext )
directory = kwargs.get( "directory", self.test_directory )
path = os.path.join( directory, "primary_%s_%s_%s_%s" % template_args )
if "dbkey" in kwargs:
path = "%s_%s" % ( path, kwargs[ "dbkey" ] )
if not path:
assert filename
subdir = kwargs.get( "subdir", "." )
path = os.path.join( self.test_directory, subdir, filename )
directory = os.path.dirname( path )
if not os.path.exists( directory ):
os.makedirs( directory )
contents = kwargs.get( "contents", "test contents" )
open( path, "w" ).write( contents )
return path
def _setup_test_output( self ):
dataset = model.Dataset()
dataset.external_filename = "example_output" # This way object store isn't asked about size...
self.hda = model.HistoryDatasetAssociation( name="test", dataset=dataset )
job = model.Job()
job.add_output_dataset( DEFAULT_TOOL_OUTPUT, self.hda )
self.app.model.context.add( job )
self.job = job
self.history = self._new_history( hdas=[ self.hda ] )
self.outputs = { DEFAULT_TOOL_OUTPUT: self.hda }
def _new_history( self, hdas=[], flush=True ):
history = model.History()
self.app.model.context.add( history )
for hda in hdas:
history.add_dataset( hda, set_hid=False )
self.app.model.context.flush( )
return history
class MockObjectStore( object ):
def __init__( self ):
self.created_datasets = {}
def update_from_file( self, dataset, file_name, create ):
if create:
self.created_datasets[ dataset ] = file_name
def size( self, dataset ):
path = self.created_datasets[ dataset ]
return os.stat( path ).st_size
def get_filename( self, dataset ):
return self.created_datasets[ dataset ]
def assert_created_with_path( self, dataset, file_name ):
assert self.created_datasets[ dataset ] == file_name