在页面上实时展示 Kafka 队列消息的 ID、状态(消费中 / 已消费 / 消费失败)和内容,核心是实现服务端实时推送消费状态到前端页面。我会基于 Goravel 框架,提供一套完整的「后端状态追踪 + WebSocket 实时推送 + 前端页面展示」的解决方案,适配你的 Go 语言技术栈。

一、整体实现思路

  1. 后端层
    • 维护一个内存级的消息状态存储(适配少量消息场景),记录每条消息的 ID、内容、消费状态;
    • 通过 Kafka 消费者实时更新消息状态;
    • 基于 WebSocket 向前端推送状态变更(Goravel 推荐用 goravel/websocket 或原生 gorilla/websocket)。
  2. 前端层
    • 建立 WebSocket 连接,实时接收后端推送的状态;
    • 渲染消息列表,动态更新每条消息的状态样式。

二、后端实现(Goravel + WebSocket + Kafka 消费)

1. 第一步:安装依赖

bash

运行

# 1. WebSocket依赖(Goravel生态推荐)
go get github.com/goravel/websocket
# 2. Kafka依赖(已安装可忽略)
go get github.com/segmentio/kafka-go
2. 第二步:定义消息状态模型(存储消息信息)

在 app/models 下创建 kafka_message.go

go

运行

package models

// KafkaMessage 消息状态模型(内存存储,少量消息场景够用)
type KafkaMessage struct {
	ID     int64  `json:"id"`      // 消息ID(对应Kafka偏移量)
	Content string `json:"content"` // 消息内容
	Status  string `json:"status"`  // 状态:pending(待消费)、consuming(消费中)、success(已消费)、failed(消费失败)
}

// 全局内存存储消息状态(适配少量消息场景,生产环境可替换为Redis)
var KafkaMessageStatus = make(map[int64]*KafkaMessage)

// UpdateMessageStatus 更新消息状态
func UpdateMessageStatus(msgID int64, content string, status string) {
	KafkaMessageStatus[msgID] = &KafkaMessage{
		ID:      msgID,
		Content: content,
		Status:  status,
	}
}

// GetAllMessages 获取所有消息状态
func GetAllMessages() []*KafkaMessage {
	var messages []*KafkaMessage
	for _, msg := range KafkaMessageStatus {
		messages = append(messages, msg)
	}
	return messages
}
3. 第三步:WebSocket 服务(实时推送状态)

在 app/services 下创建 websocket_service.go

go

运行

package services

import (
	"encoding/json"
	"github.com/goravel/websocket"
	"github.com/goravel/framework/facades"
	"your-project-path/app/models" // 替换为你的项目路径
)

// WebSocketService WebSocket推送服务
type WebSocketService struct {
	wsManager *websocket.Manager
}

// NewWebSocketService 实例化
func NewWebSocketService() *WebSocketService {
	// 初始化WebSocket管理器
	manager := websocket.NewManager()
	facades.Route().WebSocket("/ws/message", manager)
	return &WebSocketService{
		wsManager: manager,
	}
}

// PushMessageStatus 推送单条消息状态到前端
func (s *WebSocketService) PushMessageStatus(msg *models.KafkaMessage) {
	// 将消息转为JSON
	data, _ := json.Marshal(msg)
	// 广播给所有连接的客户端
	s.wsManager.Broadcast([]byte(data))
}

// PushAllMessages 推送所有消息状态(前端首次连接时)
func (s *WebSocketService) PushAllMessages(conn *websocket.Conn) {
	messages := models.GetAllMessages()
	data, _ := json.Marshal(messages)
	conn.WriteMessage(websocket.TextMessage, data)
}
4. 第四步:改造 Kafka 消费者(更新状态 + 推送)

修改之前的 kafka_consumer_service.go,集成状态更新和 WebSocket 推送:

go

运行

package services

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/goravel/framework/contracts/log"
	"github.com/segmentio/kafka-go"
	"go.uber.org/zap"
	"your-project-path/app/models" // 替换为你的项目路径
)

type KafkaConsumerService struct {
	logger        log.Log
	wsService     *WebSocketService // 注入WebSocket服务
}

// 注意:更新实例化方法,注入WebSocket服务
func NewKafkaConsumerService(logger log.Log, wsService *WebSocketService) *KafkaConsumerService {
	return &KafkaConsumerService{
		logger:    logger,
		wsService: wsService,
	}
}

func (s *KafkaConsumerService) ConsumeWithMonitor(topic, broker, groupID string) {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     []string{broker},
		Topic:       topic,
		GroupID:     groupID,
		StartOffset: kafka.FirstOffset,
		MaxWait:     100 * time.Millisecond,
		Logger:      kafka.LoggerFunc(log.Printf),
	})
	defer reader.Close()

	s.logger.Info("Kafka消费者启动成功,开始监控消息消费")

	for {
		msg, err := reader.ReadMessage(context.Background())
		if err != nil {
			if err.Error() == "kafka: reader closed" {
				break
			}
			s.logger.Error("拉取Kafka消息失败", zap.Error(err))
			continue
		}

		// 1. 更新消息状态为「消费中」
		models.UpdateMessageStatus(msg.Offset, string(msg.Value), "consuming")
		// 2. 推送状态到前端
		s.wsService.PushMessageStatus(&models.KafkaMessage{
			ID:      msg.Offset,
			Content: string(msg.Value),
			Status:  "consuming",
		})

		// 3. 执行业务消费逻辑
		consumeSuccess := s.processMessage(msg)

		// 4. 更新最终状态并推送
		var finalStatus string
		if consumeSuccess {
			finalStatus = "success"
		} else {
			finalStatus = "failed"
		}
		models.UpdateMessageStatus(msg.Offset, string(msg.Value), finalStatus)
		s.wsService.PushMessageStatus(&models.KafkaMessage{
			ID:      msg.Offset,
			Content: string(msg.Value),
			Status:  finalStatus,
		})

		// 消费完3条消息停止(适配你的场景)
		if msg.Offset >= 2 {
			s.logger.Info("已消费完3条消息,停止消费")
			break
		}
	}
}

// 模拟业务逻辑(保持不变)
func (s *KafkaConsumerService) processMessage(msg kafka.Message) bool {
	s.logger.Info("正在处理消息", zap.String("内容", string(msg.Value)))
	return true // 模拟消费成功,可根据业务修改
}
5. 第五步:注册服务和路由
  • 在 app/providers/app_service_provider.go 中注册依赖:

go

运行

package providers

import (
	"github.com/goravel/framework/facades"
	"your-project-path/app/services"
)

func AppServiceProvider() {
	// 注册WebSocket服务(单例)
	facades.App().BindSingleton(services.NewWebSocketService)
	// 注册Kafka消费服务
	facades.App().Bind(func() *services.KafkaConsumerService {
		return services.NewKafkaConsumerService(facades.Log, facades.App().Make(&services.WebSocketService{}).(*services.WebSocketService))
	})
}
  • 在 routes/web.go 中添加前端页面路由:

go

运行

package routes

import (
	"github.com/goravel/framework/facades"
	"net/http"
)

func Web() {
	// 消息监控页面
	facades.Route().Get("/message-monitor", func(ctx http.ResponseWriter, req *http.Request) {
		// 渲染HTML页面(也可使用Goravel模板引擎)
		ctx.Header().Set("Content-Type", "text/html; charset=utf-8")
		html := `<!-- 前端页面内容,见下文 -->`
		ctx.Write([]byte(html))
	})
}

三、前端页面实现(实时展示)

将以下 HTML 代码替换到上面的 /message-monitor 路由中,实现消息列表的实时渲染:

html

预览

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <title>Kafka消息消费状态监控</title>
    <style>
        .message-list { width: 800px; margin: 50px auto; }
        .message-item { 
            padding: 15px; margin: 10px 0; 
            border: 1px solid #ddd; border-radius: 8px;
        }
        .status-pending { background-color: #f5f5f5; color: #999; }
        .status-consuming { background-color: #fff3cd; color: #856404; }
        .status-success { background-color: #d4edda; color: #155724; }
        .status-failed { background-color: #f8d7da; color: #721c24; }
    </style>
</head>
<body>
    <div class="message-list">
        <h2>Kafka消息消费状态(实时更新)</h2>
        <div id="messageContainer"></div>
    </div>

    <script>
        // 1. 建立WebSocket连接
        const ws = new WebSocket("ws://" + window.location.host + "/ws/message");
        const messageContainer = document.getElementById("messageContainer");

        // 2. 存储所有消息(避免重复渲染)
        let messageMap = new Map();

        // 3. 初始化:首次连接时加载所有消息
        ws.onopen = function() {
            console.log("WebSocket连接成功");
        };

        // 4. 接收后端推送的消息
        ws.onmessage = function(event) {
            try {
                const data = JSON.parse(event.data);
                // 处理单条消息推送
                if (data.id !== undefined) {
                    messageMap.set(data.id, data);
                } 
                // 处理首次连接的所有消息
                else if (Array.isArray(data)) {
                    data.forEach(msg => messageMap.set(msg.id, msg));
                }
                // 渲染消息列表
                renderMessages();
            } catch (e) {
                console.error("解析消息失败", e);
            }
        };

        // 5. 渲染消息列表
        function renderMessages() {
            messageContainer.innerHTML = "";
            // 按消息ID排序
            const sortedMessages = Array.from(messageMap.values()).sort((a, b) => a.id - b.id);
            
            sortedMessages.forEach(msg => {
                const item = document.createElement("div");
                item.className = `message-item status-${msg.status}`;
                item.innerHTML = `
                    <div><strong>消息ID:</strong>${msg.id}</div>
                    <div><strong>消息内容:</strong>${msg.content}</div>
                    <div><strong>消费状态:</strong>${getStatusText(msg.status)}</div>
                `;
                messageContainer.appendChild(item);
            });
        }

        // 6. 状态文本转换
        function getStatusText(status) {
            const map = {
                "pending": "待消费",
                "consuming": "消费中",
                "success": "已消费(成功)",
                "failed": "消费失败"
            };
            return map[status] || "未知状态";
        }

        // 7. 关闭连接处理
        ws.onclose = function() {
            console.log("WebSocket连接关闭,5秒后重连");
            setTimeout(() => window.location.reload(), 5000);
        };
    </script>
</body>
</html>

四、启动服务并测试

  1. 启动 Goravel 服务

bash

运行

go run main.go serve
  1. 启动 Kafka 消费命令(新开终端):

bash

运行

go run main.go kafka:consume-monitor
  1. 访问监控页面:打开浏览器访问 http://localhost:3000/message-monitor(Goravel 默认端口 3000),当 Kafka 产生消息时,页面会实时更新每条消息的状态:
  • 消息被拉取时:状态显示「消费中」(黄色背景);
  • 消费成功时:状态变为「已消费(成功)」(绿色背景);
  • 消费失败时:状态变为「消费失败」(红色背景)。

总结

  1. 核心流程:Kafka 消费者更新消息状态 → WebSocket 实时推送状态 → 前端动态渲染;
  2. 状态管理:通过内存 Map 存储少量消息状态(生产环境可替换为 Redis);
  3. 实时性保障:基于 WebSocket 实现服务端主动推送,而非前端轮询,延迟更低。

这套方案完全适配你的场景:页面能实时展示每条消息的 ID、内容和消费状态,且和 Goravel 框架深度集成,符合 Go 语言开发习惯。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐