Webflux背压
在响应式的系统中数据的生产和消费的速度往往是不一致的当数据的消费速度大于生产速度时每生产一个数据都能被及时消费这时候系统可以良好地运行。但当数据的生产数据大于消费速度时来不及被消费的这部分数据就有可能将系统压垮。因此产生了背压这个概念用于处理在异步场景下生产速度大于消费速度的问题。一、什么是背压背压是流体力学中借鉴过来的一个概念它的原义是抵抗所需流体通过管道的阻力或力引申到软件领域即是响应式数据流的下游向它的上游反馈其处理能力的一种机制。背压的出现是为了解决上游组件的过量消息导致下游组件无法及时处理从而导致系统崩溃的问题。当下游的负载过重时它应当通过某种机制向上游反馈目前正在遭受过重的负载这个事实以期上游能够减缓消息的下发减轻自身的负载。背压是一种重要的反馈机制使得系统在高负载的情况下能够良好地运行。反之当下游比较空闲时可以向上游请求更多的消息进行处理。背压本质上也是一种流量控制机制上游根据下游反馈的处理能力来决定消息下发的速度。二、Reactive Streams规范中的背压Reactive Streams是一个异步流处理的API规范用于线程交互场景它的主要目标有2个管理跨异步边界的流数据交换 - 即将元素传递到另一个线程或线程池确保接收方不会强制缓冲任意数量的数据为了使线程之间的队列有界引入了背压。Reactive Streams中提供了4种组件Publisher产生消息的发布者Subscriber处理消息的订阅者Subscription用于绑定Publisher和Subscriber的关系Processor中间处理组件既是Publisher也是SubscriberReactive Stream规范指出Subscriber必须通过Subscription.request(long n)方法请求所需的信号以接受来自onNext的信号即Subscriber有责任决定它可以接受多少个元素并将其传达给Publisher。从中可以看出Subscription的request方法是背压实现的关键。Reactive Stream虽然约定了背压的传递机制但是并没有限制如何处理从上游发出来不及被下游处理的元素这部分内容由规范的实现来决定。三、Reactor中的背压策略Reactor框架是Reactive Streams的一个实现它提供了以下5种背压策略BUFFER默认值以在下游无法跟上时缓冲所有信号。这会实现无限缓冲并可能导致OutOfMemoryErrorDROP如果下游尚未准备好接收信号则丢弃该信号LATEST让下游只从上游获取最新信号ERROR在下游无法跟上时发出错误信号IllegalStateExceptionIGNORE完全忽略下游背压请求当下游队列充满时会导致IllegalStateException下面通过一个简单的案例来试验下上述5种背压策略。1首先定义一个事件的结构Event、事件源EventSource以及一个事件监听器接口EventListenerpublic class EventSource { private ListEventListener listeners new ArrayList(); public void registry(EventListener listener) { listeners.add(listener); } public void next(Event event) { System.out.println( send event: event.getMsg()); listeners.forEach(l - l.onNext(event)); } public void complete() { listeners.forEach(EventListener::onComplete); } Data AllArgsConstructor public static class Event { private String msg; } } interface EventListener { void onNext(EventSource.Event event); void onComplete(); }2然后定义一个慢消费者SlowSubscriber消费者中维护了一个线程池当接受到一个事件后将该事件放到线程池中执行实现异步消费public class SlowSubscriber extends BaseSubscriberEventSource.Event { private int capacity; private int processTime; private ThreadPoolExecutor pool; public SlowSubscriber(int capacity, int processTime) { this.capacity capacity; this.processTime processTime; this.pool new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue(capacity)); } Override protected void hookOnSubscribe(Subscription subscription) { System.out.println( request capacity ); request(capacity); } Override protected void hookOnNext(EventSource.Event event) { pool.submit(() - { System.out.println( receive event: event.getMsg()); try { TimeUnit.MILLISECONDS.sleep(processTime); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( request 1 ); request(1); }); } Override protected void hookOnComplete() { System.out.println(Complete); } Override protected void hookOnError(Throwable throwable) { throwable.printStackTrace(); } Override protected void hookOnCancel() { System.out.println(Cancel); } }3定义一个快生产者FastPublisherpublic class FastPublisher { private EventSource source; private int processTime; public FastPublisher(EventSource source, int processTime) { this.source source; this.processTime processTime; } public void send() { for (int i 0; i 10; i) { source.next(new EventSource.Event(event i)); try { TimeUnit.MILLISECONDS.sleep(processTime); } catch (InterruptedException e) { e.printStackTrace(); } } source.complete(); } }4最后是测试类分别定义测试以上5种背压策略的方法public class Test { private static int SUBSCRIBER_CAPACITY 5; private static int PUBLISH_TIME 200; private static int CONSUME_TIME 1000; private EventSource source new EventSource(); public FluxEventSource.Event createFlux(FluxSink.OverflowStrategy backpressure) { return Flux.create(sink - { source.registry(new EventListener() { Override public void onNext(EventSource.Event event) { sink.next(event); } Override public void onComplete() { sink.complete(); } }); }, backpressure); } public void testBuffer() { createFlux(FluxSink.OverflowStrategy.BUFFER).subscribe(new SlowSubscriber(SUBSCRIBER_CAPACITY, CONSUME_TIME)); new FastPublisher(source, PUBLISH_TIME).send(); } public void testDrop() { createFlux(FluxSink.OverflowStrategy.DROP).subscribe(new SlowSubscriber(SUBSCRIBER_CAPACITY, CONSUME_TIME)); new FastPublisher(source, PUBLISH_TIME).send(); } public void testError() { createFlux(FluxSink.OverflowStrategy.ERROR).subscribe(new SlowSubscriber(SUBSCRIBER_CAPACITY, CONSUME_TIME)); new FastPublisher(source, PUBLISH_TIME).send(); } public void testLatest() { createFlux(FluxSink.OverflowStrategy.LATEST).subscribe(new SlowSubscriber(SUBSCRIBER_CAPACITY, CONSUME_TIME)); new FastPublisher(source, PUBLISH_TIME).send(); } public void testIgnore() { createFlux(FluxSink.OverflowStrategy.IGNORE).subscribe(new SlowSubscriber(SUBSCRIBER_CAPACITY, CONSUME_TIME)); new FastPublisher(source, PUBLISH_TIME).send(); } public static void main(String[] args) { new Test().testBuffer(); new Test().testDrop(); new Test().testError(); new Test().testLatest(); new Test().testIgnore(); } }接下来看下具体的运行效果1BUFFER策略的运行结果如下可以看到所有的事件都能被正常消费来不及处理的事件会被缓存起来等待消费者将之前的事件处理完后继续处理2DROP策略的运行结果如下可以看到只消费了6个事件剩下4个事件由于消费者没有能力继续处理而被丢弃了其它背压策略的执行结果这里不再一一展示感兴趣的读者可以自己执行代码看下输出的结果。四、Reactor如何实现背压接下来通过分析Reactor的源码来看下它是如何实现背压的。首先看下响应式流的创建方法create该方法提供了2个入参一个是Consumer对象通过FluxSink向下游发送消息另一个则是背压策略。这部分代码没有复杂的逻辑只是实例化了FluxCreate类并给其相关属性进行赋值public static T FluxT create(Consumer? super FluxSinkT emitter, OverflowStrategy backpressure) { return onAssembly(new FluxCreate(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL)); } final class FluxCreateT extends FluxT implements SourceProducerT { ...... FluxCreate(Consumer? super FluxSinkT source, FluxSink.OverflowStrategy backpressure, CreateMode createMode) { this.source Objects.requireNonNull(source, source); this.backpressure Objects.requireNonNull(backpressure, backpressure); this.createMode createMode; } ...... }然后看下FluxCreate的订阅方法subscribe该方法需要传入一个Subscriber对象方法具体实现如下public void subscribe(CoreSubscriber? super T actual) { BaseSinkT sink createSink(actual, backpressure); actual.onSubscribe(sink); try { source.accept( createMode CreateMode.PUSH_PULL ? new SerializedSink(sink) : sink); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); sink.error(Operators.onOperatorError(ex, actual.currentContext())); } }subscribe方法首先调用createSink方法实创建了一个BaseSink对象通过BaseSink的继承关系可以知道这是一个SubscirptioncreateSink方法如下根据背压策略选择具体的Sink进行实例化static T BaseSinkT createSink(CoreSubscriber? super T t, OverflowStrategy backpressure) { switch (backpressure) { case IGNORE: { return new IgnoreSink(t); } case ERROR: { return new ErrorAsyncSink(t); } case DROP: { return new DropAsyncSink(t); } case LATEST: { return new LatestAsyncSink(t); } default: { return new BufferAsyncSink(t, Queues.SMALL_BUFFER_SIZE); } } }这里选择BUFFER策略进行分析可以看到BufferAsyncSink内部维护了一个无界队列用于存放来不及消费的消息。其父类BaseSink维护了一个requested属性该属性便是用来记录下游目前已经请求的消息数量用于实现背压控制static final class BufferAsyncSinkT extends BaseSinkT { final QueueT queue; Throwable error; volatile boolean done; //done is still useful to be able to drain before the terminated handler is executed volatile int wip; SuppressWarnings(rawtypes) static final AtomicIntegerFieldUpdaterBufferAsyncSink WIP AtomicIntegerFieldUpdater.newUpdater(BufferAsyncSink.class, wip); BufferAsyncSink(CoreSubscriber? super T actual, int capacityHint) { super(actual); this.queue Queues.Tunbounded(capacityHint).get(); } ...... } static abstract class BaseSinkT extends AtomicBoolean implements FluxSinkT, InnerProducerT { ...... volatile long requested; SuppressWarnings(rawtypes) static final AtomicLongFieldUpdaterBaseSink REQUESTED AtomicLongFieldUpdater.newUpdater(BaseSink.class, requested); ...... BaseSink(CoreSubscriber? super T actual) { this.actual actual; this.ctx actual.currentContext(); } }再回到subscribe方法里接着便是调用Subscirber的toSubscribe方法启动消费在上述的示例中传入的Subscirber是自定义的SlowSubscriber其toSubscribe方法的逻辑很简单只是通过BufferAsyncSink的request方法告诉上游我可以处理多少消息。subscribe方法的最后则是调用FluxCreate初始化时传入的Consumer开启消息生产的过程。接着再来看看BufferAsyncSink的request方法是如何实现背压控制的。该方法在其父类BaseSink中实现具体如下public final void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); ...... onRequestedFromDownstream(); } }方法的逻辑比较简单首先是更新requested属性将下游请求的消息数量n加到requested中然后通过抽象方法onRequestedFromDownstream实现具体的背压控制。BufferAsyncSink中的onRequestedFromDownstream方法如下只是简单地调用了drain方法void onRequestedFromDownstream() { drain(); }再来看下BufferAsyncSink的next方法除了将接受到的消息放入队列中外也是调用drain方法来将消息发送给下游public FluxSinkT next(T t) { queue.offer(t); drain(); return this; }由此可见drain是实现背压策略的核心方法其方法实现如下void drain() { if (WIP.getAndIncrement(this) ! 0) { return; } final Subscriber? super T a actual; final QueueT q queue; for (; ; ) { long r requested; long e 0L; while (e ! r) { if (isCancelled()) { Operators.onDiscardQueueWithClear(q, ctx, null); if (WIP.decrementAndGet(this) ! 0) { continue; } else { return; } } boolean d done; T o q.poll(); boolean empty o null; if (d empty) { Throwable ex error; if (ex ! null) { super.error(ex); } else { super.complete(); } return; } if (empty) { break; } a.onNext(o); e; } if (e r) { if (isCancelled()) { Operators.onDiscardQueueWithClear(q, ctx, null); if (WIP.decrementAndGet(this) ! 0) { continue; } else { return; } } boolean d done; boolean empty q.isEmpty(); if (d empty) { Throwable ex error; if (ex ! null) { super.error(ex); } else { super.complete(); } return; } } if (e ! 0) { Operators.produced(REQUESTED, this, e); } if (WIP.decrementAndGet(this) 0) { break; } } }忽略中间的一些控制流程则该方法的主要过程如下通过一个while循环从内部队列中取出消息并调用Subscriber的onNext方法将该消息发送给Subscriber进行处理直到发送的消息个数达到记录的requested的数量或者队列为空。从requested中减去已发送的消息的数量。五、背压样例背压就是流量控制。Reactor提供的背压策略由OverflowStrategy枚举指定IGNORE完全忽略下游背压请求。ERROR当下游无法跟上时发出IllegalStateException信号。DROP如果下游没有准备好接收传入信号则丢弃传入。LATEST下游将仅获得来自上游的最新信号。BUFFER如果下游跟不上缓冲所有信号。Test public void test() { CountDownLatch countDownLatch new CountDownLatch(1); final int[] a {0}; Flux.push(t- { for (int i0;i10;i){ t.next(a[0]); try { TimeUnit.MICROSECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } } System.out.println(generate thread:Thread.currentThread().getName()); t.complete(); }, FluxSink.OverflowStrategy.BUFFER) .publishOn(Schedulers.newSingle(publish-thread-),1) .subscribeOn(Schedulers.newSingle(subscribe-thread-)) .subscribe(new SubscriberObject() { private Subscription subscription null; Override public void onSubscribe(Subscription subscription) { this.subscription subscription; subscription.request(1); } Override public void onNext(Object o) { System.out.println(Thread.currentThread().getName():消费数据:o); try { TimeUnit.MICROSECONDS.sleep(30); } catch (InterruptedException e) { throw new RuntimeException(e); } this.subscription.request(1); } Override public void onError(Throwable throwable) { System.out.println(出现错误); throwable.printStackTrace(); countDownLatch.countDown(); } Override public void onComplete() { System.out.println(Complete); countDownLatch.countDown(); } }); try { countDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } }Flux.push是发布者睡眠10毫秒在生产下一个数字订阅者是睡眠30毫秒才向上游请求数据。来模拟快的发布者慢的订阅者。同时用publishOn和subscribeOn模拟发布者和订阅者在不同线程。记得publishOn的第二个参数预取个数设置为1。Flux.push第二个参数是背压策略。这里设置为FluxSink.OverflowStrategy.BUFFER执行结果publish-thread--2:消费数据:0 generate thread:subscribe-thread--1 publish-thread--2:消费数据:1 publish-thread--2:消费数据:2 publish-thread--2:消费数据:3 publish-thread--2:消费数据:4 publish-thread--2:消费数据:5 publish-thread--2:消费数据:6 publish-thread--2:消费数据:7 publish-thread--2:消费数据:8 publish-thread--2:消费数据:9 Complete所有的数据都消费了。将Flux.push第二个参数设置为FluxSink.OverflowStrategy.DROP执行结果publish-thread--2:消费数据:0 generate thread:subscribe-thread--1 publish-thread--2:消费数据:5 Complete除了0和5被消费外其他都被丢弃。将Flux.push第二个参数设置为FluxSink.OverflowStrategy.LATEST执行结果publish-thread--2:消费数据:0 generate thread:subscribe-thread--1 publish-thread--2:消费数据:4 publish-thread--2:消费数据:9 Complete每次向上游请求时都是请求最新的数据。在这里是049。将Flux.push第二个参数设置为FluxSink.OverflowStrategy.ERROR执行结果publish-thread--2:消费数据:0 出现错误 reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) at reactor.core.Exceptions.failWithOverflow(Exceptions.java:224) at reactor.core.publisher.FluxCreate$ErrorAsyncSink.onOverflow(FluxCreate.java:708) at reactor.core.publisher.FluxCreate$NoOverflowBaseAsyncSink.next(FluxCreate.java:673) at com.example.FluxTest.lambda$test$71(FluxTest.java:609) at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95) at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)抛出异常reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)将Flux.push第二个参数设置为FluxSink.OverflowStrategy.IGNORE执行结果publish-thread--2:消费数据:0 generate thread:subscribe-thread--1 publish-thread--2:消费数据:1 出现错误 reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesnt respect backpressure at reactor.core.Exceptions.failWithOverflow(Exceptions.java:237) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:233) at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:639) at com.example.FluxTest.lambda$test$71(FluxTest.java:609) at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95) at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)抛出异常reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesnt respect backpressure将Flux.push第二个参数去掉Reactor还提供了背压的操作来实现背压策略onBackpressureBuffer()实现了FluxSink.OverflowStrategy.BUFFERonBackpressureDrop()实现了FluxSink.OverflowStrategy.DROPonBackpressureLatest()实现了FluxSink.OverflowStrategy.LATESTonBackpressureError()实现了FluxSink.OverflowStrategy.ERRORReactor文档的示意图更加直观onBackpressureBuffer对于来自其下游的request采取“缓存”策略。onBackpressureDrop元素就绪时根据下游是否有未满足的request来判断是否发出当前元素。onBackpressureLatest当有新的request到来的时候将最新的元素发出。onBackpressureError当有多余元素就绪时发出错误信号。