Module riak_dt_map

a multi CRDT holder.

Behaviours: riak_dt.

Description

a multi CRDT holder. A Struct/Document-ish thing. Uses the same tombstone-less, Observed Remove semantics as riak_dt_orswot. A Map is set of Fields a Field is a two-tuple of: {Name::binary(), CRDTModule::module()} where the second element is the name of a crdt module that may be embedded. CRDTs stored inside the Map will have their update/3/4 function called, but the second argument will be a riak_dt:dot(), so that they share the causal context of the map, even when fields are removed, and subsequently re-added.

The contents of the Map are modeled as a dictionary of field_name() to field_value() mappings. Where field_ name() is a two tuple of an opaque binary() name, and one of the embeddable crdt types (currently riak_dt_orswot, riak_dt_emcntr, riak_dt_lwwreg, riak_dt_od_flag, and riak_dt_map). The reason for this limitation is that embedded types must support embedding: that is a shared, dot-based, causal context, and a reset-remove semantic (more on these below.) The field_value() is a two-tuple of entries() and a tombstone(). The presence of a tombstone() in a "tombstoneless" Map is confusing. The tombstone() is only stored for fields that are currently in the map, removing a field also removes its tombstone.

To use the Map create a new() Map. When you call update/3 or update/4 you pass a list of operations and an optional causal context. @See update/3 or update/4 for more details. The list of operations is applied atomically in full, and new state returned, or not at all, and an error is returned.

Semantics

The semantics of this Map are Observed-Remove-Reset-Remove. What this means is practice is, if a field is removed, and concurrently that same field is updated, the field is _in_ the Map (only observed updates are removed) but those removes propagate, so only the concurrent update survives. A concrete example helps: If a Map contains a field that is a set, and the set has 5 elements in it, and concurrently the replica at A removes the field that contains the set, while the replica at B adds an item to the set, on merge there is a field for the set, but it contains only the one item B added. The removal of the field is semantically equivalent to removing all elements in the set, and removing the field. The same goes for an embedded Map. If concurrently a Map field is removed, while a new sub-field is updated, only the updated field(s) survive the reset-remove.

There is an anomaly for embedded counters that does not fully support reset remove. Embedded counters (@see riak_dt_emcntr) are different to a normal pn-counter. Embedded counters map dots to {P, N} pairs. When a counter is incremented a new dot is created, that replaces the old dot with the new value. pn-counter usually merges by taking the max of any P or N entry for an actor. This does not work in an embedded context. When a counter field is removed, and then _re_-added, the new P and N entries may be lower than the old, and merging loses the remove information. However, if a dot is stored with the value, and the max of the dot is used in merge, new updates win over removed updates. So far so good. Here is the problem. If Replica B removes a counter field, and does not re-add it, and replica A concurrently updates it's entry for that field, then the reset-remove does not occur. All new dots are not observed by Replica B, so not removed. The new dots contain the updates from the previous dots, and the old dot is discarded. To achieve reset-remove all increments would need a dot, and need to be retained, which would be very costly in terms of space. One way to accept this anomaly is to think of a Map like a file system: removing a directory and concurrently adding a file means that the directory is present and only the file remains in it. Updating a counter and concurrently removing it, means the counter remains, with the updated value, much like appending to a file in the file system analogy: you don't expect only the diff to survive, but the whole updated file.

Merging/Size

When any pair of Maps are merged, the embedded CRDTs are _not_ merged, instead each concurrent dot->field() entry is kept. This leads to a greater size for Maps that are highly divergent. Updating a field in the map, however, leads to all entries for that field being merged to a single CRDT that is stored against the new dot. As mentioned above, there is also a tombstone entry per present field. This is bottom CRDT for the field type with a clock that contains all seen and removed dots. There tombstones are merged at merge time, so only one is present per field. Clearly the repetition of actor information (the clock, each embedded CRDT, the field dots, the tombstones) is a serious issue with regard to size/bloat of this data type. We use erlang's to_binary/2 function, which compresses the data, to get around this at present.

Context and Deferred operations

For CRDTs that use version vectors and dots (this Map and all CRDTs that may be embedded in it), the size of the CRDT is influenced by the number of actors updating it. In some systems (like Riak!) we attempt to minimize the number of actors by only having the database update CRDTs. This leads to a kind of "action at a distance", where a client sends operations to the database, and an actor in the database system performs the operations. The purpose is to ship minimal state between database and client, and to limit the number of actors in the system. There is a problem with action at a distance and the OR semantic. The client _must_ be able to tell the database what has been observed when it sends a remove operation. There is a further problem. A replica that handles an operation may not have all the state the client observed. We solve these two problems by asking the client to provide a causal context for operations (@see update/4.) Context operations solve the OR problem, but they don't solve the problem of lagging replicas handling operations.

Lagging replicas, deferred operations

In a system like Riak, a replica that is not up-to-date (including, never seen any state for a CRDT) maybe asked to perform an operation. If no context is given, and the operation is a field remove, or a "remove" like operation on an embedded CRDT, the operation may fail with a precondition error (for example, remove a field that is not present) or succeed and remove more state than intended (a field remove with no context may remove updates unseen by the client.) When a context is provided, and the Field to be removed is absent, the Map state stores the context, and Field name, in a list of deferred operations. When, eventually, through propagation and merging, the Map's clock descends the context for the operation, the operation is executed. It is important to note that _only_ actorless (field remove) operations can occur this way.

Embedded CRDTs Deferred Operations

There is a bug with embedded types and deferred operations. Imagine a client has seen a Map with a Set field, and the set contains {a, b, c}. The client sends an operation to remove {a} from the set. A replica that is new takes the operation. It will create a new Map, a Field for the Set, and store the remove` operation as part of the Sets state. A client reads this new state, and sends a field remove operation, that is executed by same replica. Now the deferred operation is lost, since the field is removed. We're working on ways to fix this. One idea is to not remove a field with "undelivered" operations, but instead to "hide" it.

See riak_dt_orswot for more on the OR semantic

See riak_dt_emcntr for the embedded counter.

Data Types

any_map()

any_map() = riak_dt_map() | ord_map()

binary_map()

binary_map() = binary()

A binary that from_binary/1 will accept

context()

context() = riak_dt_vclock:vclock() | undefined

crdt()

crdt() = riak_dt_emcntr:emcntr() | riak_dt_od_flag:od_flag() | riak_dt_lwwreg:lwwreg() | riak_dt_orswot:orswot() | riak_dt_map:riak_dt_map()

crdt_mod()

crdt_mod() = riak_dt_emcntr | riak_dt_lwwreg | riak_dt_od_flag | riak_dt_map | riak_dt_orswot

crdt_op()

crdt_op() = riak_dt_emcntr:emcntr_op() | riak_dt_lwwreg:lwwreg_op() | riak_dt_orswot:orswot_op() | riak_dt_od_flag:od_flag_op() | riak_dt_map:map_op()

crdts()

crdts() = [entry()]

deferred()

deferred() = dict(context(), [field()])

dict()

dict(_A, _B) = dict()

entries()

entries() = dict(field_name(), field_value())

entry()

entry() = {riak_dt:dot(), crdt()}

field()

field() = {field_name(), field_value()}

field_name()

field_name() = {Name::binary(), CRDTModule::crdt_mod()}

field_value()

field_value() = {crdts(), tombstone()}

map_field_op()

map_field_op() = {remove, field()}

map_field_update()

map_field_update() = {update, field(), crdt_op()}

map_op()

map_op() = {update, [map_field_update() | map_field_op()]}

ord_map()

ord_map() = {riak_dt_vclock:vclock(), orddict:orddict(), orddict:orddict()}

precondition_error()

precondition_error() = {error, {precondition, {not_present, field()}}}

riak_dt_map()

riak_dt_map() = {riak_dt_vclock:vclock(), entries(), deferred()}

tombstone()

tombstone() = crdt()

value()

value() = {field(), riak_dt_map:values() | integer() | [term()] | boolean() | term()}

values()

values() = [value()]

Function Index

equal/2compare two riak_dt_map()s for equality of structure Both schemas and value list must be equal.
from_binary/1When the argument is a binary_map() produced by to_binary/1 will return the original riak_dt_map().
merge/2merge two riak_dt_map()s.
new/0Create a new, empty Map.
parent_clock/2sets the clock in the map to that Clock.
precondition_context/1an opaque context that can be passed to update/4 to ensure that only seen fields are removed.
stat/2
stats/1stats on internal state of Map.
to_binary/1returns a binary representation of the provided riak_dt_map().
to_version/2
update/3update the riak_dt_map() or a field in the riak_dt_map() by executing the map_op().
update/4the same as update/3 except that the context ensures no unseen field updates are removed, and removal of unseen updates is deferred.
value/1get the current set of values for this Map.
value/2query map (not implemented yet).

Function Details

equal/2

equal(LHS::riak_dt_map(), RHS::riak_dt_map()) -> boolean()

compare two riak_dt_map()s for equality of structure Both schemas and value list must be equal. Performs a pariwise equals for all values in the value lists

from_binary/1

from_binary(B::binary_map()) -> {ok, riak_dt_map()} | {error, unsupported_version, Vers::pos_integer()} | {error, invalid_binary}

When the argument is a binary_map() produced by to_binary/1 will return the original riak_dt_map().

See also: to_binary/1.

merge/2

merge(LHS::riak_dt_map(), RHS::riak_dt_map()) -> riak_dt_map()

merge two riak_dt_map()s.

new/0

new() -> riak_dt_map()

Create a new, empty Map.

parent_clock/2

parent_clock(Clock::riak_dt_vclock:vclock(), Map::riak_dt_map()) -> riak_dt_map()

sets the clock in the map to that Clock. Used by a containing Map for sub-CRDTs

precondition_context/1

precondition_context(X1::riak_dt_map()) -> riak_dt:context()

an opaque context that can be passed to update/4 to ensure that only seen fields are removed. If a field removal operation has a context that the Map has not seen, it will be deferred until causally relevant.

stat/2

stat(Stat::atom(), Map::riak_dt_map()) -> number() | undefined

stats/1

stats(Map::riak_dt_map()) -> [{atom(), integer()}]

stats on internal state of Map. A proplist of {StatName :: atom(), Value :: integer()}. Stats exposed are: actor_count: The number of actors in the clock for the Map. field_count: The total number of fields in the Map (including divergent field entries). duplication: The number of duplicate entries in the Map across all fields. basically field_count - ( unique fields) deferred_length: How many operations on the deferred list, a reasonable expression of lag/staleness.

to_binary/1

to_binary(Map::riak_dt_map()) -> binary_map()

returns a binary representation of the provided riak_dt_map(). The resulting binary is tagged and versioned for ease of future upgrade. Calling from_binary/1 with the result of this function will return the original map. Use the application env var binary_compression to turn t2b compression on (true) and off (false)

See also: from_binary/1.

to_version/2

to_version(X1::pos_integer(), Map::any_map()) -> any_map()

update/3

update(Op::map_op(), ActorOrDot::riak_dt:actor() | riak_dt:dot(), Map::riak_dt_map()) -> {ok, riak_dt_map()} | precondition_error()

update the riak_dt_map() or a field in the riak_dt_map() by executing the map_op(). Ops is a list of one or more of the following ops:

{update, field(), Op} where `Op is a valid update operation for a CRDT of type Mod from the Key pair {Name, Mod} If there is no local value for Key a new CRDT is created, the operation applied and the result inserted otherwise, the operation is applied to the local value.

{remove, `field()}' where field is {name, type}, results in the crdt at field and the key and value being removed. A concurrent update will "win" over a remove so that the field is still present, and it's value will contain the concurrent update.

Atomic, all of Ops are performed successfully, or none are.

update/4

update(Op::map_op(), ActorOrDot::riak_dt:actor() | riak_dt:dot(), Map::riak_dt_map(), Ctx::riak_dt:context()) -> {ok, riak_dt_map()}

the same as update/3 except that the context ensures no unseen field updates are removed, and removal of unseen updates is deferred. The Context is passed down as the context for any nested types. hence the common clock.

See also: parent_clock/2.

value/1

value(Map::riak_dt_map()) -> values()

get the current set of values for this Map

value/2

value(X1::term(), Map::riak_dt_map()) -> values()

query map (not implemented yet)


Generated by EDoc