Source code for trackhub.track

from __future__ import absolute_import

import os
import re
import warnings
from docutils.core import publish_parts
from trackhub.base import HubComponent, deprecation_handler
from trackhub import hub
from trackhub import constants
from trackhub import settings


TRACKTYPES = [
    "bigWig",
    "bam",
    "bigBed",
    "vcfTabix",
    "bigNarrowPeak",
    None,
    "bigBarChart",
    "bigChain",
    "bigGenePred",
    "bigNarrowPeak",
    "bigMaf",
    "bigPsl",
    "halSnake",
]


def _check_name(name):
    regex = re.compile("[^a-zA-Z0-9-_]")
    if regex.search(name):
        raise ValueError('Non-alphanumeric character in name "%s"' % name)


class ParameterError(Exception):
    pass


def update_list(existing, new, first=constants.initial_params):
    """
    Extend a list, but with constraints.

    Returned list is sorted alphabetically, except for any items that are in
    `first`, which will come first regardless of sorting.

    Parameters
    ----------
    existing : list
        List to extend

    new : list
        Update `existing` with this

    first : list or None
        If provided, ensure that this list occurs at the beginning. Items must
        already be in `existing` or `new`, others are ignored.
    """
    if first is None:
        first = []

    combined = set(existing + new)
    beginning = [i for i in first if i in combined]
    end = sorted(combined.difference(first))
    return beginning + end


[docs] class SubGroupDefinition(object):
[docs] def __init__(self, name, label, mapping, default="none"): """ Represents a subgroup line in a composite track. Instances of this class are provided to a composite track in order to define options for the subtracks' groups. Parameters ---------- name : str Name for the subgroup (e.g., "celltype"). label : str The label that will be displayed (e.g., "Cell_Type") mapping : dict Dictionary of {tag: title}, where `tag` will be how subtracks access this group and `title` is how it will be displayed in the browser, e.g.:: { "ES": "Embryonic stem cell", "MEF": "Mouse embryonic fibroblast" } Upon appending this SubGroupDefinition to a composite track, the options for the subtracks' subgroups are required to come from the keys in the mapping. Continuing the example, "celltype=ES" would be a valid subgroup for a subtrack, but "celltype=other" would not since it's not in the mapping dict above. default : str Value to be used by subtracks if they don't explicitly define this subgroup. Continuing the example, if a subtrack didn't specify the "celltype" subgroup, then by default a "celltype=none" value will be added. This is necessary because subtracks must define a value for all groups. """ self.name = name self.label = label self.mapping = mapping self.default = default
def __str__(self): s = [] s.append(self.name) s.append(self.label) s.extend("%s=%s" % (k, v) for k, v in self.mapping.items()) return " ".join(s)
[docs] class BaseTrack(HubComponent):
[docs] def __init__( self, name, tracktype=None, short_label=None, long_label=None, subgroups=None, source=None, filename=None, html_string=None, html_string_format="rst", **kwargs ): """ Represents a single track stanza, base class for other track types. Parameters ---------- name : str Name of the track tracktype : str Type of the track (e.g., "bam", "bigWig"). The UCSC parameter name is "type" which is a reserved Python keyword, hence using "tracktype" here. short_label : str Used for the left-hand side track label; alias for UCSC parameter "shortLabel" long_label : str Used for the longer middle labels; if None will copy short_label. Alias for UCSC parameter "longLabel". subgroups : dict A dictionary of `{name: tag}` where each `name` is the name of a SubGroupDefinition in a parent :class:`CompositeTrack` and each `tag` is a key in the SubGroupDefinition.mapping dictionary. The dictionary `{'celltype': 'ES'}` would end up looking like this in the string representation:: subGroups celltype=ES or like this, if the track had been added to a ViewTrack whose name is `aln`:: subGroups view=aln celltype=ES source : str or None Local path to the file. If None, then `url` must instead be used to point to an already-existing filename or URL. filename : str or None Path to upload the file to, over rsync and ssh, relative to the hub directory. Typically only used when you need extensive control over the remote filename. If None, will use a filename of "<name>.tracktype>" in the same directory as the TrackDb. By default, TrackDb goes in a directory named after the assembly of its parent Genome object. html_string : str String containing documentation for a track. By default, the format is assumed to be ReStructured Text format, use `html_string_format="html"` if the documentation is already in HTML format. html_string_format : 'html' or 'rst' Indicates the format of `html_string`. If `"html"`, then use as-is; if `"rst"` then convert ReST to HTML. """ source, filename = deprecation_handler(source, filename, kwargs) HubComponent.__init__(self) _check_name(name) self.name = name # Dictionary where keys are parameter names (e.g., "color") and values # are Param objects. These are defined in the constants module. To # start, we add the params valid for all tracks. # # The Track subclass will add its own parameters when the track type is # set. Other subclasses (Composite and View) will add their own special # params in the class definition. self.track_field_order = [] self.track_field_order = update_list( self.track_field_order, constants.track_fields["all"] ) # NOTE: when setting track type, it will update the track field order # according to the known params for that track...so # self.track_field_order needs to exist first. self.tracktype = tracktype if short_label is None: short_label = name self.short_label = short_label if long_label is None: long_label = short_label self.long_label = long_label self._source = source self._filename = filename self.html_string = html_string self.html_string_format = html_string_format self.subgroups = {} self.add_subgroups(subgroups) # Convert pythonic strings to UCSC versions kwargs["track"] = name kwargs["type"] = tracktype kwargs["longLabel"] = kwargs.get("longLabel", long_label) kwargs["shortLabel"] = kwargs.get("shortLabel", short_label) self.kwargs = kwargs self._orig_kwargs = kwargs.copy()
@property def _html(self): if not self.html_string: return None _html = HTMLDoc(self.html_string, self.html_string_format) _html.add_parent(self) return _html @property def trackdb(self): from trackhub import TrackDb return self.root(TrackDb)[0] @property def hub(self): return self.root(hub.Hub)[0] @property def source(self): if self._source is not None: return self._source return None @source.setter def source(self, fn): self._source = fn @property def filename(self): if self._filename is not None: return self._filename # If filename hasn't been assigned then make one automatically based # on the track name and the trackhub's filename (which, by the way, # acts similarly, deferring up to the genomes_file.filename . . . and # so on up to the hub's filename). # # However, if source is None and URL is set, then this is an # already-existing remote file and so should not have a filename. if self.trackdb: if self.source is None and self._url is not None: return None return os.path.join( os.path.dirname(self.trackdb.filename), self.name + "." + self.tracktype.split(" ")[0], ) return None @filename.setter def filename(self, fn): self._filename = fn @property def tracktype(self): return self._tracktype @tracktype.setter def tracktype(self, tracktype): """ When setting the track type, the valid parameters for this track type need to be set as well. """ self._tracktype = tracktype # E.g., bigBed 6+3 base_tracktype = tracktype.split()[0] fields = [] fields.extend(constants.track_fields[base_tracktype]) self.track_field_order = update_list(self.track_field_order, fields)
[docs] def add_trackdb(self, trackdb): """ Attach this track to a parent TrackDb object. """ self.add_parent(trackdb)
[docs] def add_params(self, **kw): """ Add [possibly many] parameters to the track. Parameters will be checked against known UCSC parameters and their supported formats. E.g.:: add_params(color='128,0,0', visibility='dense') """ for k, v in kw.items(): if k not in self.track_field_order and constants.VALIDATE: raise ParameterError( '"{0}" is not a valid parameter for {1} with ' "tracktype {2}".format(k, self.__class__.__name__, self.tracktype) ) if not constants.param_dict[k].validate(v) and constants.VALIDATE: raise ParameterError( 'value "{0}" did not validate for parameter "{1}"'.format(k, v) ) self._orig_kwargs.update(kw) self.kwargs = self._orig_kwargs.copy()
[docs] def remove_params(self, *args): """ Remove [possibly many] parameters from the track. E.g.:: remove_params('color', 'visibility') """ for a in args: self._orig_kwargs.pop(a) self.kwargs = self._orig_kwargs.copy()
[docs] def add_subgroups(self, subgroups): """ Update the subgroups for this track. Note that in contrast to :meth:`CompositeTrack`, which takes a list of :class:`SubGroupDefinition` objects representing the allowed subgroups, this method takes a single dictionary indicating the particular subgroups for this track. Parameters ---------- subgroups : dict Dictionary of subgroups, e.g., {'celltype': 'K562', 'treatment': 'a'}. Each key must match a SubGroupDefinition name in the composite's subgroups list. Each value must match a key in that SubGroupDefinition.mapping dictionary. """ if subgroups is None: subgroups = {} assert isinstance(subgroups, dict) self.subgroups.update(subgroups)
def __str__(self): s = [] kwargs = self.kwargs.copy() for name in self.track_field_order: value = kwargs.pop(name, None) if name == "parent": if isinstance(self.parent, BaseTrack): if value is not None: s.append("parent {0} {1}".format(self.parent.name, value)) else: s.append("parent {0}".format(self.parent.name)) continue if name == "bigDataUrl" and value is None: # fall back to `url` if set value = getattr(self, "url", None) if value is not None: if constants.param_dict[name].validate(value) or not settings.VALIDATE: s.append("%s %s" % (name, value)) else: raise ParameterError( "The value '{0}' did not validate for parameter '{1}'".format( value, name ) ) # Handle subgroups differently depending on if this is a composite # track or not. s.extend(self._str_subgroups()) if settings.VALIDATE: if len(kwargs) > 0: raise ParameterError( "The following parameters are unknown for track type {0}: " "{1}".format(self.tracktype, kwargs) ) else: for k, v in kwargs.items(): s.append("%s %s" % (k, v)) self.kwargs = self._orig_kwargs.copy() return "\n".join(s) def _render(self, staging="staging"): if self._html: self._html.render(staging) def _str_subgroups(self): """ helper function to render subgroups as a string """ if not self.subgroups: return "" return [ "subGroups %s" % " ".join(["%s=%s" % (k, v) for (k, v) in self.subgroups.items()]) ]
[docs] def validate(self): pass
@property def html_fn(self): if self.filename and self.trackdb: return os.path.join( os.path.dirname(self.trackdb.filename), self.name + ".html" ) else: raise ValueError(self.filename)
[docs] class Track(BaseTrack):
[docs] def __init__(self, url=None, *args, **kwargs): """ Represents a single track stanza along with the file it describes. See :class:`BaseTrack` for details on arguments. Additional arguments supported by this class: Parameters ---------- url : str Full URL for the track (i.e., bigDataUrl). Typically this is only used when using a remote track from some other provider or when you need lots of control over the URL. Otherwise the url will be automatically created based on `filename`. See :class:`BaseTrack` for details on other arguments. """ if "bigDataUrl" in kwargs and url is not None: raise ValueError("Only one of bigDataUrl or url should be specified") kwargs["bigDataUrl"] = kwargs.get("bigDataUrl", url) super(Track, self).__init__(*args, **kwargs) self._url = kwargs["bigDataUrl"]
@property def url(self): if self._url is not None: return self._url if self.filename is None: return None return os.path.relpath( self.filename, start=os.path.dirname(self.trackdb.filename) ) @url.setter def url(self, fn): self._url = fn
[docs] class CompositeTrack(BaseTrack):
[docs] def __init__(self, *args, **kwargs): """ Represents a composite track. Subclasses :class:`BaseTrack`, and adds some extras. Add a view or subtrack to this composite with :meth:`add_tracks`. Eventually, you'll need to make a :class:`trackdb.TrackDb` instance and add this composite to it with :meth:`trackdb.TrackDb.add_tracks()`. If you're using subgroups, use the :meth:`CompositeTrack.add_subgroups()` method. See :class:`BaseTrack` for details on arguments. There are no additional arguments supported by this class. """ super(CompositeTrack, self).__init__(*args, **kwargs) self.track_field_order = update_list( self.track_field_order, constants.track_fields["compositeTrack"] ) self.track_field_order = update_list( self.track_field_order, constants.track_fields["subGroups"] ) # TODO: are subtracks and views mutually exclusive, or can a composite # have both "view-ed" and "non-view-ed" subtracks? self.subtracks = [] self.views = []
[docs] def add_subgroups(self, subgroups): """ Add a list of SubGroupDefinition objects to this composite. Note that in contrast to :meth:`BaseTrack`, which takes a single dictionary indicating the particular subgroups for the track, this method takes a list of :class:`SubGroupDefinition` objects representing the allowed subgroups for the composite. :param subgroups: List of SubGroupDefinition objects. """ if subgroups is None: subgroups = {} _subgroups = {} for sg in subgroups: assert isinstance(sg, SubGroupDefinition) _subgroups[sg.name] = sg self.subgroups = _subgroups
[docs] def add_subtrack(self, subtrack): """ Add a child :class:`Track`. Deprecated in favor of the more generic `add_tracks` method, but maintained for backwards compatibility. """ self.add_tracks(subtrack)
[docs] def add_view(self, view): """ Add a ViewTrack object to this composite. Deprecated in favor of the more generic `add_tracks` method, but maintained for backwards compatibility. :param view: A ViewTrack object. """ self.add_tracks(view)
[docs] def add_tracks(self, *args): """ This method allows for both view and subtracks to be added to a composite at the same time. `args` can be arbitrary BaseTrack objects (typically Track or View objects), either singly or as a list. For example any of the following are supported:: add_tracks(view) add_tracks(view, [track1, track2]) add_tracks(track1, track2) """ for arg in args: if isinstance(arg, BaseTrack): arg = [arg] for track in arg: if isinstance(track, ViewTrack): self.add_child(track) self.views.append(track) if isinstance(track, Track): self.add_child(track) self.subtracks.append(track)
def _str_subgroups(self): """ renders subgroups to a list of strings """ s = [] i = 0 # if there are any views, there must be a subGroup1 view View tag=val # as the first one. So create it automatically here if len(self.views) > 0: mapping = dict((i.view, i.view) for i in self.views) view_subgroup = SubGroupDefinition( name="view", label="Views", mapping=mapping ) i += 1 s.append("subGroup%s %s" % (i, view_subgroup)) for subgroup in self.subgroups.values(): i += 1 s.append("subGroup%s %s" % (i, subgroup)) return s def __str__(self): s = [] s.append(super(CompositeTrack, self).__str__()) s.append("compositeTrack on") for view in self.views: s.append("") for line in str(view).splitlines(False): s.append(constants.INDENT + line) for subtrack in self.subtracks: s.append("") for line in str(subtrack).splitlines(False): s.append(constants.INDENT + line) return "\n".join(s)
[docs] class ViewTrack(BaseTrack):
[docs] def __init__(self, view, *args, **kwargs): """ Represents a View track. Subclasses :class:`BaseTrack`, and adds some extras. This will need to be added to a :class:`track.CompositeTrack` with :meth:`track.CompositeTrack.add_view()`. Add tracks to this view with :meth:`track.ViewTrack.add_tracks()`. See :class:`BaseTrack` for details on arguments. Additional arguments supported by this class: Parameters ---------- view : str Unique name to use for the view. See :class:`BaseTrack` for details on other arguments. """ self.view = view kwargs["view"] = view super(ViewTrack, self).__init__(*args, **kwargs) self.track_field_order = update_list( self.track_field_order, constants.track_fields["view"] ) self.subtracks = []
[docs] def add_tracks(self, *args): """ Add one or more tracks to this view. Parameters ---------- args : Track or iterable of Tracks """ for arg in args: if isinstance(arg, BaseTrack): arg = [arg] for track in arg: track.subgroups["view"] = self.view self.add_child(track) self.subtracks.append(track)
def __str__(self): s = [] s.append(super(ViewTrack, self).__str__()) for subtrack in self.subtracks: s.append("") for line in str(subtrack).splitlines(False): s.append(constants.INDENT + line) return "\n".join(s)
[docs] class SuperTrack(BaseTrack):
[docs] def __init__(self, *args, **kwargs): """ Represents a Super track. Subclasses :class:`Track`, and adds some extras. Super tracks are container tracks (folders) that group tracks. They are used to control visualization of a set of related data. Eventually, you'll need to make a :class:`trackdb.TrackDb` instance and add this supertrack to it with that instance's :meth:`add_tracks` method. See :class:`BaseTrack` for details on arguments. """ super(SuperTrack, self).__init__(tracktype="superTrack", *args, **kwargs) self.track_field_order = update_list( self.track_field_order, constants.track_fields["superTrack"] ) self.subtracks = []
[docs] def add_tracks(self, *args): """ Add one or more tracks. Parameters ---------- args : Track or iterable of Tracks """ for arg in args: if isinstance(arg, BaseTrack): arg = [arg] for track in arg: self.add_child(track) self.subtracks.append(track)
def __str__(self): s = [] s.append(super(SuperTrack, self).__str__()) s.append("superTrack on") for subtrack in self.subtracks: s.append("") for line in str(subtrack).splitlines(False): s.append(constants.INDENT + line) return "\n".join(s)
[docs] class AggregateTrack(BaseTrack):
[docs] def __init__(self, aggregate, *args, **kwargs): """ Represents an Aggregate or Overlay track. Subclasses :class:`Track`, adds some extras. Aggregate tracks allow closley related tracks to be viewed as a single track. Eventually, you'll need to make a :class:`trackdb.TrackDb` instance and add this aggregate track to it with that instance's :meth:`add_tracks` method. Parameters ---------- aggregate : str Aggregate type. One of "transparentOverlay", "stacked", "solidOverlay". See https://genome.ucsc.edu/goldenpath/help/trackDb/trackDbHub.html#aggregate for details. See :class:`BaseTrack` for details on other arguments. """ self.aggregate = aggregate kwargs["aggregate"] = aggregate super(AggregateTrack, self).__init__(*args, **kwargs) self.track_field_order = update_list( self.track_field_order, constants.track_fields["multiWig"] ) self.subtracks = []
[docs] def add_subtrack(self, subtrack): """ Add a child :class:`SubTrack` to this aggregate. """ self.add_tracks(subtrack)
[docs] def add_tracks(self, *args): """ Add one or more tracks. Parameters ---------- args : Track or iterable of Tracks """ for arg in args: if isinstance(arg, BaseTrack): arg = [arg] for track in arg: self.add_child(track) self.subtracks.append(track)
def __str__(self): s = [] s.append(super(AggregateTrack, self).__str__()) s.append("container multiWig") for subtrack in self.subtracks: s.append("") for line in str(subtrack).splitlines(False): s.append(constants.INDENT + line) return "\n".join(s)
class HTMLDoc(HubComponent): def __init__(self, contents, html_string_format, filename=None): """ Represents an HTML file used for documentation. Handles local/remote/url filenames when connected to a Track and CompositeTrack Parameters ---------- contents : str String of contents for HTML file. Expected format determined by `html_string_format`. html_string_format : 'html' | 'rst' If "html", write an HTML file with no additional modification. If "rst", assumes `contents` is in ReStructured Text format and is converted to HTML. filename : str or None If None, the rendered HTML filename will be the name of the parent track with an ".html" extension, in the same directory as the parent TrackDb. """ self.contents = contents self.html_string_format = html_string_format self._filename = None super(HTMLDoc, self).__init__() @property def filename(self): if self._filename is not None: return self._filename if self.trackdb is None or self.track is None: return None return os.path.join( os.path.dirname(self.trackdb.filename), self.track.name + ".html" ) @filename.setter def filename(self, fn): self._filename = fn @property def trackdb(self): from trackhub import TrackDb obj, level = self.root(cls=TrackDb) return obj @property def track(self): return self.parent def _render(self, staging="staging"): self.validate() dirname = os.path.dirname(self.filename) if not os.path.exists(dirname): os.makedirs(dirname) fout = open(os.path.join(staging, self.filename), "w") fout.write(str(self)) fout.close() return fout.name def validate(self): if not self.trackdb: raise ValueError( "HTMLDoc object must be connected to a " "BaseTrack subclass instance and a TrackDb " "instance" ) return True def __str__(self): if self.html_string_format == "html": return self.contents elif self.html_string_format == "rst": # docutils still internally uses a "U" mode for opening files. with warnings.catch_warnings(): warnings.simplefilter("ignore") parts = publish_parts( self.contents, writer_name="html", settings_overrides={"output_encoding": "unicode"}, ) return parts["html_body"] else: raise ValueError( "html_string_format '{}' not supported".format(self.html_string_format) ) return self.contents