
rocketmq-client-java 5.x SimpleConsumer
rocketmq-client-java 5.x SimpleConsumer
·
https://help.aliyun.com/document_detail/444764.html?spm=a2c4g.444763.0.0.6dfc51022FWeT8
SimpleConsumer
示例
//消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。
ClientServiceProvider provider1 = ClientServiceProvider.loadService();
String topic1 = "Your Topic";
FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder()
//设置消费者分组。
.setConsumerGroup("Your ConsumerGroup")
//设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
//SimpleConsumer需要主动获取消息,并处理。
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
//如果处理失败,希望服务端重试,只需要忽略即可即不掉用ack方法,等待消息再次可见后即可重试获取。
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
重点代码
计算从那个topic上获取消息
String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()))
计算从那个MessageQueue得到消息
public MessageQueueImpl takeMessageQueue() {
final int next = index.getAndIncrement();
return messageQueues.get(IntMath.mod(next, messageQueues.size()));
}
获取消息
List receive(int maxMessageNum, Duration invisibleDuration)
- maxMessageNum:单次获取的最大消息数量
- invisibleDuration:获取消息后,获取的消息多长时间后才能再次被消费,即重试间隔
public List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException {
final ListenableFuture<List<MessageView>> future = receive0(maxMessageNum, invisibleDuration);
return handleClientFuture(future);
}
ListenableFuture<List> receive0(int maxMessageNum, Duration invisibleDuration)
- 获取topic
- 获取topic对应的SubscriptionLoadBalancer
- 从broker获取消息
public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
。。。。。。
final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);
final ArrayList<String> topics = new ArrayList<>(copy.keySet());
// 通过轮训的方式获取本次请求的topic
final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
final FilterExpression filterExpression = copy.get(topic);
final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
// 轮询MessageQueue集合
final MessageQueueImpl mq = result.takeMessageQueue();
// 封装请求
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
invisibleDuration, awaitDuration);
// 通过GRPC从Broker中获取消息
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
clientCallbackExecutor);
}
ListenableFuture getSubscriptionLoadBalancer(final String topic)
- 如果subscriptionRouteDataCache有topic对应的SubscriptionLoadBalancer,直接返回
- 不存在则获取topic对应的路由信息并生成新的SubscriptionLoadBalancer,加入缓存并返回
private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionLoadBalancer(final String topic) {
final SubscriptionLoadBalancer loadBalancer = subscriptionRouteDataCache.get(topic);
if (null != loadBalancer) {
return Futures.immediateFuture(loadBalancer);
}
return Futures.transform(getRouteData(topic), topicRouteData -> updateSubscriptionLoadBalancer(topic,
topicRouteData), MoreExecutors.directExecutor());
}
ListenableFuture getRouteData(final String topic)
- 如果topicRouteCache存在topic对应的路由信息,将路由信息包装为future并返回
- 本地未缓存topic的路由信息,调用fetchTopicRoute方法从远端查询。
protected ListenableFuture<TopicRouteData> getRouteData(final String topic) {
SettableFuture<TopicRouteData> future0 = SettableFuture.create();
TopicRouteData topicRouteData = topicRouteCache.get(topic);
// If route result was cached before, get it directly.
if (null != topicRouteData) {
future0.set(topicRouteData);
return future0;
}
// 对inflightRouteFutureTable加锁,防止并发
inflightRouteFutureLock.lock();
// try的逻辑:将上面生成的future0和topic加入inflightRouteFutureTable中。
try {
// If route was fetched by last in-flight request, get it directly.
topicRouteData = topicRouteCache.get(topic);
if (null != topicRouteData) {
future0.set(topicRouteData);
return future0;
}
Set<SettableFuture<TopicRouteData>> inflightFutures = inflightRouteFutureTable.get(topic);
// Request is in-flight, return future directly.
if (null != inflightFutures) {
inflightFutures.add(future0);
return future0;
}
inflightFutures = new HashSet<>();
inflightFutures.add(future0);
inflightRouteFutureTable.put(topic, inflightFutures);
} finally {
inflightRouteFutureLock.unlock();
}
// 调用请求topic路由信息的方法,返回future,底层是调用的GRPC,不了解GRPC
final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
// 对fetchTopicRoute返回的future加回调,有结果时,调用回调中的方法
// 回调方法:从inflightRouteFutureTable中获取正在等待topic的路由信息的future0,并将路由信息设置给future0。
Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@Override
public void onSuccess(TopicRouteData topicRouteData) {
inflightRouteFutureLock.lock();
try {
final Set<SettableFuture<TopicRouteData>> newFutureSet =
inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
clientId);
return;
}
log.debug("Fetch topic route successfully, topic={}, in-flight route future "
+ "size={}, clientId={}", topic, newFutureSet.size(), clientId);
for (SettableFuture<TopicRouteData> newFuture : newFutureSet) {
newFuture.set(topicRouteData);
}
} catch (Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
clientId, t);
} finally {
inflightRouteFutureLock.unlock();
}
}
@Override
public void onFailure(Throwable t) {
inflightRouteFutureLock.lock();
try {
final Set<SettableFuture<TopicRouteData>> newFutureSet =
inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
return;
}
log.debug("Failed to fetch topic route, topic={}, in-flight route future " +
"size={}, clientId={}", topic, newFutureSet.size(), clientId, t);
for (SettableFuture<TopicRouteData> future : newFutureSet) {
future.setException(t);
}
} finally {
inflightRouteFutureLock.unlock();
}
}
}, MoreExecutors.directExecutor());
return future0;
}
ListenableFuture receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq, Duration awaitDuration)
- request:发送给broker的请求消息的请求
- MessageQueueImpl mq:获取消息的MessageQueue
- Duration awaitDuration:超时时间
protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,
MessageQueueImpl mq, Duration awaitDuration) {
List<MessageViewImpl> messages = new ArrayList<>();
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final Duration tolerance = clientConfiguration.getRequestTimeout();
final Duration timeout = awaitDuration.plus(tolerance);
final ClientManager clientManager = this.getClientManager();
final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> future =
clientManager.receiveMessage(endpoints, request, timeout);
return Futures.transformAsync(future, responses -> {
Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
.setMessage("status was not set by server")
.build();
Long transportDeliveryTimestamp = null;
List<Message> messageList = new ArrayList<>();
for (ReceiveMessageResponse response : responses) {
switch (response.getContentCase()) {
case STATUS:
status = response.getStatus();
break;
case MESSAGE:
messageList.add(response.getMessage());
break;
case DELIVERY_TIMESTAMP:
final Timestamp deliveryTimestamp = response.getDeliveryTimestamp();
transportDeliveryTimestamp = Timestamps.toMillis(deliveryTimestamp);
break;
default:
log.warn("[Bug] Not recognized content for receive message response, mq={}, " +
"clientId={}, response={}", mq, clientId, response);
}
}
for (Message message : messageList) {
final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp);
messages.add(view);
}
StatusChecker.check(status, future);
final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages);
return Futures.immediateFuture(receiveMessageResult);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised during message receiving, mq={}, clientId={}", mq, clientId, t);
return Futures.immediateFailedFuture(t);
}
}
RpcClient getRpcClient(Endpoints endpoints)
- 对rpcClientTable加读锁,如果rpcClientTable存在endpoints,获取对应的RpcClient并返回
- 对rpcClientTable加写锁,根据endpoints新建RpcClient,加入rpcClientTable中并返回RpcClient
private RpcClient getRpcClient(Endpoints endpoints) throws ClientException {
RpcClient rpcClient;
rpcClientTableLock.readLock().lock();
try {
rpcClient = rpcClientTable.get(endpoints);
if (null != rpcClient) {
return rpcClient;
}
} finally {
rpcClientTableLock.readLock().unlock();
}
rpcClientTableLock.writeLock().lock();
try {
rpcClient = rpcClientTable.get(endpoints);
if (null != rpcClient) {
return rpcClient;
}
try {
rpcClient = new RpcClientImpl(endpoints, client.isSslEnabled());
} catch (SSLException e) {
log.error("Failed to get RPC client, endpoints={}, clientId={}", endpoints, client.getClientId(), e);
throw new ClientException("Failed to generate RPC client", e);
}
rpcClientTable.put(endpoints, rpcClient);
return rpcClient;
} finally {
rpcClientTableLock.writeLock().unlock();
}
}
给broker发送ack
将消息封装为AckMessageRequest然后发送给消息来源
void ack(MessageView messageView)
public void ack(MessageView messageView) throws ClientException {
final ListenableFuture<Void> future = ack0(messageView);
// 处理发送结果
handleClientFuture(future);
}
ListenableFuture ack0(MessageView messageView)
private ListenableFuture<Void> ack0(MessageView messageView) {
。。。。。。
MessageViewImpl impl = (MessageViewImpl) messageView;
final RpcFuture<AckMessageRequest, AckMessageResponse> future = ackMessage(impl);
return Futures.transformAsync(future, response -> {
final Status status = response.getStatus();
// 检查响应结果,不是正常结果直接抛异常
StatusChecker.check(status, future);
return Futures.immediateVoidFuture();
}, clientCallbackExecutor);
}
RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(MessageViewImpl messageView)
protected RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(MessageViewImpl messageView) {
final Endpoints endpoints = messageView.getEndpoints();
RpcFuture<AckMessageRequest, AckMessageResponse> future;
final List<GeneralMessage> generalMessages = Collections.singletonList(new GeneralMessageImpl(messageView));
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.ACK);
doBefore(context, generalMessages);
try {
// 包装ack消息
final AckMessageRequest request = wrapAckMessageRequest(messageView);
final Duration requestTimeout = clientConfiguration.getRequestTimeout();
future = this.getClientManager().ackMessage(endpoints, request, requestTimeout);
} catch (Throwable t) {
future = new RpcFuture<>(t);
}
Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
@Override
public void onSuccess(AckMessageResponse response) {
final Status status = response.getStatus();
final Code code = status.getCode();
MessageHookPointsStatus hookPointsStatus = Code.OK.equals(code) ?
MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, hookPointsStatus);
doAfter(context0, generalMessages);
}
@Override
public void onFailure(Throwable t) {
MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
}
}, MoreExecutors.directExecutor());
return future;
}
更多推荐
所有评论(0)