pybedtools.parallel.parallel_apply¶
- pybedtools.parallel.parallel_apply(orig_bedtool, method, genome=None, genome_fn=None, method_args=None, method_kwargs=None, shuffle_kwargs=None, shuffle=True, reduce_func=None, processes=1, sort=False, _orig_pool=None, iterations=1000, debug=False, report_iterations=False)[source]¶
Call an arbitrary BedTool method many times in parallel.
An example use-case is to generate a null distribution of intersections, and then compare this to the actual intersections.
Important: due to a known file handle leak in BedTool.__len__, it’s best to simply check the number of lines in the file, as in the below function. This works because BEDTools programs strip any non-interval lines in the results.
>>> # set up example BedTools >>> a = pybedtools.example_bedtool('a.bed') >>> b = pybedtools.example_bedtool('b.bed')
>>> # Method of `a` to call: >>> method = 'intersect'
>>> # Kwargs provided to `a.intersect` each iteration >>> method_kwargs = dict(b=b, u=True)
>>> # Function that will be called on the results of >>> # `a.intersect(**method_kwargs)`. >>> def reduce_func(x): ... return sum(1 for _ in open(x.fn))
>>> # Create a small artificial genome for this test (generally you'd >>> # use an assembly name, like "hg19"): >>> genome = dict(chr1=(0, 1000))
>>> # Do 10 iterations using 1 process for this test (generally you'd >>> # use 1000+ iterations, and as many processes as you have CPUs) >>> results = pybedtools.parallel.parallel_apply(a, method, genome=genome, ... method_kwargs=method_kwargs, iterations=10, processes=1, ... reduce_func=reduce_func, debug=True, report_iterations=True)
>>> # get results >>> print(list(results)) [1, 0, 1, 2, 4, 2, 2, 1, 2, 4]
>>> # We can compare this to the actual intersection: >>> reduce_func(a.intersect(**method_kwargs)) 3
Alternatively, we could use the
a.jaccard
method, which already does the reduction to a dictionary. However, the Jaccard method requires the input to be sorted. Here, we specifysort=True
to sort each shuffled BedTool before calling itsjaccard
method.>>> from pybedtools.parallel import parallel_apply >>> a = pybedtools.example_bedtool('a.bed') >>> results = parallel_apply(a, method='jaccard', method_args=(b,), ... genome=genome, iterations=3, processes=1, sort=True, debug=True) >>> for i in results: ... print(sorted(i.items())) [('intersection', 12), ('jaccard', 0.0171184), ('n_intersections', 1), ('union', 701)] [('intersection', 0), ('jaccard', 0.0), ('n_intersections', 0), ('union', 527)] [('intersection', 73), ('jaccard', 0.137996), ('n_intersections', 1), ('union', 529)]
- Parameters:
- orig_bedtoolBedTool
- methodstr
The method of
orig_bedtool
to run- method_argstuple
Passed directly to getattr(orig_bedtool, method)()
- method_kwargsdict
Passed directly to getattr(orig_bedtool, method)()
- shufflebool
If True, then
orig_bedtool
will be shuffled at each iteration and that shuffled version’smethod
will be called withmethod_args
andmethod_kwargs
.- shuffle_kwargsdict
If
shuffle
is True, these are passed toorig_bedtool.shuffle()
. You do not need to pass the genome here; that’s handled separately by thegenome
andgenome_fn
kwargs.- iterationsint
Number of iterations to perform
- genomestring or dict
If string, then assume it is the assembly name (e.g., hg19) and get a dictionary of chromsizes for that assembly, then converts to a filename.
- genome_fnstr
Mutually exclusive with
genome
;genome_fn
must be an existing filename with the chromsizes. Use thegenome
kwarg instead if you’d rather supply an assembly or dict.- reduce_funccallable
Function or other callable object that accepts, as its only argument, the results from
orig_bedtool.method()
. For example, if you care about the number of results, then you can usereduce_func=len
.- processesint
Number of processes to run. If
processes=1
, then multiprocessing is not used (making it much easier to debug). This argument is ignored if_orig_pool
is provided.- sortbool
If both
shuffle
andsort
are True, then the shuffled BedTool will then be sorted. Use this ifmethod
requires sorted input.- _orig_poolmultiprocessing.Pool instance
If provided, uses
_orig_pool
instead of creating one. In this case,processes
will be ignored.- debugbool
If True, then use the current iteration index as the seed to shuffle.
- report_iterationsbool
If True, then report the number of iterations to stderr.