模块  java.base
软件包  java.util.concurrent

Class Flow


  • public final class Flow
    extends Object
    相互关联的接口和用于建立流量控制组件的静态方法,其中Publishers产生由一个或多个消费的项目Subscribers ,各由一个管理Subscription

    这些接口对应于reactive-streams规范。 它们适用于并发和分布式异步设置:所有(七种)方法都以void “单向”消息样式定义。 通信依赖于简单形式的流控制(方法Flow.Subscription.request(long) ),其可用于避免在“基于推送”的系统中可能发生的资源管理问题。

    例子。 Flow.Publisher通常定义自己的Flow.Subscription实现; 在方法subscribe构造一个并将其发布到调用Flow.Subscriber 它以异步方式向订户发布项目,通常使用Executor 例如,这是一个非常简单的发布者,只向单个订阅者发出(如果请求)单个TRUE项目。 由于订阅者只接收单个项目,因此该类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher )。

       class OneShotPublisher implements Publisher<Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based private boolean subscribed; // true after first subscribe public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException()); // only one allowed else { subscribed = true; subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; // to allow cancellation private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; } public synchronized void request(long n) { if (!completed) { completed = true; if (n <= 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel() { completed = true; if (future != null) future.cancel(false); } } } 

    Flow.Subscriber安排要求和处理物品。 除非有要求,否则不会发出项目( Flow.Subscriber.onNext(T)调用),但可能会要求提供多个项目。 许多订阅者实现可以按照以下示例的样式进行排列,其中缓冲区大小为1个单步,较大的大小通常允许更有效的重叠处理和更少的通信; 例如,值为64,这将使未完成的请求总数保持在32到64之间。由于对给定Flow.Subscription订阅者方法调用是严格排序的,因此除非订阅者维护多个订阅,否则这些方法不需要使用锁定或易失性(在在哪种情况下,最好定义多个订阅者,每个订阅者都有自己的订阅)。

       class SampleSubscriber<T> implements Subscriber<T> { final Consumer<? super T> consumer; Subscription subscription; final long bufferSize; long count; SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { this.bufferSize = bufferSize; this.consumer = consumer; } public void onSubscribe(Subscription subscription) { long initialRequestSize = bufferSize; count = bufferSize - bufferSize / 2; // re-request when half consumed (this.subscription = subscription).request(initialRequestSize); } public void onNext(T item) { if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2); consumer.accept(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} } 

    默认值defaultBufferSize()可以提供一个有用的起点,用于根据预期的费率,资源和用途选择Flow组件中的请求大小和容量。 或者,当永远不需要流量控制时,订户最初可以请求有效无限数量的项目,如:

       class UnboundedSubscriber<T> implements Subscriber<T> { public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); // effectively unbounded } public void onNext(T item) { use(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} void use(T item) { ... } } 
    从以下版本开始:
    9
    • 方法详细信息

      • defaultBufferSize

        public static int defaultBufferSize()
        返回Publisher或订阅服务器缓冲的默认值,可以在没有其他约束的情况下使用。
        Implementation Note:
        返回的当前值是256。
        结果
        缓冲区大小值