This module defines the jobs_queue behaviour.
Required callback functions: new/2, delete/1, in/3, peek/1, out/2, all/1, info/2.
Authors: : Ulf Wiger (ulf@wiger.net).
Default queue behaviour for JOBS (using ordered_set ets).
This module implements the default queue behaviour for JOBS, and also specifies the behaviour itself.abstract datatype: counter()
entry() = {timestamp(), job()}
info_item() = max_time | oldest_job | length
job() = {pid(), reference()}
mod_args() = {atom(), list()}
q_check_interval() = integer() | infinity | mfa()
q_opt_action() = approve | reject
q_opt_type() = fifo | lifo | {producer, q_producer()} | {action, q_opt_action()}
q_producer() = function() | mfa() | mod_args()
abstract datatype: reg_obj()
regulator() = #rr{} | #cr{} | regulator_ref()
regulator_ref() = #group_rate{} | #counter{}
timestamp() = integer()
| all/1 | Return all the job entries in the queue, not removing them from the queue. |
| behaviour_info/1 | |
| delete/1 | Queue is being deleted; remove any external data structures. |
| empty/1 | |
| in/3 | Enqueue a job reference; return the updated queue. |
| info/2 | Return information about the queue. |
| is_empty/1 | |
| new/2 | Instantiate a new queue. |
| out/2 | Dequeue a batch of N jobs; return the modified queue. |
| peek/1 | Looks at the first item in the queue, without removing it. |
| representation/1 | A representation of a queue which can be inspected. |
| timedout/1 | Return all entries that have been in the queue longer than MaxTime. |
| timedout/2 |
all(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> [entry()]
Return all the job entries in the queue, not removing them from the queue.
behaviour_info(X1) -> any()
delete(Queue::#queue{}) -> any()
Queue is being deleted; remove any external data structures.
If the queue behaviour has created an ETS table or similar, this is the place to get rid of it.empty(Queue) -> any()
in(TS::timestamp(), Job::job(), Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> #queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}
Enqueue a job reference; return the updated queue.
This puts a job into the queue. The callback function is responsible for updating the #queue.oldest_job attribute, if needed. The #queue.oldest_job attribute shall either contain the Timestamp of the oldest job in the queue, orundefined if the queue is empty. It may be noted that, especially in the
fairly trivial case of the in/3 function, the oldest job would be
erlang:min(Timestamp, PreviousOldest), even if PreviousOldest == undefined.
info(X1::info_item(), Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> any()
Return information about the queue.
is_empty(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> boolean()
new(Options, Q::#queue{}) -> #queue{}
Instantiate a new queue.
Options is the list of options provided when defining the queue. Q is an initial #queue{} record. It can be used directly by includingjobs/include/jobs.hrl, or by using exprecs-style record accessors in the
module jobs_info.
See parse_trans for more info
on exprecs. In the new/2 function, the #queue.st attribute will normally be
used to keep track of the queue data structure.
out(N::integer(), Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> {[entry()], #queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}}
Dequeue a batch of N jobs; return the modified queue.
Note that this function may need to update the #queue.oldest_job attribute, especially if the queue becomes empty.peek(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> entry()
Looks at the first item in the queue, without removing it.
representation(Queue) -> any()
A representation of a queue which can be inspected
timedout(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> {[entry()], #queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}}
Return all entries that have been in the queue longer than MaxTime.
NOTE: This is an inspection function; it doesn't remove the job entries.timedout(TO, Queue) -> any()
Generated by EDoc