import ConfigParser
import logging
import os
import re
from galaxy import util
from galaxy import web
from galaxy.web.form_builder import build_select_field
from galaxy.webapps.tool_shed.model import directory_hash_id
from tool_shed.util.web_util import escape
from tool_shed.dependencies.repository import relation_builder
from tool_shed.util import common_util
from tool_shed.util import hg_util
from tool_shed.util import shed_util_common as suc
log = logging.getLogger( __name__ )
VALID_REPOSITORYNAME_RE = re.compile( "^[a-z0-9\_]+$" )
def build_allow_push_select_field( trans, current_push_list, selected_value='none' ):
options = []
for user in trans.sa_session.query( trans.model.User ):
if user.username not in current_push_list:
options.append( user )
return build_select_field( trans,
objs=options,
label_attr='username',
select_field_name='allow_push',
selected_value=selected_value,
refresh_on_change=False,
multiple=True )
def change_repository_name_in_hgrc_file( hgrc_file, new_name ):
config = ConfigParser.ConfigParser()
config.read( hgrc_file )
config.read( hgrc_file )
config.set( 'web', 'name', new_name )
new_file = open( hgrc_file, 'wb' )
config.write( new_file )
new_file.close()
def check_or_update_tool_shed_status_for_installed_repository( app, repository ):
updated = False
tool_shed_status_dict = suc.get_tool_shed_status_for_installed_repository( app, repository )
if tool_shed_status_dict:
ok = True
if tool_shed_status_dict != repository.tool_shed_status:
repository.tool_shed_status = tool_shed_status_dict
app.install_model.context.add( repository )
app.install_model.context.flush()
updated = True
else:
ok = False
return ok, updated
def create_repo_info_dict( app, repository_clone_url, changeset_revision, ctx_rev, repository_owner, repository_name=None,
repository=None, repository_metadata=None, tool_dependencies=None, repository_dependencies=None ):
"""
Return a dictionary that includes all of the information needed to install a repository into a local
Galaxy instance. The dictionary will also contain the recursive list of repository dependencies defined
for the repository, as well as the defined tool dependencies.
This method is called from Galaxy under four scenarios:
1. During the tool shed repository installation process via the tool shed's get_repository_information()
method. In this case both the received repository and repository_metadata will be objects, but
tool_dependencies and repository_dependencies will be None.
2. When getting updates for an installed repository where the updates include newly defined repository
dependency definitions. This scenario is similar to 1. above. The tool shed's get_repository_information()
method is the caller, and both the received repository and repository_metadata will be objects, but
tool_dependencies and repository_dependencies will be None.
3. When a tool shed repository that was uninstalled from a Galaxy instance is being reinstalled with no
updates available. In this case, both repository and repository_metadata will be None, but tool_dependencies
and repository_dependencies will be objects previously retrieved from the tool shed if the repository includes
definitions for them.
4. When a tool shed repository that was uninstalled from a Galaxy instance is being reinstalled with updates
available. In this case, this method is reached via the tool shed's get_updated_repository_information()
method, and both repository and repository_metadata will be objects but tool_dependencies and
repository_dependencies will be None.
"""
repo_info_dict = {}
repository = suc.get_repository_by_name_and_owner( app, repository_name, repository_owner )
if app.name == 'tool_shed':
# We're in the tool shed.
repository_metadata = suc.get_repository_metadata_by_changeset_revision( app,
app.security.encode_id( repository.id ),
changeset_revision )
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
tool_shed_url = str( web.url_for( '/', qualified=True ) ).rstrip( '/' )
rb = relation_builder.RelationBuilder( app, repository, repository_metadata, tool_shed_url )
# Get a dictionary of all repositories upon which the contents of the received repository depends.
repository_dependencies = rb.get_repository_dependencies_for_changeset_revision()
tool_dependencies = metadata.get( 'tool_dependencies', {} )
if tool_dependencies:
new_tool_dependencies = {}
for dependency_key, requirements_dict in tool_dependencies.items():
if dependency_key in [ 'set_environment' ]:
new_set_environment_dict_list = []
for set_environment_dict in requirements_dict:
set_environment_dict[ 'repository_name' ] = repository_name
set_environment_dict[ 'repository_owner' ] = repository_owner
set_environment_dict[ 'changeset_revision' ] = changeset_revision
new_set_environment_dict_list.append( set_environment_dict )
new_tool_dependencies[ dependency_key ] = new_set_environment_dict_list
else:
requirements_dict[ 'repository_name' ] = repository_name
requirements_dict[ 'repository_owner' ] = repository_owner
requirements_dict[ 'changeset_revision' ] = changeset_revision
new_tool_dependencies[ dependency_key ] = requirements_dict
tool_dependencies = new_tool_dependencies
# Cast unicode to string, with the exception of description, since it is free text and can contain special characters.
repo_info_dict[ str( repository.name ) ] = ( repository.description,
str( repository_clone_url ),
str( changeset_revision ),
str( ctx_rev ),
str( repository_owner ),
repository_dependencies,
tool_dependencies )
return repo_info_dict
def create_repository( app, name, type, description, long_description, user_id, category_ids=[], remote_repository_url=None, homepage_url=None ):
"""Create a new ToolShed repository"""
sa_session = app.model.context.current
# Add the repository record to the database.
repository = app.model.Repository( name=name,
type=type,
remote_repository_url=remote_repository_url,
homepage_url=homepage_url,
description=description,
long_description=long_description,
user_id=user_id )
# Flush to get the id.
sa_session.add( repository )
sa_session.flush()
# Create an admin role for the repository.
repository_admin_role = create_repository_admin_role( app, repository )
# Determine the repository's repo_path on disk.
dir = os.path.join( app.config.file_path, *directory_hash_id( repository.id ) )
# Create directory if it does not exist.
if not os.path.exists( dir ):
os.makedirs( dir )
# Define repo name inside hashed directory.
repository_path = os.path.join( dir, "repo_%d" % repository.id )
# Create local repository directory.
if not os.path.exists( repository_path ):
os.makedirs( repository_path )
# Create the local repository.
repo = hg_util.get_repo_for_repository( app, repository=None, repo_path=repository_path, create=True )
# Add an entry in the hgweb.config file for the local repository.
lhs = "repos/%s/%s" % ( repository.user.username, repository.name )
app.hgweb_config_manager.add_entry( lhs, repository_path )
# Create a .hg/hgrc file for the local repository.
hg_util.create_hgrc_file( app, repository )
flush_needed = False
if category_ids:
# Create category associations
for category_id in category_ids:
category = sa_session.query( app.model.Category ) \
.get( app.security.decode_id( category_id ) )
rca = app.model.RepositoryCategoryAssociation( repository, category )
sa_session.add( rca )
flush_needed = True
if flush_needed:
sa_session.flush()
# Update the repository registry.
app.repository_registry.add_entry( repository )
message = "Repository %s has been created." % escape( str( repository.name ) )
return repository, message
def create_repository_admin_role( app, repository ):
"""
Create a new role with name-spaced name based on the repository name and its owner's public user
name. This will ensure that the tole name is unique.
"""
sa_session = app.model.context.current
name = get_repository_admin_role_name( str( repository.name ), str( repository.user.username ) )
description = 'A user or group member with this role can administer this repository.'
role = app.model.Role( name=name, description=description, type=app.model.Role.types.SYSTEM )
sa_session.add( role )
sa_session.flush()
# Associate the role with the repository owner.
ura = app.model.UserRoleAssociation( repository.user, role )
# Associate the role with the repository.
rra = app.model.RepositoryRoleAssociation( repository, role )
sa_session.add( rra )
sa_session.flush()
return role
def get_installed_tool_shed_repository( app, id ):
"""Get a tool shed repository record from the Galaxy database defined by the id."""
return app.install_model.context.query( app.install_model.ToolShedRepository ) \
.get( app.security.decode_id( id ) )
def get_repo_info_dict( app, user, repository_id, changeset_revision ):
repository = suc.get_repository_in_tool_shed( app, repository_id )
repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False )
repository_clone_url = common_util.generate_clone_url_for_repository_in_tool_shed( user, repository )
repository_metadata = suc.get_repository_metadata_by_changeset_revision( app,
repository_id,
changeset_revision )
if not repository_metadata:
# The received changeset_revision is no longer installable, so get the next changeset_revision
# in the repository's changelog. This generally occurs only with repositories of type
# repository_suite_definition or tool_dependency_definition.
next_downloadable_changeset_revision = \
suc.get_next_downloadable_changeset_revision( repository, repo, changeset_revision )
if next_downloadable_changeset_revision:
repository_metadata = suc.get_repository_metadata_by_changeset_revision( app,
repository_id,
next_downloadable_changeset_revision )
if repository_metadata:
# For now, we'll always assume that we'll get repository_metadata, but if we discover our assumption
# is not valid we'll have to enhance the callers to handle repository_metadata values of None in the
# returned repo_info_dict.
metadata = repository_metadata.metadata
if 'tools' in metadata:
includes_tools = True
else:
includes_tools = False
includes_tools_for_display_in_tool_panel = repository_metadata.includes_tools_for_display_in_tool_panel
repository_dependencies_dict = metadata.get( 'repository_dependencies', {} )
repository_dependencies = repository_dependencies_dict.get( 'repository_dependencies', [] )
has_repository_dependencies, has_repository_dependencies_only_if_compiling_contained_td = \
suc.get_repository_dependency_types( repository_dependencies )
if 'tool_dependencies' in metadata:
includes_tool_dependencies = True
else:
includes_tool_dependencies = False
else:
# Here's where we may have to handle enhancements to the callers. See above comment.
includes_tools = False
has_repository_dependencies = False
has_repository_dependencies_only_if_compiling_contained_td = False
includes_tool_dependencies = False
includes_tools_for_display_in_tool_panel = False
ctx = hg_util.get_changectx_for_changeset( repo, changeset_revision )
repo_info_dict = create_repo_info_dict( app=app,
repository_clone_url=repository_clone_url,
changeset_revision=changeset_revision,
ctx_rev=str( ctx.rev() ),
repository_owner=repository.user.username,
repository_name=repository.name,
repository=repository,
repository_metadata=repository_metadata,
tool_dependencies=None,
repository_dependencies=None )
return repo_info_dict, includes_tools, includes_tool_dependencies, includes_tools_for_display_in_tool_panel, \
has_repository_dependencies, has_repository_dependencies_only_if_compiling_contained_td
def get_repository_admin_role_name( repository_name, repository_owner ):
return '%s_%s_admin' % ( str( repository_name ), str( repository_owner ) )
def get_role_by_id( app, role_id ):
"""Get a Role from the database by id."""
sa_session = app.model.context.current
return sa_session.query( app.model.Role ).get( app.security.decode_id( role_id ) )
def handle_role_associations( app, role, repository, **kwd ):
sa_session = app.model.context.current
message = escape( kwd.get( 'message', '' ) )
status = kwd.get( 'status', 'done' )
repository_owner = repository.user
if kwd.get( 'manage_role_associations_button', False ):
in_users_list = util.listify( kwd.get( 'in_users', [] ) )
in_users = [ sa_session.query( app.model.User ).get( x ) for x in in_users_list ]
# Make sure the repository owner is always associated with the repostory's admin role.
owner_associated = False
for user in in_users:
if user.id == repository_owner.id:
owner_associated = True
break
if not owner_associated:
in_users.append( repository_owner )
message += "The repository owner must always be associated with the repository's administrator role. "
status = 'error'
in_groups_list = util.listify( kwd.get( 'in_groups', [] ) )
in_groups = [ sa_session.query( app.model.Group ).get( x ) for x in in_groups_list ]
in_repositories = [ repository ]
app.security_agent.set_entity_role_associations( roles=[ role ],
users=in_users,
groups=in_groups,
repositories=in_repositories )
sa_session.refresh( role )
message += "Role %s has been associated with %d users, %d groups and %d repositories. " % \
( escape( str( role.name ) ), len( in_users ), len( in_groups ), len( in_repositories ) )
in_users = []
out_users = []
in_groups = []
out_groups = []
for user in sa_session.query( app.model.User ) \
.filter( app.model.User.table.c.deleted==False ) \
.order_by( app.model.User.table.c.email ):
if user in [ x.user for x in role.users ]:
in_users.append( ( user.id, user.email ) )
else:
out_users.append( ( user.id, user.email ) )
for group in sa_session.query( app.model.Group ) \
.filter( app.model.Group.table.c.deleted==False ) \
.order_by( app.model.Group.table.c.name ):
if group in [ x.group for x in role.groups ]:
in_groups.append( ( group.id, group.name ) )
else:
out_groups.append( ( group.id, group.name ) )
associations_dict = dict( in_users=in_users,
out_users=out_users,
in_groups=in_groups,
out_groups=out_groups,
message=message,
status=status )
return associations_dict
def validate_repository_name( app, name, user ):
# Repository names must be unique for each user, must be at least four characters
# in length and must contain only lower-case letters, numbers, and the '_' character.
if name in [ 'None', None, '' ]:
return 'Enter the required repository name.'
if name in [ 'repos' ]:
return "The term %s is a reserved word in the tool shed, so it cannot be used as a repository name." % name
check_existing = suc.get_repository_by_name_and_owner( app, name, user.username )
if check_existing is not None:
if check_existing.deleted:
return 'You have a deleted repository named %s, so choose a different name.' % name
else:
return "You already have a repository named %s, so choose a different name." % name
if len( name ) < 2:
return "Repository names must be at least 2 characters in length."
if len( name ) > 80:
return "Repository names cannot be more than 80 characters in length."
if not( VALID_REPOSITORYNAME_RE.match( name ) ):
return "Repository names must contain only lower-case letters, numbers and underscore _."
return ''