Publishers
produce items consumed by one or more Subscribers
, each managed by a Subscription
.
These interfaces correspond to the reactive-streams
specification. They apply in both concurrent and distributed
asynchronous settings: All (seven) methods are defined in void
"one-way" message style. Communication relies on a simple form
of flow control (method Subscription#request
) that can be
used to avoid resource management problems that may otherwise occur
in "push" based systems.
Examples. A Publisher
usually defines its own
Subscription
implementation; constructing one in method
subscribe
and issuing it to the calling Subscriber
. It publishes items to the subscriber asynchronously,
normally using an Executor
. For example, here is a very
simple publisher that only issues (when requested) a single TRUE
item to a single subscriber. Because the subscriber receives
only a single item, this class does not use buffering and ordering
control required in most implementations (for example SubmissionPublisher
).
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (!completed) {
completed = true;
if (n <= 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
A Subscriber
arranges that items be requested and
processed. Items (invocations of Subscriber#onNext
) are
not issued unless requested, but multiple items may be requested.
Many Subscriber implementations can arrange this in the style of
the following example, where a buffer size of 1 single-steps, and
larger sizes usually allow for more efficient overlapped processing
with less communication; for example with a value of 64, this keeps
total outstanding requests between 32 and 64.
Because Subscriber method invocations for a given Subscription
are strictly ordered, there is no need for these
methods to use locks or volatiles unless a Subscriber maintains
multiple Subscriptions (in which case it is better to instead
define multiple Subscribers, each with its own Subscription).
class SampleSubscriber<T> implements Subscriber<T> {
final Consumer<? super T> consumer;
Subscription subscription;
final long bufferSize;
long count;
SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
this.bufferSize = bufferSize;
this.consumer = consumer;
}
public void onSubscribe(Subscription subscription) {
long initialRequestSize = bufferSize;
count = bufferSize - bufferSize / 2; // re-request when half consumed
(this.subscription = subscription).request(initialRequestSize);
}
public void onNext(T item) {
if (--count <= 0)
subscription.request(count = bufferSize - bufferSize / 2);
consumer.accept(item);
}
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
}
The default value of defaultBufferSize
may provide a
useful starting point for choosing request sizes and capacities in
Flow components based on expected rates, resources, and usages.
Or, when flow control is never needed, a subscriber may initially
request an effectively unbounded number of items, as in:
class UnboundedSubscriber<T> implements Subscriber<T> {
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // effectively unbounded
}
public void onNext(T item) { use(item); }
public void onError(Throwable ex) { ex.printStackTrace(); }
public void onComplete() {}
void use(T item) { ... }
}
Modifier and Type | Class and Description |
---|---|
public static interface | Flow.
the subscribed item type T, the published item type R>A component that acts as both a Subscriber and Publisher. |
public static interface | Flow.
the published item type T>A producer of items (and related control messages) received by Subscribers. |
public static interface | |
public static interface | Flow.
Message control linking a |
Modifier and Type | Field and Description |
---|---|
pack-priv static final int |
Access | Constructor and Description |
---|---|
private |
Modifier and Type | Method and Description |
---|---|
public static int | Returns: the buffer size valueReturns a default value for Publisher or Subscriber buffering, that may be used in the absence of other constraints. |
DEFAULT_BUFFER_SIZE | back to summary |
---|---|
pack-priv static final int DEFAULT_BUFFER_SIZE |
Flow | back to summary |
---|---|
private Flow() |
defaultBufferSize | back to summary |
---|---|
public static int defaultBufferSize() Returns a default value for Publisher or Subscriber buffering, that may be used in the absence of other constraints. Implementation Note The current value returned is 256.
|
Subscriber
receives the same
items (via method onNext
) in the same order, unless
drops or errors are encountered. If a Publisher encounters an
error that does not allow items to be issued to a Subscriber,
that Subscriber receives onError
, and then receives no
further messages. Otherwise, when it is known that no further
messages will be issued to it, a subscriber receives onComplete
. Publishers ensure that Subscriber method
invocations for each subscription are strictly ordered in happens-before
order.
Publishers may vary in policy about whether drops (failures to issue an item because of resource limitations) are treated as unrecoverable errors. Publishers may also vary about whether Subscribers receive items that were produced or available before they subscribed.
Modifier and Type | Method and Description |
---|---|
public void | subscribe(Flow.
the subscriber subscriberAdds the given Subscriber if possible. |
subscribe | back to summary |
---|---|
public void subscribe(Flow. Adds the given Subscriber if possible. If already
subscribed, or the attempt to subscribe fails due to policy
violations or errors, the Subscriber's
|
Subscription
.
Modifier and Type | Method and Description |
---|---|
public void | onComplete()
Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription. |
public void | |
public void | |
public void | onSubscribe(Flow.
a new subscription subscriptionMethod invoked prior to invoking any other Subscriber methods for the given Subscription. |
onComplete | back to summary |
---|---|
public void onComplete() Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription. If this method throws an exception, resulting behavior is undefined. |
onError | back to summary |
---|---|
public void onError(Throwable throwable) Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription. If this method itself throws an exception, resulting behavior is undefined.
|
onNext | back to summary |
---|---|
public void onNext(T item) Method invoked with a Subscription's next item. If this method throws an exception, resulting behavior is not guaranteed, but may cause the Subscription to be cancelled.
|
onSubscribe | back to summary |
---|---|
public void onSubscribe(Flow. Method invoked prior to invoking any other Subscriber methods for the given Subscription. If this method throws an exception, resulting behavior is not guaranteed, but may cause the Subscription not to be established or to be cancelled. Typically, implementations of this method invoke
|
Publisher
and Subscriber
. Subscribers receive items only when requested,
and may cancel at any time. The methods in this interface are
intended to be invoked only by their Subscribers; usages in
other contexts have undefined effects.
Modifier and Type | Method and Description |
---|---|
public void | |
public void | request(long
the increment of demand; a value of n)Long.MAX_VALUE may be considered as effectively unboundedAdds the given number |
cancel | back to summary |
---|---|
public void cancel() Causes the Subscriber to (eventually) stop receiving
messages. Implementation is best-effort -- additional
messages may be received after invoking this method.
A cancelled subscription need not ever receive an
|
request | back to summary |
---|---|
public void request(long n) Adds the given number
|