Module jobs_queue

Default queue behaviour for JOBS (using ordered_set ets).

This module defines the jobs_queue behaviour.
Required callback functions: new/2, delete/1, in/3, peek/1, out/2, all/1, info/2.

Authors: : Ulf Wiger (ulf@wiger.net).

Description

Default queue behaviour for JOBS (using ordered_set ets).

This module implements the default queue behaviour for JOBS, and also specifies the behaviour itself.

Data Types

counter()

abstract datatype: counter()

entry()

entry() = {timestamp(), job()}

info_item()

info_item() = max_time | oldest_job | length

job()

job() = {pid(), reference()}

mod_args()

mod_args() = {atom(), list()}

q_check_interval()

q_check_interval() = integer() | infinity | mfa()

q_opt_action()

q_opt_action() = approve | reject

q_opt_type()

q_opt_type() = fifo | lifo | {producer, q_producer()} | {action, q_opt_action()}

q_producer()

q_producer() = function() | mfa() | mod_args()

reg_obj()

abstract datatype: reg_obj()

regulator()

regulator() = #rr{} | #cr{} | regulator_ref()

regulator_ref()

regulator_ref() = #group_rate{} | #counter{}

timestamp()

timestamp() = integer()

Function Index

all/1Return all the job entries in the queue, not removing them from the queue.
behaviour_info/1
delete/1Queue is being deleted; remove any external data structures.
empty/1
in/3Enqueue a job reference; return the updated queue.
info/2Return information about the queue.
is_empty/1
new/2Instantiate a new queue.
out/2Dequeue a batch of N jobs; return the modified queue.
peek/1Looks at the first item in the queue, without removing it.
representation/1A representation of a queue which can be inspected.
timedout/1Return all entries that have been in the queue longer than MaxTime.
timedout/2

Function Details

all/1

all(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> [entry()]

Return all the job entries in the queue, not removing them from the queue.

behaviour_info/1

behaviour_info(X1) -> any()

delete/1

delete(Queue::#queue{}) -> any()

Queue is being deleted; remove any external data structures.

If the queue behaviour has created an ETS table or similar, this is the place to get rid of it.

empty/1

empty(Queue) -> any()

in/3

in(TS::timestamp(), Job::job(), Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> #queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}

Enqueue a job reference; return the updated queue.

This puts a job into the queue. The callback function is responsible for updating the #queue.oldest_job attribute, if needed. The #queue.oldest_job attribute shall either contain the Timestamp of the oldest job in the queue, or undefined if the queue is empty. It may be noted that, especially in the fairly trivial case of the in/3 function, the oldest job would be erlang:min(Timestamp, PreviousOldest), even if PreviousOldest == undefined.

info/2

info(X1::info_item(), Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> any()

Return information about the queue.

is_empty/1

is_empty(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> boolean()

new/2

new(Options, Q::#queue{}) -> #queue{}

Instantiate a new queue.

Options is the list of options provided when defining the queue. Q is an initial #queue{} record. It can be used directly by including jobs/include/jobs.hrl, or by using exprecs-style record accessors in the module jobs_info. See parse_trans for more info on exprecs. In the new/2 function, the #queue.st attribute will normally be used to keep track of the queue data structure.

out/2

out(N::integer(), Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> {[entry()], #queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}}

Dequeue a batch of N jobs; return the modified queue.

Note that this function may need to update the #queue.oldest_job attribute, especially if the queue becomes empty.

peek/1

peek(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> entry()

Looks at the first item in the queue, without removing it.

representation/1

representation(Queue) -> any()

A representation of a queue which can be inspected

timedout/1

timedout(Queue::#queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}) -> {[entry()], #queue{name = any(), mod = atom(), type = fifo | lifo | #producer{} | #passive{type = fifo} | #action{a = q_opt_action()} | q_opt_type(), group = atom(), regulators = [regulator() | regulator_ref()], max_time = integer() | undefined, max_size = integer() | undefined, latest_dispatch = integer(), approved = any(), queued = any(), check_interval = q_check_interval() | undefined, oldest_job = integer() | undefined, timer = any(), link_ref = undefined | reference(), check_counter = integer(), empty = boolean(), depleted = boolean(), waiters = [{pid(), reference()}], stateful = any(), st = any()}}

Return all entries that have been in the queue longer than MaxTime.

NOTE: This is an inspection function; it doesn't remove the job entries.

timedout/2

timedout(TO, Queue) -> any()


Generated by EDoc