后端状态追踪 + WebSocket 实时推送 + 前端页面展示
服务端实时推送消费状态到前端页面,后端状态追踪 + WebSocket 实时推送 + 前端页面展示
·
在页面上实时展示 Kafka 队列消息的 ID、状态(消费中 / 已消费 / 消费失败)和内容,核心是实现服务端实时推送消费状态到前端页面。我会基于 Goravel 框架,提供一套完整的「后端状态追踪 + WebSocket 实时推送 + 前端页面展示」的解决方案,适配你的 Go 语言技术栈。
一、整体实现思路
- 后端层:
- 维护一个内存级的消息状态存储(适配少量消息场景),记录每条消息的 ID、内容、消费状态;
- 通过 Kafka 消费者实时更新消息状态;
- 基于 WebSocket 向前端推送状态变更(Goravel 推荐用
goravel/websocket或原生gorilla/websocket)。
- 前端层:
- 建立 WebSocket 连接,实时接收后端推送的状态;
- 渲染消息列表,动态更新每条消息的状态样式。
二、后端实现(Goravel + WebSocket + Kafka 消费)
1. 第一步:安装依赖
bash
运行
# 1. WebSocket依赖(Goravel生态推荐)
go get github.com/goravel/websocket
# 2. Kafka依赖(已安装可忽略)
go get github.com/segmentio/kafka-go
2. 第二步:定义消息状态模型(存储消息信息)
在 app/models 下创建 kafka_message.go:
go
运行
package models
// KafkaMessage 消息状态模型(内存存储,少量消息场景够用)
type KafkaMessage struct {
ID int64 `json:"id"` // 消息ID(对应Kafka偏移量)
Content string `json:"content"` // 消息内容
Status string `json:"status"` // 状态:pending(待消费)、consuming(消费中)、success(已消费)、failed(消费失败)
}
// 全局内存存储消息状态(适配少量消息场景,生产环境可替换为Redis)
var KafkaMessageStatus = make(map[int64]*KafkaMessage)
// UpdateMessageStatus 更新消息状态
func UpdateMessageStatus(msgID int64, content string, status string) {
KafkaMessageStatus[msgID] = &KafkaMessage{
ID: msgID,
Content: content,
Status: status,
}
}
// GetAllMessages 获取所有消息状态
func GetAllMessages() []*KafkaMessage {
var messages []*KafkaMessage
for _, msg := range KafkaMessageStatus {
messages = append(messages, msg)
}
return messages
}
3. 第三步:WebSocket 服务(实时推送状态)
在 app/services 下创建 websocket_service.go:
go
运行
package services
import (
"encoding/json"
"github.com/goravel/websocket"
"github.com/goravel/framework/facades"
"your-project-path/app/models" // 替换为你的项目路径
)
// WebSocketService WebSocket推送服务
type WebSocketService struct {
wsManager *websocket.Manager
}
// NewWebSocketService 实例化
func NewWebSocketService() *WebSocketService {
// 初始化WebSocket管理器
manager := websocket.NewManager()
facades.Route().WebSocket("/ws/message", manager)
return &WebSocketService{
wsManager: manager,
}
}
// PushMessageStatus 推送单条消息状态到前端
func (s *WebSocketService) PushMessageStatus(msg *models.KafkaMessage) {
// 将消息转为JSON
data, _ := json.Marshal(msg)
// 广播给所有连接的客户端
s.wsManager.Broadcast([]byte(data))
}
// PushAllMessages 推送所有消息状态(前端首次连接时)
func (s *WebSocketService) PushAllMessages(conn *websocket.Conn) {
messages := models.GetAllMessages()
data, _ := json.Marshal(messages)
conn.WriteMessage(websocket.TextMessage, data)
}
4. 第四步:改造 Kafka 消费者(更新状态 + 推送)
修改之前的 kafka_consumer_service.go,集成状态更新和 WebSocket 推送:
go
运行
package services
import (
"context"
"fmt"
"log"
"time"
"github.com/goravel/framework/contracts/log"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
"your-project-path/app/models" // 替换为你的项目路径
)
type KafkaConsumerService struct {
logger log.Log
wsService *WebSocketService // 注入WebSocket服务
}
// 注意:更新实例化方法,注入WebSocket服务
func NewKafkaConsumerService(logger log.Log, wsService *WebSocketService) *KafkaConsumerService {
return &KafkaConsumerService{
logger: logger,
wsService: wsService,
}
}
func (s *KafkaConsumerService) ConsumeWithMonitor(topic, broker, groupID string) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{broker},
Topic: topic,
GroupID: groupID,
StartOffset: kafka.FirstOffset,
MaxWait: 100 * time.Millisecond,
Logger: kafka.LoggerFunc(log.Printf),
})
defer reader.Close()
s.logger.Info("Kafka消费者启动成功,开始监控消息消费")
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
if err.Error() == "kafka: reader closed" {
break
}
s.logger.Error("拉取Kafka消息失败", zap.Error(err))
continue
}
// 1. 更新消息状态为「消费中」
models.UpdateMessageStatus(msg.Offset, string(msg.Value), "consuming")
// 2. 推送状态到前端
s.wsService.PushMessageStatus(&models.KafkaMessage{
ID: msg.Offset,
Content: string(msg.Value),
Status: "consuming",
})
// 3. 执行业务消费逻辑
consumeSuccess := s.processMessage(msg)
// 4. 更新最终状态并推送
var finalStatus string
if consumeSuccess {
finalStatus = "success"
} else {
finalStatus = "failed"
}
models.UpdateMessageStatus(msg.Offset, string(msg.Value), finalStatus)
s.wsService.PushMessageStatus(&models.KafkaMessage{
ID: msg.Offset,
Content: string(msg.Value),
Status: finalStatus,
})
// 消费完3条消息停止(适配你的场景)
if msg.Offset >= 2 {
s.logger.Info("已消费完3条消息,停止消费")
break
}
}
}
// 模拟业务逻辑(保持不变)
func (s *KafkaConsumerService) processMessage(msg kafka.Message) bool {
s.logger.Info("正在处理消息", zap.String("内容", string(msg.Value)))
return true // 模拟消费成功,可根据业务修改
}
5. 第五步:注册服务和路由
- 在
app/providers/app_service_provider.go中注册依赖:
go
运行
package providers
import (
"github.com/goravel/framework/facades"
"your-project-path/app/services"
)
func AppServiceProvider() {
// 注册WebSocket服务(单例)
facades.App().BindSingleton(services.NewWebSocketService)
// 注册Kafka消费服务
facades.App().Bind(func() *services.KafkaConsumerService {
return services.NewKafkaConsumerService(facades.Log, facades.App().Make(&services.WebSocketService{}).(*services.WebSocketService))
})
}
- 在
routes/web.go中添加前端页面路由:
go
运行
package routes
import (
"github.com/goravel/framework/facades"
"net/http"
)
func Web() {
// 消息监控页面
facades.Route().Get("/message-monitor", func(ctx http.ResponseWriter, req *http.Request) {
// 渲染HTML页面(也可使用Goravel模板引擎)
ctx.Header().Set("Content-Type", "text/html; charset=utf-8")
html := `<!-- 前端页面内容,见下文 -->`
ctx.Write([]byte(html))
})
}
三、前端页面实现(实时展示)
将以下 HTML 代码替换到上面的 /message-monitor 路由中,实现消息列表的实时渲染:
html
预览
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Kafka消息消费状态监控</title>
<style>
.message-list { width: 800px; margin: 50px auto; }
.message-item {
padding: 15px; margin: 10px 0;
border: 1px solid #ddd; border-radius: 8px;
}
.status-pending { background-color: #f5f5f5; color: #999; }
.status-consuming { background-color: #fff3cd; color: #856404; }
.status-success { background-color: #d4edda; color: #155724; }
.status-failed { background-color: #f8d7da; color: #721c24; }
</style>
</head>
<body>
<div class="message-list">
<h2>Kafka消息消费状态(实时更新)</h2>
<div id="messageContainer"></div>
</div>
<script>
// 1. 建立WebSocket连接
const ws = new WebSocket("ws://" + window.location.host + "/ws/message");
const messageContainer = document.getElementById("messageContainer");
// 2. 存储所有消息(避免重复渲染)
let messageMap = new Map();
// 3. 初始化:首次连接时加载所有消息
ws.onopen = function() {
console.log("WebSocket连接成功");
};
// 4. 接收后端推送的消息
ws.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
// 处理单条消息推送
if (data.id !== undefined) {
messageMap.set(data.id, data);
}
// 处理首次连接的所有消息
else if (Array.isArray(data)) {
data.forEach(msg => messageMap.set(msg.id, msg));
}
// 渲染消息列表
renderMessages();
} catch (e) {
console.error("解析消息失败", e);
}
};
// 5. 渲染消息列表
function renderMessages() {
messageContainer.innerHTML = "";
// 按消息ID排序
const sortedMessages = Array.from(messageMap.values()).sort((a, b) => a.id - b.id);
sortedMessages.forEach(msg => {
const item = document.createElement("div");
item.className = `message-item status-${msg.status}`;
item.innerHTML = `
<div><strong>消息ID:</strong>${msg.id}</div>
<div><strong>消息内容:</strong>${msg.content}</div>
<div><strong>消费状态:</strong>${getStatusText(msg.status)}</div>
`;
messageContainer.appendChild(item);
});
}
// 6. 状态文本转换
function getStatusText(status) {
const map = {
"pending": "待消费",
"consuming": "消费中",
"success": "已消费(成功)",
"failed": "消费失败"
};
return map[status] || "未知状态";
}
// 7. 关闭连接处理
ws.onclose = function() {
console.log("WebSocket连接关闭,5秒后重连");
setTimeout(() => window.location.reload(), 5000);
};
</script>
</body>
</html>
四、启动服务并测试
- 启动 Goravel 服务:
bash
运行
go run main.go serve
- 启动 Kafka 消费命令(新开终端):
bash
运行
go run main.go kafka:consume-monitor
- 访问监控页面:打开浏览器访问
http://localhost:3000/message-monitor(Goravel 默认端口 3000),当 Kafka 产生消息时,页面会实时更新每条消息的状态:
- 消息被拉取时:状态显示「消费中」(黄色背景);
- 消费成功时:状态变为「已消费(成功)」(绿色背景);
- 消费失败时:状态变为「消费失败」(红色背景)。
总结
- 核心流程:Kafka 消费者更新消息状态 → WebSocket 实时推送状态 → 前端动态渲染;
- 状态管理:通过内存 Map 存储少量消息状态(生产环境可替换为 Redis);
- 实时性保障:基于 WebSocket 实现服务端主动推送,而非前端轮询,延迟更低。
这套方案完全适配你的场景:页面能实时展示每条消息的 ID、内容和消费状态,且和 Goravel 框架深度集成,符合 Go 语言开发习惯。
更多推荐

所有评论(0)