Source code for pybedtools.parallel

import sys
import multiprocessing
from . import helpers
import pybedtools


def _parallel_wrap(
    orig_bedtool,
    shuffle_kwargs,
    genome_fn,
    method,
    method_args,
    method_kwargs,
    sort=False,
    shuffle=True,
    reduce_func=None,
):
    """
    Given a BedTool object `orig_bedtool`, call its `method` with `args` and
    `kwargs` and then call `reduce_func` on the results.

    See parallel_apply docstring for details

    """

    # note: be careful about cleaning up tempfiles
    if not shuffle:
        to_use = orig_bedtool
    else:
        shuffled = orig_bedtool.shuffle(g=genome_fn, **shuffle_kwargs)
        if sort:
            to_use = shuffled.sort()
            helpers.close_or_delete(shuffled)
        else:
            to_use = shuffled

    result = getattr(to_use, method)(*method_args, **method_kwargs)

    if shuffle:
        helpers.close_or_delete(to_use)

    if reduce_func:
        reduced = reduce_func(result)
        if isinstance(result, pybedtools.BedTool):
            helpers.close_or_delete(result)
        return reduced
    else:
        return result


[docs] def parallel_apply( orig_bedtool, method, genome=None, genome_fn=None, method_args=None, method_kwargs=None, shuffle_kwargs=None, shuffle=True, reduce_func=None, processes=1, sort=False, _orig_pool=None, iterations=1000, debug=False, report_iterations=False, ): """ Call an arbitrary BedTool method many times in parallel. An example use-case is to generate a null distribution of intersections, and then compare this to the actual intersections. **Important:** due to a known file handle leak in BedTool.__len__, it's best to simply check the number of lines in the file, as in the below function. This works because BEDTools programs strip any non-interval lines in the results. >>> # set up example BedTools >>> a = pybedtools.example_bedtool('a.bed') >>> b = pybedtools.example_bedtool('b.bed') >>> # Method of `a` to call: >>> method = 'intersect' >>> # Kwargs provided to `a.intersect` each iteration >>> method_kwargs = dict(b=b, u=True) >>> # Function that will be called on the results of >>> # `a.intersect(**method_kwargs)`. >>> def reduce_func(x): ... return sum(1 for _ in open(x.fn)) >>> # Create a small artificial genome for this test (generally you'd >>> # use an assembly name, like "hg19"): >>> genome = dict(chr1=(0, 1000)) >>> # Do 10 iterations using 1 process for this test (generally you'd >>> # use 1000+ iterations, and as many processes as you have CPUs) >>> results = pybedtools.parallel.parallel_apply(a, method, genome=genome, ... method_kwargs=method_kwargs, iterations=10, processes=1, ... reduce_func=reduce_func, debug=True, report_iterations=True) >>> # get results >>> print(list(results)) [1, 0, 1, 2, 4, 2, 2, 1, 2, 4] >>> # We can compare this to the actual intersection: >>> reduce_func(a.intersect(**method_kwargs)) 3 Alternatively, we could use the `a.jaccard` method, which already does the reduction to a dictionary. However, the Jaccard method requires the input to be sorted. Here, we specify `sort=True` to sort each shuffled BedTool before calling its `jaccard` method. >>> from pybedtools.parallel import parallel_apply >>> a = pybedtools.example_bedtool('a.bed') >>> results = parallel_apply(a, method='jaccard', method_args=(b,), ... genome=genome, iterations=3, processes=1, sort=True, debug=True) >>> for i in results: ... print(sorted(i.items())) [('intersection', 12), ('jaccard', 0.0171184), ('n_intersections', 1), ('union', 701)] [('intersection', 0), ('jaccard', 0.0), ('n_intersections', 0), ('union', 527)] [('intersection', 73), ('jaccard', 0.137996), ('n_intersections', 1), ('union', 529)] Parameters ---------- orig_bedtool : BedTool method : str The method of `orig_bedtool` to run method_args : tuple Passed directly to getattr(orig_bedtool, method)() method_kwargs : dict Passed directly to getattr(orig_bedtool, method)() shuffle : bool If True, then `orig_bedtool` will be shuffled at each iteration and that shuffled version's `method` will be called with `method_args` and `method_kwargs`. shuffle_kwargs : dict If `shuffle` is True, these are passed to `orig_bedtool.shuffle()`. You do not need to pass the genome here; that's handled separately by the `genome` and `genome_fn` kwargs. iterations : int Number of iterations to perform genome : string or dict If string, then assume it is the assembly name (e.g., hg19) and get a dictionary of chromsizes for that assembly, then converts to a filename. genome_fn : str Mutually exclusive with `genome`; `genome_fn` must be an existing filename with the chromsizes. Use the `genome` kwarg instead if you'd rather supply an assembly or dict. reduce_func : callable Function or other callable object that accepts, as its only argument, the results from `orig_bedtool.method()`. For example, if you care about the number of results, then you can use `reduce_func=len`. processes : int Number of processes to run. If `processes=1`, then multiprocessing is not used (making it much easier to debug). This argument is ignored if `_orig_pool` is provided. sort : bool If both `shuffle` and `sort` are True, then the shuffled BedTool will then be sorted. Use this if `method` requires sorted input. _orig_pool : multiprocessing.Pool instance If provided, uses `_orig_pool` instead of creating one. In this case, `processes` will be ignored. debug : bool If True, then use the current iteration index as the seed to shuffle. report_iterations : bool If True, then report the number of iterations to stderr. """ shuffle_kwargs = shuffle_kwargs or {} method_args = method_args or () if not isinstance(method_args, list) and not isinstance(method_args, tuple): raise ValueError( "method_args must be a list or tuple, got %s" % type(method_args) ) method_kwargs = method_kwargs or {} if genome_fn and genome: raise ValueError("only of of genome_fn or genome should be provided") if shuffle: if not genome_fn: if not genome: raise ValueError( "shuffle=True, so either genome_fn" " or genome must be provided" ) genome_fn = pybedtools.chromsizes_to_file(genome) _parallel_wrap_kwargs = dict( orig_bedtool=orig_bedtool, shuffle_kwargs=shuffle_kwargs, genome_fn=genome_fn, method=method, method_args=method_args, method_kwargs=method_kwargs, shuffle=shuffle, reduce_func=reduce_func, sort=sort, ) def add_seed(i, kwargs): if debug and shuffle: kwargs["shuffle_kwargs"]["seed"] = i return kwargs if processes == 1: for it in range(iterations): yield _parallel_wrap(**add_seed(it, _parallel_wrap_kwargs)) return if _orig_pool: p = _orig_pool else: p = multiprocessing.Pool(processes) results = [ p.apply_async(_parallel_wrap, (), add_seed(it, _parallel_wrap_kwargs)) for it in range(iterations) ] for i, r in enumerate(results): yield r.get() if report_iterations: sys.stderr.write("%s\r" % i) sys.stderr.flush()