当前位置: 首页 > news >正文

rabbitmq----数据管理模块

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 交换机数据管理
    • 管理的字段
    • 持久化管理类
    • 内存管理类
      • 申明交换机
      • 删除交换机
      • 获取指定交换机
  • 队列数据管理
    • 管理的字段
    • 持久化管理类
    • 内存管理类
      • 申明/删除/获取指定队列
      • 获取所有队列
  • 绑定关系管理
    • 管理的字段
    • 持久化管理类
    • 内存管理类
      • 绑定/解除绑定
      • 删除指定交换机的所有绑定关系
      • 删除队列的所有绑定关系
      • 获取指定的队列的所有绑定关系
  • 队列消息管理
    • 管理的字段
    • 持久化管理类
      • 插入消息数据
      • 删除消息
      • 垃圾回收
    • 队列消息内存管理类
      • 管理的字段
      • 插入消息
      • 删除消息
      • 获取队首消息
  • 总的消息内存管理类
      • 删除一个队列消息管理
      • 新增消息
      • 获取队首消息
      • 确认消息


数据管理模块需要管理四种数据,分别是交换机数据管理/队列数据管理/绑定关系数据管理/队列消息数据管理。

交换机数据管理

管理的字段

需要管理的数据有下面这个5个
交换机名称:交换机的唯一标识
交换机类型:交换机有三种类型,直接交换/广播交换/主题交换。决定了消息的转发方式。
持久化标识:决定了交换机信息是否持久化存储。方便断电后恢复。
剩下的俩个字段不需要关心,是为了以后进行扩展的。

 struct Exchange
{using ptr = std::shared_ptr<Exchange>;std::string name;                                  // 交换机名字ExchangeType type;                                 // 交换机类型bool durable;                                      // 交换机持久化标志位bool auto_delete;                                  // 交换机自动删除标志位 (还未实现该功能)google::protobuf::Map<std::string, std::string> args; // 其他参数 (方便以后扩展)
}

持久化管理类

交换机的信息提供了持久化管理的操作,我们使用sqlite进行存储。
要管理一个sqlite的操作句柄,这个句柄对象也是我们封装了一下sqlite的操作。

在构造函数种需要传入一个文件路径,也就是存储交换机信息的文件。
sqlite是一个本地化的数据库,不需要通过网络客户端服务器的模式来进行通信。本地提供一个文件就可以进行存储。一个文件就相当于一个数据库database,可以在这个数据库种创建多个表。
我们的交换机/队列/绑定关系信息的数据都是存储在这个文件种的.

class ExchangeMapper
{
private:SqliteHelper _sql_helper;	//sqlite句柄
public:ExchangeMapper(const std::string &dbfile): _sql_helper(dbfile){// 创建父级目录const std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);// 创建/打开数据库文件assert(_sql_helper.open());// 创建交换机数据表createTable();}
}

这就是创建的交换机表。

#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"
bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
if (ret == false)
{DLOG("创建交换机数据库表失败!!");abort(); // 直接异常退出程序
}

这个类还提供一个恢复的接口,他会查询交换机表中的所有记录,存放到一个哈希表中。

//返回交换机表中所有数据,用于重启后恢复
std::unordered_map<std::string, Exchange::ptr> recovery(){std::unordered_map<std::string, Exchange::ptr> res;std::string sql = "select name, type, durable, auto_delete, args from exchange_table";_sql_helper.exec(sql, selectCallBack, &res);return res;	
}

内存管理类

在内存管理类中包含了交换机持久化管理类的对象,和一个哈希表,用来管理已经存在交换机信息。

在他的构造函数中调用了持久化管理的数据恢复接口,他会查询数据库表中所有的字段,返回一个哈希表,也就完成了交换机数据恢复。

//交换机数据内存管理类,这个类才是对外提供的
class ExchangeManager
{
private:std::mutex _mutex;  //这个类对象可能被多线程访问,我们要加锁ExchangeMapper _mapper;std::unordered_map<std::string,Exchange::ptr> _Exchanges;   //管理已经存在的交换机ExchangeManager(const std::string &dbfile):_mapper(dbfile){//恢复交换机_Exchanges = _mapper.recovery();}

申明交换机

在rabbitMQ中不叫创建交换机,而是叫做申明交换机,它是一种强断言的思想,代表着存在及ok,不存在就创建。这个操作也很简单。
先看看哈希表中存不存在,存在就返回true,不存在就构建一个交换机对象,插入哈希表

bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,const google::protobuf::Map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);auto it = _Exchanges.find(name);if(it != _Exchanges.end()){//存在直接returnreturn true;}//定义一个Exchange对象auto ecp = std::make_shared<Exchange>(name,type,durable,auto_delete,args);//插入进数据库if(durable == true) {bool ret = _mapper.insert(ecp);if(ret == false) return false;}_Exchanges.insert({name,ecp});return true;}

删除交换机

根据交换机的名称进行一个删除。同时如果持久化存储了,也要删除数据库中的数据。

void deleteExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _Exchanges.find(name);if(it == _Exchanges.end()){return;}//删除数据库中数据if(it->second->durable == true){_mapper.remove(name);}_Exchanges.erase(name);}

获取指定交换机

根据交换机姓名获取指定交换机。

//获取指定交换机Exchange::ptr selectExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _Exchanges.find(name);if(it == _Exchanges.end()){//交换机不存在return Exchange::ptr();}return it->second;}

队列数据管理

队列和交换机管理的思想几乎一致,只不过有些字段不一样

管理的字段

队列名称:队列的唯一标识
持久化标志。
其他字段是扩展字段,暂时不关心。

 struct MsgQueue{using ptr = std::shared_ptr<MsgQueue>;std::string name;                                  // 队列名称bool durable;                                      // 持久化标志位bool exclusive;                                    // 是否独占 (还未实现此功能)bool auto_delete;                                  // 自动删除 (未实现)google::protobuf::Map<std::string, std::string> args; // 其他参数
}

持久化管理类

和交换机一样,这里看一看表结构

void createTable(){std::stringstream sql;sql << "create table if not exists queue_table(";sql << "name varchar(32) primary key, ";sql << "durable int, ";sql << "exclusive int, ";sql << "auto_delete int, ";sql << "args varchar(128));";assert(_sql_helper.exec(sql.str(), nullptr, nullptr));}

内存管理类

都是一个持久化句柄,一个哈希表存储已经存在的队列信息。

class MsgQueueManager{private:std::mutex _mutex;MsgQueueMapper _mapper;std::unordered_map<std::string, MsgQueue::ptr> _msg_queues;public:using ptr = std::shared_ptr<MsgQueueManager>;MsgQueueManager(const std::string &dbfile) : _mapper(dbfile){//从数据库中读取,恢复队列数据_msg_queues = _mapper.recovery();}
}

申明/删除/获取指定队列

这里不过多介绍,都是一样的操作

获取所有队列

但是队列这边提供了一个额外的操作,获取所有队列信息。
这里直接构造一个哈希表返回。
我们的队列消息和消费者都是以队列为单元进行管理的。所以我们需要获取到已经存在队列,用来初始化队列消息和消费者管理。

 //返回所有队列
std::unordered_map<std::string, MsgQueue::ptr> AllQueue(){std::unique_lock<std::mutex> lock(_mutex);return _msg_queues; //这里构造了一个}

绑定关系管理

管理的字段

交换机名称和队列名称,还有一个binding_key。

 struct Binding
{using ptr = std::shared_ptr<Binding>;std::string exchange_name; // 交换机名称std::string msgqueue_name; // 队列名称std::string binding_key;
}

持久化管理类

绑定关系也是需要持久化管理的。当交换机和队列的持久化标志位都为true时,我们才将绑定关系持久化管理。这在虚拟机管理模块进行判断。

void createTable(){std::stringstream sql;sql << "create table if not exists binding_table(";sql << "exchange_name varchar(32), ";sql << "msgqueue_name varchar(32), ";sql << "binding_key varchar(128));";assert(_sql_helper.exec(sql.str(), nullptr, nullptr));}

内存管理类

内存管理类这块也是一个持久化管理句柄。
但是我们这里是交换机和队列的一个信息管理。
因为一个交换机可以绑定多个队列,而队列和绑定关系是一一对应的。
所以我们定义了两个类型,一个是队列和绑定的映射。一个是交换机和队列绑定的映射。我们实际管理的就是这个交换机和队列绑定的对象。

using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
 class BindingManager{private:std::mutex _mutex;BindingMapper _mapper;BindingMap _bindings;using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string &dbfile) : _mapper(dbfile){_bindings = _mapper.recovery();}
}

绑定/解除绑定

需要提供交换机名和队列名称以及binding_key和是否持久化。
这里的是否持久化是虚拟机判断后传入的。
这里也是存在及ok,不存在就创建的思想。

bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit != _bindings.end() && (eit->second.find(qname) != eit->second.end())){   //绑定数据已存在return true;}Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);// 当交换机和队列的持久化标志位都为true,绑定数据才进行持久化,这个durable由外界判断后传入if (durable == true){bool ret = _mapper.insert(bp);if (ret == false){return false;}}//存在及获取,不存在及创建auto &MsgQueueMap = _bindings[ename];MsgQueueMap.insert({qname, bp});return true;}

删除绑定关系,一个交换机可以绑定多个队列,这里删除的只是一个交换机和队列的绑定关系。

void unbind(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);//没有交换机信息,直接退出if(eit == _bindings.end()) { return; }auto qit = eit->second.find(qname);//没有队列相关信息,直接退出if(qit == eit->second.end()){ return ;}//判断持久化太麻烦了,所以这里不管持久化标志是否存在,我们直接去数据库中删除_mapper.remove(ename,qname);_bindings[ename].erase(qname);}

删除指定交换机的所有绑定关系

当交换机被删除时,需要删除该交换机的所有绑定关系

//删除指定交换机的所有绑定数据 ---当删除交换机时需要删除交换机相关的所有绑定信息
void removeExchangeBindngs(const std::string &ename){//同样的这里不判断持久化,直接操作_mapper.removeExchangeBindings(ename);auto eit = _bindings.find(ename);if(eit == _bindings.end()){//不存在直接returnreturn;}_bindings.erase(eit);
}

删除队列的所有绑定关系

当队列被删除后,需要删除该队列所有的绑定关系。
删除的方法就是遍历所有的交换机,因为一个队列是可以被多个交换机绑定的。

 void removeMsgQueueBindings(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);//同样的这里不判断持久化,直接操作_mapper.removeMsgQueueBindings(qname);//遍历所有的交换机,因为一个交换机可以绑定多个队列,这个要删除的队列可能绑定了多个交换机for(auto eit = _bindings.begin(); eit != _bindings.end(); eit++){//eit->second是一个MsgQueueMap,这里判断的是这个队列是否存在这个map中,存在就删除auto qit = eit->second.find(qname);if(qit != eit->second.end()){eit->second.erase(qit);}}
}

获取指定的队列的所有绑定关系

这个接口非常重要,当交换机收到信息后,我们需要获取该交换机的所有绑定的队列,用来判断需要将消息转发到哪个队列。

//获取交换机绑定的队列描述信息,当交换机收到消息时,需要将消息转给绑定的队列MsgQueueBindingMap getExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()) {return MsgQueueBindingMap();}return eit->second; }

队列消息管理

管理的字段

这个模块有一些复杂,消息是需要被传输的,因此我们定义在了proto中,我们先看一下消息类型.

消息分为消息属性和消息正文。
消息属性中有三个字段,分别是消息的ID,routing_key和持久化模式。
消息正文就是一个string字段。
还有一个消息是否有效标志位。
也就是消息属性,消息正文,消息是否有效这三个字段是需要持久化存储的。我们把它定义为Payload字段。
其他两个字段是方便服务器进行管理二添加的字段。

//消息属性
message BasicProperties {string id = 1; //消息IDstring routing_key = 2; //与binding_key做匹配DeliveryMode delivery_mode = 3; //持久化模式 1-⾮持久化; 2-持久化
};//消息结构
message Message {message Payload {BasicProperties properties = 1; //消息属性string body = 2; //有效载荷数据string valid = 3; //消息是否有效位};Payload payload = 1; //真正持久化的只有这⼀个字段uint64 offset = 2; //这两个字段⽤于记录消息在持久化⽂件中的位置和⻓度uint64 length = 3; //以便于在加载时可以在指定位置读取指定⻓度的数据获取到消息
};

持久化管理类

同样的消息也是需要进行持久化的,但是消息的持久化我们不是放在数据库中,而是存储在文件中。
因为有些消息会很大,不适合放在数据库。其次我们只是为了备份,不涉及到查询。

另外的我们的消息是以队列为单元进行管理的。

这是消息进行持久化管理的类,他有三个成员,一个是队列名称,另外两个是持久化数据文件的文件名。文件名就是用队列名加上.mqd后缀。
例如qname.mqd,这里还有一个tmpfile,原因是我们垃圾回收不是在源文件直接操作,而是遍历源文件,提取出有效消息,存入临时文件,最后在更改临时文件名称覆盖源文件。

 class MessageManpper{private:std::string _qname;    // 队列名称std::string _datafile; // 保存消息持久化数据文件std::string _tmpfile;  // 垃圾回收时的临时文件}

插入消息数据

当收到一个消息后,如果消息的持久化标志位true,就需要对消息进行持久化存储了。
调用虚拟机的消息发布接口,然后调用到队列消息总的内存管理类。然后调用具体的一个队列消息管理类中的插入接口,进行插入。如果消息的持久化标志位是true,就需要持久化。

bool insert(MessagePtr &msp)
{insert(_datafile, msp);
}

需要传入一个Message对象,这个对象是在队列消息管理的插入接口构造的.
这里的大致流程,将消息结构中的Payload结构序列化,然后获取到文件的大小,也就是我们要要存入的偏移量,在偏移量的位置先写入4字节的消息大小,也就是Payload的大小,然后写入序列化的数据。
最后会把偏移量和消息大小设置到MessagePtr中,这个Ptr会存储到消息链表和持久化哈希表中,也就同步跟新到内存了。
这里的偏移量是跳过4字节的,也就是指向的Payload。

bool insert(const std::string &file, MessagePtr &msp)
{// 将消息数据中的有效载体序列化std::string body = msp->payload().SerializeAsString();FileHelper helper(file);size_t fsize = helper.size();size_t msg_size = body.size();// 往文件写入4字节数据长度bool ret = helper.write((char *)&msg_size, fsize, sizeof(size_t));if (ret == false){DLOG("往队列文件 %s 写入数据长度失败", file.c_str());return false;}// DLOG("往队列文件 %s 写入数据长度:%d",file.c_str(),msg_size);// 往文件写入消息有效载荷ret = helper.write(body.c_str(), fsize + sizeof(size_t), msg_size);if (ret == false){DLOG("往队列文件 %s 写入数据主体失败", file.c_str());return false;}// DLOG("往队列文件 %s 写入数据:%d",file.c_str(),body.c_str());// 更改消息数据的偏移量和长度msp->set_offset(fsize + sizeof(size_t));msp->set_length(msg_size);return true;
}

删除消息

当消息被确认后,就需要从文件中删除消息。而删除不是从文件中真的删除,而是将消息的有效位置0后,覆盖掉原文件中的消息。

这里需要传入一个MessagePtr,这个ptr就是收到了消费客户端的确认应答后,根据消息id在待确认哈希中找到对应的MessaePtr对象.
如果这个对象的持久化标志位1,就需要删除文件中的数据。
删除的流程就是将MessagePtr中的有效位置0,然后根据偏移量和消息长度,进行一个覆盖写入。

 bool remove(MessagePtr &msp)
{// 将消息数据的有效位设置位'0'msp->mutable_payload()->set_valid("0");// 对msg进行序列化std::string body = msp->payload().SerializeAsString();if (body.size() != msp->length()){DLOG("不能修改文件中的数据信息,因为新生成的数据与原数据长度不一致!");return false;}FileHelper helper(_datafile);// 将消息数据覆盖写入到文件位置bool ret = helper.write(body.c_str(), msp->offset(), body.size());if (ret == false){DLOG("向队列数据文件 %s 写入数据失败", _datafile);return false;}return true;
}

垃圾回收

这个垃圾回收会返回一个MessagePtr的链表,他会从文件中循环读取消息。先算出文件的大小,从0偏移量开始读取,先读取4字节消息长度,然后根据长度读取消息payload,盘后反序列化出一个Payload结构,判断有效位是否为1,为1则插入到链表中。循环结束就可以得到有效消息的链表。
然后将有效消息写入到临时文件,在写入到临时文件中消息的偏移量发生了改变,在内存中也存储了消息对象,所以需要同步更新消息的偏移量,所以我们的返回值是一个MessagePtr的链表。队列消息管理在进行了垃圾回收后可以进行跟新偏移量。

std::list<MessagePtr> gc()
{std::list<MessagePtr> result;bool ret = load(result);if (ret == false){DLOG("加载有效数据失败!\n");return result;}// DLOG("加载有效数据结束,数据个数:%d",result.size());// 将有效数据写入临时文件FileHelper::createFile(_tmpfile);   //必须先创建出临时文件,在datafile中没有数据时,不会进这个循环。会导致文件源文件被删除,tmp文件也没了FileHelper tmp_file_helper(_tmpfile);size_t offset = 0;for (auto &msp : result){ret = insert(_tmpfile, msp);if (ret == false){DLOG("向临时文件写入消息数据失败!!");return result;}}// DLOG("像临时文件写入数据结束,临时文件大小:%d",tmp_file_helper.size());ret = FileHelper::removeFile(_datafile);if (ret == false){DLOG("删除源文件失败!");return result;}// 4. 修改临时文件名,为源文件名称ret = FileHelper(_tmpfile).rename(_datafile);if (ret == false){DLOG("修改临时文件名称失败!");return result;}// 5. 返回新的有效数据return result;
}

队列消息内存管理类

队列消息是以队列为单元进行管理的,所以这个类是存在一个队列就要有一个。
和交换机/队列/绑定关系不同,这里复杂一些.

管理的字段

需要有一个持久化句柄,消息需要持久化。
然后一个队列名,该类所代表的队列。
一个有效消息数量和总消息数量。用来进行垃圾回收
一个待推送消息链表,收到消息后插入这个链表中。
一个持久化哈希表,用来垃圾回收后更新内存中消息的偏移量
一个待确认哈希表,当消息推送消费者后,需要把消息从带推送链表中删除,然后插入到待确认哈希表中。

class QueueMessage
{
private:MessageManpper _manpper;    //持久化操作的句柄std::string _qname;  // 队列名size_t _valid_count; // 有效消息数量size_t _total_count; // 消息总数量std::mutex _mutex;std::list<MessagePtr> _msgs;                               // 带推送消息std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息
}

插入消息

插入消息是在服务器收到消息发布请求后,通过虚拟机句柄调用的,我们这里需要构造一个MEssagePTr.用户的请求中可能没有填入消息属性。如果没有填入的话,我们就构造一个属性字段填入,其中消息ID自动生成,持久化标志看队列是否持久化,routing_ket设置为空字符串。

bool insert(const BasicProperties *bp, std::string body, bool queue_is_durable)

然后我们根据持久化标志位,将消息持久化存储,同时更新消息数量,然后插入进带推送消息链表中。

删除消息

是在客户端确认后,删除待确认哈希表中的消息。
如果消息时持久化的,需要删除持久化信息,同同时删除消息数量。
另外进行一次垃圾回收,垃圾回收需要满足总体消息数量达到2000条,且有效消息数量的个数不到总消息的%50;
垃圾回收就是调用持久化管理的gc接口,它会返回一个list< MessagePtr>,通过这个list,跟新内存消息的偏移量(遍历list,find查找持久化哈希表中的Message,进行更新)。同时更新消息数量。

  void gc()
{if (GCCheck() == false)return;// 获取有效消息std::list<MessagePtr> msgs = _manpper.gc();//这里构造了一个list<MessagePtr>,他和我们的_durable_msgs中的MessagePtr是不同的,所以需要单独更改_durable_msgs中MessagePtr的偏移量for (auto &msg : msgs){auto it = _durable_msgs.find(msg->payload().properties().id());if (it == _durable_msgs.end()){// 持久化文件中的消息,在内存中不存在DLOG("垃圾回收后,有一条持久化消息,在内存中没有进行管理");_msgs.push_back(it->second); // 做法:将该消息插入进待推送链表的末尾_durable_msgs.insert({msg->payload().properties().id(), msg});continue;}// 更新每一条消息的实际存储位置it->second->set_offset(msg->offset());it->second->set_length(msg->length());}// 3. 更新当前的有效消息数量 & 总的持久化消息数量_valid_count = _total_count = msgs.size();
}

获取队首消息

当队列收到一个消息,就需要进行推送,删除消息链表的头部消息,然后插入到待确认哈希表中。

 // 获取队首消息
MessagePtr front()
{std::unique_lock<std::mutex> lock(_mutex);if(_msgs.size() == 0){return MessagePtr();}// 从待推送链表中取出一个消息MessagePtr msp = _msgs.front();_msgs.pop_front();// 并将该消息添加进待确认哈希表_waitack_msgs.insert({msp->payload().properties().id(), msp});return msp;
}

总的消息内存管理类

一个文件路径,我们的消息持久化文件的路径。
一个哈希表,队列名称和队列消息管理类的映射。
我们对外提供的就是这个对象。

 class MessageManager
{
private:std::string _basedir; //存储队列信息文件的路径,我们的队列信息是存储在文件中的std::mutex _mutex;std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs; //队列名对应队列信息
}

在他的构造中,我们需要根据已经存在的队列创建出队列消息管理类。然后进行一个持久化消息的恢复。

// 初始化一个队列消息管理,在创建队列的时候调用
void InitQueueMessage(const std::string &qname)
{QueueMessage::ptr tmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it != _queue_msgs.end()){//DLOG("消息管理句柄 %s 已经存在", qname.c_str());return;}// 构建一个队列消息内存管理类对象QueueMessage::ptr qmp = std::make_shared<QueueMessage>(_basedir, qname);tmp = qmp;_queue_msgs.insert(std::make_pair(qname, qmp));}// 恢复历史数据,这个操作是非常耗时的,我们没有放在加锁里。tmp->recovery();
}

这里不想交换机,队列和绑定关系的恢复那么轻量,这里的消息数量会很大,因此我们没有在队列消息的构造函数中进行,而是单独提供了一个接口用来进行恢复。
恢复就是进行一次垃圾回收,然后返回一个链表,遍历链表,插入到内存管理中。

删除一个队列消息管理

当一个队列删除时,他的队列消息管理也就没有意义了,需要删除他的管理,同时删除对应的持久化文件数据。

// 销毁一个队列消息管理,在删除队列的时候调用void DestoryQueueMessage(const std::string &qname){QueueMessage::ptr tmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("消息管理句柄 %s 不存在", qname.c_str());return;}tmp = it->second;_queue_msgs.erase(qname);}tmp->clear();}

新增消息

这里都是对指定队列消息管理进行的操作。接口我们已经实现了,就是从哈希表中找到 指定的对象就行。

// 新增一条消息bool insert(const std::string &qname, BasicProperties *bp, std::string body, bool queue_is_durable)
{QueueMessage::ptr tmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("向队列%s新增消息失败:没有找到消息管理句柄!", qname.c_str());return false;}tmp = it->second;}return tmp->insert(bp, body, queue_is_durable);
}

获取队首消息

 MessagePtr front(const std::string &qname)
{QueueMessage::ptr tmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("向队列%s获取队首消息失败:没有找到消息管理句柄!", qname.c_str());return MessagePtr();}tmp = it->second;}return tmp->front();
}

确认消息

 void ack(const std::string &qname, const std::string &msg_id)
{QueueMessage::ptr tmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("向队列%s确认消息失败:没有找到消息管理句柄!", qname.c_str());return;}tmp = it->second;}tmp->remove(msg_id);return;
}

相关文章:

rabbitmq----数据管理模块

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 交换机数据管理管理的字段持久化管理类内存管理类申明交换机删除交换机获取指定交换机 队列数据管理管理的字段持久化管理类内存管理类申明/删除/获取指定队列获取所…...

【人工智能深度学习应用】妙笔API最佳实践

AI妙笔是一款以文本创作为主、多模态为辅的生成式创作大模型产品&#xff0c;专门为传媒、政务等特定的行业和组织提供行业化的内容创作辅助。它具备深度的行业知识&#xff0c;能够生成高质量的专业内容&#xff0c;能覆盖各行业常见的文体类型&#xff0c;写作文体丰富多样&a…...

SOMEIP_ETS_150: SD_Send_triggerEventUINT8Multicast_Eventgroup_6

测试目的&#xff1a; 验证DUT在Tester订阅事件组后&#xff0c;能够响应Tester触发的triggerEventUINT8Multicast方法&#xff0c;并将TestEventUINT8Multicast事件发送到订阅请求中端点选项指定的IP地址和端口。 描述 本测试用例旨在确保DUT能够正确处理事件组的订阅请求&…...

【EXCEL数据处理】000009 案列 EXCEL单元格数字格式。文本型数字格式和常规型数字格式的区别

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 【EXCEL数据处理】000009 案列 EXCEL单元格数字格式。文本型数字格式和…...

Vxe UI vue vxe-table vxe-text-ellipsis 如何实现单元格多行文本超出、多行文本溢出省略

Vxe UI vue vxe-table 如何实现单元格多行文本超出、多行文本溢出省略 代码 配合 vxe-text-ellipsis 组件实现多行文本溢出省略 <template><div><vxe-grid v-bind"gridOptions"><template #defaultAddress"{ row }"><vxe-te…...

FFmpeg源码:avio_feof函数分析

AVIOContext结构体和其相关的函数分析&#xff1a; FFmpeg源码&#xff1a;avio_r8、avio_rl16、avio_rl24、avio_rl32、avio_rl64函数分析 FFmpeg源码&#xff1a;read_packet_wrapper、fill_buffer函数分析 FFmpeg源码&#xff1a;avio_read函数分析 FFmpeg源码&#xff…...

各省-城镇化率(2001-2022年)

数据收集各省-城镇化率&#xff08;2001-2022年&#xff09;.zip资源-CSDN文库https://download.csdn.net/download/2401_84585615/89465885 相关指标&#xff1a; 包括省份、年份、年末总人口数(万人)、年末城镇人口数(万人)、城镇化率等。 数据集构建&#xff1a; 数据集通…...

飞创龙门双驱XYZ直线模组高精度应用实例

飞创龙门双驱XYZ直线模组集超精密定位、高动态响应和灵活配置于一体&#xff0c;适用于电子制造行业&#xff08;点胶、组装、检测&#xff09;、半导体圆晶加工、芯片封装、激光切割、激光焊接、数控机床、精密检测及科研实验等&#xff0c;满足高精度、高动态的三维定位需求&…...

Prompt 初级版:构建高效对话的基础指南

Prompt 初级版&#xff1a;构建高效对话的基础指南 文章目录 Prompt 初级版&#xff1a;构建高效对话的基础指南一 “标准”提示二 角色提示三 多范例提示四 组合提示五 规范化提示 本文介绍了提示词的基础概念与不同类型&#xff0c;帮助用户更好地理解如何在对话中构建有效的…...

餐饮重点企业在AI领域的布局,看方大的AI实践

大家好&#xff0c;我是Shelly&#xff0c;一个专注于输出AI工具和科技前沿内容的AI应用教练&#xff0c;体验过300款以上的AI应用工具。关注科技及大模型领域对社会的影响10年。关注我一起驾驭AI工具&#xff0c;拥抱AI时代的到来。 AI已经被应用在餐饮餐厨行业的哪些方面&am…...

Axure PR 9 开关切换 设计交互

大家好&#xff0c;我是大明同学。 这期内容&#xff0c;我们来探讨Axure开关按钮设计与交互技巧​。 创建切换开关所需的元件 1.打开一个新的 RP 文件并在画布上打开 Page 1。 2.将“圆形”元件拖到画布上&#xff0c;在样式窗格中将高度和宽度设置为35&#xff0c;线段宽度…...

ruoyi-python 若依python版本部署及新增模块

ruoyi spring版本支持一键导出前后端代码&#xff0c;且b站上有很多教程&#xff0c;但是发现python版本的教程并不多&#xff0c;于是自己尝试创建一个简易的CRUD模块 1.各版本bug 主要尝试了1.1.2版本和vue2的版本&#xff0c;链接如下&#xff1a; v1.1.2 vue2 两个版本…...

【理论】负载均衡

目录 1. 定义2. 主要作用3. 实现方法4. 实现原理 1. 定义 负载均衡&#xff08;Load Balancing&#xff09;将网络流量、请求等输入分发到后端服务器&#xff0c;为后端服务器提供负载均衡&#xff0c;实现高可用和容错。 2. 主要作用 1. 高并发 通过将请求均匀分配到多个服务…...

流行前端框架Vue.js详细学习要点

Vue.js是一款流行的JavaScript前端框架&#xff0c;用于构建用户界面&#xff0c;特别是在构建交互式Web应用程序时表现出色。以下是Vue.js详细学习的一些要点&#xff1a; 1. Vue.js基础 定义与特点&#xff1a;Vue.js是一款渐进式JavaScript框架&#xff0c;提供响应式数据…...

Java.数据结构.TreeMap

一、什么是TreeMap TreeMap是Java集合框架中的一部分&#xff0c;并且基于红黑树数据结构。这说明TreeMap能够高效地执行键值对的存储、检索、排序等操作。 二、TreeMap的特点 有序性&#xff1a;TreeMap会根据键的自然顺序进行排序&#xff0c;当然&#xff0c;你也可以通过…...

什么是托管安全信息和事件管理 SIEM?

什么是 SIEM&#xff1f; 安全信息和事件管理 ( SIEM ) 解决方案最初是一种集中式日志聚合解决方案。SIEM 解决方案会从整个组织网络中的系统收集日志数据&#xff0c;使组织能够从单一集中位置监控其网络。 随着时间的推移&#xff0c;SIEM解决方案已发展成为一个完整的威胁…...

vscode安装及c++配置编译

1、VScode下载 VS Code官网下载地址&#xff1a;Visual Studio Code - Code Editing. Redefined。 2、安装中文插件 搜索chinese&#xff0c;点击install下载安装中文插件。 3、VS Code配置C/C开发环境 3.1、MinGW-w64下载 VS Code是一个高级的编辑器&#xff0c;只能用来写代…...

JavaScript使用渐变来美化对象!

我们的目标是渐变&#xff01;渐变&#xff01; 首先了解&#xff0c;渐变分为线性渐变和径向渐变&#xff0c;线性渐变可以是从左上角到右下角的渐变&#xff0c;径向渐变是从中心向外的渐变。 JavaScript中实现渐变可以使用addColorStop的方法&#xff0c;例如创建一个线性渐…...

Linux之实战命令24:od应用实例(五十八)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…...

【CKA】一、基于角色的访问控制-RBAC

1、基于角色的访问控制-RBAC 1. 考题内容&#xff1a; 2. 答题思路&#xff1a; 这道题就三条命令&#xff0c;建议直接背熟就行。 也可以查看帮助 kubectl create clusterrole -h kubectl create serviceaccount -h kubectl create rolebinding -h 注意&#xff1a; 1、资…...

【华为HCIP实战课程三】动态路由OSPF的NBMA环境建立邻居及排错,网络工程师

一、NBMA环境下的OSPF邻居建立问题 上节我们介绍了NBMA环境下OSPF邻居建立需要手动指定邻居,因为NBMA环境是不支持广播/组播的 上一节AR1的配置: ospf 1 peer 10.1.1.4 //手动指定邻居的接口地址,而不是RID peer 10.1.1.5 area 0.0.0.0 手动指定OSPF邻居后抓包查看OSP…...

初始Kafka

1、Kafka是什么&#xff1f; Kafka是由Scala语言开发的一个多分区、多副本&#xff0c;基于Zookeeper集群协调的系统。 那这个所谓的系统又是什么系统呢&#xff1f; 回答这个问题要从发展的角度来看&#xff1a;起初Kafka的定位是分布式消息系统。但是目前它的定位是一个分布…...

学会使用maven工具看这一篇文章就够了

文章目录 概述一、定义与功能二、核心组件三、主要作用四、仓库管理 settings.xml说明一、文件位置与优先级二、主要配置元素三、配置示例 pom.xml文件说明一、pom.xml的基本结构二、pom.xml的主要元素及其说明三、依赖管理四、常用插件五、其他配置 maven安装配置一、下载Mave…...

如何创建虚拟环境并实现目标检测及验证能否GPU加速

创建虚拟环境&#xff1a; 先创建一个虚拟python环境&#xff0c;敲如下代码 然后再到该虚拟环境里面安装自己想要的包 激活虚拟环境 然后再聚类训练这些 验证GPU加速 阿里源 pip install torch torchvision -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mir…...

<STC32G12K128入门第十三步>驱动W5500进行TCP_Client通信

前言 最近本人接触到了一个消费类产品需要用到以太网,并且需要连接服务器,同时需要发送https协议。本文就是讲解如何运行TCP客户端, 一、W5500讲解? W5500是一款10/100M的以太网转换芯片,内部集成了TCP/IP协议栈。并且支持SPI/I2C协议。我在STC32上面使用的是软件SPI。…...

【Go语言】Ergo:构建分布式系统的现代化 Erlang 框架

Ergo 是一个基于 Go 语言的开源框架&#xff0c;专门用于构建分布式系统。它为开发人员提供了与 Erlang/OTP 类似的编程模型和功能。Ergo 通过将 Erlang 的强大分布式并发编程模型带入 Go 语言的生态中&#xff0c;使得开发者能够轻松创建高度可靠、可扩展的分布式应用程序。 …...

教资备考--高中数学(仅为高中数学梳理)

按照高中学习数学梳理的方案进行整理...

Qt 学习第十一天:QTableWidget 的使用

一、创建QTableWidget对象&#xff0c;设置大小&#xff0c;在窗口的位置 //创建tablewidgetQTableWidget *table new QTableWidget(this);table->resize(550, 300);table->move(100, 100); //移动 二、设置表头 //设置表头QStringList headerList; //定义headerList…...

【Linux】基础指令 1

Linux中各个指令是相互联系的&#xff0c;所以一开始学习Linux时&#xff0c;对指令很陌生是正常的&#xff0c;不用花费大量的时间去刻意的记忆&#xff0c;在我们一次次的使用当中&#xff0c;这些指令自然会烂熟于心。 简单看看各个指令的功能 ls指令 显示当前目录下的文…...

Linux_kernel字符设备驱动12

一、字符设备的编程框架 在Linux_kernel驱动开发11中&#xff0c;我们介绍的系统调用。只是为了做一个实验&#xff0c;在真正开发时&#xff0c;我们并不会直接在内核中添加一个新的系统调用&#xff0c;这样做会导致内核体积变大。 1、字符设备结构体 我们实现一个硬件字符设…...

能利用双股铜芯电话线做网站吗/最新军事头条

题目&#xff1a;有一对兔子&#xff0c;从出生后第3个月起每个月都生一对兔子&#xff0c;小兔子长到第三个月后每个月又生一对兔子&#xff0c;假如兔子都不死&#xff0c;问每个月的兔子总数为多少&#xff1f; 1.程序分析&#xff1a; 兔子的规律为数列1,1,2,3,5,8,13,21..…...

番禺做网站公司哪家好/最好的网站优化公司

Queries for Number of Palindromes 我们可以设 dp[l][r]dp[l][r]dp[l][r] 为 [l,r][l,r][l,r] 中 的答案。那么我们可以找到转移方程&#xff1a; dp[l][r]dp[l1][r]dp[l][r−1]−dp[l1][r−1]check(l,r)&#xff08;判断整个[l,r]是不是回文串&#xff09;dp[l][r] dp[l1][r…...

wordpress数据库设置密码/竞价排名什么意思

前言 期末到了&#xff0c;上课老师讲的云里雾里&#xff0c;很慌某些老师给一些所谓复习题其实是考试题的题&#xff0c;害人害己&#xff0c;白交学费。想学东西找不到途径&#xff1f;MOOC所谓的没有围墙&#xff0c;除了精品课程其他大都在念PPT&#xff0c;并未深入浅出的…...

电子商务网站开发形式有/软文接单平台

在开发 Web 项目的时候&#xff0c;经常需要过滤器来处理一些请求&#xff0c;包括字符集转换什么的&#xff0c;记录请求日志什么的等等。在之前的 Web 开发中&#xff0c;我们习惯把过滤器配置到 web.xml 中&#xff0c;但是在 SpringBoot 中&#xff0c;兵没有这个配置文件&…...

找网页设计师/整站seo服务

数组即是一组相同类型组合在一起&#xff0c;使用一个通用的名称&#xff0c;通过分配的下标访问的数据集合中的元素。数组是具有相同类型的一组数据。当访问数组中的数据时&#xff0c;可以通过下标来指明。c#中数组元素可以为任何数据类型&#xff0c;数组下标从0开始&#x…...

wordpress api post/长春网站建设推广

文章目录1、sys模块简介2、相关函数介绍2.1、argv2.2、modules.keys2.3、exc_info2.4、exit2.5、hexversion2.6、version2.7、maxsize2.8、maxunicode2.9、modules2.10、path2.11、platform2.12、stdout2.13、stdin2.14、stderr2.15、byteorder2.15.1、大端和小端2.16、copyrig…...