SpringBoot3响应式编程入门
SpringBoot3响应式编程入门
博主在该知识点上不是特别熟练,在之前的工作经历中没有过多的使用场景,仅限于自己学习或者准备面试时突击一下,准备利用好这段闲暇时间记录学习阶段。
Lambda表达式
语法糖
Function函数式接口
Supplier<T> get();
Consumer<T> accept(T t);
Predicate<T> test(T t);
Function<T, R> apply(T t);
Runnable run();
StreamAPI
流的特性:懒加载
只有存在终止操作时才会启动
创建流
中间操作(intermediate operation)
parallel
开启并发,开启后自行解决多线程安全问题,推荐操作无状态数据
public class Mylambda {
public static void main(String[] args) {
var list = new ArrayList<Integer>();
list.add(1);
list.add(5);
list.add(8);
list.add(11);
list.add(23);
list.add(37);
list.stream()
.parallel() //开启并发
.filter(x -> (x & 1) == 0)
.max(Integer::compareTo)
.ifPresent(System.out::println);
}
}
filter
过滤操作
map
一对一映射
flatMap
一对多映射,打散、散列后成为新流
distinct
去重
sorted
排序
peek
偷看——消费者函数式接口
limit
限制——按照排序截取流
skip
跳过——按照排序跳过流
takeWhile
只保留流中第一个不满足条件的元素之前的所有元素
dropWhile
删除第一个不满足条件的元素之前的所有元素
等等
新流
终止操作(terminal operation)
forEach
toArray
reduce
collect
toList
min
max
count
anyMatch
findFirst
iterator
等等
结果
Reactive-Stream
JVM面向流的库的标准和规范
处理可能无限数量的元素
有序
在组件之间异步传递元素
强制性非阻塞背压模式
Publisher
发布者
Subscriber
订阅者
Subscription
订阅关系
Processor
处理器
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* MyFlow
*
* @author <b>孤寂灬无痕</b>
* <p>
* 994300880@qq.com
* @version 1.0
* @date 2024/2/5 13:17
*/
public class MyFlow {
//5.定义流中间处理器
static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println(Thread.currentThread().getName() + "#####中间处理器开始订阅");
this.subscription.request(1L);
}
@Override
public void onNext(String item) {
System.out.println(Thread.currentThread().getName() + "#####中间处理器接收到订阅消息" + item);
item = "{[加工]-" + item + "}";
submit(item);
this.subscription.request(1L);
}
@Override
public void onError(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + "#####中间处理器订阅出错消息" + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + "#####中间处理器订阅完成");
}
}
public static void main(String[] args) throws InterruptedException {
//1.定义一个发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MyProcessor myProcessor = new MyProcessor();
//2.定义一个订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription temp;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.temp = subscription;
//开始订阅
System.out.println(Thread.currentThread().getName() + "开始订阅");
this.temp.request(1L);
}
@Override
public void onNext(String item) {
//接收到订阅消息
System.out.println(Thread.currentThread().getName() + "接收到订阅消息" + item);
this.temp.request(1L);
}
@Override
public void onError(Throwable throwable) {
//订阅出错消息
System.out.println(Thread.currentThread().getName() + "订阅出错消息" + throwable.getMessage());
}
@Override
public void onComplete() {
//订阅完成
System.out.println(Thread.currentThread().getName() + "订阅完成");
}
};
//3.绑定订阅关系
publisher.subscribe(myProcessor);
myProcessor.subscribe(subscriber);
//4.发布者生产消息
for (int i = 0;i < 10; i++) {
publisher.submit("(p-" + i + ")");
}
Thread.sleep(3000L);
System.out.println("关闭发布者");
myProcessor.close();
publisher.close();
}
}
响应式编程
底层:数据缓冲队列+消息驱动模型+异步回调机制
编码:流式编程+链式调用+声明式API
效果:优雅全异步+消息实时处理+高吞吐量+占用少量资源
Reactor
Mono[0|1]
Flux[N]
响应式流:元素(内容) + 信号(完成/异常)
subscribe()
自定义流的信号感知回调
flux.subscribe(
v-> System.out.println("v = " + v), //流元素消费
throwable -> System.out.println("throwable = " + throwable), //感知异常结束
()-> System.out.println("流结束了...") //感知正常结束
);
自定义消费者
flux.subscribe(new BaseSubscriber<String>() {
// 生命周期钩子1: 订阅关系绑定的时候触发
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 流被订阅的时候触发
System.out.println("绑定了..."+subscription);
//找发布者要数据
request(1); //要1个数据
// requestUnbounded(); //要无限数据
}
@Override
protected void hookOnNext(String value) {
System.out.println("数据到达,正在处理:"+value);
request(1); //要1个数据
}
// hookOnComplete、hookOnError 二选一执行
@Override
protected void hookOnComplete() {
System.out.println("流正常结束...");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流异常..."+throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消...");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最终回调...一定会被执行");
}
});
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 孤寂灬无痕
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果