Post

SpringEvent扩展性利器

使用Spring Event机制可以保证高扩展性:

使用Spring Event来发布应用内部领域事件,对于事件监听器可通过注解或类的方式来扩展,Spring Event内部使用观察者模式,但api使用层面可以完全解耦事件发布和事件监听:

常用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
@Slf4j
public class ClazzHourEventListener {
    
    // 默认同步调用该方法,@Order注解编排顺序
    @EventListener
    @Order
    public void listener1(ClazzHourDepletedMemEvent event) {
        // ...
    }
    
    
    // @Async注解实现异步调用
    @Async
    @EventListener
    public void listener2(ClazzHourDepletedMemEvent event) {
        // ...
    }
    
    // 事务监听,默认在事务提交后同步执行该方法
    @TransactionalEventListener
    public void listener3(ClazzHourDepletedMemEvent event) {
        // ...
    }
}

注解实现事件监听需要考虑一下三个方面的内容:

  • 异步:如何实现异步
  • 事务:调用者的事务和监听器事务关系,包括异步情况下
  • 异常处理:异常需要捕获吗?对事务有什么影响

测试如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Service
public class BizService {

    @Autowired
    StudentMapper mapper;

    @Autowired
    ApplicationEventPublisher publisher;

    @Transactional
    public void bizAction(){
        Student student = new Student();
        student.setId(120L);
        student.setName("test1");
        mapper.insert(student);

        System.out.println("bizAction +"+Thread.currentThread().getName());
        // 发布时间
        publisher.publishEvent(new TestEvent());
    }

}

@Component
public class TestTxListener {

    @Autowired
    StudentMapper studentMapper;

    // 该步抛异常会导致后续listener无法运行
    // 该步事务和BizService中是一个,抛异常会同时回滚
    // @Order(2) 
    @EventListener
    public void listener1(TestEvent event){
        System.out.println("TestTxListener 1 +"+Thread.currentThread().getName());

        Student student = new Student();
        student.setId(121L);
        student.setName("同步调用测试tx");
        studentMapper.insert(student);
        // throw new RuntimeException();

    }


    // 异步 情况下 该步事务和BizService中不是一个,这里抛异常不影响其他listenr执行,也不影响BizService事务
    // 如果order优先级高的同步listener抛异常,这里也会执行不到
    // @Order(1)
    @Async("testTPE")
    @EventListener
    public void listener2(TestEvent event){
        System.out.println("TestTxListener 2 +"+Thread.currentThread().getName());

        Student student = new Student();
        student.setId(122L);
        student.setName("异步调用测试tx");
        studentMapper.insert(student);
        
        // throw new RuntimeException();
    }
}


@Component
public class TestTxListener {

    @Autowired
    StudentMapper studentMapper;

    // 默认同步调用,这里面跑出异常不影响其他TransactionalEventListener执行
    // TransactionalEventListener 使用 TransactionSynchronization实现
    // 这里的事务需要指定REQUIRES_NEW,或者使用编程式事务。否则无法提交,详见TransactionSynchronization    
    // The transaction will have been committed already, but the 
    // transactional resources might still be active and accessible. 
    // As a consequence, any data access code triggered at this point 
    // will still "participate" in the original transaction, allowing 
    // to perform some cleanup (with no commit following anymore!), 
    // unless it explicitly declares that it needs to run in a 
    // separate transaction. Hence: Use PROPAGATION_REQUIRES_NEW for 
    // any transactional operation that is called from here.
    // 但是,如果使用@Async,就没有这个问题,因为事务是绑定线程的,多线程propogation无作用
    @Transactional(propogation=REQUIRES_NEW)
    @Order(1)
    @TransactionalEventListener
    public void listener1(TestEvent event){
        // 同步调用
        System.out.println("TestTxListener 1 +"+Thread.currentThread().getName());


        Student student = new Student();
        student.setId(161L);
        student.setName("同步调用测试tx");
        studentMapper.insert(student);
        // 不影响其他listener执行
        throw new RuntimeException();

    }


    @Order(2)
    // @Async 可异步
    @TransactionalEventListener
    public void listener2(TestEvent event){
        System.out.println("TestTxListener 2 +"+Thread.currentThread().getName());

        Student student = new Student();
        student.setId(162L);
        student.setName("异步调用测试tx");
        studentMapper.insert(student);
    }
}

由此可见 EventListener 执行过程中遇到异常终止,则后续的同步&异步EventListener都不会执行(之前的会执行,使用@Order控制顺序),而TransactionalEventListener相互之间不受影响。所以使用EventListener要做好异常处理。此外TransactionalEventListener方法内使用事务(默认afterCommit)需要注明@Transactional( propogation=REQUIRES_NEW)或使用编程式事务,但是如果@Async异步时,就不需要指定,因为事务是绑定线程的。

ps:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
org.springframework.modulith.events.ApplicationModuleListener
 
@Async
@Transactional(propagation=REQUIRES_NEW)
@TransactionalEventListener
@Documented
@Target({METHOD,ANNOTATION_TYPE})
@Retention(RUNTIME)
public @interface ApplicationModuleListener{
    @AliasFor(annotation=org.springframework.context.event.EventListener.class,attribute="id")
    String id();
    @AliasFor(annotation=org.springframework.transaction.annotation.Transactional.class,attribute="readOnly")
    boolean readOnlyTransaction();
}
This post is licensed under CC BY 4.0 by the author.