Flow.Publisher
that asynchronously issues submitted
(non-null) items to current subscribers until it is closed. Each
current subscriber receives newly submitted items in the same order
unless drops or exceptions are encountered. Using a
SubmissionPublisher allows item generators to act as compliant reactive-streams
Publishers relying on drop handling and/or blocking for flow
control.
A SubmissionPublisher uses the Executor
supplied in its
constructor for delivery to subscribers. The best choice of
Executor depends on expected usage. If the generator(s) of
submitted items run in separate threads, and the number of
subscribers can be estimated, consider using a Executors#newFixedThreadPool
. Otherwise consider using the
default, normally the ForkJoinPool#commonPool
.
Buffering allows producers and consumers to transiently operate
at different rates. Each subscriber uses an independent buffer.
Buffers are created upon first use and expanded as needed up to the
given maximum. (The enforced capacity may be rounded up to the
nearest power of two and/or bounded by the largest value supported
by this implementation.) Invocations of request
do not directly result in
buffer expansion, but risk saturation if unfilled requests exceed
the maximum capacity. The default value of Flow#defaultBufferSize()
may provide a useful starting point for
choosing a capacity based on expected rates, resources, and usages.
A single SubmissionPublisher may be shared among multiple sources. Actions in a source thread prior to publishing an item or issuing a signal happen-before actions subsequent to the corresponding access by each subscriber. But reported estimates of lag and demand are designed for use in monitoring, not for synchronization control, and may reflect stale or inaccurate views of progress.
Publication methods support different policies about what to do
when buffers are saturated. Method submit
blocks until resources are available. This is simplest, but least
responsive. The offer
methods may drop items (either
immediately or with bounded timeout), but provide an opportunity to
interpose a handler and then retry.
If any Subscriber method throws an exception, its subscription
is cancelled. If a handler is supplied as a constructor argument,
it is invoked before cancellation upon an exception in method
onNext
, but exceptions in methods
onSubscribe
,
onError
and
onComplete
are not recorded or
handled before cancellation. If the supplied Executor throws
RejectedExecutionException
(or any other RuntimeException
or Error) when attempting to execute a task, or a drop handler
throws an exception when processing a dropped item, then the
exception is rethrown. In these cases, not all subscribers will
have been issued the published item. It is usually good practice to
closeExceptionally
in these cases.
Method consume(Consumer)
simplifies support for a
common case in which the only action of a subscriber is to request
and process all items using a supplied function.
This class may also serve as a convenient base for subclasses that generate items, and use the methods in this class to publish them. For example here is a class that periodically publishes the items generated from a supplier. (In practice you might add methods to independently start and stop generation, to share Executors among publishers, and so on, or use a SubmissionPublisher as a component rather than a superclass.)
class PeriodicPublisher<T> extends SubmissionPublisher<T> {
final ScheduledFuture<?> periodicTask;
final ScheduledExecutorService scheduler;
PeriodicPublisher(Executor executor, int maxBufferCapacity,
Supplier<? extends T> supplier,
long period, TimeUnit unit) {
super(executor, maxBufferCapacity);
scheduler = new ScheduledThreadPoolExecutor(1);
periodicTask = scheduler.scheduleAtFixedRate(
() -> submit(supplier.get()), 0, period, unit);
}
public void close() {
periodicTask.cancel(false);
scheduler.shutdown();
super.close();
}
}
Here is an example of a Flow.
implementation.
It uses single-step requests to its publisher for simplicity of
illustration. A more adaptive version could monitor flow using the
lag estimate returned from submit
, along with other utility
methods.
class TransformProcessor<S,T> extends SubmissionPublisher<T>
implements Flow.Processor<S,T> {
final Function<? super S, ? extends T> function;
Flow.Subscription subscription;
TransformProcessor(Executor executor, int maxBufferCapacity,
Function<? super S, ? extends T> function) {
super(executor, maxBufferCapacity);
this.function = function;
}
public void onSubscribe(Flow.Subscription subscription) {
(this.subscription = subscription).request(1);
}
public void onNext(S item) {
subscription.request(1);
submit(function.apply(item));
}
public void onError(Throwable ex) { closeExceptionally(ex); }
public void onComplete() { close(); }
}
Modifier and Type | Class and Description |
---|---|
pack-priv static class | SubmissionPublisher.
A resizable array-based ring buffer with integrated control to start a consumer task whenever items are available. |
pack-priv static class | |
pack-priv static class | SubmissionPublisher.
A task for consuming buffer items and signals, created and executed whenever they become available. |
private static class | SubmissionPublisher.
Fallback if ForkJoinPool.commonPool() cannot support parallelism |
Modifier and Type | Field and Description |
---|---|
private static final Executor | ASYNC_POOL
Default executor -- ForkJoinPool.commonPool() unless it cannot support parallelism. |
pack-priv static final int | BUFFER_CAPACITY_LIMIT
The largest possible power of two array size. |
pack-priv SubmissionPublisher. | clients
Clients (BufferedSubscriptions) are maintained in a linked list (via their "next" fields). |
pack-priv volatile boolean | closed
Run status, updated only within locks |
pack-priv volatile Throwable | closedException
If non-null, the exception in closeExceptionally |
pack-priv final Executor | |
pack-priv static final int | INITIAL_CAPACITY
Initial buffer capacity used when maxBufferCapacity is greater. |
pack-priv final ReentrantLock | lock
Lock for exclusion across multiple sources |
pack-priv final int | |
pack-priv final BiConsumer | |
pack-priv Thread | owner
The first caller thread to subscribe, or null if thread ever changed |
pack-priv boolean | subscribed
Set true on first call to subscribe, to initialize possible owner |
Access | Constructor and Description |
---|---|
public | SubmissionPublisher(Executor
the executor to use for async delivery,
supporting creation of at least one independent thread executor, int the maximum capacity for each
subscriber's buffer (the enforced capacity may be rounded up to
the nearest power of two and/or bounded by the largest value
supported by this implementation; method maxBufferCapacity, BiConsumer<? super Flow.getMaxBufferCapacity
returns the actual value)if non-null, procedure to invoke upon exception
thrown in method handler)onNext Creates a new SubmissionPublisher using the given Executor for
async delivery to subscribers, with the given maximum buffer size
for each subscriber, and, if non-null, the given handler invoked
when any Subscriber throws an exception in method |
public | SubmissionPublisher(Executor
the executor to use for async delivery,
supporting creation of at least one independent thread executor, int the maximum capacity for each
subscriber's buffer (the enforced capacity may be rounded up to
the nearest power of two and/or bounded by the largest value
supported by this implementation; method maxBufferCapacity)getMaxBufferCapacity
returns the actual value)Creates a new SubmissionPublisher using the given Executor for
async delivery to subscribers, with the given maximum buffer size
for each subscriber, and no handler for Subscriber exceptions in
method |
public | SubmissionPublisher()
Creates a new SubmissionPublisher using the |
Modifier and Type | Method and Description |
---|---|
private int | |
public void | close()
Implements java. Unless already closed, issues |
public void | closeExceptionally(Throwable
the error)onError argument sent to subscribersUnless already closed, issues |
public CompletableFuture | |
private int | doOffer(T item, long nanos, BiPredicate<Flow.
Common implementation for all three forms of submit and offer. |
public int | Returns: the estimateReturns an estimate of the maximum number of items produced but not yet consumed among all current subscribers. |
public long | Returns: the estimate, or zero if no subscribersReturns an estimate of the minimum number of items requested
(via |
public Throwable | Returns: the exception, or null if noneReturns the exception associated with |
public Executor | Returns: the Executor used for asynchronous deliveryReturns the Executor used for asynchronous delivery. |
public int | Returns: the maximum per-subscriber buffer capacityReturns the maximum per-subscriber buffer capacity. |
public int | Returns: the number of current subscribersReturns the number of current subscribers. |
public List | Returns: list of current subscribersReturns a list of current subscribers for monitoring and
tracking purposes, not for invoking |
public boolean | Returns: true if this publisher has any subscribersReturns true if this publisher has any subscribers. |
public boolean | |
public boolean | Returns: true if currently subscribedthe subscriber subscriberReturns true if the given Subscriber is currently subscribed. |
public int | Returns: if negative, the (negative) number of drops; otherwise an estimate of maximum lagthe (non-null) item to publish item, BiPredicate<Flow.if non-null, the handler invoked upon a drop to a
subscriber, with arguments of the subscriber and item; if it
returns true, an offer is re-attempted (once) onDropPublishes the given item, if possible, to each current subscriber
by asynchronously invoking its |
public int | Returns: if negative, the (negative) number of drops; otherwise an estimate of maximum lagthe (non-null) item to publish item, long how long to wait for resources for any subscriber
before giving up, in units of timeout, TimeUnit unit a unit, BiPredicate<Flow.TimeUnit determining how to interpret the
timeout parameterif non-null, the handler invoked upon a drop to a
subscriber, with arguments of the subscriber and item; if it
returns true, an offer is re-attempted (once) onDropPublishes the given item, if possible, to each current subscriber
by asynchronously invoking its |
private int | retryOffer(T item, long nanos, BiPredicate<Flow.
Helps, (timed) waits for, and/or drops buffers on list; returns lag or negative drops (for use in offer). |
pack-priv static final int | |
public int | |
public void | subscribe(Flow.
the subscriber subscriberImplements java. Adds the given Subscriber unless already subscribed. |