This module contains a small parallel dispatch queue that allows to take a list of jobs and run as many of them in parallel as there are schedulers ongoing.
Original design by Max Fedorov in the rebar compiler, then generalised and extracted here to be reused in other circumstances.
It also contains an asynchronous version of the queue built with a naive pool, which allows similar semantics in worker definitions, but without demanding to know all the tasks to run ahead of time.| pool/4 | Create a pool using as many workers as there are schedulers,
and for which tasks can be added by calling pool_async_task/2. |
| pool/5 | Create a pool using PoolSize workers and for which tasks can be
added by calling pool_async_task/2. |
| pool_task_async/2 | Add a task to a pool. |
| pool_terminate/1 | Mark the pool as terminated. |
| queue/5 | Create a queue using as many workers as there are schedulers,
that will spread all Task entries among them based on whichever
is available first. |
pool(WorkF, WArgs, Handler, HArgs) -> pid()
WorkF = fun((Task, WArgs) -> TmpRet)Task = term()WArgs = term()Handler = fun((TmpRet, HArgs) -> NoRet | AccVal)HArgs = term()NoRet = okAccVal = {ok, term()}
Create a pool using as many workers as there are schedulers,
and for which tasks can be added by calling pool_async_task/2.
The values returned by the worker function WorkF for each value
is then passed to a Handler which either discards its result
after having done a side-effectful operation (by returning ok)
as in a lists:foreach/2 call, or returns a value that gets
added to an accumulator (by returning {ok, Val}). The handler
can return both types as required.
The accumulated list of value is in no specific order and depends
on how tasks were scheduled and completed, and can only
be obtained by calling pool_terminate/1.
pool(WorkF, WArgs, Handler, HArgs, PoolSize) -> pid()
WorkF = fun((Task, WArgs) -> TmpRet)Task = term()WArgs = term()Handler = fun((TmpRet, HArgs) -> NoRet | AccVal)HArgs = term()PoolSize = pos_integer()NoRet = okAccVal = {ok, term()}
Create a pool using PoolSize workers and for which tasks can be
added by calling pool_async_task/2.
The values returned by the worker function WorkF for each value
is then passed to a Handler which either discards its result
after having done a side-effectful operation (by returning ok)
as in a lists:foreach/2 call, or returns a value that gets
added to an accumulator (by returning {ok, Val}). The handler
can return both types as required.
The accumulated list of value is in no specific order and depends
on how tasks were scheduled and completed, and can only
be obtained by calling pool_terminate/1.
pool_task_async(Pid::pid(), Task::term()) -> ok
Add a task to a pool. This call is asynchronous and does no validation about whether the pool process exists or not. If the pool has already been terminated or is in the process of being terminated, the task may trigger the pool to abort and error out to point out invalid usage.
pool_terminate(Pid::pid()) -> [term()]
Mark the pool as terminated. At this point it will stop accepting new tasks but will keep processing those that have been scheduled.
Once all tasks are complete and workers have shut down, the accumulated value will be returned.
Any process may terminate the pool, and the pool may only be terminated once.queue(Tasks::[Task], WorkF, WArgs, Handler, HArgs) -> [Ret]
Task = term()WorkF = fun((Task, WArgs) -> TmpRet)WArgs = term()Handler = fun((TmpRet, HArgs) -> NoRet | AccVal)HArgs = term()NoRet = okAccVal = {ok, Ret}Ret = term()
Create a queue using as many workers as there are schedulers,
that will spread all Task entries among them based on whichever
is available first.
The values returned by the worker function WorkF for each value
is then passed to a Handler which either discards its result
after having done a side-effectful operation (by returning ok)
as in a lists:foreach/2 call, or returns a value that gets
added to an accumulator (by returning {ok, Val}). The handler
can return both types as required.
Generated by EDoc