Top Description Inners Fields Constructors Methods
java.util.concurrent

public Class SubmissionPublisher<T>

extends Object
implements Flow.Publisher<T>, AutoCloseable
Class Inheritance
All Implemented Interfaces
java.lang.AutoCloseable, java.util.concurrent.Flow.Publisher
Type Parameters
<T>
the published item type
Imports
java.lang.invoke.MethodHandles, .VarHandle, java.util.ArrayList, .Arrays, .List, java.util.concurrent.locks.LockSupport, .ReentrantLock, java.util.function.BiConsumer, .BiPredicate, .Consumer

A Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.

A SubmissionPublisher uses the Executor supplied in its constructor for delivery to subscribers. The best choice of Executor depends on expected usage. If the generator(s) of submitted items run in separate threads, and the number of subscribers can be estimated, consider using a Executors#newFixedThreadPool. Otherwise consider using the default, normally the ForkJoinPool#commonPool.

Buffering allows producers and consumers to transiently operate at different rates. Each subscriber uses an independent buffer. Buffers are created upon first use and expanded as needed up to the given maximum. (The enforced capacity may be rounded up to the nearest power of two and/or bounded by the largest value supported by this implementation.) Invocations of request do not directly result in buffer expansion, but risk saturation if unfilled requests exceed the maximum capacity. The default value of Flow#defaultBufferSize() may provide a useful starting point for choosing a capacity based on expected rates, resources, and usages.

A single SubmissionPublisher may be shared among multiple sources. Actions in a source thread prior to publishing an item or issuing a signal happen-before actions subsequent to the corresponding access by each subscriber. But reported estimates of lag and demand are designed for use in monitoring, not for synchronization control, and may reflect stale or inaccurate views of progress.

Publication methods support different policies about what to do when buffers are saturated. Method submit blocks until resources are available. This is simplest, but least responsive. The offer methods may drop items (either immediately or with bounded timeout), but provide an opportunity to interpose a handler and then retry.

If any Subscriber method throws an exception, its subscription is cancelled. If a handler is supplied as a constructor argument, it is invoked before cancellation upon an exception in method onNext, but exceptions in methods onSubscribe, onError and onComplete are not recorded or handled before cancellation. If the supplied Executor throws RejectedExecutionException (or any other RuntimeException or Error) when attempting to execute a task, or a drop handler throws an exception when processing a dropped item, then the exception is rethrown. In these cases, not all subscribers will have been issued the published item. It is usually good practice to closeExceptionally in these cases.

Method consume(Consumer) simplifies support for a common case in which the only action of a subscriber is to request and process all items using a supplied function.

This class may also serve as a convenient base for subclasses that generate items, and use the methods in this class to publish them. For example here is a class that periodically publishes the items generated from a supplier. (In practice you might add methods to independently start and stop generation, to share Executors among publishers, and so on, or use a SubmissionPublisher as a component rather than a superclass.)

 class PeriodicPublisher<T> extends SubmissionPublisher<T> {
  final ScheduledFuture<?> periodicTask;
  final ScheduledExecutorService scheduler;
  PeriodicPublisher(Executor executor, int maxBufferCapacity,
                    Supplier<? extends T> supplier,
                    long period, TimeUnit unit) {
    super(executor, maxBufferCapacity);
    scheduler = new ScheduledThreadPoolExecutor(1);
    periodicTask = scheduler.scheduleAtFixedRate(
      () -> submit(supplier.get()), 0, period, unit);
  }
  public void close() {
    periodicTask.cancel(false);
    scheduler.shutdown();
    super.close();
  }
}

Here is an example of a Flow.Processor implementation. It uses single-step requests to its publisher for simplicity of illustration. A more adaptive version could monitor flow using the lag estimate returned from submit, along with other utility methods.

 class TransformProcessor<S,T> extends SubmissionPublisher<T>
  implements Flow.Processor<S,T> {
  final Function<? super S, ? extends T> function;
  Flow.Subscription subscription;
  TransformProcessor(Executor executor, int maxBufferCapacity,
                     Function<? super S, ? extends T> function) {
    super(executor, maxBufferCapacity);
    this.function = function;
  }
  public void onSubscribe(Flow.Subscription subscription) {
    (this.subscription = subscription).request(1);
  }
  public void onNext(S item) {
    subscription.request(1);
    submit(function.apply(item));
  }
  public void onError(Throwable ex) { closeExceptionally(ex); }
  public void onComplete() { close(); }
}
Author
Doug Lea
Since
9

Nested and Inner Type Summary

Modifier and TypeClass and Description
pack-priv static class
SubmissionPublisher.BufferedSubscription<T>

A resizable array-based ring buffer with integrated control to start a consumer task whenever items are available.

pack-priv static class
SubmissionPublisher.ConsumerSubscriber<T>

Subscriber for method consume

pack-priv static class
SubmissionPublisher.ConsumerTask<T>

A task for consuming buffer items and signals, created and executed whenever they become available.

private static class
SubmissionPublisher.ThreadPerTaskExecutor

Fallback if ForkJoinPool.commonPool() cannot support parallelism

Field Summary

Modifier and TypeField and Description
private static final Executor
ASYNC_POOL

Default executor -- ForkJoinPool.commonPool() unless it cannot support parallelism.

pack-priv static final int
BUFFER_CAPACITY_LIMIT

The largest possible power of two array size.

pack-priv SubmissionPublisher.BufferedSubscription<T>
clients

Clients (BufferedSubscriptions) are maintained in a linked list (via their "next" fields).

pack-priv volatile boolean
closed

Run status, updated only within locks

pack-priv volatile Throwable
closedException

If non-null, the exception in closeExceptionally

pack-priv final Executor
pack-priv static final int
INITIAL_CAPACITY

Initial buffer capacity used when maxBufferCapacity is greater.

pack-priv final ReentrantLock
lock

Lock for exclusion across multiple sources

pack-priv final int
pack-priv final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable>
pack-priv Thread
owner

The first caller thread to subscribe, or null if thread ever changed

pack-priv boolean
subscribed

Set true on first call to subscribe, to initialize possible owner

Constructor Summary

AccessConstructor and Description
public
SubmissionPublisher(Executor
the executor to use for async delivery, supporting creation of at least one independent thread
executor
,
int
the maximum capacity for each subscriber's buffer (the enforced capacity may be rounded up to the nearest power of two and/or bounded by the largest value supported by this implementation; method getMaxBufferCapacity returns the actual value)
maxBufferCapacity
,
BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable>
if non-null, procedure to invoke upon exception thrown in method onNext
handler
)

Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and, if non-null, the given handler invoked when any Subscriber throws an exception in method onNext.

public
SubmissionPublisher(Executor
the executor to use for async delivery, supporting creation of at least one independent thread
executor
,
int
the maximum capacity for each subscriber's buffer (the enforced capacity may be rounded up to the nearest power of two and/or bounded by the largest value supported by this implementation; method getMaxBufferCapacity returns the actual value)
maxBufferCapacity
)

Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and no handler for Subscriber exceptions in method onNext.

public
SubmissionPublisher()

Creates a new SubmissionPublisher using the ForkJoinPool#commonPool() for async delivery to subscribers (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task), with maximum buffer capacity of Flow#defaultBufferSize, and no handler for Subscriber exceptions in method onNext.

Method Summary

Modifier and TypeMethod and Description
private int
cleanAndCount()

Returns current list count after removing closed subscribers.

public void
close()

Implements java.lang.AutoCloseable.close.

Unless already closed, issues onComplete signals to current subscribers, and disallows subsequent attempts to publish.

public void
closeExceptionally(Throwable
the onError argument sent to subscribers
error
)

Unless already closed, issues onError signals to current subscribers with the given error, and disallows subsequent attempts to publish.

public CompletableFuture<Void>

Returns:

a CompletableFuture that is completed normally when the publisher signals onComplete, and exceptionally upon any error or cancellation
consume
(Consumer<? super T>
the function applied to each onNext item
consumer
)

Processes all published items using the given Consumer function.

private int
doOffer(T item, long nanos, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop)

Common implementation for all three forms of submit and offer.

public int

Returns:

the estimate
estimateMaximumLag
()

Returns an estimate of the maximum number of items produced but not yet consumed among all current subscribers.

public long

Returns:

the estimate, or zero if no subscribers
estimateMinimumDemand
()

Returns an estimate of the minimum number of items requested (via request) but not yet produced, among all current subscribers.

public Throwable

Returns:

the exception, or null if none
getClosedException
()

Returns the exception associated with closeExceptionally, or null if not closed or if closed normally.

public Executor

Returns:

the Executor used for asynchronous delivery
getExecutor
()

Returns the Executor used for asynchronous delivery.

public int

Returns:

the maximum per-subscriber buffer capacity
getMaxBufferCapacity
()

Returns the maximum per-subscriber buffer capacity.

public int

Returns:

the number of current subscribers
getNumberOfSubscribers
()

Returns the number of current subscribers.

public List<Flow.Subscriber<? super T>>

Returns:

list of current subscribers
getSubscribers
()

Returns a list of current subscribers for monitoring and tracking purposes, not for invoking Flow.Subscriber methods on the subscribers.

public boolean

Returns:

true if this publisher has any subscribers
hasSubscribers
()

Returns true if this publisher has any subscribers.

public boolean

Returns:

true if closed
isClosed
()

Returns true if this publisher is not accepting submissions.

public boolean

Returns:

true if currently subscribed
isSubscribed
(Flow.Subscriber<? super T>
the subscriber
subscriber
)

Returns true if the given Subscriber is currently subscribed.

public int

Returns:

if negative, the (negative) number of drops; otherwise an estimate of maximum lag
offer
(T
the (non-null) item to publish
item
,
BiPredicate<Flow.Subscriber<? super T>, ? super T>
if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
onDrop
)

Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method.

public int

Returns:

if negative, the (negative) number of drops; otherwise an estimate of maximum lag
offer
(T
the (non-null) item to publish
item
,
long
how long to wait for resources for any subscriber before giving up, in units of unit
timeout
,
TimeUnit
a TimeUnit determining how to interpret the timeout parameter
unit
,
BiPredicate<Flow.Subscriber<? super T>, ? super T>
if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
onDrop
)

Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method, blocking while resources for any subscription are unavailable, up to the specified timeout or until the caller thread is interrupted, at which point the given handler (if non-null) is invoked, and if it returns true, retried once.

private int
retryOffer(T item, long nanos, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop, SubmissionPublisher.BufferedSubscription<T> retries, int lag, boolean cleanMe)

Helps, (timed) waits for, and/or drops buffers on list; returns lag or negative drops (for use in offer).

pack-priv static final int
roundCapacity(int cap)

Round capacity to power of 2, at most limit.

public int

Returns:

the estimated maximum lag among subscribers
submit
(T
the (non-null) item to publish
item
)

Publishes the given item to each current subscriber by asynchronously invoking its onNext method, blocking uninterruptibly while resources for any subscriber are unavailable.

public void
subscribe(Flow.Subscriber<? super T>
the subscriber
subscriber
)

Implements java.util.concurrent.Flow.Publisher.subscribe.

Adds the given Subscriber unless already subscribed.

Inherited from java.lang.Object:
cloneequalsfinalizegetClasshashCodenotifynotifyAlltoStringwaitwaitwait