Class MpscUnboundedXaddArrayQueue<E>

java.lang.Object
java.util.AbstractCollection<E>
java.util.AbstractQueue<E>
org.jctools.queues.MpscUnboundedXaddArrayQueue<E>
All Implemented Interfaces:
Iterable<E>, Collection<E>, Queue<E>, IndexedQueueSizeUtil.IndexedQueue, MessagePassingQueue<E>, QueueProgressIndicators

public class MpscUnboundedXaddArrayQueue<E> extends AbstractQueue<E>
An MPSC array queue which grows unbounded in linked chunks.
Differently from MpscUnboundedArrayQueue it is designed to provide a better scaling when more producers are concurrently offering.
Users should be aware that poll() could spin while awaiting a new element to be available: to avoid this behaviour relaxedPoll() should be used instead, accounting for the semantic differences between the twos.
Author:
https://github.com/franz1981
  • Constructor Details

    • MpscUnboundedXaddArrayQueue

      public MpscUnboundedXaddArrayQueue(int chunkSize, int maxPooledChunks)
      Parameters:
      chunkSize - The buffer size to be used in each chunk of this queue
      maxPooledChunks - The maximum number of reused chunks kept around to avoid allocation, chunks are pre-allocated
    • MpscUnboundedXaddArrayQueue

      public MpscUnboundedXaddArrayQueue(int chunkSize)
  • Method Details

    • offer

      public boolean offer(E e)
      Description copied from interface: MessagePassingQueue
      Called from a producer thread subject to the restrictions appropriate to the implementation and according to the Queue.offer(Object) interface.
      Parameters:
      e - not null, will throw NPE if it is
      Returns:
      true if element was inserted into the queue, false iff full
    • poll

      public E poll()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.poll() interface.
      Returns:
      a message from the queue if one is available, null iff empty
    • peek

      public E peek()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.peek() interface.
      Returns:
      a message from the queue if one is available, null iff empty
    • relaxedPoll

      public E relaxedPoll()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.poll() this method may return null without the queue being empty.
      Returns:
      a message from the queue if one is available, null if unable to poll
    • relaxedPeek

      public E relaxedPeek()
      Description copied from interface: MessagePassingQueue
      Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.peek() this method may return null without the queue being empty.
      Returns:
      a message from the queue if one is available, null if unable to peek
    • fill

      public int fill(MessagePassingQueue.Supplier<E> s)
      Description copied from interface: MessagePassingQueue
      Stuff the queue with elements from the supplier. Semantically similar to:
      while(relaxedOffer(s.get());
      
      There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation.

      Unbounded queues will fill up the queue with a fixed amount rather than fill up to oblivion. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<E>
      Returns:
      the number of offered elements
    • drain

      public int drain(MessagePassingQueue.Consumer<E> c, int limit)
      Description copied from interface: MessagePassingQueue
      Remove up to limit elements from the queue and hand to consume. This should be semantically similar to:

        M m;
        int i = 0;
        for(;i < limit && (m = relaxedPoll()) != null; i++){
          c.accept(m);
        }
        return i;
      

      There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<E>
      Returns:
      the number of polled elements
    • fill

      public int fill(MessagePassingQueue.Supplier<E> s, int limit)
      Description copied from interface: MessagePassingQueue
      Stuff the queue with up to limit elements from the supplier. Semantically similar to:

        for(int i=0; i < limit && relaxedOffer(s.get()); i++);
      

      There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Returns:
      the number of offered elements
    • chunkSize

      public final int chunkSize()
    • maxPooledChunks

      public final int maxPooledChunks()
    • currentProducerIndex

      public long currentProducerIndex()
      Description copied from interface: QueueProgressIndicators
      This method has no concurrent visibility semantics. The value returned may be negative. Under normal circumstances 2 consecutive calls to this method can offer an idea of progress made by producer threads by subtracting the 2 results though in extreme cases (if producers have progressed by more than 2^64) this may also fail.
      This value will normally indicate number of elements passed into the queue, but may under some circumstances be a derivative of that figure. This method should not be used to derive size or emptiness.
      Specified by:
      currentProducerIndex in interface QueueProgressIndicators
      Returns:
      the current value of the producer progress index
    • currentConsumerIndex

      public long currentConsumerIndex()
      Description copied from interface: QueueProgressIndicators
      This method has no concurrent visibility semantics. The value returned may be negative. Under normal circumstances 2 consecutive calls to this method can offer an idea of progress made by consumer threads by subtracting the 2 results though in extreme cases (if consumers have progressed by more than 2^64) this may also fail.
      This value will normally indicate number of elements taken out of the queue, but may under some circumstances be a derivative of that figure. This method should not be used to derive size or emptiness.
      Specified by:
      currentConsumerIndex in interface QueueProgressIndicators
      Returns:
      the current value of the consumer progress index
    • appendNextChunks

      protected final MpscUnboundedXaddChunk<E> appendNextChunks(MpscUnboundedXaddChunk<E> currentChunk, long currentChunkIndex, long chunksToAppend)
    • iterator

      public Iterator<E> iterator()
      Specified by:
      iterator in interface Collection<R extends MpUnboundedXaddChunk<R,E>>
      Specified by:
      iterator in interface Iterable<R extends MpUnboundedXaddChunk<R,E>>
      Specified by:
      iterator in class AbstractCollection<E>
    • size

      public int size()
      Description copied from interface: MessagePassingQueue
      This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
      Specified by:
      size in interface Collection<R extends MpUnboundedXaddChunk<R,E>>
      Specified by:
      size in interface MessagePassingQueue<R extends MpUnboundedXaddChunk<R,E>>
      Specified by:
      size in class AbstractCollection<E>
      Returns:
      number of messages in the queue, between 0 and Integer.MAX_VALUE but less or equals to capacity (if bounded).
    • isEmpty

      public boolean isEmpty()
      Description copied from interface: MessagePassingQueue
      This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
      Specified by:
      isEmpty in interface Collection<R extends MpUnboundedXaddChunk<R,E>>
      Specified by:
      isEmpty in interface MessagePassingQueue<R extends MpUnboundedXaddChunk<R,E>>
      Overrides:
      isEmpty in class AbstractCollection<E>
      Returns:
      true if empty, false otherwise
    • capacity

      public int capacity()
      Specified by:
      capacity in interface IndexedQueueSizeUtil.IndexedQueue
      Specified by:
      capacity in interface MessagePassingQueue<R extends MpUnboundedXaddChunk<R,E>>
      Returns:
      the capacity of this queue or MessagePassingQueue.UNBOUNDED_CAPACITY if not bounded
    • relaxedOffer

      public boolean relaxedOffer(E e)
      Description copied from interface: MessagePassingQueue
      Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.offer(Object) this method may return false without the queue being full.
      Specified by:
      relaxedOffer in interface MessagePassingQueue<R extends MpUnboundedXaddChunk<R,E>>
      Parameters:
      e - not null, will throw NPE if it is
      Returns:
      true if element was inserted into the queue, false if unable to offer
    • drain

      public int drain(MessagePassingQueue.Consumer<E> c)
      Description copied from interface: MessagePassingQueue
      Remove all available item from the queue and hand to consume. This should be semantically similar to:
      M m;
      while((m = relaxedPoll()) != null){
      c.accept(m);
      }
      
      There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<R extends MpUnboundedXaddChunk<R,E>>
      Returns:
      the number of polled elements
    • drain

      Description copied from interface: MessagePassingQueue
      Remove elements from the queue and hand to consume forever. Semantically similar to:

       int idleCounter = 0;
       while (exit.keepRunning()) {
           E e = relaxedPoll();
           if(e==null){
               idleCounter = wait.idle(idleCounter);
               continue;
           }
           idleCounter = 0;
           c.accept(e);
       }
      

      Called from a consumer thread subject to the restrictions appropriate to the implementation.

      WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

      Specified by:
      drain in interface MessagePassingQueue<R extends MpUnboundedXaddChunk<R,E>>
    • fill

      Description copied from interface: MessagePassingQueue
      Stuff the queue with elements from the supplier forever. Semantically similar to:

      
       int idleCounter = 0;
       while (exit.keepRunning()) {
           E e = s.get();
           while (!relaxedOffer(e)) {
               idleCounter = wait.idle(idleCounter);
               continue;
           }
           idleCounter = 0;
       }
      
      

      Called from a producer thread subject to the restrictions appropriate to the implementation. The main difference being that implementors MUST assure room in the queue is available BEFORE calling MessagePassingQueue.Supplier.get(). WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

      Specified by:
      fill in interface MessagePassingQueue<R extends MpUnboundedXaddChunk<R,E>>
    • toString

      public String toString()
      Overrides:
      toString in class AbstractCollection<E>
    • lvConsumerIndex

      public final long lvConsumerIndex()
    • lvProducerIndex

      public final long lvProducerIndex()