Source code for trackhub.upload

import tempfile
import os
import sys
import shlex
import subprocess as sp
import logging
from . import track
from . import genome
from . import base
from . import trackdb
from . import compatibility

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

RSYNC_OPTIONS = "--progress -rvL"


def run(cmds, **kwargs):
    """
    Wrapper around subprocess.run, with unicode decoding of output.

    Additional kwargs are passed to subprocess.run.
    """
    proc = sp.Popen(
        cmds,
        bufsize=-1,
        stdout=sp.PIPE,
        stderr=sp.STDOUT,
        close_fds=sys.platform != "win32",
    )
    for line in proc.stdout:
        print(line[:-1].decode())
    retcode = proc.wait()
    if retcode:
        raise sp.CalledProcessError(retcode, cmds)


def symlink(target, linkname):
    """
    Create a symlink to `target` called `linkname`.

    Converts `target` and `linkname` to absolute paths; creates
    `dirname(linkname)` if needed.
    """
    target = os.path.abspath(target)
    linkname = os.path.abspath(linkname)

    if not os.path.exists(target):
        raise ValueError("target {} not found".format(target))

    link_dir = os.path.dirname(linkname)
    if not os.path.exists(link_dir):
        os.makedirs(link_dir)

    if os.path.exists(linkname):
        if os.path.islink(linkname):
            os.remove(linkname)

    os.symlink(target, linkname)

    # If possible, we modify the symlink's mtime to be that of the target.
    mtime = os.stat(target).st_mtime

    # Python 2.7 does not support modifying symlink modification time
    if compatibility.PY == 3:
        try:
            os.utime(target, (mtime, mtime), follow_symlinks=False)

        # In some cases (the reason is still unclear), in Python 3 we can still
        # get a PermissionError. That's still OK, and we'll handle it as if it
        # were Py27, that is, leave the symlink itself unchanged.
        except (NotImplementedError, PermissionError):
            pass

    return linkname


def upload(host, user, local_dir, remote_dir, rsync_options=RSYNC_OPTIONS):
    """
    Upload a file or directory via rsync.

    Parameters
    ----------
    host : str or None
        If None, omit the host part and just transfer locally

    user : str or None
        If None, omit the user part

    local_dir : str
        If a directory, a trailing "/" will be added.

    remote_dir : str
        If a directory, a trailing "/" will be added.
    """
    if user is None:
        user = ""
    else:
        user = user + "@"
    if host is None or host == "localhost":
        host = ""
    else:
        host = host + ":"

    if not local_dir.endswith("/"):
        local_dir = local_dir + "/"

    if not remote_dir.endswith("/"):
        remote_dir = remote_dir + "/"

    remote_string = "{user}{host}{remote_dir}".format(**locals())
    cmds = ["rsync"]
    cmds += shlex.split(rsync_options)
    cmds += [local_dir, remote_string]
    run(cmds)
    return [remote_string]


def local_link(local_fn, remote_fn, staging):
    """
    Creates a symlink to a local staging area.

    The link name is built from `remote_fn`, but the absolute path is put
    inside the staging directory.

    Example
    -------

    If we have the following initial setup::

        cwd="/home/user"
        local="data/sample1.bw"
        remote="/hubs/hg19/a.bw"
        staging="__staging__"

    Then this function does the equivalent of::

        mkdir -p __staging__/hubs/hg19
        ln -sf \\
            /home/user/data/sample1.bw \\
            /home/user/__staging__/hubs/hg19/a.bw
    """
    linkname = os.path.join(staging, remote_fn.lstrip(os.path.sep))
    return symlink(local_fn, linkname)


def stage(x, staging):
    """
    Stage an object to the `staging` directory.

    If the object is a Track and is one of the types that needs an index file
    (bam, vcfTabix), then the index file will be staged as well.

    Returns a list of the linknames created.
    """
    linknames = []

    # Objects that don't represent a file shouldn't be staged
    non_file_objects = (
        track.ViewTrack,
        track.CompositeTrack,
        track.AggregateTrack,
        track.SuperTrack,
        genome.Genome,
    )
    if isinstance(x, non_file_objects):
        return linknames

    # If it's an object representing a file, then render it.
    #
    # Track objects don't represent files, but their documentation does
    linknames.append(x.render(staging))

    if hasattr(x, "source") and hasattr(x, "filename"):

        def _stg(x, ext=""):
            # A remote track hosted elsewhere does not need staging. This is
            # defined by a track with a url, but no source or filename.
            if (
                x.source is None
                and x.filename is None
                and getattr(x, "url", None) is not None
            ):
                return

            linknames.append(local_link(x.source + ext, x.filename + ext, staging))

        _stg(x)

        if isinstance(x, track.Track):
            if x.tracktype == "bam":
                _stg(x, ext=".bai")
            if x.tracktype == "vcfTabix":
                _stg(x, ext=".tbi")

        if isinstance(x, track.CompositeTrack):
            if x._html:
                _stg(x._html)

    return linknames


[docs] def stage_hub(hub, staging=None): """ Stage a hub by symlinking all its connected files to a local directory. """ linknames = [] if staging is None: staging = tempfile.mkdtemp() for obj, level in hub.leaves(base.HubComponent, intermediate=True): linknames.extend(stage(obj, staging)) return staging, linknames
[docs] def upload_hub( hub, host, remote_dir, user=None, port=22, rsync_options=RSYNC_OPTIONS, staging=None ): """ Renders, stages, and uploads a hub. """ hub.render() if staging is None: staging = tempfile.mkdtemp() staging, linknames = stage_hub(hub, staging=staging) local_dir = os.path.join(staging) upload( host, user, local_dir=local_dir, remote_dir=remote_dir, rsync_options=rsync_options, ) return linknames