即时通讯系统后台服务端开发(2)
系统采用模块化设计,包含网关、文件存储、好友管理、消息转发、消息存储、用户管理和语音识别等子服务。网关服务负责请求分发和用户鉴权,文件服务处理非文本消息存储,好友服务管理社交关系,消息服务负责消息转发和持久化存储,用户服务处理用户注册登录等操作。系统采用多种技术栈:使用brpc实现服务间通信,Redis管理会话状态,MySQL存储核心数据,ElasticSearch支持高效搜索,RabbitMQ实
对上篇博客中的各个技术进行封装测试没有问题后,接着完成各个子服务模块的搭建。
本项目服务端设计如图所示:

网关子服务
负责业务:
网关子服务通过httplib负责对客户端发来的请求进行回调处理:
#define PUT_MULTI_FILE "/service/file/put_multi_file"
#define RECOGNITION "/service/speech/recognition"
_http_server.Post(PUT_MULTI_FILE, (httplib::Server::Handler)std::bind(&GatewayServer::PutMultiFile, this, std::placeholders::_1, std::placeholders::_2));
_http_server.Post(RECOGNITION, (httplib::Server::Handler)std::bind(&GatewayServer::SpeechRecognition, this, std::placeholders::_1, std::placeholders::_2));
void PutMultiFile(const httplib::Request &request, httplib::Response &response)
{
LOG_DEBUG("PutMultiFile");
// 对 上传多个文件 的处理
PutMultiFileReq req;
PutMultiFileRsp rsp;
auto err_response = [&rsp, &response](const std::string &errmsg)
{
rsp.set_success(false);
rsp.set_errmsg(errmsg);
response.set_content(rsp.SerializeAsString(), "application/x-protbuf");
};
// 1.取出消息正文,进行反序列化 && 将反序列化后的内容填充好请求
bool res = req.ParseFromString(request.body);
if (res == false)
{
LOG_ERROR("上传多个文件 请求信息反序列化失败");
err_response("上传多个文件 请求信息反序列化失败");
return;
}
// 2.用户鉴权
auto uid = _session_redis->Uid(req.session_id());
req.set_user_id(*uid);
if (!uid)
{
LOG_ERROR("在会话信息不存在的时候,发起了 上传多个文件 的请求, ssid: {}", req.session_id());
err_response("在会话信息不存在的时候,发起了 上传多个文件 的请求");
return;
}
// 3.发起服务请求
myim::ServiceChannel::ChannelPtr channel = _service_manager->choose(_file_service_key);
if (!channel)
{
LOG_ERROR("文件管理 子服务未能寻到,无法提供 上传多个文件 服务");
rsp.set_request_id(req.request_id());
err_response("文件管理 子服务未能寻到,无法提供 上传多个文件 服务");
return;
}
myim::FileService_Stub stub(channel.get());
brpc::Controller cntl;
stub.PutMultiFile(&cntl, &req, &rsp, nullptr);
if (cntl.Failed())
{
LOG_ERROR("文件管理 子服务调用失败,无法处理 上传多个文件 请求");
rsp.set_request_id(req.request_id());
err_response("文件管理 子服务调用失败,无法处理 上传多个文件 请求");
return;
}
// 4.发回响应
response.set_content(rsp.SerializeAsString(), "application/x-protbuf");
}
void SpeechRecognition(const httplib::Request &request, httplib::Response &response)
{
LOG_DEBUG("SpeechRecognitionc");
// 对 语音识别 的处理
SpeechRecognitionReq req;
SpeechRecognitionRsp rsp;
auto err_response = [&rsp, &response](const std::string &errmsg)
{
rsp.set_success(false);
rsp.set_errmsg(errmsg);
response.set_content(rsp.SerializeAsString(), "application/x-protbuf");
};
// 1.取出消息正文,进行反序列化 && 将反序列化后的内容填充好请求
bool res = req.ParseFromString(request.body);
if (res == false)
{
LOG_ERROR("语音识别 请求信息反序列化失败");
err_response("语音识别 请求信息反序列化失败");
return;
}
// 2.用户鉴权
auto uid = _session_redis->Uid(req.session_id());
req.set_user_id(*uid);
if (!uid)
{
LOG_ERROR("在会话信息不存在的时候,发起了 语音识别 的请求, ssid: {}", req.session_id());
err_response("在会话信息不存在的时候,发起了 语音识别 的请求");
return;
}
// 3.发起服务请求
myim::ServiceChannel::ChannelPtr channel = _service_manager->choose(_speech_service_key);
if (!channel)
{
LOG_ERROR("语音识别 子服务未能寻到,无法提供 语音识别 服务");
rsp.set_request_id(req.request_id());
err_response("语音识别 子服务未能寻到,无法提供 语音识别 服务");
return;
}
myim::SpeechService_Stub stub(channel.get());
brpc::Controller cntl;
stub.SpeechRecognition(&cntl, &req, &rsp, nullptr);
if (cntl.Failed())
{
LOG_ERROR("语音识别 子服务调用失败,无法处理 语音识别 请求");
rsp.set_request_id(req.request_id());
err_response("语音识别 子服务调用失败,无法处理 语音识别 请求");
return;
}
// 4.发回响应
response.set_content(rsp.SerializeAsString(), "application/x-protbuf");
}
亮点:
1.在对客户端发来的诸如 昵称修改/群聊创建 等请求时,对请求进行用户鉴权(通过会话ID获取用户ID再通过brpc进行服务的调用)。
2.使用etcd动态调整服务注册端的host(实时发现服务上线/下线)。
3.不负责任何具体业务,仅做客户端和服务端交互的 “门口”,减少性能消耗。
4.使用websocket,使得群聊创建时可以主动对涉及的群聊成员进行通知(若用户不在线,下次登录的时候也会自动获取聊天会话信息)。
文件存储子服务
负责业务:
负责处理非文本消息(如图片、语音等)的持久化存储和高效传输:
file.proto
service FileService {
rpc GetSingleFile(GetSingleFileReq) returns
(GetSingleFileRsp);
rpc GetMultiFile(GetMultiFileReq) returns (GetMultiFileRsp);
rpc PutSingleFile(PutSingleFileReq) returns
(PutSingleFileRsp);
rpc PutMultiFile(PutMultiFileReq) returns (PutMultiFileRsp);
}
file_server.hpp
virtual void PutSingleFile(google::protobuf::RpcController *cntl_base,
const myim::PutSingleFileReq *request,
myim::PutSingleFileRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
response->set_request_id(request->request_id());
// 上传文件
// 首先给文件创建一个唯一的uuid
std::string fileid = uuid();
std::string file_pathnameid = _storage_path + fileid;
// 然后将文件数据进行写入
bool ret = writefile(file_pathnameid, request->file_data().file_content());
if (ret == false)
{
response->set_success(false);
response->set_errmsg("写入文件失败");
LOG_ERROR("写入文件 {} 失败", file_pathnameid.c_str());
return;
}
// 返回响应
response->set_success(true);
response->mutable_file_info()->set_file_id(file_pathnameid);
response->mutable_file_info()->set_file_size(request->file_data().file_size());
response->mutable_file_info()->set_file_name(request->file_data().file_name());
}
virtual void GetSingleFile(google::protobuf::RpcController *cntl_base,
const myim::GetSingleFileReq *request,
myim::GetSingleFileRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
// 单个文件的下载
// 1.先获取到文件名(其实也就是文件ID)
response->set_request_id(request->request_id());
std::string fileid = request->file_id();
std::string body;
// 2.再获取到文件内部数据
bool ret = readfile(fileid, body);
if (ret == false)
{
response->set_success(false);
response->set_errmsg("读取文件失败");
LOG_ERROR("读取文件 {} 失败", fileid.c_str());
return;
}
// 3.将文件数据写入响应即可
response->set_success(true);
response->mutable_file_data()->set_file_id(fileid);
response->mutable_file_data()->set_file_content(body);
}
亮点:
1.使用UUID生成唯一的文件ID,确保文件存储的安全性和唯一性。
2.提供了服务器内部的文件批量上传/下载接口,减少网络交互和系统开销。
3.通过配置文件.conf设置文件的存储路径。
好友管理子服务
负责业务:
负责维护用户间的社交关系(好友添加/删除/搜索)和聊天会话(单聊/群聊的创建与维护):
Friend.proto
service FriendService {
rpc GetFriendList(GetFriendListReq) returns
(GetFriendListRsp);
rpc FriendRemove(FriendRemoveReq) returns (FriendRemoveRsp);
rpc FriendAdd(FriendAddReq) returns (FriendAddRsp);
rpc FriendAddProcess(FriendAddProcessReq) returns
(FriendAddProcessRsp);
rpc FriendSearch(FriendSearchReq) returns (FriendSearchRsp);
rpc GetChatSessionList(GetChatSessionListReq) returns
(GetChatSessionListRsp);
rpc ChatSessionCreate(ChatSessionCreateReq) returns
(ChatSessionCreateRsp);
rpc GetChatSessionMember(GetChatSessionMemberReq) returns
(GetChatSessionMemberRsp);
rpc GetPendingFriendEventList(GetPendingFriendEventListReq)
returns (GetPendingFriendEventListRsp);
}
friend_server.hpp
virtual void FriendRemove(::google::protobuf::RpcController *controller, // 删除用户指定好友
const ::myim::FriendRemoveReq *request,
::myim::FriendRemoveRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
std::string rid = request->request_id();
auto error_func = [response, rid](const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
// 1.提取用户ID以及好友ID
std::string uid = request->user_id();
std::string pid = request->peer_id();
// 2.删除relation中好友信息
bool flag = _relationtable->Remove(uid, pid);
if (flag == false)
{
LOG_ERROR("删除好友关系信息失败");
error_func("删除好友关系信息失败");
return;
}
// 3.删除单聊会话 && 删除chat_session_member中好友信息
_chatsessiontable->Remove(uid, pid);
// 4.返回响应
response->set_request_id(rid);
response->set_success(true);
}
virtual void FriendAdd(::google::protobuf::RpcController *controller, // 用户user_id 对 用户respondent_id 发起好友申请
const ::myim::FriendAddReq *request,
::myim::FriendAddRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
std::string rid = request->request_id();
auto error_func = [response, rid](const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
// 1.提取关键信息
std::string uid = request->user_id();
std::string pid = request->respondent_id();
// 1.5判断此时是否已经是好友 或 是否已经发送过未处理的好友请求
bool res = _relationtable->Exists(uid, pid);
if (res == true)
{
LOG_ERROR("发起好友申请 {} 失败, 两人 {} - {} 已经是好友", rid, uid, pid);
error_func("发起好友申请失败, 两人已经是好友");
return;
}
res = _friendapplytable->Exists(uid, pid);
if (res == true)
{
LOG_ERROR("发起好友申请 {} 失败, {} - {} 已经发起了好友申请", rid, uid, pid);
error_func("发起好友申请失败, 已经发起了好友申请");
return;
}
// 2.往apply中添加好友申请事件
std::string eid = uuid();
FriendApply friendapply(eid, uid, pid);
res = _friendapplytable->Insert(friendapply);
if (res == false)
{
LOG_ERROR("发起好友申请 {} 失败, 添加好友申请 {} - {} 事件失败", rid, uid, pid);
error_func("发起好友申请失败, 添加好友申请事件失败");
return;
}
// 3.返回响应
response->set_request_id(rid);
response->set_success(true);
response->set_notify_event_id(eid);
}
virtual void ChatSessionCreate(::google::protobuf::RpcController *controller, // 创建聊天会话(用于创建多聊,单聊在用户同意了好友申请后自动创建)
const ::myim::ChatSessionCreateReq *request,
::myim::ChatSessionCreateRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
std::string rid = request->request_id();
auto error_func = [response, rid](const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
// 1.提取关键信息(群聊名称、群聊成员信息)
std::string cssname = request->chat_session_name();
// 2.添加member成员信息以及chatsession聊天会话信息
std::string cssid = uuid();
ChatSession cs(cssid, cssname, ChatSessionType::GROUP);
_chatsessiontable->Append(cs);
std::vector<ChatSessionMember> csms;
for (int i = 0; i < request->member_id_list_size(); i++)
{
csms.push_back({cssid, request->member_id_list(i)});
}
_chatsessionmembertable->Append(csms);
// 3.组织响应
response->set_success(true);
response->set_request_id(rid);
response->mutable_chat_session_info()->set_chat_session_id(cssid);
response->mutable_chat_session_info()->set_chat_session_name(cssname);
}
virtual void FriendSearch(::google::protobuf::RpcController *controller, // 对其他用户进行搜索,用以完成好友的添加
const ::myim::FriendSearchReq *request,
::myim::FriendSearchRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
std::string rid = request->request_id();
auto error_func = [response, rid](const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
// 1.提取关键信息(搜索字段)
std::string uid = request->user_id();
std::string skey = request->search_key();
// 2.进行请求发起人的好友搜索(达到已经是好友就不用再搜索的目的)
std::vector<std::string> friends = _relationtable->Friends(uid);
// 3.通过关键字使用ES搜索获取到用户ID
friends.push_back(uid);
std::vector<User> users = _es_user->Search(skey, friends);
// 4.通过用户ID批量获取用户信息
unordered_set<std::string> uids; // 通过set对users进行去重
for (auto &user : users)
{
uids.insert(user.user_id());
}
std::unordered_map<std::string, myim::UserInfo> users_list;
auto res = Get_User_Info(rid, uids, users_list);
if (res == false)
{
LOG_ERROR("{}, 通过用户管理子服务批量获取用户信息失败", rid);
error_func("通过用户管理子服务批量获取用户信息失败");
return;
}
// 5.组织响应
for (auto &e : users_list)
{
auto res = response->add_user_info();
res->CopyFrom(e.second);
}
response->set_request_id(rid);
response->set_success(true);
}
亮点:
1.使用ES作为好友搜索关键字的查询工具,查询速度快。
2.通过mysql使用用户ID进行好友关系的储存。
3.群聊创建支持批量添加成员。
4.好友申请通过后,自动创建单聊会话并发送通知。
消息转发子服务
负责业务:
负责消息的转发(获取消息的用户转发列表)
transmite.proto
service MsgTransmitService {
rpc GetTransmitTarget(NewMessageReq) returns
(GetTransmitTargetRsp);
}
transmite_server.hpp
virtual void GetTransmitTarget(google::protobuf::RpcController *cntl_base,
const myim::NewMessageReq *request,
myim::GetTransmitTargetRsp *response,
::google::protobuf::Closure *done)
{
std::string rid = request->request_id();
auto error_func = [response, rid](const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
brpc::ClosureGuard done_closure(done);
// 1.从请求中获取关键信息:用户ID,所属会话ID,消息内容
std::string uid = request->user_id();
std::string ssid = request->session_id();
std::string chat_ssid = request->chat_session_id();
auto msg = request->message();
// 2.发送者向用户子服务获取用户信息
auto channel = _service_manager->choose(_user_register_key);
if (!channel)
{
LOG_DEBUG("获取用户子服务信道失败!");
error_func("获取用户子服务信道失败!");
return;
}
UserService_Stub stub(channel.get());
brpc::Controller cntl;
GetUserInfoReq req;
GetUserInfoRsp rsp;
req.set_request_id(uuid());
req.set_session_id(ssid);
req.set_user_id(uid);
stub.GetUserInfo(&cntl, &req, &rsp, nullptr);
if (cntl.Failed() == true || rsp.success() == false)
{
LOG_DEBUG("调用用户子服务失败!");
error_func("调用用户子服务失败!");
return;
}
auto user_info = rsp.user_info();
// 3.进行消息组织并获取消息转发客户端用户列表
MessageInfo message;
message.set_message_id(uuid());
message.set_chat_session_id(chat_ssid);
message.set_timestamp(time(nullptr));
message.mutable_sender()->CopyFrom(user_info);
message.mutable_message()->CopyFrom(msg);
auto uids = _csmt->Members(chat_ssid);
// 4.将封装完毕的消息送入消息存储子服务中进行消息持久化保存
_rabbitmqclient->Publish(_msg_exchange, message.SerializePartialAsString(), _msg_rkey);
// 5.返回响应
for (auto &uid : uids)
{
response->add_target_id_list(uid);
}
response->set_request_id(rid);
response->set_success(true);
response->mutable_message()->CopyFrom(message);
}
亮点:
1.具有GetTransmitTargetRsp结构,方便网关对该消息所发送用户进行一对一的通知。
2.对消息进行完整性填充(如消息产生时间、发送用户、消息ID)。
3.将消息通过RabbitMQ转发给消息存储子服务从而对消息进行持久化存储。
消息存储子服务
负责业务:
负责获取消息、消息搜索以及对消息进行持久化存储。
message.proto
service MsgStorageService {
rpc GetHistoryMsg(GetHistoryMsgReq) returns
(GetHistoryMsgRsp);
rpc GetRecentMsg(GetRecentMsgReq) returns (GetRecentMsgRsp);
rpc MsgSearch(MsgSearchReq) returns (MsgSearchRsp);
}
message_server.hpp
// 获取最近n条消息
virtual void GetRecentMsg(google::protobuf::RpcController *cntl_base,
const myim::GetRecentMsgReq *request,
myim::GetRecentMsgRsp *response,
::google::protobuf::Closure *done)
{
std::string rid = request->request_id();
auto error_func = [response, rid](const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
brpc::ClosureGuard done_closure(done);
// 1.首先收集元信息
std::string chat_session_id = request->chat_session_id();
int64_t msg_count = request->msg_count();
// 2.从mysql处获取最近msg_count条信息的集合
std::vector<Message> recent_list = _messagetable->Recent(chat_session_id, msg_count);
if (recent_list.size() == 0)
{
response->set_request_id(rid);
response->set_success(true);
return;
}
// 3.对非文本信息,从文件管理子服务处批量下载文件/图片/语音信息
std::unordered_set<std::string> file_id_list;
for (auto &msg : recent_list)
{
if (msg.message_type() == MessageType::STRING)
continue;
file_id_list.insert(msg.file_id());
}
unordered_map<std::string, FileDownloadData> file_id_data; // 存储的是全部非文本id, 对应的非文本数据
bool res = Get_Files(rid, file_id_list, file_id_data);
if (res == false)
{
LOG_ERROR("批量文件获取失败, rid:", rid);
error_func("批量文件获取失败");
return;
}
// 4.对消息的发送者,从用户管理子服务处获取用户信息
std::unordered_set<std::string> user_id_list;
for (auto &e : recent_list)
{
user_id_list.insert(e.sender_id());
}
unordered_map<std::string, UserInfo> user_id_data; // 存储的每一个消息发送者的用户数据
res = Get_Users(rid, user_id_list, user_id_data);
if (res == false)
{
LOG_ERROR("批量用户信息获取失败, rid:", rid);
error_func("批量用户信息获取失败");
return;
}
// 5.组织响应
response->set_success(true);
response->set_request_id(rid);
for (auto &e : recent_list)
{
auto msg = response->add_msg_list();
msg->set_message_id(e.message_id());
msg->set_chat_session_id(e.session_id());
msg->set_timestamp(boost::posix_time::to_time_t(e.create_time()));
msg->mutable_sender()->set_user_id(e.sender_id());
msg->mutable_sender()->set_nickname(user_id_data[e.sender_id()].nickname());
msg->mutable_sender()->set_description(user_id_data[e.sender_id()].description());
msg->mutable_sender()->set_phone(user_id_data[e.sender_id()].phone());
msg->mutable_sender()->set_avatar(user_id_data[e.sender_id()].avatar());
msg->mutable_message()->set_message_type(MessageType(e.message_type()));
switch (MessageType(e.message_type()))
{
case MessageType::STRING:
{
msg->mutable_message()->mutable_string_message()->set_content(e.content());
break;
}
case MessageType::IMAGE:
{
msg->mutable_message()->mutable_image_message()->set_image_content(file_id_data[e.file_id()].file_content());
msg->mutable_message()->mutable_image_message()->set_file_id(e.file_id());
break;
}
case MessageType::FILE:
{
msg->mutable_message()->mutable_file_message()->set_file_contents(file_id_data[e.file_id()].file_content());
msg->mutable_message()->mutable_file_message()->set_file_id(e.file_id());
msg->mutable_message()->mutable_file_message()->set_file_size(e.file_size());
msg->mutable_message()->mutable_file_message()->set_file_name(e.file_name());
break;
}
case MessageType::SPEECH:
{
msg->mutable_message()->mutable_speech_message()->set_file_contents(file_id_data[e.file_id()].file_content());
msg->mutable_message()->mutable_speech_message()->set_file_id(e.file_id());
break;
}
default:
{
LOG_ERROR("组织响应时消息类型出错, rid:", rid);
error_func("组织响应时消息类型出错");
return;
}
}
}
}
void Rabbitmqclient_Cb(const char *body, size_t size)
{
// 1.首先提取元信息并进行反序列化
MessageInfo messageinfo;
bool res = messageinfo.ParseFromArray(body, size);
if (res == false)
{
LOG_ERROR("对消息队列消息进行反序列化失败");
return;
}
// 2.根据不同消息进行不同处理
std::string file_id, file_name, content; // 文本消息存储在content,非文本消息的content为空,其存储在"file_id"里
int64_t file_size;
switch (messageinfo.message().message_type())
{
case MessageType::STRING:
{
// 文本信息保存在ES
content = messageinfo.message().string_message().content();
res = _es_message->Append_Message(messageinfo.chat_session_id(),
messageinfo.message_id(),
messageinfo.sender().user_id(),
messageinfo.timestamp(),
content);
break;
}
// 非文本信息上传到文件管理子服务中进行报错
case MessageType::IMAGE:
{
file_size = messageinfo.message().image_message().image_content().size();
res = Put_File("",
file_size,
messageinfo.message().image_message().image_content(),
file_id);
break;
}
case MessageType::FILE:
{
file_size = messageinfo.message().file_message().file_size();
file_name = messageinfo.message().file_message().file_name();
res = Put_File(file_name,
file_size,
messageinfo.message().file_message().file_contents(),
file_id);
break;
}
case MessageType::SPEECH:
{
file_size = messageinfo.message().speech_message().file_contents().size();
res = Put_File("",
file_size,
messageinfo.message().speech_message().file_contents(),
file_id);
break;
}
default:
{
LOG_ERROR("消息队列内消息 {} 类型错误!", messageinfo.message_id());
return;
}
if (res == false)
{
LOG_ERROR("文本/图片/文件/语音消息 {} 往ES上传失败", messageinfo.message_id());
return;
}
}
// 3.提取原信息存储到mysql数据库中
Message message;
message.Set_sender_id(messageinfo.sender().user_id());
message.Set_content(content);
message.Set_file_id(file_id);
message.Set_create_time(boost::posix_time::from_time_t(messageinfo.timestamp()));
message.Set_file_name(file_name);
message.Set_file_size(file_size);
message.Set_message_id(messageinfo.message_id());
message.Set_message_type(messageinfo.message().message_type());
message.Set_session_id(messageinfo.chat_session_id());
res = _messagetable->Insert(message);
if (res == false)
{
LOG_ERROR("文本/图片/文件/语音消息 {} 往mysql上传失败", messageinfo.message_id());
return;
}
LOG_INFO("结束Rabbitmqclient_Cb");
}
亮点:
1.从RabbitMQ获取消息并根据消息类型将文本消息存储入mysql进行持久化存储以及ES中方便后续消息搜索的高效,非文本消息则上传到文件管理子服务中进行消息的持久化保存。
用户管理子服务
负责业务:
负责用户注册、用户登录、用户换绑邮箱、用户邮箱登录、用户昵称设置、用户头像设置等诸多关于用户的操作内容。
user.proto
service UserService {
rpc UserRegister(UserRegisterReq) returns (UserRegisterRsp);
rpc UserLogin(UserLoginReq) returns (UserLoginRsp);
rpc GetPhoneVerifyCode(PhoneVerifyCodeReq) returns
(PhoneVerifyCodeRsp);
rpc PhoneRegister(PhoneRegisterReq) returns
(PhoneRegisterRsp);
rpc PhoneLogin(PhoneLoginReq) returns (PhoneLoginRsp);
rpc GetUserInfo(GetUserInfoReq) returns (GetUserInfoRsp);
rpc GetMultiUserInfo(GetMultiUserInfoReq) returns
(GetMultiUserInfoRsp);
rpc SetUserAvatar(SetUserAvatarReq) returns
(SetUserAvatarRsp);
rpc SetUserNickname(SetUserNicknameReq) returns
(SetUserNicknameRsp);
rpc SetUserDescription(SetUserDescriptionReq) returns
(SetUserDescriptionRsp);
rpc SetUserPhoneNumber(SetUserPhoneNumberReq) returns
(SetUserPhoneNumberRsp);
}
//用户名登录
message UserLoginReq {
string request_id = 1;
string nickname = 2;
string password = 3;
optional string verify_code_id = 4;//目前客户端实现了本地验证功能,该字段废弃
optional string verify_code = 5;//目前客户端实现了本地验证功能,该字段废弃
}
message UserLoginRsp {
string request_id = 1;
bool success = 2;
string errmsg = 3;
string login_session_id = 4;
}
//结构化字段代码太多,这里不一一列举,文章末尾会有项目代码地址奉上。
user_server.hpp
// 对用户登录请求进行处理
virtual void UserLogin(google::protobuf::RpcController *controller,
const ::myim::UserLoginReq *request,
::myim::UserLoginRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
auto error_func = [response](const std::string &rid, const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
// 1. 从请求中取出昵称和密码
std::string nickname = request->nickname();
std::string password = request->password();
// 2. 通过昵称获取用户信息,进行密码是否一致的判断
auto user = _mysql_user->Select_By_Nickname(nickname);
if (!user || user->password() != password)
{
if (!user)
{
LOG_ERROR("用户不存在");
}
if (user->password() != password)
{
LOG_ERROR("密码不对");
LOG_ERROR("{}", password);
LOG_ERROR("{}", user->password());
}
LOG_ERROR("用户登录请求: {} , 用户名或密码错误", request->request_id());
error_func(request->request_id(), "用户名或密码错误");
return;
}
// 3. 根据 redis 中的登录标记信息是否存在判断用户是否已经登录。
if (_status_redis->Exist(user->user_id()))
{
LOG_ERROR("用户登录请求: {} , 用户已在别处登录", request->request_id());
error_func(request->request_id(), "用户已在别处登录");
return;
}
// 4. 构造会话 ID,生成会话键值对,向 redis 中添加会话信息以及登录标记信息
std::string sid = uuid();
_status_redis->Append(user->user_id());
_session_redis->Append(sid, user->user_id());
// 5. 组织响应,返回生成的会话 ID
response->set_request_id(request->request_id());
response->set_success(true);
response->set_login_session_id(sid);
}
// 设置用户昵称
virtual void SetUserNickname(google::protobuf::RpcController *controller,
const ::myim::SetUserNicknameReq *request,
::myim::SetUserNicknameRsp *response,
::google::protobuf::Closure *done)
{
brpc::ClosureGuard done_closure(done);
auto error_func = [response](const std::string &rid, const std::string &errmsg)
{
response->set_request_id(rid);
response->set_errmsg(errmsg);
response->set_success(false);
};
// 1. 从请求中取出用户 ID 与新的昵称
std::string uid = request->user_id();
std::string nickname_new = request->nickname();
// 2. 判断昵称格式是否正确
if (Legal_Nickname(nickname_new) == false)
{
LOG_ERROR("设置用户昵称请求: {}, 昵称不合法: {}", request->request_id(), nickname_new);
error_func(request->request_id(), "昵称不合法");
return;
}
// 3. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在
auto user = _mysql_user->Select_By_User_id(uid);
if (!user)
{
LOG_ERROR("设置用户昵称信息请求: {} , 用户信息未找到: {}", request->request_id(), uid);
error_func(request->request_id(), "用户信息未找到");
return;
}
// 4. 将新的昵称更新到数据库中
user->set_nickname(nickname_new);
bool res = _mysql_user->Update(user);
if (!res)
{
LOG_ERROR("设置用户昵称信息请求: {} , mysql数据库更新用户信息失败: {}", request->request_id(), uid);
error_func(request->request_id(), "mysql数据库更新用户信息失败");
return;
}
// 5. 更新 ES 服务器中用户信息
res = _user_es->Append_User(user->user_id(), user->nickname(), user->description(), user->phone(), user->avatar_id());
if (!res)
{
LOG_ERROR("设置用户昵称信息请求: {} , ES搜索引擎更新用户信息失败: {}", request->request_id(), uid);
error_func(request->request_id(), "ES搜索引擎更新用户信息失败");
return;
}
// 6. 组织响应,返回更新成功与否
response->set_request_id(request->request_id());
response->set_success(true);
}
亮点:
1.始终使用ES对用户昵称、用户签名等信息进行维护,保证后续进行用户搜索操作时的高效性与正确性。
2.将用户信息存储进mysql数据库中,保证重要信息的安全性。
3.使用redis进行用户登录状态的快速判断,防止账号多方登录,保障安全性。
4.多登陆方式的支持(昵称密码登录/绑定邮箱验证码登录)
语音识别子服务
负责业务:
负责将语音转换为文字。
speech.proto
service SpeechService {
rpc SpeechRecognition(SpeechRecognitionReq) returns
(SpeechRecognitionRsp);
}
speech_server.hpp
virtual void SpeechRecognition(google::protobuf::RpcController *cntl_base,
const myim::SpeechRecognitionReq *request,
myim::SpeechRecognitionRsp *response,
::google::protobuf::Closure *done)
{
response->set_request_id(request->request_id());
brpc::ClosureGuard done_closure(done);
std::string errs;
std::string ret = _client->GetData(request->speech_content(), errs);
LOG_DEBUG("当前执行GetData函数完毕");
if (ret.empty())
{
LOG_ERROR("{} 语音识别失败: {}", request->request_id(), errs.c_str());
response->set_success(false);
response->set_errmsg("语音识别失败: " + errs);
return;
}
response->set_success(true);
response->set_recognition_result(ret);
}
亮点:
1.使用百度语音服务,高效快捷、简单易懂,同时支持多种语言方便后续功能扩展。
子服务的总体实现流程:
1.进行参数/配置文件的解析(gflags),获取程序运行参数。
2.初始化各个功能子模块的句柄(如redis、mysql、ES、registry、discovery)。
3.搭建rpc服务器。
4.进行服务注册。
5.不同子服务可以搭建在不同的服务器上进行协作,做到了解耦。
CMake
为了提高开发效率,使用了CMake来对项目代码进行统一编译。
Friend目录下的CMakeLists.txt
# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)
# 2. 声明工程名称
project(friend_server)
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")
# 3. 检测并生成file框架代码
# 1. 添加所需的file映射代码文件名称
set(target "friend_server")
#------------------------------客户端所需------------------------------#
set(friend_client "friend_client")
#------------------------------客户端所需------------------------------#
set(proto_path ${CMAKE_CURRENT_SOURCE_DIR}/../proto) #仅路径无文件。指明.proto文件所在的目录路径,用于指定Protobuf编译器的输入文件路径
set(proto_files "") #仅文件无路径。存储需要处理的.proto文件列表
list(APPEND proto_files base.proto)
list(APPEND proto_files message.proto)
list(APPEND proto_files friend.proto)
list(APPEND proto_files user.proto)
# 检测并生成proto框架代码
set(proto_ccs "") #仅文件无路径。存储所有由Protobuf编译器生成的.cc源文件名称
set(proto_cc "") #仅文件无路径。用于临时存储单个proto文件生成的.cc文件名称(在循环中使用)
# 2. 检测框架代码文件是否已经生成
foreach(proto_files ${proto_files}) #filefile依次存储filefiles中的.proto文件
string(REPLACE ".proto" ".pb.cc" proto_cc ${proto_files})
# 3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码
if(NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})
add_custom_command(
PRE_BUILD
COMMAND protoc
ARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR} -I ${proto_path} --experimental_allow_proto3_optional ${proto_path}/${proto_files}
DEPENDS ${proto_path}/${proto_files}
OUTPUT ${proto_cc}
COMMENT "生成proto框架代码文件: " ${proto_cc}
)
endif()
# 4. 将所有生成的proto框架源码文件名称保存起来
list(APPEND proto_ccs ${proto_cc})
endforeach()
# 检测并生成ODB框架代码
set(odbpaths ${CMAKE_CURRENT_SOURCE_DIR}/../odb) #仅路径无文件
set(odbfiles "") #仅文件无路径
list(APPEND odbfiles chat_session.hxx)
list(APPEND odbfiles chat_session_member.hxx)
list(APPEND odbfiles friend_apply.hxx)
list(APPEND odbfiles relation.hxx)
set(odbsrcs "") #仅文件无路径
set(odb_cxx "") #仅文件无路径
# 检测框架代码文件是否已经生成
foreach(odbfile ${odbfiles})
string(REPLACE ".hxx" "-odb.cxx" odb_cxx ${odbfile})
# 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码
if(NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx})
add_custom_command(
PRE_BUILD
COMMAND odb
ARGS -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time ${odbpaths}/${odbfile}
DEPENDS ${odbpaths}/${odbfile}
OUTPUT ${odb_cxx}
COMMENT "生成odb框架代码文件: " ${odb_cxx}
)
endif()
# 3.24 将所有生成的odb框架源码文件名称保存起来
list(APPEND odbsrcs ${odb_cxx})
endforeach()
# 4. 获取源代码目录下的所有源码文件
set(srcfiles_cc "") #有路径有文件。srcfiles_cc表示的是C相关的源码文件
list(APPEND srcfiles_cc ${CMAKE_CURRENT_SOURCE_DIR}/source/friend_server.cc)
# 5. 声明目标及依赖
add_executable(${target} ${srcfiles_cc} ${CMAKE_CURRENT_BINARY_DIR}/${proto_ccs} ${CMAKE_CURRENT_BINARY_DIR}/${odbsrcs})
#------------------------------客户端所需------------------------------#
#friend_client
set(friend_cc_clients "") #有路径有文件。transmite_cc_clients表示的是C相关的源码文件
list(APPEND friend_cc_clients ${CMAKE_CURRENT_SOURCE_DIR}/test/friend_client.cc)
add_executable(${friend_client} ${friend_cc_clients} ${CMAKE_CURRENT_BINARY_DIR}/${proto_ccs})
#------------------------------客户端所需------------------------------#
# 6. 设置头文件默认搜索路径
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../common)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../odb)
include_directories(${CMAKE_CURRENT_BINARY_DIR}) #这是为了Protobuf生成的头文件(file.pb.h)会输出到构建目录中,需要让编译器能够找到这些文件
include_directories(${proto_path})
# 7. 设置需要连接的库
target_link_libraries(${target} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -lcpprest -lcurl
/usr/local/lib/libjsoncpp.so.19 -lodb-mysql -lodb -lodb-boost -lcpr -lpthread -lamqpcpp -lev -letcd-cpp-api -lelasticlient)
#------------------------------客户端所需------------------------------#
target_link_libraries(${friend_client} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -lcpprest -lcurl
/usr/local/lib/libjsoncpp.so.19 -lodb-mysql -lodb -lodb-boost -lcpr -lpthread -lamqpcpp -lev -letcd-cpp-api -lgtest)
#------------------------------客户端所需------------------------------#
INSTALL(TARGETS ${target} ${friend_client} RUNTIME DESTINATION bin) #设置安装路径,将${target}安装到/usr/local/bin/目录下
统一编译各个子服务模块
# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)
# 2. 声明工程名称
project(im_server)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/file)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/message)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/speech)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/transmite)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/user)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/friend)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/gateway)
set(CMAKE_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR})
Shell脚本
为了在项目部署环节中对程序依赖文件的统一载入 以及 配合docker-compose.yml进行镜像+容器的统一部署,我使用shell脚本。
depends.sh
#!/bin/bash
declare depends
Get_Depends()
{
depends=$(ldd $1 | awk '{if (match($3,"/")){print $3}}')
mkdir $2
cp -Lr ${depends} $2
}
Get_Depends ./gateway/build/gateway_server ./gateway/depends
Get_Depends ./file/build/file_server ./file/depends
Get_Depends ./user/build/user_server ./user/depends
Get_Depends ./friend/build/friend_server ./friend/depends
Get_Depends ./message/build/message_server ./message/depends
Get_Depends ./speech/build/speech_server ./speech/depends
Get_Depends ./transmite/build/transmite_server ./transmite/depends
Get_Depends ./user/build/user_server ./user/depends
#拷贝可执行程序的依赖文件进入子服务目录下
cp /bin/nc ./gateway
cp /bin/nc ./file
cp /bin/nc ./user
cp /bin/nc ./friend
cp /bin/nc ./message
cp /bin/nc ./speech
cp /bin/nc ./transmite
Get_Depends /bin/nc ./gateway/depends
Get_Depends /bin/nc ./file/depends
Get_Depends /bin/nc ./user/depends
Get_Depends /bin/nc ./friend/depends
Get_Depends /bin/nc ./message/depends
Get_Depends /bin/nc ./speech/depends
Get_Depends /bin/nc ./transmite/depends
Get_Depends /bin/nc ./user/depends
#拷贝nc的依赖文件进入子服务目录下
entrypoint.sh
#本脚本作为容器启动的入口点
#!/bin/bash
#./entrypoint.sh -h 127.0.0.1 -p 3306,2379,6379 -c '/im/bin/friend_server -flagfile=./friend_server .conf'
# 1.端口连接不上则循环等待,保证子服务启动时所需端口一定就绪
Wait_For()
{
while ! nc -z $1 $2;
do
echo "wait for $1: $2";
sleep 1;
done
}
declare DEPENDS_PORT
declare DEPENDS_HOST
declare CMD
# 2.对运行脚本的参数进行解析,获取到ip, port, command
while getopts "h:p:c:" arg
do
case $arg in
h)
DEPENDS_HOST=$OPTARG ;;
p)
DEPENDS_PORT=$OPTARG ;;
c)
CMD=$OPTARG ;
esac
done
# 3.通过执行脚本进行端口检测
for port in ${DEPENDS_PORT//,/ }
do
Wait_For $DEPENDS_HOST $port
done
# 4.执行command
eval $CMD
Docker
为了统一打包服务及其依赖并生成镜像启动容器,我使用了Docker。
friend子服务目录下的Dockfile
#声明基础镜像来源
FROM ubuntu:22.04
#声明工作路径
WORKDIR /im
RUN mkdir -p /im/logs &&\
mkdir -p /im/data &&\
mkdir -p /im/conf &&\
mkdir -p /im/bin
#将可执行程序拷贝进镜像中
COPY ./build/friend_server /im/bin
#将依赖文件拷贝进镜像中
COPY ./depends/ /lib/x86_64-linux-gnu/
COPY ./nc/ /bin/
#设置容器的默认启动操作
CMD /im/bin/friend_server -flagfile=/im/conf/friend_server.conf
docker-compose.yml
#统一生成生成镜像+部署
services:
etcd:
image: quay.io/coreos/etcd:v3.3.25
container_name: etcd-service
environment:
#定义 etcd 节点的唯一名称
- ETCD_NAME=etcd-s1
#指定 etcd 的数据持久化目录
- ETCD_DATA_DIR=/var/lib/etcd
#指定 etcd 监听客户端请求的地址和端口
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
#对外 “公告” 的客户端访问地址
- ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379
volumes:
# 1. 使容器内的程序能够访问宿主机上的文件
# 2. 使容器内程序运行所产生的数据文件能映射到宿主机上
- ./middle/data/etcd:/var/lib/etcd:rw
ports:
- 2379:2379
restart: always
mysql:
image: mysql:8.0.44
container_name: mysql-service
environment:
MYSQL_ROOT_PASSWORD: 111111
volumes:
# 1. 使容器内的程序能够访问宿主机上的文件
# 2. 使容器内程序运行所产生的数据文件能映射到宿主机上
- ./middle/data/mysql:/var/lib/mysql:rw
- ./sql:/docker-entrypoint-initdb.d/:rw
ports:
- 3306:3306
restart: always
…………
file_server:
build: ./file
container_name: file_server-service
volumes:
# 1. 使容器内的程序能够访问宿主机上的文件
# 2. 使容器内程序运行所产生的数据文件能映射到宿主机上
- ./conf/file_server.conf:/im/conf/file_server.conf
- ./middle/data/logs:/im/logs:rw
- ./middle/data/data:/im/data:rw
- ./entrypoint.sh:/im/bin/entrypoint.sh
ports:
- 8081:8081
restart: always
entrypoint:
/im/bin/entrypoint.sh -h 192.168.0.185 -p 2379 -c "/im/bin/file_server -flagfile=/im/conf/file_server.conf"
depends_on:
- etcd
friend_server:
build: ./friend
container_name: friend_server-service
volumes:
# 1. 使容器内的程序能够访问宿主机上的文件
# 2. 使容器内程序运行所产生的数据文件能映射到宿主机上
- ./conf/friend_server.conf:/im/conf/friend_server.conf
- ./middle/data/logs:/im/logs:rw
- ./middle/data/data:/im/data:rw
- ./entrypoint.sh:/im/bin/entrypoint.sh
ports:
- 8085:8085
restart: always
entrypoint:
/im/bin/entrypoint.sh -h 192.168.0.185 -p 2379,3306,9200 -c "/im/bin/friend_server -flagfile=/im/conf/friend_server.conf"
depends_on:
- etcd
- mysql
- elas
而后输入docker compose up即可直接完成项目的镜像生成+容器的一键启动。
项目文档:
由于本项目体量于我而言并不小,故使用项目文档将其中细节进行清晰标注:
客户端和etcd服务器通信的端口 2379
客户端和ES服务器通信的端口 9200
客户端和Kibana服务器通信的端口 5601
客户端和redis服务器通信的端口 6379
客户端和MySQL服务器通信的端口 3306
客户端和MQrabbit服务器通信的端口 15672
语音识别子服务部署在8080端口
文件管理子服务部署在8081端口
用户管理子服务部署在8082端口
消息转发子服务部署在8083端口
消息存储子服务部署在8084端口
好友管理子服务部署在8085端口
网关控制子服务部署在8086端口
websocket_port使用的是10000端口
http_port使用的是10001端口
子服务名称前缀统一为 "/service"
语音识别子服务名称为 /service/speech/speech_service
文件管理子服务名称为 /service/file/file_service
用户管理子服务名称为 /service/user/user_service
消息转发子服务名称为 /service/transmite/transmite_service
消息存储子服务名称为 /service/message/message_service
好友管理子服务名称为 /service/friend/friend_service
关于脚本的作用及管理:
/root/xianzun/microservice_project_server/depends.sh:
将nc指令与对应子服务可执行程序的依赖文件拷贝进入对应子服务文件夹中以便于对应子服务文件夹中的dockfile能够将该子服务可执行程序的依赖文件拷贝进入镜像中。
/root/xianzun/microservice_project_server/entrypoint.sh:
因为docker compose up后,depends依赖文件并不会一定比对应子服务先开,故而需要本脚本确保对应端口可以连接(使用nc指令进行端口的检测)
关于docker对于项目部署的维护:
/root/xianzun/microservice_project_server/docker-compose.yml:
将指定的镜像生成容器、指定容器名、设置环境变量(完成内部的初始化)、完成文件的映射(使得容器内部可以直接操作宿主机上的文件)、端口的映射(注释1)、\
设置 容器内程序退出后总是重启 。
对7个子服务的额外安排:build可以指定目录下进行镜像的重新构造、entrypoint可以设置容器启动时执行的命令、depends_on设置了其依赖的服务(使得其依赖的服务\
先启动而本服务后启动)
注释1:Docker 容器的核心特性之一是网络隔离:每个容器都会被分配一个独立的网络命名空间,拥有自己专属的网卡、IP 地址和端口号,与宿主机、其他容器的\
网络环境完全隔离开,因此需要将宿主机上的端口映射到容器内部,而后外界才能通过访问宿主机的映射端口从而访问到该容器的对应端口。
注:
Docker 容器默认情况下能访问宿主机的端口,但绝对不能用 localhost/127.0.0.1 访问,而是需要通过宿主机的实际可访问 IP / 专属 DNS 名称,\
才能访问到宿主机上的开放端口。(本项目使用通过ifconfig获取的宿主机eth0对应的 IP 地址用来让容器访问宿主机端口)
更多推荐
所有评论(0)