import logging
import os
import shutil
import galaxy.tools
from galaxy import util
from galaxy.datatypes import checkers
from galaxy.util.expressions import ExpressionContext
from galaxy.web.form_builder import SelectField
from tool_shed.util import basic_util
log = logging.getLogger( __name__ )
def build_shed_tool_conf_select_field( app ):
"""Build a SelectField whose options are the keys in app.toolbox.shed_tool_confs."""
options = []
for dynamic_tool_conf_filename in app.toolbox.dynamic_conf_filenames():
if dynamic_tool_conf_filename.startswith( './' ):
option_label = dynamic_tool_conf_filename.replace( './', '', 1 )
else:
option_label = dynamic_tool_conf_filename
options.append( ( option_label, dynamic_tool_conf_filename ) )
select_field = SelectField( name='shed_tool_conf' )
for option_tup in options:
select_field.add_option( option_tup[ 0 ], option_tup[ 1 ] )
return select_field
def build_tool_panel_section_select_field( app ):
"""Build a SelectField whose options are the sections of the current in-memory toolbox."""
options = []
for section_id, section_name in app.toolbox.get_sections():
options.append( ( section_name, section_id ) )
select_field = SelectField( name='tool_panel_section_id', display='radio' )
for option_tup in options:
select_field.add_option( option_tup[ 0 ], option_tup[ 1 ] )
return select_field
def copy_sample_file( app, filename, dest_path=None ):
"""
Copy xxx.sample to dest_path/xxx.sample and dest_path/xxx. The default value for dest_path
is ~/tool-data.
"""
if dest_path is None:
dest_path = os.path.abspath( app.config.tool_data_path )
sample_file_name = basic_util.strip_path( filename )
copied_file = sample_file_name.replace( '.sample', '' )
full_source_path = os.path.abspath( filename )
full_destination_path = os.path.join( dest_path, sample_file_name )
# Don't copy a file to itself - not sure how this happens, but sometimes it does...
if full_source_path != full_destination_path:
# It's ok to overwrite the .sample version of the file.
shutil.copy( full_source_path, full_destination_path )
# Only create the .loc file if it does not yet exist. We don't overwrite it in case it
# contains stuff proprietary to the local instance.
if not os.path.exists( os.path.join( dest_path, copied_file ) ):
shutil.copy( full_source_path, os.path.join( dest_path, copied_file ) )
def copy_sample_files( app, sample_files, tool_path=None, sample_files_copied=None, dest_path=None ):
"""
Copy all appropriate files to dest_path in the local Galaxy environment that have not
already been copied. Those that have been copied are contained in sample_files_copied.
The default value for dest_path is ~/tool-data. We need to be careful to copy only
appropriate files here because tool shed repositories can contain files ending in .sample
that should not be copied to the ~/tool-data directory.
"""
filenames_not_to_copy = [ 'tool_data_table_conf.xml.sample' ]
sample_files_copied = util.listify( sample_files_copied )
for filename in sample_files:
filename_sans_path = os.path.split( filename )[ 1 ]
if filename_sans_path not in filenames_not_to_copy and filename not in sample_files_copied:
if tool_path:
filename = os.path.join( tool_path, filename )
# Attempt to ensure we're copying an appropriate file.
if is_data_index_sample_file( filename ):
copy_sample_file( app, filename, dest_path=dest_path )
def generate_message_for_invalid_tools( app, invalid_file_tups, repository, metadata_dict, as_html=True,
displaying_invalid_tool=False ):
if as_html:
new_line = '
'
bold_start = ''
bold_end = ''
else:
new_line = '\n'
bold_start = ''
bold_end = ''
message = ''
if app.name == 'galaxy':
tip_rev = str( repository.changeset_revision )
else:
tip_rev = str( repository.tip( app ) )
if not displaying_invalid_tool:
if metadata_dict:
message += "Metadata may have been defined for some items in revision '%s'. " % tip_rev
message += "Correct the following problems if necessary and reset metadata.%s" % new_line
else:
message += "Metadata cannot be defined for revision '%s' so this revision cannot be automatically " % tip_rev
message += "installed into a local Galaxy instance. Correct the following problems and reset metadata.%s" % new_line
for itc_tup in invalid_file_tups:
tool_file, exception_msg = itc_tup
if exception_msg.find( 'No such file or directory' ) >= 0:
exception_items = exception_msg.split()
missing_file_items = exception_items[ 7 ].split( '/' )
missing_file = missing_file_items[ -1 ].rstrip( '\'' )
if missing_file.endswith( '.loc' ):
sample_ext = '%s.sample' % missing_file
else:
sample_ext = missing_file
correction_msg = "This file refers to a missing file %s%s%s. " % \
( bold_start, str( missing_file ), bold_end )
correction_msg += "Upload a file named %s%s%s to the repository to correct this error." % \
( bold_start, sample_ext, bold_end )
else:
if as_html:
correction_msg = exception_msg
else:
correction_msg = exception_msg.replace( '
', new_line ).replace( '', bold_start ).replace( '', bold_end )
message += "%s%s%s - %s%s" % ( bold_start, tool_file, bold_end, correction_msg, new_line )
return message
def get_headers( fname, sep, count=60, is_multi_byte=False ):
"""Returns a list with the first 'count' lines split by 'sep'."""
headers = []
for idx, line in enumerate( file( fname ) ):
line = line.rstrip( '\n\r' )
if is_multi_byte:
line = unicode( line, 'utf-8' )
sep = sep.encode( 'utf-8' )
headers.append( line.split( sep ) )
if idx == count:
break
return headers
def get_tool_path_install_dir( partial_install_dir, shed_tool_conf_dict, tool_dict, config_elems ):
for elem in config_elems:
if elem.tag == 'tool':
if elem.get( 'guid' ) == tool_dict[ 'guid' ]:
tool_path = shed_tool_conf_dict[ 'tool_path' ]
relative_install_dir = os.path.join( tool_path, partial_install_dir )
return tool_path, relative_install_dir
elif elem.tag == 'section':
for section_elem in elem:
if section_elem.tag == 'tool':
if section_elem.get( 'guid' ) == tool_dict[ 'guid' ]:
tool_path = shed_tool_conf_dict[ 'tool_path' ]
relative_install_dir = os.path.join( tool_path, partial_install_dir )
return tool_path, relative_install_dir
return None, None
def handle_missing_index_file( app, tool_path, sample_files, repository_tools_tups, sample_files_copied ):
"""
Inspect each tool to see if it has any input parameters that are dynamically
generated select lists that depend on a .loc file. This method is not called
from the tool shed, but from Galaxy when a repository is being installed.
"""
for index, repository_tools_tup in enumerate( repository_tools_tups ):
tup_path, guid, repository_tool = repository_tools_tup
params_with_missing_index_file = repository_tool.params_with_missing_index_file
for param in params_with_missing_index_file:
options = param.options
missing_file_name = basic_util.strip_path( options.missing_index_file )
if missing_file_name not in sample_files_copied:
# The repository must contain the required xxx.loc.sample file.
for sample_file in sample_files:
sample_file_name = basic_util.strip_path( sample_file )
if sample_file_name == '%s.sample' % missing_file_name:
copy_sample_file( app, sample_file )
if options.tool_data_table and options.tool_data_table.missing_index_file:
options.tool_data_table.handle_found_index_file( options.missing_index_file )
sample_files_copied.append( options.missing_index_file )
break
# Reload the tool into the local list of repository_tools_tups.
repository_tool = app.toolbox.load_tool( os.path.join( tool_path, tup_path ), guid=guid )
repository_tools_tups[ index ] = ( tup_path, guid, repository_tool )
return repository_tools_tups, sample_files_copied
def is_column_based( fname, sep='\t', skip=0, is_multi_byte=False ):
"""See if the file is column based with respect to a separator."""
headers = get_headers( fname, sep, is_multi_byte=is_multi_byte )
count = 0
if not headers:
return False
for hdr in headers[ skip: ]:
if hdr and hdr[ 0 ] and not hdr[ 0 ].startswith( '#' ):
if len( hdr ) > 1:
count = len( hdr )
break
if count < 2:
return False
for hdr in headers[ skip: ]:
if hdr and hdr[ 0 ] and not hdr[ 0 ].startswith( '#' ):
if len( hdr ) != count:
return False
return True
def is_data_index_sample_file( file_path ):
"""
Attempt to determine if a .sample file is appropriate for copying to ~/tool-data when
a tool shed repository is being installed into a Galaxy instance.
"""
# Currently most data index files are tabular, so check that first. We'll assume that
# if the file is tabular, it's ok to copy.
if is_column_based( file_path ):
return True
# If the file is any of the following, don't copy it.
if checkers.check_html( file_path ):
return False
if checkers.check_image( file_path ):
return False
if checkers.check_binary( name=file_path ):
return False
if checkers.is_bz2( file_path ):
return False
if checkers.is_gzip( file_path ):
return False
if checkers.check_zip( file_path ):
return False
# Default to copying the file if none of the above are true.
return True
def new_state( trans, tool, invalid=False ):
"""Create a new `DefaultToolState` for the received tool. Only inputs on the first page will be initialized."""
state = galaxy.tools.DefaultToolState()
state.inputs = {}
if invalid:
# We're attempting to display a tool in the tool shed that has been determined to have errors, so is invalid.
return state
inputs = tool.inputs_by_page[ 0 ]
context = ExpressionContext( state.inputs, parent=None )
for input in inputs.itervalues():
try:
state.inputs[ input.name ] = input.get_initial_value( trans, context )
except:
state.inputs[ input.name ] = []
return state
def panel_entry_per_tool( tool_section_dict ):
# Return True if tool_section_dict looks like this.
# { :
# [{ tool_config : ,
# id: ,
# version : ,
# name : }]}
# But not like this.
# { id: , version : , name : }
if not tool_section_dict:
return False
if len( tool_section_dict ) != 3:
return True
for k, v in tool_section_dict:
if k not in [ 'id', 'version', 'name' ]:
return True
return False
def reload_upload_tools( app ):
if hasattr( app, 'toolbox' ):
app.toolbox.handle_datatypes_changed()