https://help.aliyun.com/document_detail/444764.html?spm=a2c4g.444763.0.0.6dfc51022FWeT8

SimpleConsumer

示例

 //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。
        ClientServiceProvider provider1 = ClientServiceProvider.loadService();
        String topic1 = "Your Topic";
        FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);

        SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder()
                //设置消费者分组。
                .setConsumerGroup("Your ConsumerGroup")
                //设置接入点。
                .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        List<MessageView> messageViewList = null;
        try {
            //SimpleConsumer需要主动获取消息,并处理。
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //消费处理完成后,需要主动调用ACK提交消费结果。
                try {
                	//如果处理失败,希望服务端重试,只需要忽略即可即不掉用ack方法,等待消息再次可见后即可重试获取。
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
                    e.printStackTrace();
                }
            });
        } catch (ClientException e) {
            //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
            e.printStackTrace();
        }

重点代码

计算从那个topic上获取消息
	String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()))
计算从那个MessageQueue得到消息
    public MessageQueueImpl takeMessageQueue() {
        final int next = index.getAndIncrement();
        return messageQueues.get(IntMath.mod(next, messageQueues.size()));
    }

获取消息

List receive(int maxMessageNum, Duration invisibleDuration)
  1. maxMessageNum:单次获取的最大消息数量
  2. invisibleDuration:获取消息后,获取的消息多长时间后才能再次被消费,即重试间隔
    public List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException {
        final ListenableFuture<List<MessageView>> future = receive0(maxMessageNum, invisibleDuration);
        return handleClientFuture(future);
    }
ListenableFuture<List> receive0(int maxMessageNum, Duration invisibleDuration)
  1. 获取topic
  2. 获取topic对应的SubscriptionLoadBalancer
  3. 从broker获取消息
	public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
		。。。。。。
        final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);
        final ArrayList<String> topics = new ArrayList<>(copy.keySet());
        // 通过轮训的方式获取本次请求的topic
        final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
        final FilterExpression filterExpression = copy.get(topic);
        final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);
        final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
        	// 轮询MessageQueue集合
            final MessageQueueImpl mq = result.takeMessageQueue();
            // 封装请求
            final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
                invisibleDuration, awaitDuration);
            // 通过GRPC从Broker中获取消息
            return receiveMessage(request, mq, awaitDuration);
        }, MoreExecutors.directExecutor());
        return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
            clientCallbackExecutor);
    }
ListenableFuture getSubscriptionLoadBalancer(final String topic)
  1. 如果subscriptionRouteDataCache有topic对应的SubscriptionLoadBalancer,直接返回
  2. 不存在则获取topic对应的路由信息并生成新的SubscriptionLoadBalancer,加入缓存并返回
    private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionLoadBalancer(final String topic) {
        final SubscriptionLoadBalancer loadBalancer = subscriptionRouteDataCache.get(topic);
        if (null != loadBalancer) {
            return Futures.immediateFuture(loadBalancer);
        }
        return Futures.transform(getRouteData(topic), topicRouteData -> updateSubscriptionLoadBalancer(topic,
            topicRouteData), MoreExecutors.directExecutor());
    }
ListenableFuture getRouteData(final String topic)
  1. 如果topicRouteCache存在topic对应的路由信息,将路由信息包装为future并返回
  2. 本地未缓存topic的路由信息,调用fetchTopicRoute方法从远端查询。
 protected ListenableFuture<TopicRouteData> getRouteData(final String topic) {
        SettableFuture<TopicRouteData> future0 = SettableFuture.create();
        TopicRouteData topicRouteData = topicRouteCache.get(topic);
        // If route result was cached before, get it directly.
        if (null != topicRouteData) {
            future0.set(topicRouteData);
            return future0;
        }
        // 对inflightRouteFutureTable加锁,防止并发
        inflightRouteFutureLock.lock();
        // try的逻辑:将上面生成的future0和topic加入inflightRouteFutureTable中。
        try {
            // If route was fetched by last in-flight request, get it directly.
            topicRouteData = topicRouteCache.get(topic);
            if (null != topicRouteData) {
                future0.set(topicRouteData);
                return future0;
            }
            Set<SettableFuture<TopicRouteData>> inflightFutures = inflightRouteFutureTable.get(topic);
            // Request is in-flight, return future directly.
            if (null != inflightFutures) {
                inflightFutures.add(future0);
                return future0;
            }
            inflightFutures = new HashSet<>();
            inflightFutures.add(future0);
            inflightRouteFutureTable.put(topic, inflightFutures);
        } finally {
            inflightRouteFutureLock.unlock();
        }
        // 调用请求topic路由信息的方法,返回future,底层是调用的GRPC,不了解GRPC
        final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
        // 对fetchTopicRoute返回的future加回调,有结果时,调用回调中的方法
        // 回调方法:从inflightRouteFutureTable中获取正在等待topic的路由信息的future0,并将路由信息设置给future0。
        Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
            @Override
            public void onSuccess(TopicRouteData topicRouteData) {
                inflightRouteFutureLock.lock();
                try {
                    final Set<SettableFuture<TopicRouteData>> newFutureSet =
                        inflightRouteFutureTable.remove(topic);
                    if (null == newFutureSet) {
                        // Should never reach here.
                        log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
                            clientId);
                        return;
                    }
                    log.debug("Fetch topic route successfully, topic={}, in-flight route future "
                        + "size={}, clientId={}", topic, newFutureSet.size(), clientId);
                    for (SettableFuture<TopicRouteData> newFuture : newFutureSet) {
                        newFuture.set(topicRouteData);
                    }
                } catch (Throwable t) {
                    // Should never reach here.
                    log.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
                        clientId, t);
                } finally {
                    inflightRouteFutureLock.unlock();
                }
            }

            @Override
            public void onFailure(Throwable t) {
                inflightRouteFutureLock.lock();
                try {
                    final Set<SettableFuture<TopicRouteData>> newFutureSet =
                        inflightRouteFutureTable.remove(topic);
                    if (null == newFutureSet) {
                        // Should never reach here.
                        log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
                        return;
                    }
                    log.debug("Failed to fetch topic route, topic={}, in-flight route future " +
                        "size={}, clientId={}", topic, newFutureSet.size(), clientId, t);
                    for (SettableFuture<TopicRouteData> future : newFutureSet) {
                        future.setException(t);
                    }
                } finally {
                    inflightRouteFutureLock.unlock();
                }
            }
        }, MoreExecutors.directExecutor());
        return future0;
    }
ListenableFuture receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq, Duration awaitDuration)
  • request:发送给broker的请求消息的请求
  • MessageQueueImpl mq:获取消息的MessageQueue
  • Duration awaitDuration:超时时间
protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,
        MessageQueueImpl mq, Duration awaitDuration) {
        List<MessageViewImpl> messages = new ArrayList<>();
        try {
            final Endpoints endpoints = mq.getBroker().getEndpoints();
            final Duration tolerance = clientConfiguration.getRequestTimeout();
            final Duration timeout = awaitDuration.plus(tolerance);
            final ClientManager clientManager = this.getClientManager();
            final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> future =
                clientManager.receiveMessage(endpoints, request, timeout);
            return Futures.transformAsync(future, responses -> {
                Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
                    .setMessage("status was not set by server")
                    .build();
                Long transportDeliveryTimestamp = null;
                List<Message> messageList = new ArrayList<>();
                for (ReceiveMessageResponse response : responses) {
                    switch (response.getContentCase()) {
                        case STATUS:
                            status = response.getStatus();
                            break;
                        case MESSAGE:
                            messageList.add(response.getMessage());
                            break;
                        case DELIVERY_TIMESTAMP:
                            final Timestamp deliveryTimestamp = response.getDeliveryTimestamp();
                            transportDeliveryTimestamp = Timestamps.toMillis(deliveryTimestamp);
                            break;
                        default:
                            log.warn("[Bug] Not recognized content for receive message response, mq={}, " +
                                "clientId={}, response={}", mq, clientId, response);
                    }
                }
                for (Message message : messageList) {
                    final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp);
                    messages.add(view);
                }
                StatusChecker.check(status, future);
                final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages);
                return Futures.immediateFuture(receiveMessageResult);
            }, MoreExecutors.directExecutor());
        } catch (Throwable t) {
            // Should never reach here.
            log.error("[Bug] Exception raised during message receiving, mq={}, clientId={}", mq, clientId, t);
            return Futures.immediateFailedFuture(t);
        }
    }

RpcClient getRpcClient(Endpoints endpoints)
  1. 对rpcClientTable加读锁,如果rpcClientTable存在endpoints,获取对应的RpcClient并返回
  2. 对rpcClientTable加写锁,根据endpoints新建RpcClient,加入rpcClientTable中并返回RpcClient
	private RpcClient getRpcClient(Endpoints endpoints) throws ClientException {
        RpcClient rpcClient;
        rpcClientTableLock.readLock().lock();
        try {
            rpcClient = rpcClientTable.get(endpoints);
            if (null != rpcClient) {
                return rpcClient;
            }
        } finally {
            rpcClientTableLock.readLock().unlock();
        }
        rpcClientTableLock.writeLock().lock();
        try {
            rpcClient = rpcClientTable.get(endpoints);
            if (null != rpcClient) {
                return rpcClient;
            }
            try {
                rpcClient = new RpcClientImpl(endpoints, client.isSslEnabled());
            } catch (SSLException e) {
                log.error("Failed to get RPC client, endpoints={}, clientId={}", endpoints, client.getClientId(), e);
                throw new ClientException("Failed to generate RPC client", e);
            }
            rpcClientTable.put(endpoints, rpcClient);
            return rpcClient;
        } finally {
            rpcClientTableLock.writeLock().unlock();
        }
    }

给broker发送ack

将消息封装为AckMessageRequest然后发送给消息来源

void ack(MessageView messageView)
    public void ack(MessageView messageView) throws ClientException {
        final ListenableFuture<Void> future = ack0(messageView);
        // 处理发送结果
        handleClientFuture(future);
    }
ListenableFuture ack0(MessageView messageView)
private ListenableFuture<Void> ack0(MessageView messageView) {
		。。。。。。
        MessageViewImpl impl = (MessageViewImpl) messageView;
        final RpcFuture<AckMessageRequest, AckMessageResponse> future = ackMessage(impl);
        return Futures.transformAsync(future, response -> {
            final Status status = response.getStatus();
            // 检查响应结果,不是正常结果直接抛异常
            StatusChecker.check(status, future);
            return Futures.immediateVoidFuture();
        }, clientCallbackExecutor);
    }
RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(MessageViewImpl messageView)
protected RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(MessageViewImpl messageView) {
        final Endpoints endpoints = messageView.getEndpoints();
        RpcFuture<AckMessageRequest, AckMessageResponse> future;
        final List<GeneralMessage> generalMessages = Collections.singletonList(new GeneralMessageImpl(messageView));
        final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.ACK);
        doBefore(context, generalMessages);
        try {
        	// 包装ack消息
            final AckMessageRequest request = wrapAckMessageRequest(messageView);
            final Duration requestTimeout = clientConfiguration.getRequestTimeout();
            future = this.getClientManager().ackMessage(endpoints, request, requestTimeout);
        } catch (Throwable t) {
            future = new RpcFuture<>(t);
        }
        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
            @Override
            public void onSuccess(AckMessageResponse response) {
                final Status status = response.getStatus();
                final Code code = status.getCode();
                MessageHookPointsStatus hookPointsStatus = Code.OK.equals(code) ?
                    MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, hookPointsStatus);
                doAfter(context0, generalMessages);
            }

            @Override
            public void onFailure(Throwable t) {
                MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
                    MessageHookPointsStatus.ERROR);
                doAfter(context0, generalMessages);
            }
        }, MoreExecutors.directExecutor());
        return future;
    }
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐