pybedtools.parallel.parallel_apply

pybedtools.parallel.parallel_apply(orig_bedtool, method, genome=None, genome_fn=None, method_args=None, method_kwargs=None, shuffle_kwargs=None, shuffle=True, reduce_func=None, processes=1, sort=False, _orig_pool=None, iterations=1000, debug=False, report_iterations=False)[source]

Call an arbitrary BedTool method many times in parallel.

An example use-case is to generate a null distribution of intersections, and then compare this to the actual intersections.

Important: due to a known file handle leak in BedTool.__len__, it’s best to simply check the number of lines in the file, as in the below function. This works because BEDTools programs strip any non-interval lines in the results.

>>> # set up example BedTools
>>> a = pybedtools.example_bedtool('a.bed')
>>> b = pybedtools.example_bedtool('b.bed')
>>> # Method of `a` to call:
>>> method = 'intersect'
>>> # Kwargs provided to `a.intersect` each iteration
>>> method_kwargs = dict(b=b, u=True)
>>> # Function that will be called on the results of
>>> # `a.intersect(**method_kwargs)`.
>>> def reduce_func(x):
...     return sum(1 for _ in open(x.fn))
>>> # Create a small artificial genome for this test (generally you'd
>>> # use an assembly name, like "hg19"):
>>> genome = dict(chr1=(0, 1000))
>>> # Do 10 iterations using 1 process for this test (generally you'd
>>> # use 1000+ iterations, and as many processes as you have CPUs)
>>> results = pybedtools.parallel.parallel_apply(a, method, genome=genome,
... method_kwargs=method_kwargs, iterations=10, processes=1,
... reduce_func=reduce_func, debug=True, report_iterations=True)
>>> # get results
>>> print(list(results))
[1, 0, 1, 2, 4, 2, 2, 1, 2, 4]
>>> # We can compare this to the actual intersection:
>>> reduce_func(a.intersect(**method_kwargs))
3

Alternatively, we could use the a.jaccard method, which already does the reduction to a dictionary. However, the Jaccard method requires the input to be sorted. Here, we specify sort=True to sort each shuffled BedTool before calling its jaccard method.

>>> from pybedtools.parallel import parallel_apply
>>> a = pybedtools.example_bedtool('a.bed')
>>> results = parallel_apply(a, method='jaccard', method_args=(b,),
... genome=genome, iterations=3, processes=1, sort=True, debug=True)
>>> for i in results:
...     print(sorted(i.items()))
[('intersection', 12), ('jaccard', 0.0171184), ('n_intersections', 1), ('union', 701)]
[('intersection', 0), ('jaccard', 0.0), ('n_intersections', 0), ('union', 527)]
[('intersection', 73), ('jaccard', 0.137996), ('n_intersections', 1), ('union', 529)]
Parameters:
orig_bedtoolBedTool
methodstr

The method of orig_bedtool to run

method_argstuple

Passed directly to getattr(orig_bedtool, method)()

method_kwargsdict

Passed directly to getattr(orig_bedtool, method)()

shufflebool

If True, then orig_bedtool will be shuffled at each iteration and that shuffled version’s method will be called with method_args and method_kwargs.

shuffle_kwargsdict

If shuffle is True, these are passed to orig_bedtool.shuffle(). You do not need to pass the genome here; that’s handled separately by the genome and genome_fn kwargs.

iterationsint

Number of iterations to perform

genomestring or dict

If string, then assume it is the assembly name (e.g., hg19) and get a dictionary of chromsizes for that assembly, then converts to a filename.

genome_fnstr

Mutually exclusive with genome; genome_fn must be an existing filename with the chromsizes. Use the genome kwarg instead if you’d rather supply an assembly or dict.

reduce_funccallable

Function or other callable object that accepts, as its only argument, the results from orig_bedtool.method(). For example, if you care about the number of results, then you can use reduce_func=len.

processesint

Number of processes to run. If processes=1, then multiprocessing is not used (making it much easier to debug). This argument is ignored if _orig_pool is provided.

sortbool

If both shuffle and sort are True, then the shuffled BedTool will then be sorted. Use this if method requires sorted input.

_orig_poolmultiprocessing.Pool instance

If provided, uses _orig_pool instead of creating one. In this case, processes will be ignored.

debugbool

If True, then use the current iteration index as the seed to shuffle.

report_iterationsbool

If True, then report the number of iterations to stderr.