一个c++RPC实现
一个c++RPC实现简述1.线程池2.RPC2.1RpcServer2.2RpcClient3.测试简述 1.本文主要由三个部分,第一个是线程池的实现,第二个是RPC的实现,第三个是一些测试的代码。 2.ROC网络的部分使用我另一篇文章的网络服务实现,所以这里的RPC实现完全没有一行关于网络的代码。 3.因为使用的是现成的网络轮子,很多地方的接口也不得不受限于其提供的接口。1.线程池 这个
一个c++RPC实现
一个内存泄露问题(已修改)
①昨天发现程序里大概8000次登陆操作会泄露2M的内存,然后看了一下,注释掉一行代码就没有泄露了,该行代码是把阻塞的RPC调用放到线程池里执行,这样服务器就不会阻塞。
②执行2W次阻塞的RPC调用后,内存并没有增长,问题来到了线程池这边。
③调用MfEnqueue把任务放进线程池会导致其返回一个future对象,由该future对象异步取得任务的执行结果,因为在代码中没有对这个future操作,尝试对该future对象调用get(),内存没有增长
③但是问题也来了,有些情况下程序里确实不需要用future取得执行结果,只是丢进去执行而已
④然后我写了一个MfEnqueue_NoneReturnValue,这个函数里不使用packaged_task,也就不会生成future对象,只是把函数包裹成std::function<void()>入队,但是我使用这个函数后,不论是windows还是linux是,内存泄露还是存在。
⑤折磨了几小时后,实在没有办法搜了一下std::queue内存泄露,发现确实有queue.pop之后不释放内存的现象,解决方案是使用list或者重新生成一个queue来swap,就可以释放内存
⑥增加了一个计数器,每执行2W次任务,就用这个swap操作来释放内存。
⑦在windows上,swap确实解决了泄露的问题,但是Linux上用top观察,执行2W次后并没有降低内存,后来看到linux的进程不会归还内存,而是保留下来供以后用,既然是操作系统的问题就不纠结了。
⑧最后,③中为什么future调用get操作后会导致内存降下来我也没搞清楚,代码就那么几行,future也都是临时对象,也没有哪里存储啊
已修改,两处使用strncpy复制结构体,这里的测试是一个int类型,遇到strnpc复制4字节,就停下了,但是当复制一个复杂的结构体,而这个结构体中有char*时,遇到\0就停下了,更换为memcpy解决。
另一个问题是关于future的使用,原来使用了valid成员函数来判断结果是否可用,而valid返回的是一个结果是否被关联到了future对象,并不代表结果是否可用,这里使用了wait_for来等待结果可用,不使用wait的原因是不想阻塞。
还有另一个没有解决的疑惑,在另一个项目中,我可以使用future::Is_Ready成员函数来直接判断结果是否可用,但是我在当前项目里,怎么也无法找到这个函数,所以使用wait_for来替代。我使用vs分别打开了这两个项目,然后转到future的定义,这个两个vs窗口都转到了<future>文件,但是他们的定义居然不一样:
在future::Is_Ready能用的那个项目里,future的定义是850多行,
不能用的那个项目里,也就是本页这个,future的定义是761行,
尝试比对这两处,他们都不一样,我也查看了项目标准,都是c++17,不明白为什么
简述
1.本文主要由三个部分,第一个是线程池的实现,第二个是RPC的实现,第三个是一些测试的代码。
2.ROC网络的部分使用我另一篇文章的网络服务实现,所以这里的RPC实现完全没有一行关于网络的代码。
3.因为使用的是现成的网络轮子,很多地方的接口也不得不受限于其提供的接口。
1.线程池
这个线程池是我直接拿来别人的改改,来自link.
threadpool.h文件
#pragma once
#include <vector>
#include <thread>
#include <queue>
#include <condition_variable>
#include <functional>
#include <future>
#include <list>
class CThreadPool
{
private:
std::vector<std::thread> MdPool; // 线程池
std::queue<std::function<void()>> MdTasks; // 提交的任务队列
std::mutex MdQueueMutex; // 队列的锁
std::condition_variable MdQueueCondition; // 队列的条件变量
std::atomic<bool> MdIsStop; // 队列停止时使用
int MdCount = 0;
std::mutex MdCountMutex;
public:
CThreadPool();
~CThreadPool();
void MfStart(size_t threads = 5);
bool MfIsStop() { return MdIsStop; };
private:
void MfTheadFun();
public:
template<class F, class... Args>
auto MfEnqueue(F&& f, Args&&... args)->std::future<typename std::result_of<F(Args...)>::type>;
};
// 后置返回类型,提取出参数F的返回值类型
// 模板成员需要写在,h中
template<class F, class... Args>
auto CThreadPool::MfEnqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(MdQueueMutex);
MdTasks.emplace([task]() { (*task)(); });
}
MdQueueCondition.notify_one();
return res;
}
threadpool.cpp文件
#include "ThreadPool.h"
CThreadPool::CThreadPool() :MdIsStop(false) {}
CThreadPool::~CThreadPool()
{
MdIsStop = true;
MdQueueCondition.notify_all();
for (std::thread& worker : MdPool)
worker.join();
}
void CThreadPool::MfStart(size_t threads) // 线程不应该在构造函数中启动,因为这些线程使用了数据成员
{
for (size_t i = 0; i < threads; ++i)
MdPool.push_back(std::thread(&CThreadPool::MfTheadFun, this));
}
void CThreadPool::MfTheadFun()
{
while (1)
{
std::function<void()> task; // 要执行的任务
{
std::unique_lock<std::mutex> lock(MdQueueMutex);
MdQueueCondition.wait(lock, [this] { return this->MdIsStop || !this->MdTasks.empty(); });
if (this->MdIsStop && this->MdTasks.empty())
return;
task = this->MdTasks.front();
this->MdTasks.pop();
{
std::unique_lock<std::mutex> lock2(MdCountMutex);
if (++MdCount > 20000)
{
MdCount = 0;
std::queue<std::function<void()>>(MdTasks).swap(MdTasks);
}
}
}
task();
}
}
2.RPC
RPC的server和client是写在同一个文件里的,但是这里分开来说明。
这里是server和client公用的一些东西。
过程的函数签名应该像RemoteProc那样,比如:std::shared_ptr<char[]> testf2(std::shared_ptr<char[]> a)
RpcMsg的定义就像unp卷二里描述的那样,只是一个指针,这个地址上的数据有多长,如何解释,完全自定义。
RpcCS.h
// 所有远程调用的函数应当遵循此接口
// 返回值是一个智能指针,指向一个char[]
// 参数是一个智能指针,指向一个char[]
// 使用智能指针而不是直接使用char*,是因为很多地方都是跨线程传递的,方便在调用中的内存管理
// 不论是返回值还是参数,都在void的基础上自定义结构解析
typedef std::function
<
std::shared_ptr<char[]> (std::shared_ptr<char[]>)
> RemoteProc;
enum RpcNetMsgCmd
{
RpcProc_NOON = 0 // 若CNetMsgHead::MdCmd为该值,
// 其余自定义的过程号都应该大于0
};
struct CNetMsgHead // 这个结构是搬过来说明的,他的定义不放在这里
{
int MdLen; // 该包总长度
int MdCmd; // 该包执行的操作
CNetMsgHead()
{
MdLen = sizeof(CNetMsgHead);
MdCmd = -1; // 该值为-1时默认为心跳包
}
};
struct RpcMsg :public CNetMsgHead
{
// CNetMsgHead::MdLen 该成员依旧代表整个包的长度
// CNetMsgHead::MdCmd 该成员不再代表某个操作,而是直接代表要调用的那个过程号CallNo
void* MdData; // 数据指针
// 在server中,收到该结构MdData表示参数,发送该结构MdData表示返回值
// 在client中,收到该结构MdData表示返回值,发送该结构MdData表示参数
// 发送时不设置该成员,而是直接写数据到缓冲区
// 从缓冲区取出时使用这个成员
RpcMsg()
{
MdData = nullptr;
MdCmd = 0;
}
};
2.1RpcServer
RpcCS.h
MdCallList
:服务器要执行的过程调用,都存放在这里,存储了:
①过程号
②对应的过程函数地址
③该过程返回值的长度
④该过程参数的长度
MdCallListMutex
:因为主线程和复写的网络消息处理线程都会访问该列表,这里使用了读写锁。
MdProcRet
:每当有一个过程调用被放进线程池执行,代表其结果的对象就会放进这个表,存储了:
①对应客户端连接的CSocketObj*对象
②代表执行过程的过程号
③可以异步取得执行结果的std::future<std::shared_ptr<char[]>>
MdProcRetMutex
:MdProcRet是会被异步访问的见,MfCallRetrun
MfStart
:该函数先启动一个线程池,从客户端递送过来的远程调用会被送进该线程池执行
MfCallRetrun
:这是一个线程,它会在MftSart中被丢到线程池中执行,轮询MdProcRet中是否有结果可用,并把结果写回客户端,相当于MdProcRet的消费者。
MfVNetMsgDisposeFun
:是基类提供的网络消息处理函数,当收到消息,会调用这个虚函数来处理,另一个身份是MdProcRet的生产者。
class CRpcServer :private CServiceNoBlock
{
private:
CThreadPool MdThreadPool; // 执行过程时的线程池
std::map<int, std::tuple<RemoteProc, int, int>> MdCallList; // 注册的过程表,分别描述过程号、过程函数,参数的长度、返回值的长度
std::shared_mutex MdCallListMutex; // 该表的互斥元
std::map<CSocketObj*, std::tuple<int, std::future<std::shared_ptr<char[]>>>> MdProcRet; // 过程放入线程池执行时会返回一个std::future<void*>,以便后续异步取得该过程的返回值
// 分别描述客户端连接、执行的过程号、可以取得返回值的future对象
std::shared_mutex MdProcRetMutex; // 过程结果集的锁
public:
CRpcServer(int HeartBeatTime = 300, int ServiceMaxPeoples = 100, int DisposeThreadNums = 1);
virtual ~CRpcServer();
void MfStart(const char* ip, unsigned short port, int threadNums = 3); // 启动收发线程和线程池,threadNums代表线程池线程数量
int MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen); // 注册一个过程
int MfRemoveCall(int CallNo); // 移除一个过程
private:
void MfCallRetrun();
virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};
RpcCS.cpp
这里主要说明MfVNetMsgDisposeFun
的思路:
①从msg参数中取得要调用的过程号,到MdCallList中寻找,如果找不到对应过程,就给客户端发回一个过程号为RpcProc_NOON(0)的包,表示该过程不存在,然后就没事了。
②把可以异步取得结果的future对象放入队列,生产者。
CRpcServer::CRpcServer(int HeartBeatTime, int ServiceMaxPeoples, int DisposeThreadNums) :
CServiceNoBlock(HeartBeatTime, ServiceMaxPeoples, DisposeThreadNums)
{
}
CRpcServer::~CRpcServer()
{
}
void CRpcServer::MfStart(const char* ip, unsigned short port, int threadNums)
{
MdThreadPool.MfStart(threadNums+1); // 线程池启动
MdThreadPool.MfEnqueue(&CRpcServer::MfCallRetrun, this);// 给对应客户端写回结果的线程丢尽线程池执行
return CServiceNoBlock::Mf_NoBlock_Start(ip, port); // 网络收发处理启动
}
int CRpcServer::MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen)
{
{
std::lock_guard<std::shared_mutex> write_lock(MdCallListMutex);
if (MdCallList.find(CallNo) != MdCallList.end())
{
printf("RempteProcReg CallNo <%d> already existed!\n", CallNo);
LogFormatMsgAndSubmit(std::this_thread::get_id(), ERROR_FairySun, "RempteProcReg CallNo <%d> already existed!\n", CallNo);
return -1;
}
MdCallList[CallNo] = std::make_tuple(Call, ArgLen, RetLen);
}
return 0;
}
int CRpcServer::MfRemoveCall(int CallNo)
{
{
std::lock_guard<std::shared_mutex> write_lock(MdCallListMutex);
auto it = MdCallList.find(CallNo);
if (it != MdCallList.end())
MdCallList.erase(it);
}
return 0;
}
void CRpcServer::MfCallRetrun()
{
while (!MdProcRet.empty() || !MdThreadPool.MfIsStop())
{
RpcMsg ret;
// for循环中 执行完成的远程调用 的CSocketObj*会被加入该队列,结束后统一从MdProcRet中移除
std::vector<CSocketObj*> removelist;
// 遍历MdProcRet,如果有一个过程可以取得结果,就把结果发回对应的socket
for (auto it = MdProcRet.begin(); it != MdProcRet.end(); ++it)
{
if (std::future_status::ready == std::get<1>(it->second).wait_for(std::chrono::milliseconds(1)))
//if (std::get<1>(it->second).valid()) // 如果结果可用
{
int procNo = std::get<0>(it->second);
int procRetSize = std::get<2>(MdCallList[procNo]);
ret.MdLen = sizeof(CNetMsgHead) + procRetSize;
ret.MdCmd = procNo;
std::shared_ptr<char[]> data = std::get<1>(it->second).get();
it->first->MfDataToBuffer((char*)&ret, sizeof(CNetMsgHead)); // 先写包头
it->first->MfDataToBuffer(data.get(), procRetSize); // 再写数据
removelist.push_back(it->first);
}
}
// 将执行完成的远程调用统一移除
{
std::unique_lock<std::shared_mutex> write_lock(MdProcRetMutex);
for (auto it = removelist.begin(); it != removelist.end(); ++it)
{
if (MdProcRet.find(*it) != MdProcRet.end())
MdProcRet.erase(*it);
}
}
std::this_thread::sleep_for(std::chrono::seconds(1)); // 暂停一秒防止执行过快
}
}
void CRpcServer::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
// 注册列表中没有找到对应的过程号,发回一个为0的消息,表示找不到对应过程
int flag = false;
RpcMsg ret;
{
std::shared_lock<std::shared_mutex> read_lock(MdCallListMutex);
if (MdCallList.find(msg->MdCmd) != MdCallList.end())
flag = true;
}
if (flag == false)
{
ret.MdLen = sizeof(RpcMsg);
ret.MdCmd = RpcProc_NOON;
cli->MfDataToBuffer((char*)&ret, ret.MdLen);
}
else // 否则就是找到了对应的过程,保存客户端对象和对应的future,放进结果表
{
int arglen = std::get<1>(MdCallList[msg->MdCmd]);
std::shared_ptr<char[]> buf(new char[arglen]);
//strncpy(buf.get(), ((char*)msg) + sizeof(CNetMsgHead), arglen);
memcpy(buf.get(), ((char*)msg) + sizeof(CNetMsgHead), arglen);
{
std::unique_lock<std::shared_mutex> write_lock(MdProcRetMutex);
MdProcRet[cli] = std::make_tuple
(
msg->MdCmd,
MdThreadPool.MfEnqueue(std::get<0>(MdCallList[msg->MdCmd]), buf)
);
}
}
}
2.2RpcClient
RpcCs.h
没有任何新的数据成员,只是简单封装了下CClientLinkManage
的接口
MfVNetMsgDisposeFun
:基类提供的网络消息处理函数,每收到一条消息就会调用一次,这里虽然复写了它,但是复写成了空函数,它什么都不做,因为客户端的远程过程调用应当是阻塞的。
MfRemote:
使用该函数来发起一个远程过程调用,第二三四参数分别指明了调用过程号、过程需要的数据地址、数据的长度。几乎全部的逻辑都集中在该函数中:
①声明一个RpcMsg结构,填写包头结构,然后把包头和数据分别写到对应的套接字里。
②循环检查是否有服务端的数据发回。
③有数据发回,检查服务器返回的过程号是否正确,不正确和0都会导致该函数返回空指针来表示失败。
④正确就从套接字中取出数据,然后写到一个智能指针标识的空间中返回。
class CRpcClient :private CClientLinkManage
{
private:
public:
CRpcClient();
virtual ~CRpcClient();
void MfStart(); // 启动收发线程
int MfConnectRpcServer(std::string Linkname, const char* ip, unsigned short port); // 连接Rpc服务
void MfCloseRpclink(std::string Linkname); // 关闭和Rpc的连接
std::shared_ptr<char[]> MfRemote(std::string Linkname, int CallNo, void* data, int DataSize); // 远程过程调用,阻塞等待
private:
virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};
RpcCs.cpp
CRpcClient::CRpcClient():
CClientLinkManage()
{
}
CRpcClient::~CRpcClient()
{
}
void CRpcClient::MfStart()
{
CClientLinkManage::MfStart();
}
int CRpcClient::MfConnectRpcServer(std::string Linkname, const char* ip, unsigned short port)
{
return CClientLinkManage::MfCreateAddLink(Linkname, ip, port);
}
void CRpcClient::MfCloseRpclink(std::string Linkname)
{
CClientLinkManage::MfCloseLink(Linkname);
}
std::shared_ptr<char[]> CRpcClient::MfRemote(std::string Linkname, int CallNo, void* data, int DataSize)
{
RpcMsg msg;
msg.MdLen = sizeof(CNetMsgHead) + DataSize;
msg.MdCmd = CallNo;
CClientLinkManage::MfSendData(Linkname, (char*)&msg, sizeof(CNetMsgHead)); // 先写包头
CClientLinkManage::MfSendData(Linkname, (char*)data, DataSize); // 再写数据
while (1)
{
if (!CClientLinkManage::MfHasMsg(Linkname)) // 如果没有数据
{
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
else
{
const char * buff = CClientLinkManage::MfGetRecvBufferP(Linkname);
if ( ((RpcMsg*)buff)->MdCmd == RpcProc_NOON ) // 收到了为0的callno
return nullptr;
else if ( ((RpcMsg*)buff)->MdCmd != CallNo ) // 收到的callno和发出去的callno不一样
return nullptr;
else
{
int retsize = ((RpcMsg*)buff)->MdLen - sizeof(CNetMsgHead); // 计算返回的数据长度
char* ret = new char[retsize]; // 申请空间
//strncpy(ret, ((char*)buff) + sizeof(CNetMsgHead) , retsize); // 复制到新申请的空间
memcpy(ret, ((char*)buff) + sizeof(CNetMsgHead) , retsize);
CClientLinkManage::MfPopFrontMsg(Linkname); // 缓冲区的消息弹出
return std::shared_ptr<char[]>(ret); // 转换成智能指针返回
}
}
}
}
void CRpcClient::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
}
3.测试
client.cpp
int main()
{
FairySunOfNetBaseStart(); // https://blog.csdn.net/qq_43082206/article/details/110383165
int i = 5;
CRpcClient c;
c.MfStart();
c.MfConnectRpcServer("rpc", "118.31.75.171", 4567);
for(int i = 0; i < 10; ++i)
printf("remote ret:%d\n", *(int*)c.MfRemote("rpc", 2, (void*)&i, sizeof(int)).get());
for (int i = 0; i < 3; ++i)
printf("remote ret:%d\n", *(int*)c.MfRemote("rpc", 1, (void*)&i, sizeof(int)).get());
getchar();
FairySunOfNetBaseOver();
return 0;
}
server.cpp
std::shared_ptr<char[]> testf1(std::shared_ptr<char[]> a)
{
printf("test11111\n");
char* ret = new char[sizeof(int)];
int s = *((int*)a.get()) + 1;
*(int*)ret = s;
std::this_thread::sleep_for(std::chrono::seconds(5));
return std::shared_ptr<char[]>(ret);
}
std::shared_ptr<char[]> testf2(std::shared_ptr<char[]> a)
{
printf("test22222\n");
char* ret = new char[sizeof(int)];
*(int*)ret = *(int*)a.get() + 2;
return std::shared_ptr<char[]>(ret);
}
int main()
{
FairySunOfNetBaseStart(); // https://blog.csdn.net/qq_43082206/article/details/110383165
CRpcServer s;
s.MfStart(0, 4567);
s.MfRegCall(1, testf1, sizeof(int), sizeof(int));
s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
//s.MfRemoveCall(1);
//s.MfRemoveCall(2);
getchar();
FairySunOfNetBaseOver();
return 0;
}
2021年11月16日16:19:38
更多推荐
所有评论(0)