目录

一,项目介绍

1.1 关于消息队列

1.2 开发环境

1.3 技术栈

二,环境搭建

2.1 更换软件源

2.2 基础工具安装

2.3 protobuf 安装

2.4 muduo库安装

2.5 sqlite+gtest安装

三,第三方框架

3.1 Protobuf介绍和使用

3.1.1 介绍

3.1.2 语法

3.1.3 编译

3.2 muduo库介绍和使用

3.2.2 接口介绍

3.2.3 实际应用

3.3 基于muduo库实现protobuf协议的通信

3.3.0 预备

3.3.1 proto文件

3.3.2 服务端

3.3.3 客户端

3.4 SQLite 介绍和使用

3.4.1 简单介绍

3.4.2 常用API

3.4.3 实战使用

3.5 GTest 介绍和使用

3.5.1 介绍

3.5.2 宏断言

3.5.3 事件机制


一,项目介绍

1.1 关于消息队列

我们以前介绍过“阻塞队列”,是一种常用于实现生产者消费者模型的一种数据结构,可以看下这篇文章:Linux系统编程——生产者消费者模型_linux实现生产者与消费者-CSDN博客

在实际的后端开发中,尤其是中大型的分布式系统里,跨主机之间使用生产者消费者模型也是非常普遍的。 因此,我们通常会把阻塞队列封装成⼀个独立的服务器程序,并且为其添加更多丰富且实用的功能。这样的服务程序我们就称为消息队列(MessageQueue,MQ)。

1.2 开发环境

  • Linux: Ubuntu-22.04 

  • VSCode 远程访问窗口
  • g++/gdb
  • Makefile

1.3 技术栈

  • 开发主语:C++
  • 序列化框架:Protobuf ⼆进制序列化
  • 网络通信:自定义应用层协议 + muduo库:对TCP长连接的封装,并使用 epoll 的事件驱动模式,实现高并发服务器与客户端,关于epoll:计算机网络(十二) —— 高级IO_轮询模式和主备模式-CSDN博客
  • 源数据信息数据库:SQLite3(数据持久化存储,比MySQL轻量,本地,不跨主机)
  • 单元测试框架:Gtest

二,环境搭建

2.1 更换软件源

首先是wget工具,作用是下载一些文件(一般系统默认自带)

sudo apt-get install wget

然后就是更换软件源,部分软件默认是从国外下载,下面改成国内的服务器:

先备份原软件源:

sudo cp /etc/apt/sources.list /etc/apt/sources.list.bak

打开软件源文件,删除原内容:

sudo vim /etc/apt/sources.list

更换源:

deb http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-proposed main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-proposed main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
deb-src http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
#添加清华源

deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal main restricted universe multiverse
deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal-updates main restricted universe multiverse
deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal-backports main restricted universe multiverse
deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal-security main restricted universe multiverse

最后更新源:

sudo apt-get update

2.2 基础工具安装

首先是 lrzsz,这个是一个文件传输工具,可以实现文件远距离传输:

sudo apt-get install lrzsz
rz --version

然后是编译器安装:

dyk@VM-16-14-ubuntu:~$ sudo apt-get install gcc g++

然后是项目构建工具make:

sudo apt-get install make

安装gdb调试器:

 sudo apt-get install gdb

安装Git

sudo apt-get install git
git --version

安装cmake

sudo apt-get install cmake
cmake --version

2.3 protobuf 安装

安装教程可以参考这篇文章:Protobuf学习(1)—— 初识与安装-CSDN博客

下面是通过代码来验证是否可行:

先创建一个 contacts.proto 文件,添加下列内容

syntax = "proto3";
package contacts;
message PeopleInfo 
{
    string name = 1;            
    int32 age = 2;  
}

再执行下面命令,会根据:我们上面的内容自动生成相关的C++代码:

 protoc --cpp_out=./ contacts.proto

然后是test.cc :

#include <iostream>
#include <string>
#include "contacts.pb.h" //引⼊编译⽣成的头⽂件 
using namespace std;      
int main() 
{       
    string people_str;
    //序列化    
    {
        // .proto⽂件声明的package,通过protoc编译后,会为编译⽣成的C++代码声明同名的命名空间
        // 其范围是在.proto ⽂件中定义的内容
        contacts::PeopleInfo people;
        people.set_age(20);
        people.set_name("张三");
        // 调⽤序列化⽅法,将序列化后的⼆进制序列存⼊string中
        if (!people.SerializeToString(&people_str)) cout << "序列化联系⼈失败." << endl;
        cout << "序列化后的 people_str: " << people_str.size() << endl;
    }
    //反序列化
    {
        contacts::PeopleInfo people;
        // 调⽤反序列化⽅法,读取string中存放的⼆进制序列,并反序列化出对象
        if (!people.ParseFromString(people_str)) cout << "反序列化出联系⼈失败." << endl;
        cout << "Parse age: " << people.age() << endl;
        cout << "Parse name: " << people.name() << endl;
    }
    return 0;
}  

2.4 muduo库安装

安装可以参考这篇文章:muduo网络库下载安装教程(超详细百分百成功)_muduo安装-CSDN博客

2.5 sqlite+gtest安装

安装SQLite3:

sudo apt-get install sqlite3 libsqlite3-dev
sqlite3 --version

安装Gtest:

 sudo apt-get install libgtest-dev

下面是测试代码:

#include <gtest/gtest.h>
int add(int a, int b) { return a + b; }
TEST(testCase, test1) { EXPECT_EQ(add(2, 3), 5); }
int main(int argc, char **argv)
{
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}

然后编译运行:

三,第三方框架

3.1 Protobuf介绍和使用

3.1.1 介绍

ProtoBuf(全称 Protocol Buffer )是一个数据结构序列化和反序列化框架,关于序列化和反序列化这篇文章有介绍过:

计算机网络(五) —— 自定义协议简单网络程序-CSDN博客

Protobuf 具有下面特点:

  • 无关语言、平台:可以使用C++、Java、Python 等多种语言,且支持大多数平台
  • 高效:比 XML 更轻量化,速度更快也更简单
  • 扩展性、兼容性强:数据结构可以随时更改,且不影响和破坏原程序逻辑

上面安装 Protobuf 时已经简单使用过,下面总结下使用流程

  1. 编写 .proto 文件,定义结构对象 (message) 和其属性内容
  2. 根据编写的语言使用对应的 protoc 命令编译 .proto 文件,生成新的头文件和源文件
  3. 依赖生成的接口,通过包含头文件的方式实现对 .proto 文件中自定义的字段进行设置和获取,对 message 对象进行序列化和反序列化

3.1.2 语法

就以我们上面的那个 contacts.proto 为例,可以分为三部分:

①指定语法

  • proto3 全称 Protocol Buffers 语言版本3,是 .proto 文件最新的语法版本。简化了使用,可以在更广泛的编程语言中使用。允许使用 Java,C++,Python 等多种语言生成 protocol buffer 代码。
  • 在 .proto 中,要使用 syntax = "proto3"; 来指定文件语法为 proto3,且必须写在除去注释内容的第一行。如果没有指定,编译器会使用 proto2 语法,可能会有意想不到的错误

②package 声明符(可选)

  • package 是⼀个可选的声明符,能表示 .proto 文件的命名空间,在项目中要有唯⼀性,作用是为 了避免我们定义的 message 消息出现冲突。

③定义消息

  • 表示要定义的结构化对象,可以给这个结构化对象中定义其对应的属性内容。
  • 在网络传输中,我们需要为传输双方定制协议,就是定义结构体或者结构化数据,常用的 Tcp,Udp报文就是结构化的。
  • 再比如将数据持久化存储到数据库时,会将一系列元数据统一用对象组织起来,再进行存储。
  • ProtoBuf就是以 message 的方式来支持我们定制协议字段,后期帮助我们成类和方法来使用。

在message中我们可以定义其属性字段,字段定义格式为:字段类型字段名 = 字段唯一编号

  • 字段名称命名规范:全小写字母,多个字母之间用 ‘ _ ’ 连接
  • 字段类型分为:标量数据类型和特殊类型(包括枚举、其他消息类型等)
  • 字段唯⼀编号:用来标识字段,⼀旦开始使用就不能够再改变

3.1.3 编译

编译的命令行格式为:

protoc [--proto_path=IMPORT_PATH] --cpp_out=DST_DIR path/to/file.proto 
  • protoc:是 Protocol Buffer 提供的命令行编译工具
  • --proto_path=IMPORT_PATH:指定被编译的 .proto 文件所在目录,可多次指定,可简写成 -I。如不指定该参数,则在当前目录进行搜索。当某个 .proto 文件 import 其他 .proto 文件或需要编译的 .proto 文件不在当前目录下,这时就要用 -I 来指定搜索目录
  • --cpp_out:指编译后的文件为 C++ 文件
  • OUT_DIR:编译后生成文件的目标路径
  • path/to/file.proto:要编译的 .proto 文件

编译 contacts.proto 文件并将结果输出在当前目录的命令如下:

protoc --cpp_out=. contacts.proto

具体的使用在上面安装环节已经介绍过了

3.2 muduo库介绍和使用

3.2.1 关于muduo库

Muduo由陈硕大佬开发,是一个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。是一款基 于主从Reactor模型的高性能服务器框架,使用的线程模型是 one loop per thread,指的是:

  • 一个线程只能有一个事件循环(EventLoop),用于响应计时器和IO事件
  • 一个文件描述符只能由一个线程进行读写,换句话说就是一个TCP连接必须归属于某个EventLoop 管理

关于主从Reactor模型:

  • Reactor:基于事件触发的模型(类似基于 epoll 进行IO事件监控)
  • 主从:将IO事件监控进行进一步层次划分,主Reactor只对新建连接事件进行监控,保证新建连接的获取不受IO阻塞影响;从Reactor针对新建连接进行IO事件监控,进行业务处理
  • 所以主从Reactor必然是一个多执行流的并发模式,一个事件监控占据一个线程

3.2.2 接口介绍

后面主要就是通过这几个类实例化的对象来实现服务器和客户端的搭建

①muduo::net::TcpServer 类介绍

typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
typedef std::function<void(const TcpConnectionPtr &,
                           Buffer *,
                           Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable
{
public:
    InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};

class TcpServer : noncopyable
{
public:
    enum Option
    {
        kNoReusePort,
        kReusePort,
    };
    TcpServer(EventLoop *loop,
              const InetAddress &listenAddr,
              const string &nameArg,
              Option option = kNoReusePort);
    void setThreadNum(int numThreads);
    void start();

    //当⼀个新连接建⽴成功的时候被调⽤
    void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
    //消息的业务处理回调函数-- 这是收到新连接消息的时候被调⽤的函数
    void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
};
  • 构造函数:有4个参数,第一个参数后面介绍;第二个参数表示服务端要监听和绑定的信息;第三个参数表示服务端的名称,字符串形式;第四个参数表示一个操作选项,表示“是否启用端口重用”,Tcp协议主动断开连接的一方会进入一个“TimeWait”状态,在这段时间内这个端口无法被使用,如果启用了这个选项,那么进入这个状态的端口可以立马被使用

  • setThreadNum :用来设置从属Reactor的数量,就是上面图片里的子Reactor的数量
  • start :服务器启动接口,包括启动线程池,开始事件监控等等
  • 后面两个回调函数的用法代码注释已解释,需要我们自己来实现

②muduo::net::EventLoop 类介绍

上面说了一个线程只能有一个事件循环(EventLoop),用于响应计时器和IO事件,简单来说就是每一个事件监控里都有一个EventLoop对象来完成事件监控业务处理等操作

class EventLoop : noncopyable
{
public:
    /// Loops forever.
    /// Must be called in the same thread as creation of the object.
    void loop();
    /// Quits loop.
    /// This is not 100% thread safe, if you call through a raw pointer,
    /// better to call through shared_ptr<EventLoop> for 100% safety.
    void quit();
    TimerId runAt(Timestamp time, TimerCallback cb);
    /// Runs callback after @c delay seconds.
    /// Safe to call from other threads.
    TimerId runAfter(double delay, TimerCallback cb);
    /// Runs callback every @c interval seconds.
    /// Safe to call from other threads.
    TimerId runEvery(double interval, TimerCallback cb);
    /// Cancels the timer.
    /// Safe to call from other threads.
    void cancel(TimerId timerId);

private:
    std::atomic<bool> quit_;
    std::unique_ptr<Poller> poller_;
    mutable MutexLock mutex_;
    std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};

最重要的接口其实就一个 loop,表示开始循环,后面几个都是计时器接口,我们暂时用不上

③muduo::net::TcpConnection 类介绍

在 TcpServer 类里后面的两个回调函数的参数的 MessageCallback 和 ConnectionCallback 都是有格式的:

我们可以在这个回调函数里对这个消息进行处理,然后通过连接对客户端进行一个响应

class TcpConnection : noncopyable,
                      public std::enable_shared_from_this<TcpConnection>
{
public:
  /// Constructs a TcpConnection with a connected sockfd
  ///
  /// User should not create this object.
  TcpConnection(EventLoop *loop,
                const string &name,
                int sockfd,
                const InetAddress &localAddr,
                const InetAddress &peerAddr);
  bool connected() const { return state_ == kConnected; }
  bool disconnected() const { return state_ == kDisconnected; }
  void send(string &&message); // C++11
  void send(const void *message, int len);
  void send(const StringPiece &message);
  // void send(Buffer&& message); // C++11
  void send(Buffer *message); // this one will swap data
  void shutdown();            // NOT thread safe, no simultaneous calling
  void setContext(const boost::any &context)
  {
    context_ = context;
  }
  const boost::any &getContext() const
  {
    return context_;
  }
  boost::any *getMutableContext()
  {
    return &context_;
  }
  void setConnectionCallback(const ConnectionCallback &cb)
  {
    connectionCallback_ = cb;
  }
  void setMessageCallback(const MessageCallback &cb)
  {
    messageCallback_ = cb;
  }

private:
  enum StateE
  {
    kDisconnected,
    kConnecting,
    kConnected,
    kDisconnecting
  };
  EventLoop *loop_;
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  WriteCompleteCallback writeCompleteCallback_;
  boost::any context_;
};

这个类我们目前能用上的就 send 发送消息和 shutdown 关闭连接两个,还有 connect 接口用来判断连接状态

后面还有 TcpClient类和 Buffer 类,里面的接口后面再介绍

3.2.3 实际应用

下面搜索服务器客户端的一个简单翻译程序(详细注释版):

TestServer.hpp

#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include <iostream>
#include <functional>
#include <unordered_map>
#include <string>
using namespace std;

class TranslateServer
{
public:
    TranslateServer(int port)
        : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "TranslateServer", muduo::net::TcpServer::Option::kReusePort)
    // 给 TcpServer 类的构造函数传四个参数:EvevtLoop 类,监听的ip和端口,服务器名称,是否启用端口复用(上面为启用)
    {
        _server.setConnectionCallback(bind(&TranslateServer::onConnection, this, placeholders::_1));
        // bind 是 C++11 里面的一个函数适配器,对指定的函数进行参数绑定
        // bind 的第一个参数是我们要绑定的函数,第二个是要绑定的参数,这里传入 this 是因为要适配成员函数的第一个隐藏参数this,第三个参数为预留位,预留位数量和绑定函数的参数数量一致
        _server.setMessageCallback(bind(&TranslateServer::onMessage, this, placeholders::_1, placeholders::_2, placeholders::_3));
    }
    void start() // 启动服务器
    {
        _server.start();  // 开始事件监听
        _baseloop.loop(); // 开始事件监控,是一个死循环的阻塞接口
    }

private:
    // 新连接建立成功时要做的事情
    void onConnection(const muduo::net::TcpConnectionPtr &conn)
    {
        cout << conn.get() << " CONNECTION " << (conn->connected() ? "UP" : "DOWN");
        // connected 返回连接状态,true 为成功,false 为失败
    }

    // 收到请求时的回调函数
    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp t)
    {
        string msg = buf->retrieveAllAsString(); // 这个表示把buffer里面所有的数据取出来
        if (msg.back() == '\n')
            msg.pop_back();
        string rsp = translate(msg); // 进行翻译
        conn->send(rsp);             // 发回翻译结果
    }
    string translate(const string &msg)
    {
        static unordered_map<string, string> dict_map = {
            {"Hello", "你好"},
            {"你好", "Hello"},
            {"World", "世界"},
            {"世界", "World"},
        };
        auto it = dict_map.find(msg);
        return it == dict_map.end() ? "不知道哦" : it->second;
    }

private:
    muduo::net::TcpServer _server;   // 主要用于设置回调函数
    muduo::net::EventLoop _baseloop; // 是 epoll 的事件监控,进行描述符的事件监控,触发事件后调用具体的回调函数来进行业务处理
};

int main()
{
    TranslateServer server(8085);
    server.start();
    return 0;
}

TestClient.hpp

#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"
#include <iostream>
#include <functional>
#include <unordered_map>
using namespace std;

class TranslateClient
{
public:
    TranslateClient(const string &ip, int port)
        : _latch(1) // 设置成1就好,因为coutDown 操作就是将计数 count-- 一次,然后判断 count 是否为0再进行的唤醒
          ,
          _client(_loopthread.startLoop(), muduo::net::InetAddress(ip, port), "TranslateClient") // 通过startLoop来获取eventloop
    {
        // 连接服务器成功时调用 onConnection
        _client.setConnectionCallback(bind(&TranslateClient::onConnection, this, placeholders::_1));
        // 收到服务器发送消息时调用 onMessage
        _client.setMessageCallback(bind(&TranslateClient::onMessage, this, placeholders::_1, placeholders::_2, placeholders::_3));
    }
    void connect()
    {
        _client.connect(); // 连接服务器
                           // 但是这个 connect 是发起连接后会立即返回,但是连接不一定建立成功
        _latch.wait();
        // 这个是 CountDownLatch 类里面的 wait 接口,作用是先阻塞住
        // 当连接建立成功时,onConnection 回调函数会被执行,然后在里面调用 countDown 来唤醒这个阻塞的 wait
    }
    void send(string &msg)
    {
        msg.push_back('\n');
        if (_conn)
            _conn->send(msg);
    }
    void shutdown() { _client.disconnect(); } // 关闭连接

private:
    void onConnection(const muduo::net::TcpConnectionPtr &conn)
    {
        if (conn->connected()) // 确保连接状态正常时再发送
        {
            _conn = conn;
            _latch.countDown(); // 唤起主线程中的阻塞
        }
        else
            _conn.reset();
    }
    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp t)
    {
        // 收到的响应就在buffer里,直接取出来打印即可
        cout << buf->retrieveAllAsString() << endl;
    }

private:
    muduo::CountDownLatch _latch;

    // 我们的 Client 也是依赖于这个事件监控来处理主事物的,而这个事件监控是死循环式的阻塞接口
    // 如果直接调用,那么就会把主线程给阻塞住从而无法执行其他的操作,所以我们不能直接在主线程里面调用该 _baseloop.loop() 去死循环监控
    // 而需要创建一个线程去搞这个
    muduo::net::EventLoopThread _loopthread; // 作用是将事件监控和一个线程合并在一起
    muduo::net::TcpClient _client;
    muduo::net::TcpConnectionPtr _conn;
};
int main()
{
    TranslateClient client("127.0.0.1", 8085);
    client.connect();
    while (1)
    {
        string msg;
        cout.flush();
        cin >> msg;
        client.send(msg);
    }
    client.shutdown();
    return 0;
}

makefile

all: server client
server:TestServer.cpp
	g++ -std=c++11 $^ -o $@ -L/use/local/lib -lmuduo_net -lmuduo_base -lpthread
client:TestClient.cpp
	g++ -std=c++11 $^ -o $@ -L/use/local/lib -lmuduo_net -lmuduo_base -lpthread

.PHONY:clean
clean:
	rm -f server client 

3.3 基于muduo库实现protobuf协议的通信

我们上面只是客户都安单纯地发了一个字符串给服务器,然后服务器也是单纯地发回一个字符串给客户端,中间没有任何的协议,也没有考虑粘包等问题,但是到后面我们会自定义网络协议,所以我们将muduo和protobuf结合来搞一个基于muduo库的对于protobuf协议的处理代码,实现一个翻译+加法的服务器与客户端

3.3.0 预备

红色部分中,前面三个文件都是从Muduo解压目录的 "muduo-master/examples/protobuf/codec" 目录下复制粘贴过来的,第四个是从Muduo解压目录的muduo-master/muduo/net/protorpc/google-inl.h复制粘贴过来的,然后就是codec.cc中,部分头文件包含路径需要修改:

然后是makefile文件:

all: server client
server:protobuf_server.cpp ./proto/codec.cc test.pb.cc
	g++ -std=c++11 $^ -o $@ -lmuduo_net -lmuduo_base -lpthread -lprotobuf -lz -g
client:protobuf_client.cpp ./proto/codec.cc test.pb.cc 
	g++ -std=c++11 $^ -o $@ -lmuduo_net -lmuduo_base -lpthread -lprotobuf -lz

.PHONY:clean
clean:
	rm -f server client 
  • muduo的install安装程序不会把头文件和库文件放到系统目录里,如果在前面安装过程没有手动添加,则makefile文件里需要手动加上头文件和库文件路径

3.3.1 proto文件

syntax = "proto3";
package hello;

//翻译请求
message TranslateRequest {
    string msg = 1;
}

//翻译响应
message TranslateResponse {
    string msg = 1;
}

//加法请求
message AddRequest {
    int32 num1 = 1;
    int32 num2 = 2;
}

//加法响应
message AddResponse {
    int32 result = 1;
}

3.3.2 服务端

#include "proto/codec.h" //Proto目录里的这三个文件是从Muduo解压目录的 "muduo-master/examples/protobuf/codec" 目录下复制粘贴过来的
#include "proto/dispatcher.h"
#include "test.pb.h" //我们自己写的 .proto 生成后的

#include "muduo/base/Logging.h" //这几个我们在最开始安装muduo时就已经把muduo目录移到系统目录去了所以能直接找到
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"

#include <iostream>
#include <unordered_map>
#include <string>

#define _p1 std::placeholders::_1
#define _p2 std::placeholders::_2
#define _p3 std::placeholders::_3 // 占位符

class Server
{
public:
    typedef std::shared_ptr<hello::TranslateRequest> TranslateRequestPtr;
    typedef std::shared_ptr<hello::TranslateResponse> TranslateResponsePtr;
    typedef std::shared_ptr<hello::AddRequest> AddRequestPtr;
    typedef std::shared_ptr<hello::AddResponse> AddResponsePtr;
    typedef std::shared_ptr<google::protobuf::Message> MessagePtr;

    Server(int port)
        : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),
          _dispatcher(std::bind(&Server::onUnknownMessage, this, _p1, _p2, _p3)),
          _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, _p1, _p2, _p3))
    {
        // 设置请求函数,通过模板参数匹配,当收到TranslateRequest请求时用Server::onTranslate这个函数来出来,这样就完成了收到不同消息调用不同函数的功能
        _dispatcher.registerMessageCallback<hello::TranslateRequest>(std::bind(&Server::onTranslate, this, _p1, _p2, _p3));
        _dispatcher.registerMessageCallback<hello::AddRequest>(std::bind(&Server::onAdd, this, _p1, _p2, _p3));

        // 设置消息回调函数
        _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, _p1, _p2, _p3));
        _server.setConnectionCallback(std::bind(&Server::onConnection, this, _p1));
    }

    void start()
    {
        _server.start();
        _baseloop.loop();
    }

private:
    std::string translate(const std::string &msg)
    {
        static std::unordered_map<std::string, std::string> dict_map = {
            {"Hello", "你好"},
            {"你好", "Hello"},
            {"World", "世界"},
            {"世界", "World"},
        };
        auto it = dict_map.find(msg);
        return it == dict_map.end() ? "不知道哦" : it->second;
    }

    void onTranslate(const muduo::net::TcpConnectionPtr &conn, const TranslateRequestPtr &message, muduo::Timestamp)
    {
        std::string msg = translate(message->msg()); // 提取message并翻译
        hello::TranslateResponse resp;               // protobuf响应
        resp.set_msg(msg);                           // 装填protobuf内容
        _codec.send(conn, resp);
    }
    void onAdd(const muduo::net::TcpConnectionPtr &conn, const AddRequestPtr &message, muduo::Timestamp)
    {
        int num1 = message->num1();
        int num2 = message->num2();
        hello::AddResponse resp;
        resp.set_result(num1 + num2);
        _codec.send(conn, resp);
    }
    // 连接成功时调用
    void onConnection(const muduo::net::TcpConnectionPtr &conn)
    {
        if (conn->connected())
            LOG_INFO << "新连接建立成功";
        else
            LOG_INFO << "连接关闭";
    }
    void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp) // 未知消息处理函数
    {
        LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
        conn->shutdown(); // 收到第一次消息直接关闭连接
    }

private:
    muduo::net::TcpServer _server;   // muduo里的服务器对象
    muduo::net::EventLoop _baseloop; // 事件监控,服务器TcpServer第一个参数
    ProtobufDispatcher _dispatcher;  // 请求分发器,负责设置请求处理函数
    ProtobufCodec _codec;            // 对收到的请求数据进行 protobuf 序列化和反序列化处理
};

int main()
{
    Server server(8085);
    server.start();
    return 0;
}

3.3.3 客户端

#include "proto/codec.h"
#include "proto/dispatcher.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"

#include "muduo/base/CountDownLatch.h"
#include "muduo/net/EventLoopThread.h"

#include "test.pb.h"
#include <iostream>

#define _p1 std::placeholders::_1
#define _p2 std::placeholders::_2
#define _p3 std::placeholders::_3 // 占位符

class Client
{
public:
    typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
    typedef std::shared_ptr<hello::AddResponse> AddResponsePtr;
    typedef std::shared_ptr<hello::TranslateResponse> TranslateResponsePtr; // 和服务端是反着来的,客户端是收到响应再进行处理

    Client(const std::string &ip, int port)
        : _latch(1),
          _client(_loopthread.startLoop(), muduo::net::InetAddress(ip, port), "Client"),
          _dispatcher(std::bind(&Client::onUnknownMessage, this, _p1, _p2, _p3)),
          _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, _p1, _p2, _p3))
    {
        // 设置响应处理函数
        _dispatcher.registerMessageCallback<hello::TranslateResponse>(std::bind(&Client::onTranslate, this, _p1, _p2, _p3));
        _dispatcher.registerMessageCallback<hello::AddResponse>(std::bind(&Client::onAdd, this, _p1, _p2, _p3));

        // 设置消息回调函数
        _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, _p1, _p2, _p3));
        _client.setConnectionCallback(std::bind(&Client::onConnection, this, _p1));
    }

    void connect()
    {
        _client.connect(); // 连接服务器
                           // 但是这个 connect 是发起连接后会立即返回,但是连接不一定建立成功
        _latch.wait();
        // 这个是 CountDownLatch 类里面的 wait 接口,作用是先阻塞住
        // 当连接建立成功时,onConnection 回调函数会被执行,然后在里面调用 countDown 来唤醒这个阻塞的 wait
    }

    void Translate(const std::string &msg)
    {
        hello::TranslateRequest request;
        request.set_msg(msg); // 手动构建请求
        send(&request);
    }
    void Add(int n1, int n2)
    {
        hello::AddRequest request;
        request.set_num1(n1);
        request.set_num2(n2);
        send(&request);
    }

private:
    bool send(const google::protobuf::Message *msg) // 父类指针指向子类对象,内部序列化时再调用虚函数进行序列化
    {
        if (_conn->connected())
        {
            _codec.send(_conn, *msg); // 再解引用
            return true;
        }
        return false;
    }

    // 响应的话直接打印结果即可
    void onTranslate(const muduo::net::TcpConnectionPtr &conn, const TranslateResponsePtr &message, muduo::Timestamp)
    {
        std::cout << "翻译结果:" << message->msg() << std::endl;
    }
    void onAdd(const muduo::net::TcpConnectionPtr &conn, const AddResponsePtr &message, muduo::Timestamp)
    {
        std::cout << "加法结果:" << message->result() << std::endl;
    }

    void onConnection(const muduo::net::TcpConnectionPtr &conn) // 连接接口不能和服务端一样了奥
    {
        if (conn->connected()) // 确保连接状态正常时再发送
        {
            _conn = conn;
            _latch.countDown(); // 唤起主线程中的阻塞
        }
        else
            _conn.reset();
    }
    void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp) // 未知消息处理函数
    {
        LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
        conn->shutdown(); // 收到第一次消息直接关闭连接
    }

private:
    // 这里的声明顺序必须和构造函数初始化列表一致,不然会报错
    muduo::CountDownLatch _latch;            // 实现同步的
    muduo::net::EventLoopThread _loopthread; // 异步循环处理线程,作用是将事件监控和一个线程合并在一起
    muduo::net::TcpConnectionPtr _conn;      // 客户端对应的连接

    muduo::net::TcpClient _client;  // muduo里的客户端对象
    ProtobufDispatcher _dispatcher; // 请求分发器,负责设置请求处理函数
    ProtobufCodec _codec;           // 对收到的请求数据进行 protobuf 序列化和反序列化处理
};

int main()
{
    Client client("127.0.0.1", 8085);
    client.connect();
    client.Translate("Hello");
    client.Add(11, 22);
    sleep(2);
    return 0;
}

3.4 SQLite 介绍和使用

官方文档:https://www.sqlite.org/c3ref/funclist.html

3.4.1 简单介绍

SQLite是一个进程内的轻量级数据库,实现了自给自足、无服务器、零配置、事务性的SQL数据库引擎,可以直接按程序需求进行静态或动态连接,直接通过API接口让SQLite直接访问存储文件,这也代表SQLite不会像MySQL那样单独创建进程来做这些事,后面我们会介绍

SQLite优势:

  • 不需要一个单独的服务器进程或操作的系统(比如很多的手机App都是用的SQLite进行快速读取存储本地文件)
  • SQLite不需要配置,安装好API就能直接用
  • SQLite非常小,满血状态时小于400KB
  • SQLite自给自足,不需要任何外部依赖
  • SQLite事物完全兼容事物的ACID,允许多个进程或线程访问
  • SQLite支持大多数标准的数据库查询语言
  • SQLite使用C编写的,提供简单易用的API,这也代表它可以在多个平台上使用

3.4.2 常用API

1,查看当前数据库在编译阶段是否启用了线程安全

int sqlite3_threadsafe();
//返回0表示未启用,返回1表示启用

需要注意sqlite3有三种安全等级:

  • 非线程安全模式
  • 线程安全模式(一个句柄不能用于多线程间,既不同的连接来不同的线程或进程间是安全的)
  • 串行化模式(可以在不同的线程或进程间使用同一个句柄)

2,创建/打开数据库文件,并返回操作句柄

SQLite数据库在操作数据时,数据都是存放在一个文件里的,所以刚开始的时候我们要告诉SQLite我要操作的数据是哪一个文件里的

int sqlite3_open(const char *filename, sqlite3 **ppDb) //如果成功返回SQLITE_OK
int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );

第二个接口表示指定方式打开文件,通过flag参数表示:

  • SQLITE_OPEN_READWRITE:以可读可写方式打开数据库文件
  • SQLITE_OPEN_CREATE:不存在数据库文件则创建
  • SQLITE_OPEN_NOMUTEX:多线程模式,只要不同的线程使用不同的连接即可保证线程安全
  • SQLITE_OPEN_FULLMUTEX:串行化模式

返回 SQLITE_OK 表示成功

3,执行语句

int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)

第一个参数是句柄,第二个参数是SQL语句,第三个参数是一个回调函数,负责处理执行操作后返回的信息,当回调函数被调用时,第四个参数就会传进这个回调函数内,第四个参数指向内存里的一块空间,返回的信息就是暂时存放在这个空间里,然后第五个参数是出错的时候返回错误信息,具体使用后面会展示

这个回调函数有个int返回值,成功处理的情况下必须返回0,如果不返回0会触发ABORT退出程序,/var/include/sqlite3.h 中对sqlite3_exec3 的特殊说明:

4,销毁句柄

int sqlite3_close(sqlite3* db); //成功则返回 SQLITE_OK
int sqlite3_close_v2(sqlite3*); //⽆论如何都会返回 SQLITE_OK
const char *sqlite3_errmsg(sqlite3* db); //返回错误信息

3.4.3 实战使用

在 sqlite.hpp 里面封装一个 SqliteHelper 类,提供简单的sqlit数据库api,完成数据的基础增删查改操作:

  1. 创建/打开数据库文件
  2. 针对打开的数据库文件进行操作:表操作,数据操作
  3. 关闭数据库

sqlite.hpp:

#include <iostream>
#include <string>
#include <vector>
#include <sqlite3.h>

class SqliteHelper
{
public:
    typedef int (*SqliteCallback)(void *, int, char **, char **);
    SqliteHelper(const std::string &dbfile)
        : _dbfile(dbfile), _handler(nullptr)
    {
    }

    bool open(int save_leve = SQLITE_OPEN_FULLMUTEX) // 打开数据库文件(串行化)
    {
        // int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
        int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | save_leve, nullptr); // 可读可写打开,不存在则创建,默认等级为串行化
        if (ret != SQLITE_OK)
        {
            std::cout << "创建或打开SQLite数据库文件失败: ";
            std::cout << sqlite3_errmsg(_handler) << std::endl;
            return false;
        }
        return true;
    }

    bool exec(const std::string &sql, SqliteCallback cb = nullptr, void *arg = nullptr) // arg参数表示查询结果保存位置的指针,作为参数传递给回调函数
    {
        // int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)
        int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
        if (ret != SQLITE_OK)
        {
            std::cout << "--" << sql.c_str() << "--" << std::endl;
            std::cout << "执行语句失败: ";
            std::cout << sqlite3_errmsg(_handler) << std::endl;
            return false;
        }
        return true;
    }

    void close() // 关闭数据库
    {
        // int sqlite3_close_v2(sqlite3*);
        if (_handler)
            sqlite3_close_v2(_handler);
    }

private:
    std::string _dbfile;
    sqlite3 *_handler;
};

main.cc:

#include "sqlite.hpp"
#include <vector>
#include <cassert>

int func(void *arg, int count, char **result, char **name)
{
    std::vector<std::string> *arr = (std::vector<std::string> *)arg;
    arr->push_back(result[0]);
    return 0;
}

int main()
{
    // 1,创建/打开库文件
    SqliteHelper hp("./test.db");
    assert(hp.open());

    // 2,创建表(如果不存在再创建表)学生信息:学号、姓名、年龄
    const char *ct = "create table if not exists student(sn int primary key, name varchar(32), age int);";
    assert(hp.exec(ct, nullptr, nullptr));

    // 3,新增数据、修改、删除、查询
    const char *insert_sql = "insert into student values(1, 'Hello', 18),(2,'Hello',20),(3,'world', 21);"; // 新增
    assert(hp.exec(insert_sql, nullptr, nullptr));

    const char *update_sql = "update student set name='World' where sn = 2"; // 修改
    assert(hp.exec(update_sql, nullptr, nullptr));

    const char *delete_sql = "delete from student where sn=3;"; // 删除
    assert(hp.exec(delete_sql, nullptr, nullptr));

    const char *select = "select name from student;"; // 查询
    std::vector<std::string> arr;
    assert(hp.exec(select, func, &arr));
    for (auto &name : arr)
        std::cout << name << std::endl;

    // 4,关闭数据库
    hp.close();
    return 0;
}

makefile

main:main.cc
	g++ -std=c++11 $^ -o $@ -lsqlite3

.PHONY:clean
clean:
	rm -f main test.db

3.5 GTest 介绍和使用

3.5.1 介绍

GTest是一个跨平台的C++单元测试框架,由google公司发布,是为了在不同平台上为编写C++单 元测试而生成的,提供了丰富的断言、致命和非致命判断、参数化等等

使用分为两个,一个是宏断言,其实也就是assert断言,但是我们可以直接用C的assert,所以GTest的这个就没必要用了,最重要的是事件机制 (全局、单独用例)

3.5.2 宏断言

断言宏可以 分为两类:

  • ASSERT_系列:如果当前点检测失败则退出当前函数
  • EXPECT_系列:如果当前点检测失败则继续往下执行

直接上代码:

#include <iostream>
#include <gtest/gtest.h>

TEST(hello, great_than) // 单元测试宏函数
{
    int age = 20;
    ASSERT_GT(age, 18);
    printf("Hello World!\n");
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    RUN_ALL_TESTS(); // 运行所有单元测试

    return 0;
}

常用的断言宏如下:

1,bool 值检查

  • ASSERT_TRUE(参数):期待结果是 true
  • ASSERT_FALSE(参数):期待结果是 false

2,数值型数据检查(都需要传入两个参数)

  • ASSERT_EQ():equal 传入的是需要比较的两个数
  • ASSERT_NE():not equal 不等于才返回 true
  • ASSERT_LT():less than 小于才返回 true
  • ASSERT_GT():greater than 大于才返回 true
  • ASSERT_LE():less equal 小于等于才返回 true
  • ASSERT_GE():greater equal 大于等于才返回 true

了解即可,因为用这个还不如直接用C的assert,直接就可以用也不用定义啥宏函数,真正有用的还是后面的事件机制

3.5.3 事件机制

先声明下三个概念:

  • 测试程序:一个测试程序只有一个 main 函数,也可以说是一个可执行程序是一个测试程序,该级别的事件机制是在程序的开始和结束执行
  • 测试套件:代表⼀个测试用例的集合体,该级别的事件机制是在整体的测试案例开始和结束执行
  • 测试用例:该级别的事件机制是在每个测试用例开始和结束都执行

测试中可以有多个测试套件,可以理解为一组测试单元(前面宏断言中一个宏函数可以看作是一个测试单元),关于测试套件:

  • 测试套件:可以理解为一个测试环境,可以在单元测试之前进行测试环境初始化,测试完毕后进行测试环境清理
  • 全局测试套件:在整体测试中,只会初始化一次环境,在所有测试用例完毕后才会清理环境
  • 用例测试套件:每次的单元测试中都会重新初始化测试环境,保证每个不同的单元测试都有相同的测试环境,完毕后清理环境

事件机制的最大好处就是能够为我们各个测试用例提前准备好测试环境,并在测试完毕后销毁环 境,这样有个好处就是如果我们有一端代码需要进行多种不同方法的测试,则可以通过测试机制在每个测试用例进行之前初始化测试环境和数据,并在测试完毕后清理测试造成的影响


下面是全局测试套件在代码中的样子:

#include <iostream>
#include <unordered_map>
#include <gtest/gtest.h>

std::unordered_map<std::string, std::string> dict;
// 全局测试套件,其实就是我们用户自己定义的一个测试环境类
class GlobalTest : public testing::Environment
{
public:
    virtual void SetUp() override // 重要接口,在所有单元测试运行前执行的接口,用于初始化测试环境
    {
        std::cout << "测试前初始化环境\n";
        dict.insert(std::make_pair("Hello", "11111"));
        dict.insert(std::make_pair("World", "22222"));
        dict.insert(std::make_pair("ABCDE", "33333"));
    }
    virtual void TearDown() override // 所有单元测试运行结束后执行的接口,完成测试环境清理
    {
        std::cout << "测试后清理环境\n ";
        dict.clear();
    }
};

TEST(GlobalTest, test1)
{
    std::cout << "单元测试1" << std::endl;
    ASSERT_EQ(dict.size(), 3); // 前面初始化插入了三条数据,所有这里应该的大小应该是3
    dict.erase("ABCDE");
}

TEST(GlobalTest, test2)
{
    std::cout << "单元测试2" << std::endl;
    ASSERT_EQ(dict.size(), 2);
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    testing::AddGlobalTestEnvironment(new GlobalTest);
    RUN_ALL_TESTS();
    return 0;
}

执行结果如下:


上面的全局的测试,但有时候全局不测试并不适合某些场景,所以下面是独立测试套件:

#include <iostream>
#include <unordered_map>
#include <gtest/gtest.h>

// 独立测试套件和全局测试套件很类似,但是继承于 testing::Test
class SuitTest : public testing::Test
{
public:
    static void SetUpTestCase()
    {
        std::cout << "测试前初始化环境\n";
    }
    static void TearDownTestCase()
    {
        std::cout << "测试后清理环境\n ";
    }

public:
    std::unordered_map<std::string, std::string> dict; // 独立测试套件可以在类内部定义成员变量,该成员变量每个单元测试独享一份
};

TEST_F(SuitTest, test1) // 最最重要的一点,单元测试的这个宏变成了 TEST_F,而且测试的名称必须与套件环境类名一致,保证单元测试宏函数可以直接访问类成员变量
{
    std::cout << "单元测试1" << std::endl;
    dict.insert(std::make_pair("Hello", "11111"));
    dict.insert(std::make_pair("World", "22222"));
    ASSERT_EQ(dict.size(), 2);
}

TEST_F(SuitTest, test2)
{
    std::cout << "单元测试2" << std::endl;
    dict.insert(std::make_pair("你好", "33333"));
    ASSERT_EQ(dict.size(), 1);
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    RUN_ALL_TESTS();
    return 0;
}

执行结果如下:


全局测试的SetUp和TearDown两个函数在独立测试中依然有效,这也代表着可以在独立测试里嵌套全局测试,这样能使测试场景更加灵活,如下代码:

#include <iostream>
#include <unordered_map>
#include <gtest/gtest.h>

std::unordered_map<std::string, std::string> dict_global;
// 独立测试套件和全局测试套件可以嵌套使用,使应用场景更灵活
class Suit_Global_Test : public testing::Test
{
public:
    // 操作全局数据
    static void SetUpTestCase() // 整个测试执行前执行一次
    {
        // 全局的测试数据容器的数据在这里插入
        std::cout << "全局数据初始化\n";
        dict_global.insert(std::make_pair("Hello", "11111"));
    }
    static void TearDownTestCase() // 整个测试执行完成后执行一次
    {
        // 全局的测试数据容器的数据在这里清理
        std::cout << "清理全局数据\n ";
        dict_global.clear();
    }

    // 操作局部数据
    virtual void SetUp() override // 每个单元测试开始前调用一次
    {
        std::cout << "单元测试前执行SetUp\n";
        dict_suit.insert(std::make_pair("Hello", "11111"));
        dict_suit.insert(std::make_pair("World", "22222"));
    }
    virtual void TearDown() override // 每个单元测试完成后调用一次
    {
        std::cout << "单元测试后执行TearDown\n ";
        dict_suit.clear();
    }

public:
    std::unordered_map<std::string, std::string> dict_suit; // 独立测试套件可以在类内部定义成员变量,该成员变量每个单元测试独自初始化一份
};

TEST_F(Suit_Global_Test, test1) // 最最重要的一点,单元测试的这个宏变成了 TEST_F,而且测试的名称必须与套件环境类名一致,保证单元测试宏函数可以直接访问类成员变量
{
    std::cout << "单元测试1" << std::endl;
    // 测试局部dict
    dict_suit.insert(std::make_pair("你好", "33333"));
    ASSERT_EQ(dict_suit.size(), 3);

    // 测试全局dict
    ASSERT_EQ(dict_global.size(), 1);
    dict_global.insert(std::make_pair("World", "22222"));
    dict_global.insert(std::make_pair("ABCDE", "33333"));
}

TEST_F(Suit_Global_Test, test2)
{
    std::cout << "单元测试2" << std::endl;
    // 局部dict是分开的,所以 test1 的 dict_suit::insert 对test2不起效
    ASSERT_EQ(dict_suit.size(), 2);

    // 测试全局dict
    ASSERT_EQ(dict_global.size(), 3); // test1 对全局dict插入了两条数据,所以现在长度是3
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    RUN_ALL_TESTS();
    return 0;
}

执行结果如下:

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐