云原生微服务架构实战精讲第六节 事务事件处理和项目实例
第15讲:什么是事务性消息模式第 14 课时介绍了事件驱动设计的基本概念,在使用了消息代理之后,应用中产生的事件以消息的形式进行发布,消息的消费者接收到事件并进行处理。如果消息代理可以在传递消息时提供至少有一次的保证性,那么只要消息被成功发布,就可以确保该消息对应的事件必定会得到处理。事务性消息(Transactional Messaging)的目的是保证数据的一致性。在示例应用中,当收到创建行程
第15讲:什么是事务性消息模式
第 14 课时介绍了事件驱动设计的基本概念,在使用了消息代理之后,应用中产生的事件以消息的形式进行发布,消息的消费者接收到事件并进行处理。如果消息代理可以在传递消息时提供至少有一次的保证性,那么只要消息被成功发布,就可以确保该消息对应的事件必定会得到处理。
事务性消息(Transactional Messaging)的目的是保证数据的一致性。在示例应用中,当收到创建行程的请求之后,行程服务会把行程信息保存在关系型数据库中,同时发布表示行程已创建的事件 TripCreatedEvent。很显然,行程信息的保存和 TripCreatedEvent 事件的发布,这两个动作要么同时发生,要么同时不发生。如果只有一个动作发生,那么必然会产生数据一致性的问题。而这两个动作都可能失败,为了保证原子性,通常需要用到事务。
对于关系数据库中的事务,我们都不陌生。如果上述的两个动作是对同一个数据库中表的操作,我们使用事务就可以轻松解决。两个动作在同一个事务中,如果这两个动作都成功,事务才会被提交,否则事务会自动回滚。如果两个动作是对两个不同数据库的操作,那么也可以使用 XA 事务的两阶段提交协议(Two-Phase Commit Protocol,2PC)。
在行程服务的实现中,行程信息被保存在 PostgreSQL 数据库中,而事件发布则由 Apache Kafka 来完成。Kafka 并不支持 XA 事务,这使得无法通过 XA 事务来解决问题。为了解决这个问题,就需要用到下面介绍的事务性发件箱模式。
事务性发件箱模式
事务性发件箱(Transactional Outbox)模式使用一个数据库表来保存需要发布的事件,这个表称为事件的发件箱。通过使用这种模式,发布事件的动作被转换成一个数据库操作,因此可以使用一个本地数据库事务来保证原子性。对于保存在发件箱表中的事件,需要一个独立的消息中继进程来转发给消息代理。
下图给出了事务性发件箱模式的示意图。在行程服务对行程表进行操作时,包括插入、更新和删除操作,会同时在发件箱表中插入对应的事件记录,对这两个表的操作在同一个数据库事务中。如果对行程表的操作成功,则发件箱表中必然有对应的事件;如果对行程表的操作失败,则发件箱表中必然没有对应的事件。消息中继负责读取发件箱表中的记录,并发送事件给消息代理。
实现事务性发件箱模式的一个重要问题是如何有效读取发件箱表中的记录,一般的做法是使用下面介绍的变化数据捕获技术。
变化数据捕获
消息中继需要监控发件箱表,当有记录插入时,就需要发布消息到消息代理,这种监控数据库变化的技术称为变化数据捕获(Change Data Capture,CDC)。有很多不同的方法可以捕获到数据库表中的改动,常见的做法如下所示。
- 更新时间戳。表中包含一个字段来记录每一行的更新时间戳。在检查数据变化时,更新时间戳大于上一次捕获的时间戳的行,都是这一次需要处理的内容。
- 版本号。表中包含一个字段来记录数据的版本号。当一行的数据发生变化时,这一行的版本号被更新为当前的版本号,每次捕获变化时,选择版本号与当前版本号相同的行。当捕获完成之后,当前版本号被更新为新的值,为下一次捕获做准备。
- 状态指示符。表中包含一个字段来标记每一行是否发生了变化。
- 触发器。当表中的数据产生变化时,数据库的触发器负责往另外一个历史记录表中插入数据来记录对应的事件。在捕获变化时,只需要查询这个历史记录表即可。
- 扫描事务日志。大部分数据库管理系统使用事务日志来记录对数据库的改动。通过扫描和解析事务日志的内容,可以捕获数据的变化。
上述方法可以根据是否使用事务日志划分成两类。事务日志的好处是对数据库没有影响,也不要求对应用的表结构和代码进行修改,另外还有更好的性能。事务日志的不足之处在于,事务日志的格式并没有统一的标准,不同的数据库系统有自己的私有实现,而且会随着版本更新而变化。这就要求解析事务日志的代码需不断更新。幸运的是,有不少开源库可供使用。
事务日志
下面对 MySQL 和 PostgreSQL 中的事务日志进行具体的介绍。
MySQL
MySQL 使用二进制日志(Binary Log,binlog)来记录数据库变化,二进制日志中包含的事件描述数据库中的变化,包括表创建和表中数据的变化。二进制日志有两个重要的作用。
第一个作用是复制(Replication)。主服务器(Master)上的二进制日志包含了数据变化的记录。在进行复制时,二进制日志被发送到从服务器(Slave),从服务器通过执行日志中包含的事件来完成复制。
第二个作用是进行数据恢复。当从备份中恢复数据之后,在二进制日志文件中,该备份创建之后产生的事件会被重新执行,从而把数据恢复到最近的状态。
MySQL 支持 3 种不同的二进制日志格式,通过参数 binlog-format 来指定,如下表所示。
语句和行格式各有优势。有些 SQL 语句可能不会产生对行的改动,如找不到匹配行的 UPDATE 或 DELETE 语句。这样的 SQL 语句可以记录在语句格式日志中,但是不会出现在行格式的日志中。语句格式的问题在于,某些情况下语句的执行并没有确定的结果。
MySQL 中的变化数据捕获技术一般通过二进制日志文件的复制来完成。具体的做法是把要监控的 MySQL 数据库作为复制的主服务器,而捕获变化的客户端作为复制的从服务器,这样就可以自动获取到二进制日志文件,并解析其中的事件。
下面给出了 MySQL 8 服务器的配置文件,用来配置二进制日志文件。对于日志文件的保留时间,旧版本的 MySQL 使用 expire_logs_days 配置项,该文件应该被添加到 MySQL 服务器的 /etc/mysql/conf.d 目录中。推荐的做法是创建自定义的 Docker 镜像,也可以使用我创建的 quay.io/alexcheng1982/mysql-cdc 镜像。
[mysqld]
default-authentication-plugin = mysql_native_password
server-id = 1
log_bin = mysql-bin # 日志文件名称的前缀
binlog_expire_logs_seconds = 86400 # 日志文件保留时间
binlog_format = row # 使用行格式
在 Java 中,读取 MySQL 二进制日志文件最常用的是 mysql-binlog-connector-java 库,该库实现了 MySQL 的二进制日志文件的复制协议,可以直接连接 MySQL 服务器并解析事件。需要注意的是,连接 MySQL 服务器的用户需要具有 REPLICATION SLAVE 和 REPLICATION CLIENT 权限。从测试的角度,可以直接使用 root 用户;在实际的生产环境,应该使用专有的 MySQL 用户并配置好权限。
下面的代码是 mysql-binlog-connector-java 库的使用示例。表示事件的 Event 对象有 EventHeader 和 EventData 两个属性:EventHeader 中包含的是事件的元数据;EventData 接口的不同实现类表示不同类型的事件数据。EventType 表示事件的类型,最常见的 3 种事件类型是 EXT_WRITE_ROWS、EXT_UPDATE_ROWS 和 EXT_DELETE_ROWS,分别对应于插入行、更新行和删除行。
BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "myrootpassword");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(event -> {
EventType type = event.getHeader().getEventType();
switch (type) {
case EXT_WRITE_ROWS:
WriteRowsEventData writeData = event.getData();
String writeResult = "Insert: " + writeData.getRows().stream()
.map(Arrays::toString).collect(Collectors.joining(",\n"));
System.out.println(writeResult);
break;
case EXT_UPDATE_ROWS:
UpdateRowsEventData updateData = event.getData();
String updateResult = "Update: " + updateData.getRows().stream()
.map(entry -> String.format("before: %s, after: %s",
Arrays.toString(entry.getKey()),
Arrays.toString(entry.getValue())))
.collect(Collectors.joining(",\n"));
System.out.println(updateResult);
break;
case EXT_DELETE_ROWS:
DeleteRowsEventData deleteData = event.getData();
String deleteResult = "Delete: " + deleteData.getRows().stream()
.map(Arrays::toString).collect(Collectors.joining(",\n"));
System.out.println(deleteResult);
break;
}
});
client.connect();
下面的代码用来启动运行时需要的 MySQL 8 容器:
docker run --rm -p 3306:3306 -e MYSQL_ROOT_PASSWORD=myrootpassword quay.io/alexcheng1982/mysql-cdc
PostgreSQL
PostgreSQL 使用写提前日志(Write-Ahead Logging,WAL)来保证数据的完整性。WAL 的核心思想是对数据文件的写入,必须发生在相应的改动被记录在日志之后。当数据库崩溃之后,可以从日志中进行恢复,只需要重新应用日志中记录的改动即可。
PostgreSQL 的逻辑解码(Logical Decoding)对 WAL 中的内容进行解码,转换成特定的格式。在进行逻辑复制时,一个复制位置(Replication Slot)代表一个变化的流,可以在一个客户端中进行重放。输出插件用来把 WAL 中的内容转换成复制位置的消费者所期望的格式,逻辑解码最早在 PostgreSQL 9.4 版本中引入。下表给出了常用的输出插件,其中 pgoutput 是 PostgreSQL 10 及以上版本自带的输出插件,除了它之外的其他插件都需要手动安装并启用。
输出插件 | 格式 |
---|---|
pgoutput | PostgreSQL 逻辑复制协议 |
wal2json | JSON |
decoderbufs | Protocol Buffers |
如果使用 pgoutput 之外的输出插件,在安装对应的插件之后,需要在 postgresql.conf 文件中配置 PostgreSQL 加载对应的插件,如下面的代码所示。
shared_preload_libraries = 'decoderbufs,wal2json'
接着需要配置 PostgreSQL 复制时的逻辑解码。下面的代码给出了 postgresql.conf 文件中的相关配置。
max_wal_senders = 4 # walsender进程的最大数量
wal_level = logical # WAL文件的级别
max_replication_slots = 4 # 复制位置的最大数量
推荐使用已有的 debezium/postgres 镜像,该镜像已经安装了 wal2json 和 decoderbufs 两个输出插件。
PostgreSQL 中的变化数据捕获技术一般使用 PostgreSQL 的流复制协议来读取输出插件产生的内容。在 Java 中,PostgreSQL 的 JDBC 驱动可以直接读取相应的事件。下面的代码展示了如何使用 PostgreSQL 的 JDBC 驱动来读取数据变化事件,在连接到 PostgreSQL 数据库之后,首先创建一个名为 demo_logical_slot 的复制位置,并指定输出插件为 wal2json,接着创建一个读取该复制位置的流,最后读取流中的内容并输出。
String url = "jdbc:postgresql://localhost:5432/postgres";
Properties props = new Properties();
PGProperty.USER.set(props, "postgres");
PGProperty.PASSWORD.set(props, "postgres");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
Connection con = DriverManager.getConnection(url, props);
PGConnection replConnection = con.unwrap(PGConnection.class);
String slogName = “demo_logical_slot”;
replConnection.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(slogName)
.withOutputPlugin(“wal2json”)
.make();
PGReplicationStream stream = replConnection.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(slogName)
.withStatusInterval(20, TimeUnit.SECONDS)
.start();
while (true) {
ByteBuffer msg = stream.readPending();
if (msg == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
System.out.println(new String(source, offset, length));
}
下面的 JSON 数据对应的是往 demo 表中插入一行之后的输出,该 JSON 数据由 wal2json 插件生成。
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "demo",
"columnnames": [
"id",
"name"
],
"columntypes": [
"integer",
"character varying"
],
"columnvalues": [
5,
"a"
]
}
]
}
下面的代码用来启动运行时需要的 PostgreSQL 容器:
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=postgres debezium/postgres:12-alpine
数据库轮询
如果不能通过读取事务日志的方式来捕获数据变化,可以采用数据库轮询的形式,数据库轮询的做法是定期查询数据库中的表数据,来找出变化的行。这里需要在表中添加额外的字段,如更新时间戳、版本号或状态指示符等。
比如,可以在发件箱表中添加一个字段 published 来标明每一行对应的事件是否被发布。在每次查询时,总是选择 published 字段的值为 0 的行,并尝试发送事件到消息代理。当发送成功后,把对应行的 published 字段的值更新为 1。
CDC 库
直接读取事务日志的做法在很多时候都过于复杂,有一些开源 CDC 库可供使用,如下表所示。
名称 | 支持数据库 | 开发者 |
---|---|---|
Debezium | MySQL、PostgreSQL、Oracle、SQL Server、DB2、MongoDB、Cassandra | RedHat |
SpinalTap | MySQL | Airbnb |
maxwell | MySQL | Zendesk |
mysql_streamer | MySQL | Yelp |
DBLog | MySQL、PostgreSQL | Netflix |
Eventuate CDC | MySQL、PostgreSQL | Eventuate |
下面对 Debezium 和 Eventuate CDC 进行具体介绍。
Debezium
Debezium 是流行的开源 CDC 库,构建在 Apache Kafka 之上,提供了 Kafka Connect 兼容的连接器,可以把数据库中的变化事件发布成 Kafka 中的消息。Debezium 提供了对应不同类型的数据库连接器,只需要把连接器部署到 Kafka Connect 即可,下图是 Debezium 的架构图。
如果你的应用也使用 Kafka,那么 Debezium 是一个不错的选择。通过 Kafka Streams API 可以把 Debezium 发布的消息进行转换,并发布到其他主题中,还可以使用连接器输出到其他第三方消费者。Debezium 也支持嵌入在 Java 应用中运行。
Eventuate CDC
Eventuate CDC 是 Eventuate 平台的一部分,也是 Eventuate 提供的事务性消息框架的基础。Eventuate CDC 对 MySQL 和 PostgreSQL 使用事务日志,对其他数据库使用轮询。在发送消息时,Eventuate CDC 支持 Apache Kafka、Apache ActiveMQ、RabbitMQ 和 Redis 作为消息代理。
下图是 Eventuate CDC 的架构示意图。
Eventuate CDC 中重要组件的介绍如下表所示。
示例应用使用的是 Eventuate CDC。
实现事务性发件箱模式
在了解了变化数据捕获技术相关的内容之后,我们可以实现自己的事务性发件箱模式,不过更好的做法是使用已有的开源库。
在连接器读取到发件箱表中的数据变化之后,Debezium 可以对发布到 Kafka 中的消息进行转换,再发布到应用相关的主题中。不过 Debezium 的这个功能目前还处于开发阶段,不太适用于生产环境。
本专栏的示例应用使用的 Eventuate 提供的事务消息库。Eventuate 的事务性消息的具体内容将在下一课时中介绍。
需要注意的是,事务性发件箱模式会导致一个事件被发布至少一次。如果消息中继进程在发送事件之后崩溃,而没有机会记录下 CDC 相关的状态,当消息中继进程恢复之后,会重新处理发件箱表中的一些记录,这会导致对应的事件被重新发布。这并不是一个问题,因为 Kafka 也是提供至少有一次的消息传递的保证性,所以事件的重复是无法避免的。
总结
事务性消息对数据一致性有着至关重要的作用,它保证了对关系型数据库的修改和对应的事件的发布这两个动作的原子性。本课时介绍了事务性发件箱模式,以及实现该模式需要的变化数据捕获技术;同时还介绍了事务日志和数据库轮询这两种 CDC 实现技术,以及常用的 CDC 库 Debezium 和 Eventuate CDC。
第16讲:事件发布如何进行处理
在事件驱动的微服务中最基本的操作是发布和处理事件,事件以消息的形式发布到消息代理中,示例应用使用 Apache Kafka 作为消息代理,从实现的角度来说,我们只需要直接使用 Kafka 的客户端就可以发布和处理事件了。不过这种做法的开发效率不高,示例应用使用 Eventuate Tram 框架来提高开发效率。本课时将介绍如何使用 Eventuate Tram 框架来发布和处理事件。
事件描述
我们首要的任务是描述事件,事件一般由 3 个要素组成,即标识符、类型和载荷。标识符是事件的唯一标识,可以用来区分重复的事件;类型用来区分不同的事件,事件类型一般使用名词加上动词被动语态的形式,如 TripCreatedEvent,在 Java 中,一般使用事件类的全名作为事件的类型;事件的载荷由事件的类型来确定,事件类型可以没有载荷。
每个事件都有一个来源,表明产生该事件的对象。事件来源的类型一般是事件类型中作为前缀的名词部分,比如表示行程已创建的事件 TripCreatedEvent 的来源是行程。在介绍领域驱动设计时,我提到过聚合的概念,如果在建模时使用了聚合,那么事件的来源通常是聚合中的实体,这样的事件称为领域事件。领域事件除了上述 3 个基本属性之外,还包括事件的来源对象所在聚合的根实体的类型和标识符。比如,乘客管理服务中发布的事件,包含对应的乘客的标识符和乘客的实体类型。
在 Java 中,我们使用 Java 类来表示事件,不同的事件类之间共通的部分很少,一般使用一个标记接口(Marker Interface)来声明事件,所有的事件类只需要实现这个标记接口即可,你可能会认为事件的接口中应该包含一个 getId 方法来返回事件的标识符。实际上,事件的标识符对于事件对象本身来说并没有意义,我们只需要在发布事件的时候生成其标识符即可,并不需要把标识符添加到事件对象模型中。
Eventuate Tram 提供了标记接口 DomainEvent。下面代码中的接口 TripDomainEvent 是行程相关的事件的标记接口:
public interface TripDomainEvent extends DomainEvent {
}
事件类是实现了标记接口的 POJO 类,事件对象本身就是事件的载荷格式。当发布事件时,只需要把当前的事件对象序列化,就得到了事件的载荷。事件对象在序列化时,可以使用 JSON 和 XML 这样的文本格式,也可以使用 Protocol Buffers 和 Apache Avro 这样的二进制格式。
下面的代码给出了 TripCreatedEvent 事件类的定义:
@Data
@NoArgsConstructor
@RequiredArgsConstructor
public class TripCreatedEvent implements TripDomainEvent {
private TripDetails tripDetails;
}
发布事件
当发布事件时,只需要创建对应事件类的一个新对象即可,实际的事件发布由 Eventuate Tram 来完成。下面代码中的 TripDomainEventPublisher 类用来发布行程相关的领域事件。TripDomainEventPublisher 类继承自抽象类 AbstractAggregateDomainEventPublisher,AbstractAggregateDomainEventPublisher 类包含了发布聚合对应的领域事件的基本逻辑。
public class TripDomainEventPublisher extends
AbstractAggregateDomainEventPublisher<Trip, TripDomainEvent> {
public TripDomainEventPublisher(
DomainEventPublisher eventPublisher) {
super(eventPublisher, Trip.class, Trip::getId);
}
}
AbstractAggregateDomainEventPublisher<A, E extends DomainEvent> 类有两个类型参数,A 表示聚合的类型,E 表示领域事件的类型。它的构造器需要 3 个参数,如下表所示。
TripDomainEventPublisher 类的构造器只需要提供 DomainEventPublisher 接口的实现。aggregateType 参数的值为 Trip.class,而 idSupplier 函数则是调用 Trip 类的 getId 方法。
当需要发布事件时,使用 TripDomainEventPublisher 的 publish 方法。publish 方法有两个参数:第一个参数是聚合对象,第二个参数是需要发布的事件列表。
下面以创建行程的过程为例来进行说明,代码是 Trip 类中的 createTrip 方法,用来创建 Trip 对象和需要发布的事件。createTrip 方法的返回值类型 ResultWithDomainEvents 是聚合对象和事件列表的一个封装,这里封装了一个 Trip 对象和 TripCreatedEvent 事件。
public static ResultWithDomainEvents<Trip, TripDomainEvent> createTrip(String passengerId,
PositionVO startPos, PositionVO endPos) {
Trip trip = new Trip(passengerId, startPos, endPos);
TripCreatedEvent event = new TripCreatedEvent(new TripDetails(passengerId, startPos, endPos));
return new ResultWithDomainEvents<>(trip, event);
}
下面的代码是 TripService 中的 createTrip 方法。首先使用 Trip 类的 createTrip 方法创建出 Trip 对象和 TripCreatedEvent 事件,接着使用 TripRepository 来保存 Trip 对象,最后使用 TripDomainEventPublisher 来发布事件。TripService 类添加了 @Transactional 注解,保证了 createTrip 方法的调用在一个事务中完成。
public TripVO createTrip(String passengerId, PositionVO startPos, PositionVO endPos) {
ResultWithDomainEvents<Trip, TripDomainEvent> tripAndEvents = Trip
.createTrip(passengerId, startPos, endPos);
Trip trip = tripAndEvents.result;
tripRepository.save(trip);
tripDomainEventPublisher.publish(trip, tripAndEvents.events);
return trip.toTripVO();
}
TripDomainEventPublisher 在发布事件时通过 MessageProducer 接口来完成。MessageProducer 是一个通用的消息发送接口,它的 send 方法用来发送消息,该方法有两个参数:第一个参数是消息的目的地,第二个参数是 Message 接口表示的消息。Message 接口描述了消息中的 3 个组成部分,即标识符、消息头和载荷,消息头是一个哈希表,包含消息头的名称和对应的值,其作用是包含消息的元数据。
TripDomainEventPublisher 在发布事件时,需要从聚合对象和事件对象中构建出消息。事件对象的 JSON 序列化结果,作为消息的载荷。聚合对象的标识符和类型,以及事件对象的类型都添加到消息头中。聚合对象的类型作为消息的目的地。
由于使用了事务性发件箱模式,MessageProducer 发布的消息会被保存在关系型数据库中。发件箱表的名称是 message,下表给出了 message 表中的字段。
下面代码中的 JSON 数据是 message 表 headers 字段的内容示例:
{
"PARTITION_ID": "05f5e7e8-5a46-4da3-bd29-bb44d5f8b34d",
"event-aggregate-type": "io.vividcode.happyride.tripservice.domain.Trip",
"DATE": "Fri, 1 May 2020 01:06:14 GMT",
"event-aggregate-id": "05f5e7e8-5a46-4da3-bd29-bb44d5f8b34d",
"event-type": "io.vividcode.happyride.tripservice.api.events.TripCreatedEvent",
"DESTINATION": "io.vividcode.happyride.tripservice.domain.Trip",
"ID": "00000171cdc508b4-94b86dfe66ea0000"
}
对于发布到 Kafka 中的消息,主题来自于 message 表中的 destination 字段,分片 ID 来自于 headers 字段中消息头中的 PARTITION_ID,消息的内容是包含 headers 和 payload 两个属性的 JSON 数据。
处理事件
由于事件以消息的形式发布到 Kafka,那么处理事件时则需要订阅 Kafka 中的主题,并消费其中的消息。我们需要创建一个 DomainEventDispatcher 类的对象,来负责消费主题中的消息并处理。DomainEventDispatcher 类使用 MessageConsumer 接口来订阅主题,对事件的处理逻辑由 DomainEventHandlers 类来声明,DomainEventHandlers 对象包含了一个 DomainEventHandler 对象的列表,DomainEventHandler 类的构造器的参数如下表所示。其中 DomainEventEnvelope 接口封装了事件的相关信息,包括事件对象、聚合 ID、聚合类型、事件 ID 和消息对应的 Message 对象。
一般使用构建器 DomainEventHandlersBuilder 类来创建 DomainEventHandlers 对象。在下面的代码中,DomainEventHandlersBuilder 类的 forAggregateType 方法指定了 DomainEventHandler 对象的聚合类型,而 onEvent 方法则对不同类型的事件指定了处理器。这里我指定了 TripConfirmedEvent 事件的处理器是 onTripConfirmed 方法。
public class DispatchServiceEventConsumer {
@Autowired
DispatchService dispatchService;
private static final Logger LOGGER = LoggerFactory
.getLogger(DispatchServiceEventConsumer.class);
public DomainEventHandlers domainEventHandlers() {
return DomainEventHandlersBuilder
.forAggregateType(“io.vividcode.happyride.tripservice.domain.Trip”)
.onEvent(TripConfirmedEvent.class, this::onTripConfirmed)
.build();
}
private void onTripConfirmed(DomainEventEnvelope<TripConfirmedEvent> envelope) {
TripDetails tripDetails = envelope.getEvent().getTripDetails();
try {
dispatchService.dispatchTrip(envelope.getAggregateId(), tripDetails);
} catch (Exception e) {
LOGGER.warn(“Failed to dispatch trip {}”, envelope.getAggregateId(), e);
}
}
}
下面代码是与事件处理相关的 Spring 配置类。DomainEventDispatcher 类的对象由 DomainEventDispatcherFactory 类的 make 方法来创建,make 方法的第一个参数是 DomainEventDispatcher 对象的标识符,第二个参数是 DomainEventHandlers 对象。
@Configuration
@Import(TramEventSubscriberConfiguration.class)
public class DispatchServiceMessageHandlersConfiguration {
public DispatchServiceEventConsumer dispatchServiceEventConsumer() {
return new DispatchServiceEventConsumer();
}
public DomainEventDispatcher domainEventDispatcher(
DispatchServiceEventConsumer dispatchServiceEventConsumer,
DomainEventDispatcherFactory domainEventDispatcherFactory) {
return domainEventDispatcherFactory
.make(“dispatchServiceEvents”,
dispatchServiceEventConsumer.domainEventHandlers());
}
}
重复事件处理
消息在传递时提供的是至少一次的保证性,虽然不会丢失消息,但是会产生重复消息。这就意味着,对于同一个事件,它的处理器可能会被调用多次,对于重复事件的问题,一般有两种解决办法。
第一种做法是使用幂等( Idempotent)处理器,其含义是,对于同一个事件,多次调用处理器不会产生副作用。如果一个事件处理器是幂等的,那就不需要对重复事件进行额外处理,并不是所有的处理器都是幂等的。幂等的处理器需要满足业务逻辑和具体实现两方面的要求。业务逻辑指的是对事件的重复处理在业务上是可行的,具体实现指的是代码实现对于重复事件在处理时不会出错。以订单取消的事件为例,从业务逻辑上来说,一个订单被取消多次是没有问题的,取消一个已经被取消的订单并没有什么影响。在实现上,代码也需要考虑到处理重复事件的情况。
第二种做法是去掉重复的事件。每个事件都有自己的标识符,只需要记录下已经处理过的事件标识符,就可以去掉重复的事件。Eventuate Tram 提供了检测重复消息的功能,DuplicateMessageDetector 接口用来检测重复的消息,接收到消息的标识符被保存在 received_messages 表中。当需要处理新消息时,首先尝试往 received_messages 表中插入新的记录,如果插入时出现重复键的异常,就说明消息已经被处理过。
命令
命令与一般消息的不同之处在于,命令有对应的返回结果,命令的返回结果用另外一个消息来表示。命令类都需要实现标记接口 Command,命令通过 CommandProducer 接口的 send 方法来发送。下面代码给出了 CommandProducer 接口的声明,两个 send 方法的区别在于是否可以使用 resource 参数。
public interface CommandProducer {
String send(String channel, Command command, String replyTo, Map<String, String> headers);
String send(String channel, String resource, Command command, String replyTo, Map<String, String> headers);
}
下表给出了 send 方法的参数及其说明。
CommandProducer 接口的实现在发送命令时,也创建出了对应的 Message 对象,只不过消息的头中包含的内容不同。消息的目的地是发送命令时指定的通道。
处理命令的工作由 CommandDispatcher 类来完成。CommandDispatcher 类使用 MessageConsumer 接口来接收消息,同时使用 MessageProducer 接口来发送命令的回应消息。对命令的处理逻辑由 CommandHandlers 类来声明,CommandHandlers 对象包含一个 CommandHandler 对象的列表。CommandHandler 类的构造器参数如下表所示。
命令处理器对应的函数有两个参数:第一个参数是封装命令的 CommandMessage 对象,第二个参数是包含资源路径的实际参数值的 PathVariables 对象。函数的返回值是作为回应消息的 Message 对象的列表。
一般使用构建器 CommandHandlersBuilder 类来创建 CommandHandlers 对象。在下面的代码中,CommandHandlers 对象对 QueryWeatherCommand 类型的命令,使用 queryWeather 方法进行处理。withSuccess 方法的作用是创建一个成功的回应消息,与之对应的 withFailure 方法用来创建一个失败的回应消息。
@Component
public class WeatherCommandHandlers {
public CommandHandlers commandHandlers() {
return CommandHandlersBuilder.fromChannel(“weather”)
.onMessage(QueryWeatherCommand.class, this::queryWeather)
.build();
}
private Message queryWeather(CommandMessage<QueryWeatherCommand> cm,
PathVariables pathVariables) {
return withSuccess(
new QueryWeatherResult(cm.getCommand().getCity(), “Rain”));
}
}
有了 CommandHandlers 对象之后,就可以创建 CommandDispatcher 对象。下面的代码是对应的 Spring 配置类,CommandDispatcherFactory 类的 make 方法用来创建 CommandDispatcher 对象。
@Configuration
@Import({TramCommandProducerConfiguration.class,
TramCommandConsumerConfiguration.class})
public class WeatherConfiguration {
public CommandDispatcher weatherCommandDispatcher(
CommandDispatcherFactory commandDispatcherFactory,
WeatherCommandHandlers weatherCommandHandlers) {
return commandDispatcherFactory
.make(“weatherCommandDispatcher”,
weatherCommandHandlers.commandHandlers());
}
}
下面的代码展示了如何发布命令和获取回应。使用 CommandProducer 接口发送 QueryWeatherCommand 命令到通道 weather,而命令回应的发送通道是 weather-reply。使用 MessageConsumer 接口订阅 weather-reply 通道就可以得到命令的回应消息。
public void testWeatherCommand() {
messageConsumer
.subscribe("weather-subscriber", Collections.singleton("weather-reply"),
message -> {
QueryWeatherResult result = JSonMapper
.fromJson(message.getPayload(), QueryWeatherResult.class);
System.out.println(result.getResult());
});
commandProducer
.send(“weather”, new QueryWeatherCommand(“Beijing”), “weather-reply”,
new HashMap<>());
}
资源路径的作用类似于 REST API 中的 URL 路径,资源路径中可以包含参数。在处理命令时,可以获取到资源路径中参数的实际值。
在下面的代码中,CommandHandlers 对象的资源路径是 /user/{username},其中 username 是参数。在对应的处理方法 echo 中,可以使用 PathVariables 对象来获取到参数 username 的实际值。
@Component
public class EchoCommandHandlers {
public CommandHandlers commandHandlers() {
return CommandHandlersBuilder.fromChannel(“echo”)
.resource(“/user/{username}”)
.onMessage(EchoCommand.class, this::echo)
.build();
}
private Message echo(CommandMessage<EchoCommand> cm,
PathVariables pathVariables) {
return withSuccess("echo -> " + pathVariables.getString(“username”));
}
}
在下面的代码中,当使用 CommandProducer 接口的 send 方法发送命令时,提供了资源路径 /user/alex,其中 alex 是参数 username 的实际值。
public void testEchoCommand() {
messageConsumer
.subscribe("echo-subscriber", Collections.singleton("echo-reply"),
message -> {
System.out.println(message.getPayload());
});
commandProducer
.send(“echo”, “/user/alex”, new EchoCommand(), “echo-reply”,
new HashMap<>());
}
配置 CDC
使用 Eventuate Tram 发布和处理事件时,必须使用 Eventuate CDC 服务,我们只需要启动 CDC 服务对应的 Docker 容器即可。
下面的代码是开发环境的 Docker Compose 文件中与 CDC 服务相关的内容。CDC 服务的配置通过环境变量来完成,以 EVENTUATE_CDC_READER_TRIP 开头的环境变量用来配置读取行程数据库的名为 TRIP 的读取器,以 EVENTUATE_CDC_PIPELINE_PIPELINE1 开头的环境变量用来配置名为 PIPELINE1 的流水线。流水线和读取器的对应关系由 EVENTUATE_CDC_PIPELINE_PIPELINE1_READER 变量来指定。
cdc-service:
image: eventuateio/eventuate-cdc-service:0.6.0.RELEASE
ports:
- "9090:8080"
depends_on:
- postgres-trip
- kafka
- zookeeper
environment:
SPRING_PROFILES_ACTIVE: PostgresWal
LOGGING_LEVEL_IO_EVENTUATE: INFO
EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:<span class="hljs-number">9092</span>
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:<span class="hljs-number">2181</span>
EVENTUATE_CDC_PIPELINE_PIPELINE1_TYPE: eventuate-tram
EVENTUATE_CDC_PIPELINE_PIPELINE1_READER: Trip
EVENTUATE_CDC_PIPELINE_PIPELINE1_EVENTUATEDATABASESCHEMA: eventuate
EVENTUATE_CDC_READER_TRIP_TYPE: postgres-wal
EVENTUATE_CDC_READER_TRIP_DATASOURCEURL: jdbc:postgresql:<span class="hljs-comment">//postgres-trip/happyride-trip</span>
EVENTUATE_CDC_READER_TRIP_DATASOURCEUSERNAME: postgres
EVENTUATE_CDC_READER_TRIP_DATASOURCEPASSWORD: postgres
EVENTUATE_CDC_READER_TRIP_DATASOURCEDRIVERCLASSNAME: org.postgresql.Driver
EVENTUATE_CDC_READER_TRIP_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/trip
总结
事件驱动的微服务在实现中离不开事件的发布和处理,Eventuate Tram 框架简化了与事件的发布和处理相关的操作。本课时介绍了如何描述事件、发布事件和处理事件,还介绍了如何对重复事件进行处理,以及如何使用命令,最后介绍了如何配置 Eventuate CDC 服务。
第17讲:如何设计与实现事件源(Event ourcing)
本课时主要讲解如何设计与实现事件源(Event Sourcing)。
在第 15 课时,我介绍了事务性消息模式的使用。事务性消息模式的出发点是解决应用中可能会出现的数据一致性问题,数据一致性问题在微服务架构的应用中尤其明显。这是因为微服务相互独立,并且一般使用各自独立的数据存储,每个微服务负责维护各自的数据集,同时与其他微服务进行协作来更新相关的数据。
在事务性消息模式中,对当前微服务数据的修改由数据库操作来完成,而与其他微服务的协作则由事件来完成。这种把数据和事件分离的做法,有其实现上的复杂度。本课时介绍的事件源(Event Sourcing)技术从另外一个维度来解决这个问题,下面介绍事件源技术的基本概念。
事件源技术
数据一致性问题的根源在于对象状态与事件的分离,对象的当前状态保存在数据库中,而事件则在对象的状态发生变化时被发布。事件源技术的核心在于使用事件来捕获对对象状态的修改,这些事件按照发生的时间顺序来保存。当需要获取对象的当前状态时,只需要从一个初始状态的对象开始,然后对该对象依次应用保存的事件即可,这个过程的最终结果就是对象的当前状态。
最典型的事件源技术的例子是银行的账户管理操作,银行账户对象 Account 维护着当前的余额这一状态值。取款和存款这两个不同的操作会对账户对象的状态产生影响,并改变当前的余额。在一般的面向对象实现中,Account 对象的 balance 属性维护当前的余额。
下面的代码是 Account 类的声明,其中 credit 和 debit 方法分别对应于存款和取款操作,这两个方法都会对 balance 属性进行修改。这也是常见的面向对象设计的实现方式。
public class Account {
private final String id;
private MonetaryAmount balance = this.ofAmount(BigDecimal.ZERO);
public Account(final String id) {
this.id = id;
}
public String getId() {
return this.id;
}
public MonetaryAmount getBalance() {
return this.balance;
}
public void credit(final BigDecimal amount) {
this.balance = this.balance.add(this.ofAmount(amount));
}
public void debit(final BigDecimal amount) {
this.balance = this.balance.subtract(this.ofAmount(amount));
}
private MonetaryAmount ofAmount(final BigDecimal amount) {
return Money.of(amount, “CNY”);
}
}
下面代码中的 TransactionalMessagingAccountService 使用了事务性消息模式,在修改 Account 对象的状态之后,发布对应的事件。第 16 课时已经详细介绍了事务性消息模式的实现,这里只是使用了这个模式。
public class TransactionalMessagingAccountService implements AccountService {
private final AccountRepository accountRepository;
private final EventPublisher eventPublisher;
public TransactionalMessagingAccountService(
final AccountRepository accountRepository,
final EventPublisher eventPublisher) {
this.accountRepository = accountRepository;
this.eventPublisher = eventPublisher;
}
public void credit(final String accountId, final BigDecimal amount) {
this.accountRepository.findById(accountId).ifPresent(account -> {
account.credit(amount);
this.accountRepository.save(account);
this.eventPublisher
.publish(new AccountCreditedEvent(accountId, this.ofAmount(amount)));
});
}
public void debit(final String accountId, final BigDecimal amount) {
this.accountRepository.findById(accountId).ifPresent(account -> {
account.debit(amount);
this.accountRepository.save(account);
this.eventPublisher
.publish(new AccountDebitedEvent(accountId, this.ofAmount(amount)));
});
}
private MonetaryAmount ofAmount(final BigDecimal amount) {
return Money.of(amount, “CNY”);
}
}
在使用事件源技术时,我们使用事件来描述对对象状态的修改。下面代码中的 DomainEvent 接口是所有事件类的接口,其中的 getTimestamp 方法的作用是返回事件发生的时间戳。
public interface DomainEvent {
long getTimestamp();
}
下面代码中的 AccountEvent 接口是与账户相关的事件类公共接口,其中 getAccountId 方法返回产生该事件的 Account 对象标识符,而 getAmount 方法则返回事件相关的金额。
public interface AccountEvent extends DomainEvent {
String getAccountId();
MonetaryAmount getAmount();
}
下面代码中的 AccountCreditedEvent 类表示账户存款事件。
public class AccountCreditedEvent extends AbstractAccountEvent {
public AccountCreditedEvent(final String accountId,
final MonetaryAmount amount) {
super(accountId, amount);
}
}
下面代码中的 EventSourcingAccountService 类是使用事件源技术的 AccountService 接口的实现,其中的 credit 和 debit 方法都只是调用 EventRepository 类的 addEvent 方法来保存事件。通过比较两个不同的 AccountService 实现可以发现,EventSourcingAccountService 并不保存对象状态,而只是发布事件。这样就避免了对象状态和事件发布之间可能存在的不一致问题。
public class EventSourcingAccountService implements AccountService {
private final EventRepository eventRepository;
public EventSourcingAccountService(
final EventRepository eventRepository) {
this.eventRepository = eventRepository;
}
public void credit(final String accountId, final BigDecimal amount) {
this.eventRepository
.addEvent(new AccountCreditedEvent(accountId, this.ofAmount(amount)));
}
public void debit(final String accountId, final BigDecimal amount) {
this.eventRepository
.addEvent(new AccountDebitedEvent(accountId, this.ofAmount(amount)));
}
private MonetaryAmount ofAmount(final BigDecimal amount) {
return Money.of(amount, “CNY”);
}
}
查询对象状态
事件源技术实现中的一个重要的问题是如何查询对象的当前状态,对于银行账户对象来说,我们需要知道账户的当前余额是多少。我们只需要从对象的初始状态开始,按照时间顺序依次应用不同事件所对应的改动,最终得到的结果就是对象的当前状态。
下面代码中的 Account 是表示银行账户的对象类,其中,apply 方法表示应用不同类型的事件所对应的改动。比如,AccountCreditedEvent 表示的是存款事件,在对应的 apply 方法中,余额 balance 需要加上事件中包含的金额。而 AccountDebitedEvent 事件对应的 apply 方法中,余额 balance 被减去事件中包含的金额。
public class Account {
private final String id;
private MonetaryAmount balance = Money.of(BigDecimal.ZERO, “CNY”);
public Account(final String id) {
this.id = id;
}
public String getId() {
return this.id;
}
public MonetaryAmount getBalance() {
return this.balance;
}
public void apply(final AccountCreditedEvent event) {
this.balance = this.balance.add(event.getAmount());
}
public void apply(final AccountDebitedEvent event) {
this.balance = this.balance.subtract(event.getAmount());
}
}
在获取对象的状态之前,首先需要找到相关的事件。在下面的代码中,EventRepository 类的 query 方法用来查询与某个 Account 对象相关的事件。在查询时,除了 Account 对象的标识符之外,还可以提供一个时间戳,用来查询特定时间点之前或之后发生的事件。如果提供两个时间戳,还可以查询两个时间点中间发生的事件。
public class EventRepository {
private final List<AccountEvent> events = new ArrayList<>();
public void addEvent(final AccountEvent event) {
this.events.add(event);
}
public List<AccountEvent> query(final String accountId) {
return Stream.ofAll(this.events)
.filter(event -> Objects.equals(accountId, event.getAccountId()))
.toJavaList();
}
public List<AccountEvent> queryBefore(final String accountId,
final long timestamp) {
return Stream.ofAll(this.events).takeWhile(
event -> Objects.equals(accountId, event.getAccountId())
&& event.getTimestamp() <= timestamp
).toJavaList();
}
public List<AccountEvent> queryAfter(final String accountId,
final long timestamp) {
return Stream.ofAll(this.events)
.dropWhile(event -> event.getTimestamp() <= timestamp)
.filter(event -> Objects.equals(accountId, event.getAccountId()))
.toJavaList();
}
public List<AccountEvent> queryBetween(final String accountId,
final long startTimestamp,
final long endTimestamp) {
return Stream.ofAll(this.events)
.dropWhile(event -> event.getTimestamp() < startTimestamp)
.takeWhile(event -> event.getTimestamp() < endTimestamp)
.filter(event -> Objects.equals(accountId, event.getAccountId()))
.toJavaList();
}
}
下面代码中的 AccountQuery 类用来查询 Account 对象。首先使用 EventRepository 类的 query 方法查询到相关的事件列表,再调用 applyEvents 方法来应用事件。在应用事件时,从一个新建的 Account 对象开始,再对事件列表中的每个 AccountEvent 事件,调用 apply 方法来应用事件对应的改动。在 apply 方法中,通过 Java 的反射 API 来找到 Account 对象中用来处理对应类型的事件的方法,并调用该方法来处理事件。applyEvents 方法的返回值就是包含了最新状态的 Account 对象。
public class AccountQuery {
private final EventRepository eventRepository;
private static final Logger LOGGER = LoggerFactory
.getLogger(AccountQuery.class);
public AccountQuery(
final EventRepository eventRepository) {
this.eventRepository = eventRepository;
}
public Account getAccount(final String accountId) {
return this.applyEvents(accountId,
this.eventRepository.query(accountId));
}
public Account getAccount(final String accountId, final long timestamp) {
return this.applyEvents(accountId,
this.eventRepository.queryBefore(accountId, timestamp));
}
private Account applyEvents(final String accountId, final List<AccountEvent> events) {
final Account account = new Account(accountId);
events.forEach(event -> this.apply(account, event));
return account;
}
private void apply(final Account account, final AccountEvent event) {
try {
final Method method = Account.class.getMethod(“apply”, event.getClass());
method.invoke(account, event);
} catch (final NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
LOGGER.warn(“Ignore event without handler {}”, event, e);
}
}
}
由于每个事件都有自己的时间戳,我们可以查询到一个对象在任何时间点上的状态,只需要从一个初始状态开始,然后仅应用发生在给定时间点之前的事件即可。EventRepository 类已经提供了 queryBefore 方法来指定查询时间戳,这给了我们一个强大的类似时间机器的追溯功能。如果用户想知道上个月底时的账户余额是多少,只需要把查询的时间戳设置为上个月底的午夜,查询的返回结果就是包含了当时的余额的 Account 对象。
快照
使用事件来表示对对象状态的修改之后,查询对象的状态变得复杂,需要依次应用所有的事件。当事件的数量非常大时,查询操作的性能会变低,这是因为每次都需要从初始状态开始遍历全部的事件。快照(Snapshot)的作用是提高查询状态时的性能,快照可以看成是一次状态查询的结果,在执行查询操作之后得到的对象状态被保存成快照。之后的查询操作不再需要从初始状态开始,而是从最近的快照开始,再应用快照保存之后产生的事件即可。在使用了快照之后,每次查询操作所要处理的事件数量可以控制在一个合理的范围。
快照可以保存在内存中或磁盘上。由于快照可以从历史事件中重新创建,丢失快照并不是一个严重的问题,因此保存在内存中是很合理的,性能也很好。也可以定期把快照保存在磁盘上,这样当应用崩溃之后,可以从磁盘上保存的快照中快速恢复状态,而不用从头开始应用全部事件来创建新的快照。
下面代码中的 Snapshot 是实现快照的一个基本接口。Snapshot 接口封装了一个类型为 V 的对象,以及快照创建的时间戳。
public interface Snapshot<V> {
V getValue();
long getTimestamp();
}
对于 Account 对象相关的快照,下面代码中的 AccountSnapshotRepository 类提供了基于内存中的哈希表的存储。AccountSnapshot.createBlank 方法的作用是创建一个新的 Account 对象的快照,其中时间戳的值为 0。
public class AccountSnapshotRepository {
private final Map<String, AccountSnapshot> snapshots = new HashMap<>();
public void save(final Account account) {
this.snapshots.put(account.getId(), new AccountSnapshot(account));
}
public AccountSnapshot get(final String accountId) {
return Optional.ofNullable(this.snapshots.get(accountId))
.orElse(AccountSnapshot.createBlank(accountId));
}
}
进行对象查询的 AccountQuery 类也需要进行相应的修改,来使用 AccountSnapshotRepository 对象中保存的快照。在 doGetAccount 方法中,首先从 AccountSnapshotRepository 对象中得到对应的 Account 对象最近的快照,然后使用 EventRepository 对象的 queryAfter 方法查询到快照创建的时间戳之后产生的事件,最后在快照中的 Account 对象上应用这些事件,就得到了 Account 对象的最新状态。在 postProcess 方法中,把上一次查询时得到的 Account 对象保存在 AccountSnapshotRepository 中,提供给下一次查询使用。
public class AccountQuery {
public Account getAccount(final String accountId) {
return this.postProcess(this.doGetAccount(accountId));
}
private Account doGetAccount(final String accountId) {
final AccountSnapshot snapshot = this.snapshotRepository.get(accountId);
return this.applyEvents(snapshot.getValue(),
this.eventRepository.queryAfter(accountId, snapshot.getTimestamp()));
}
private Account postProcess(final Account account) {
this.snapshotRepository.save(account);
return account;
}
}
事件反转
由于所有对对象状态的修改都由事件对象来保存,如果产生了错误的事件,可以很容易就进行纠正。对于一个事件,除了可以应用事件所对应的修改之外,还可以反转事件所对应的修改。比如,对于 AccountCreditedEvent 事件,在进行反转时,执行的是取款操作。在一个事件序列中,如果某个事件的产生是错误的,只需要对这个事件及其之后的事件都进行反转操作,再重新应用正确的事件以及之后产生的事件,所得到的结果就是正确的状态。
举例来说,如果事件序列中的一个 AccountCreditedEvent 事件的金额发生了错误,那么这个事件及其之后的事件都要被反转,再重新应用一个新的金额正确的 AccountCreditedEvent 事件,最后再重新应用该 AccountCreditedEvent 事件之后产生的相关事件,就可以得到 Account 对象的正确状态。
在查询对象的状态时,可以从初始状态开始,依次应用事件。当处理到错误的事件时,用正确的事件替换即可。如果保存了对象的快照,可以从错误事件发生之前的最近一个快照开始。
在下面的代码中,Account 类的 reverse 方法用来对不同类型的事件进行反转操作。
public class Account {
public void reverse(final AccountCreditedEvent event) {
this.balance = this.balance.subtract(event.getAmount());
}
public void reverse(final AccountDebitedEvent event) {
this.balance = this.balance.add(event.getAmount());
}
}
与外部系统交互
事件源技术的最大优势在于可以随时重放事件,有些事件在应用时会调用外部系统提供的服务来进行修改操作。当进行事件重放时,这些对外部系统服务的调用是不应该发生的,这就需要在事件的正常处理和重放时,对外部系统的调用采用不同的策略。比如,AccountCreditedEvent 事件的处理逻辑会发送短信通知给用户,告知有新的资金入账。当这个事件被重放时,并不需要发送短信通知。
推荐的做法是把所有与外部系统的交互都封装在网关(Gateway)中,网关的实现会根据事件处理的状态来确定是否发送实际的调用给外部系统。
与调用外部系统服务相关的是,事件处理时依赖外部系统提供的数据。比如,如果银行账户的存款操作支持不同的货币,假设 AccountCreditedEvent 事件中的金额使用的不是人民币,在处理该事件时,则需要根据当时的汇率来得到人民币的金额。当进行事件重放时,我们需要的是事件产生时的汇率值来完成处理,而不是重放事件时的汇率值,这就要求外部系统支持历史数据的查询。如果外部系统不支持查询历史数据,可以在网关中保存全部调用的结果。
下面代码中的 ExchangeRateGateway 展示了获取汇率网关的实现。ExchangeRateGateway 会保存每一个 AccountCreditedEvent 对象对应的汇率查询结果,不论在什么时候执行事件重放操作,ExchangeRateGateway 会保证 query 方法返回正确的结果。
public class ExchangeRateGateway {
private final ExternalExchangeRateService exchangeRateService = new ExternalExchangeRateService();
private final Map<AccountCreditedEvent, BigDecimal> results = new HashMap<>();
public BigDecimal query(final AccountCreditedEvent event) {
final BigDecimal oldResult = this.results.get(event);
if (oldResult != null) {
return oldResult;
}
final BigDecimal result = this.doQuery(event);
this.results.put(event, result);
return result;
}
public BigDecimal doQuery(final AccountCreditedEvent event) {
final String currencyCode = event.getAmount().getCurrency()
.getCurrencyCode();
if (!Objects.equals(currencyCode, “CNY”)) {
return this.exchangeRateService.query(currencyCode, “CNY”);
}
return BigDecimal.ONE;
}
}
代码更新
随着应用版本的更新,对于同样类型的事件处理逻辑也有可能发生变化。下面讨论三种不同的代码更新方式。
第一种代码更新是增加新功能,新功能并不影响之前已经处理过的事件。当代码更新之后,新增的功能会对更新之后产生的事件生效。如果新功能对之前的事件也适用,只需要重放之前的事件即可,这是事件源技术的一个强大功能。比如,银行系统添加了一个新的功能,可以智能分析付款记录并进行归类。在代码更新之后,只需要重放所有的 AccountDebitedEvent 事件,就可以完成对历史付款记录的处理。
第二种代码更新是修复 bug。在 bug 被修复之后,只需要重放事件,对象的状态就会被自动修复,如果 bug 涉及到外部系统,那么需要根据 bug 的情况来具体分析,采取不同的策略。比如,如果系统对 AccountDebitedEvent 事件的处理方式出现了 bug,导致了错误的短信通知被发送给客户,那么 bug 修复完成之后,则需要发送新的通知来告知用户之前的通知是错误的。在另外的情况下,网关会需要计算 bug 修复前后的差异性来进行补偿。比如,如果转账事件的处理代码中的 bug 造成了调用第三方服务时的金额产生了错误,则需要计算出相应的差额来执行多退少补的操作。这些补偿操作由网关来完成,对之前所有受到 bug 影响的事件都需要执行一次。
第三种代码更新是与时间相关的代码处理逻辑。比如 AccountDebitedEvent 事件的处理逻辑中需要扣除取款操作的手续费,而手续费的金额会随着时间而变化,这就要求领域模型可以根据事件的发生时间来应用对应的处理策略。最简单的做法是用一系列 if-else 语句来根据事件的发生时间,返回不同的值。
审计日志
事件源技术的一个非常重要的应用场景是实现审计日志(Audit Log),审计日志在很多涉及敏感数据的系统中至关重要。这些系统要求对数据的所有修改都需要记录下来,方便以后查询。在使用事件源技术之后,保存的事件序列实际上就形成了审计日志,这是使用事件源技术带来的直接好处。
事件存储
事件源技术在实现时的一个重要考虑是事件的持久化存储。由于事件是有序,而且不可变的,我们可以利用这些特性实现高效的事件存储,典型的实现是采用只追加(Apppend Only)的数据存储。当新的事件产生,只是往事件日志中追加记录,由于事件是不可变的,不需要考虑已有事件的更新。从实现上来说,事件存储类似于数据库中的事务日志,以及时间序列数据库(Time Series Database)对数据的存储。
总结
事件源技术使用事件来保存所有对状态的修改。通过事件的重放,可以实现很多强大的功能,如查询对象在任意时刻的状态。本课时介绍了事件源技术的基本概念,包括事件的发布和对象状态的查询。除此之外,还讨论了快照的使用、事件反转、与外部系统交互、代码更新、审计日志和事件存储等相关的内容。
第18讲:如何处理司机位置更新事件
在第 17 课时中,我介绍了事件源技术的基本概念,并提供了基本的 Java 实现。在实际的开发中,我们通常使用已有的开源框架来实现事件源,不同的编程语言和平台都有事件源的开源实现。下表给出了几个比较流行的开源实现,示例应用使用的是 Axon。
示例应用中使用事件源技术的场景是司机位置的更新。本课时将介绍如何使用 Axon 来更新司机的位置,除了本课时之外,在介绍 CQRS 技术的课时中,也会用到 Axon。
在打车应用中,司机需要实时更新其当前位置。司机的位置信息是派发行程的重要依据,同时也是很多安全相关的功能的基础,打车应用的司机 App 会定期读取 GPS 中的位置信息,并发送给服务器。这属于典型的事件应用的场景。示例应用中虽然没有司机 App,但是有司机模拟器来模拟司机的行为,也会同样发送被模拟的司机位置信息给服务器。
Axon
Axon 是一个事件驱动的微服务的完整解决方案,我们可以完全基于 Axon 来开发事件驱动的微服务架构的应用。Axon 包括 Axon 框架和 Axon 服务器两个部分。Axon 框架是 Axon 中的编程模型,提供了 SDK 来构建应用,支持事件源和 CQRS 等技术;Axon 服务器则是一个高可用性和可伸缩的事件存储。Axon 框架并不一定要使用 Axon 服务器作为事件存储,也可以使用关系型数据库、MongoDB 或内存作为事件存储方式。不过 Axon 服务器提供了额外的功能,适合于生产环境。
Axon 支持 3 种不同类型的消息,这 3 种消息模式,可以适用于不同的应用场景,如下表所示。
Axon 服务器
Axon 服务器是一个通用的分布式消息处理平台,不同的应用可以连接到 Axon 服务器来进行消息传递。一个应用可以发布事件给 Axon 服务器,其他应用可以声明事件的处理器来处理事件。从这个功能上来说,Axon 服务器的作用类似于分布式的事件总线,一个应用也可以发送命令给 Axon 服务器,由其他应用来提供回应。命令只会发送给一个应用。这个功能类似于常见的远程过程调用(RPC)模式。查询会被发送给所有能够回答该查询的应用。
Axon 服务器提供两种交互方式,分别是 8024 端口上的 HTTP 和 8124 端口上的 gRPC。Axon 服务器基于 Spring Boot 实现,打包成单个 JAR 文件,下载之后可以直接运行。推荐的做法是使用 Docker 来运行。下面的命令用来启动 Axon 服务器的 Docker 容器。
docker run -p 8024:8024 -p 8124:8124 axoniq/axonserver:4.3.3
Axon 框架
Axon 框架是 Java 应用使用的开发框架,支持 Axon 中 3 种类型的消息发布和处理。在 Spring Boot 应用中使用 Axon 框架非常简单,只需要添加相关的 Maven 依赖,就可以利用 Axon 框架提供的自动配置功能,如下面的代码所示。完成配置之后,Axon 框架中的对象实例都可以通过 @Autowired 来声明。
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.3.3</version>
</dependency>
在 Spring Boot 应用中,可以使用 Spring 标准的方式来配置 Axon 框架。在下面的 YAML 文件中,axon.axonserver.servers 用来配置 Axon 服务器的连接方式,这里连接的是 8124 端口上的 gRPC 接口。axon.serializer 用来配置事件对象的序列化格式。Axon 框架默认使用 XStream 序列化成 XML 格式。下面代码中的配置值 jackson 用来指定使用 Jackson 序列化成 JSON 格式。
axon:
axonserver:
servers: ${DOCKER_HOST_IP:localhost}:8124
serializer:
events: jackson
司机模拟器
司机模拟器是示例应用提供的一个辅助工具,其作用是代替司机 App 在打车应用中的作用,司机模拟器可以同时模拟多个司机的行为。每个被模拟的司机有固定的行为模式,每隔 5 秒钟改变一次位置,位置改变的规则如下所示:
- 从当前位置开始进行模拟,第一次模拟时使用初始位置;
- 随机确定是否应该转向,如果转向,随机确定是左转还是右转;
- 以随机的速度前进一段距离,作为新的位置。
除了位置之外,司机还可能处于不同的状态,如下表所示。代码中使用枚举类型 DriverState 来表示。
下图是司机模拟器的界面,上面展示了每个司机的状态,以及可以进行的操作。
发布事件
Axon 框架中有两类不同的事件,分别是从聚合中发布的领域事件,以及从其他组件发布的普通事件。司机模拟器只需要发布普通事件即可。领域事件相关的内容,将会在介绍 CQRS 技术的课时(第 22 课时)中进行说明。
Axon 框架 EventGateway 的 publish 方法可以发布多个事件。EventGateway 接口使用 Object 作为事件类型,因此任何对象都可以作为事件来发布。在司机模拟器中,每个被模拟的司机都会定期发送其位置,位置变化的事件由 DriverLocationUpdatedEvent 类表示,如下面的代码所示。
@Data
public class DriverLocationUpdatedEvent {
private DriverLocation location;
private DriverState state;
private long timestamp;
}
在下面的代码中,DriverSimulator 类中的 sendLocation 方法使用 EventGateway 对象来发布 DriverLocationUpdatedEvent 对象。
public class DriverSimulator {
private final EventGateway eventGateway;
private void sendLocation() {
final DriverLocationUpdatedEvent event = new DriverLocationUpdatedEvent();
event.setTimestamp(System.currentTimeMillis());
event.setLocation(this.currentLocation);
event.setState(this.state);
this.eventGateway.publish(event);
}
}
Axon 中所有组件的通讯都是通过消息对象来完成的。Message 是所有消息对象的接口,其类型参数 T 表示载荷对象的类型。所有消息对象都由标识符、载荷和元数据这 3 个部分组成,标识符的类型是 String,元数据由 MetaData 类来表示,而载荷则根据消息的类型来确定。MetaData 类实现了 Map<String, Object> 接口,实际是一个名值对的哈希表,用来包含与消息相关的辅助信息。需要注意的是,消息对象都是不可变,对消息对象的修改,都会创建新的消息对象。
对于 Axon 支持的 3 种类型的消息,都有与之对应的 Message 子接口,表示事件的 EventMessage 接口在 Message 接口的基础上,增加了事件发生的时间戳,以 Instant 对象表示。通过 EventGateway 发布的事件对象,都会被自动封装成 EventMessage 对象,再发送给 Axon 服务器。
可以访问 Axon 服务器的 8024 端口来查看 Axon 服务器提供的管理界面,如下图所示,可以查看所有发布的事件的详细信息。
处理事件
事件的处理器通过在处理方法上添加 @EventHandler 注解来声明。事件处理方法的第一个参数是事件消息的载荷类型,这个载荷类型用来确定可以被处理的事件类型。一个事件处理类中可以包含多个方法来处理不同类型的事件,对于一个事件处理对象,最多只有一个处理方法被调用。如果有多个方法可以匹配,那么参数类型最具体的那个方法会被调用;如果处理方法不需要访问事件的载荷对象,则可以通过 @EventHandler 注解的 payloadType 属性来声明载荷对象的类型,而不需要添加额外的方法参数。
除了事件的载荷之外,处理方法还可以声明其他类型的参数来自动获取事件对象中的其他值,如下表所示。
从司机模拟器中发布的 DriverLocationUpdatedEvent 事件,会被行程派发服务来处理。在下面的代码中,DriverLocationUpdater 类中的 handle 方法用来处理 DriverLocationUpdatedEvent 事件。根据事件中司机状态的不同,调用 DriverLocationService 对象中的不同方法。
@Component
public class DriverLocationUpdater {
DriverLocationService driverLocationService;
public void handle(final DriverLocationUpdatedEvent event) {
final DriverLocation location = event.getLocation();
if (event.getState() == DriverState.AVAILABLE) {
this.driverLocationService.addAvailableDriver(location);
} else {
this.driverLocationService.removeAvailableDriver(location.getDriverId());
}
}
}
位置查询
在行程派发服务中,司机的位置信息被保存在 Redis 中。通过 Redis 提供的地理位置查询功能,可以从一个点出发,找到指定距离内的其他点。在下面的代码中,DriverLocationService 服务类负责管理所有处于可用状态的司机,其中 addAvailableDriver 和 removeAvailableDriver 方法分别用来添加和删除可用的司机,由上一节中 DriverLocationUpdatedEvent 的 handle 方法来调用。访问 Redis 时使用的是 Spring Data Redis 中的 RedisTemplate。
在 findAvailableDrivers 方法中,两个参数是作为查找起始点的地理位置坐标。接着通过调用 Redis 的 GEORADIUS 命令来查询以起始位置为圆心,半径 10 公里内的全部可用司机,并以 AvailableDriver 对象的形式返回。
@Service
public class DriverLocationService {
RedisTemplate<String, String> redisTemplate;
private final String key = “available_drivers”;
public static final Distance searchRadius = new Distance(10,
DistanceUnit.KILOMETERS);
public void addAvailableDriver(final DriverLocation location) {
this.redisTemplate.opsForGeo()
.add(this.key, new Point(location.getLng().doubleValue(),
location.getLat().doubleValue()),
location.getDriverId());
}
public void removeAvailableDriver(final String driverId) {
this.redisTemplate.opsForGeo().remove(this.key, driverId);
}
public Set<AvailableDriver> findAvailableDrivers(final BigDecimal lng,
final BigDecimal lat) {
final GeoResults<GeoLocation<String>> results = this.redisTemplate.opsForGeo()
.radius(this.key, new Circle(new Point(lng.doubleValue(), lat.doubleValue()),
searchRadius),
GeoRadiusCommandArgs.newGeoRadiusArgs().includeCoordinates());
if (results != null) {
return results.getContent().stream().filter(Objects::nonNull)
.map(result -> {
final GeoLocation<String> content = result.getContent();
final Point point = content.getPoint();
return new AvailableDriver(content.getName(),
BigDecimal.valueOf(point.getX()),
BigDecimal.valueOf(point.getY()));
})
.collect(Collectors.toSet());
}
return Collections.emptySet();
}
}
总结
示例应用中的司机模拟器用来模拟司机的行为,并定时发送其位置信息,示例应用使用 Axon 框架和 Axon 服务器来作为事件源技术的实现。本课时介绍了 Axon 服务器及其框架,以及如何使用 Axon 来发布和处理司机位置更新事件,并使用 Redis 来查询可用司机。
更多推荐
所有评论(0)