当前位置: 首页 > news >正文

【Skynet 入门实战练习】分布式 ID | 雪花算法 | 缓存设计 | LRU算法 | 数据库

文章目录

  • 前言
    • 雪花算法
    • LRU 算法
    • 缓存模块
    • 数据库
    • 测试逻辑

前言

本节实现了 分布式 ID 生成系统,采用雪花算法实现唯一 ID;实现缓存架构,采用 LRU (最近最少使用)算法。

雪花算法

分布式 ID 生成算法的有很多种,Twitter 的雪花算法(SnowFlake)就是其中经典的一种。

SnowFlake算法生成id的结果是一个64bit大小的整数,它的结构如下图:

在这里插入图片描述

  • 1位,不用。二进制中最高位为1的都是负数,但是我们生成的 id 一般都使用正整数,所以这个最高位固定是0

  • 41位,用来记录时间戳(毫秒)。
    41位可以表示 2 41 − 1 2^{41}-1 2411 个数字,如果只用来表示正整数(计算机中正数包含0),可以表示的数值范围是:0 至 2 41 − 1 2^{41}-1 2411。41位可以表示 2 41 − 1 2^{41}-1 2411个毫秒的值,转化成单位年则是 ( 2 41 − 1 ) / ( 1000 ∗ 60 ∗ 60 ∗ 24 ∗ 365 ) = 69 (2^{41}-1) / (1000 * 60 * 60 * 24 * 365) = 69 (2411)/(1000606024365)=69

  • 10位,用来记录工作机器id。可以部署在 2 10 = 1024 2^{10} = 1024 210=1024 个节点,包括5位 datacenterId 和5位 workerId
    5位(bit)可以表示的最大正整数是 2 5 − 1 = 31 2^{5}-1 = 31 251=31,即可以用0、1、2、3、…31这32个数字,来表示不同的 datecenterId 或 workerId

  • 12位,序列号,用来记录同毫秒内产生的不同 id。12位可以表示的最大正整数是 2 12 − 1 = 4095 2^{12}-1 = 4095 2121=4095,即可以用 0、1、2、3、…4094这4095个数字,来表示同一机器同一时间截(毫秒)内产生的4095个 ID 序号

SnowFlake 算法的优点:

  • 生成 ID 时不依赖于数据库,完全在内存生成,高性能高可用。

  • 容量大,每秒可生成几百万ID。

    • SnowFlake算法在同一毫秒内最多可以生成多少个全局唯一ID呢?同一毫秒的ID数量 = 1024 * 4096 = 4194304
  • 所有生成的id按时间趋势递增,后续插入数据库的索引树的时候,性能较高。

  • 整个分布式系统内不会产生重复id(因为有datacenterId和workerId来做区分)

SnowFlake 算法的缺点:

  • 依赖于系统时钟的一致性。如果某台机器的系统时钟回拨,有可能造成ID冲突,或者ID乱序。

  • 还有,在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险。

以上参考:cloudyan/snowflake


在本项目中,与之有异同之处。采用 39 位表示时间戳,12 位表示机器 id,12位表示序列号。

实现后的雪花算法:

  • 4096 个服务
  • 单个服务 10 毫秒内可以生成 4096 个 ID
  • 支持时间跨服 174 年
  1. 通过自定义雪花算法生成 id 的服务,即表示为机器 id,则可以实现至多 2 12 = 4096 2^{12}=4096 212=4096 个服务。
  2. 由于 skynet 内部时钟精度是 10ms,所以在同一时间戳(10ms)内,生成 id 的序列号依此递增,至多 2 12 = 4096 2^{12}=4096 212=4096 个。
  3. 39 位用于表示时间戳, 2 39 / ( 100 ∗ 3600 ∗ 24 ∗ 365 ) = 174 2^{39}/(100*3600*24*365)=174 239/(100360024365)=174 年。

雪花算法服务的配置文件:

-- snowflake conf 
snowflake_begin         = 1 
snowflake_end           = 2
snowflake_start_date    = "2003-01-21"

lualib/snowflake.lua

local skynet = require "skynet"local _M = {}
local snowflake_service = {} -- service: begin - end 
local max_service_id
local cur_service_id = 0-- 获取一个 snowflake 服务
local function get_snowflake_service()cur_service_id = cur_service_id + 1if cur_service_id > max_service_id then cur_service_id = 1end return snowflake_service[cur_service_id]
end -- 对外接口,雪花 id 算法生成
function _M.snowflake()local addr = get_snowflake_service()return skynet.call(addr, "lua", "snowflake")
end skynet.init(function()skynet.uniqueservice("snowflake")local snowflake_begin = tonumber(skynet.getenv("snowflake_begin")) or 1local snowflake_end = tonumber(skynet.getenv("snowflake_end")) or 10assert(snowflake_begin <= snowflake_end, "snowflake_begin or snowflake_end error")local i = 0for id = snowflake_begin, snowflake_end do  i = i + 1local service_name = string.format(".snowflake_%s", id)snowflake_service[i] = skynet.localname(service_name) --  返回同一进程内,用 register 注册的具名服务的地址。end max_service_id = i
end)return _M 

可以看到,服务采用主从架构,通过简单的轮询算法负载均衡。生成的服务数量由 snowflake_begin snowflake_end 配置。

我们再来看 snowflake 服务代码:

service/snowflake.lua

-------- master ---------- 启动主节点服务,创建多个从节点服务
skynet.start(function()local snowflake_begin = tonumber(skynet.getenv("snowflake_begin")) or 1local snowflake_end = tonumber(skynet.getenv("snowflake_end")) or 10assert(snowflake_begin <= snowflake_end, "snowflake_begin or snowflake_end error")for id = snowflake_begin, snowflake_end do skynet.newservice(SERVICE_NAME, "slave", id)end skynet.register(".snowflake")
end)

主节点仅负责启动多个从节点服务,通过 skynet.newservice(SERVICE_NAME, "slave", id)启动并传入参数,参数 id 则用于后续标识机器的 id。

从节点用于提供生成 ID 的雪花算法,并维护当前这个从服务的时间戳,定时每 3s 保存到文件中。

-- 将 2000-01-01 形式日期,转为时间戳
local function parse_date(date)local year, month, day = date:match("(%d+)-(%d+)-(%d+)")return os.time({year = year, month = month, day = day})
end 
local start_date = skynet.getenv("snowflake_start_date") or "2000-01-01"
local START_TIMESTAMP = parse_date(start_date)-- 每一部分占用位数
local TIME_BIT      = 39    -- 时间占用位数
local SEQUENCE_BIT  = 12    -- 序列号占用位数
local MACHINE_BIT   = 12    -- 机器标识占用位数-- 每一部分最大值
local MAX_TIME      = 1 << TIME_BIT     -- 时间最大值      ((1 << 39) / 365 * 24 * 3600 * 100) ==> 174 year
local MAX_SEQUENCE  = 1 << SEQUENCE_BIT -- 序列号最大值     (4096)
local MAX_MACHINE   = 1 << MACHINE_BIT  -- 机器标识最大值   (4096)-- 每一部分向左的偏移
local LEFT_MACHINE  = SEQUENCE_BIT                  -- 12
local LEFT_TIME     = SEQUENCE_BIT + MACHINE_BIT    -- 24-- snowflake 接口
function CMD.snowflake()local cur = get_cur_timestamp()if cur < last_timestamp then error("Clock moved backwards.  Refusing to generate id")end if cur == last_timestamp then -- 相同 10ms 内,序列号自增sequence = (sequence + 1) & MAX_SEQUENCEif sequence == 0 then cur = get_next_timestamp()end else -- 不同 10ms 内,序列号置 0sequence = 0end last_timestamp = curreturn (cur - START_TIMESTAMP) << LEFT_TIME | slave_id << LEFT_MACHINE | sequence
end 

从代码中可以看出,生成 id 的时间戳是相较于配置文件中 snowflake_start_date 起始的。并且在相同 10ms 内,序列号自增,如果序列号超出 12 位的最大值,那么强制变为下一个 10ms 的时间戳。

雪花算法 snowflake,实际返回的 id:(cur - START_TIMESTAMP) << LEFT_TIME | slave_id << LEFT_MACHINE | sequence,即分别将时间戳、机器 id、序列号,向左偏移到二进制对应的位置返回。

-- 10ms
local function get_cur_timestamp()return math.floor(skynet.time() * 100)
end local function get_next_timestamp()local cur = get_cur_timestamp()while cur <= last_timestamp do cur = get_cur_timestamp()end return cur
end 

skynet.time:通过 starttime 和 now 计算出当前 UTC 时间(单位是秒, 精度是ms),get_cur_timestamp 获取当前时间戳函数控制了 10ms 为一个单位。

完整代码:service/snowflake


LRU 算法

缓存模块使用最经典的 LRU 算法实现,淘汰策略是最近最少使用的数据。详细的介绍参考:百度百科

LRU 算法在 leetcode 上也有相应试题,我们参考实现自己的 LRU 算法。

Go 语言版本:

type entry struct {key int value int 
}type LRUCache struct {ll          *list.Listcache       map[int]*list.ElementmaxBytes    int nBytes      int
}func Constructor(capacity int) LRUCache {lru := LRUCache{}lru.ll = list.New()lru.cache = make(map[int]*list.Element)lru.maxBytes = capacitylru.nBytes = 0return lru
}func RemoveOldest(this *LRUCache) {ele := this.ll.Back()if ele != nil {this.ll.Remove(ele)delete(this.cache, ele.Value.(*entry).key)this.nBytes -= 1}
}func (this *LRUCache) Get(key int) int {if ele, ok := this.cache[key]; ok {this.ll.MoveToFront(ele)return ele.Value.(*entry).value}return -1
}func (this *LRUCache) Put(key int, value int)  {if ele, ok := this.cache[key]; ok {this.ll.MoveToFront(ele)ele.Value = &entry{key, value}} else {ele := this.ll.PushFront(&entry{key, value})this.cache[key] = ele this.nBytes += 1 }for this.maxBytes < this.nBytes && this.maxBytes != 0 {RemoveOldest(this)}
}
/*** Your LRUCache object will be instantiated and called as such:* obj := Constructor(capacity);* param_1 := obj.Get(key);* obj.Put(key,value);*/

根据上述 Go 语言实现的 LRU,需要一个双向链表模块,还有一个哈希表。哈希表在 lua 中实际就是 table,那么下面首先实现双向链表结构。
lualib/list.lua

local list = {} 
local mt = { __index = list }-- entry { key, value, next, prev }function list.New()local self = setmetatable({}, mt)self.size = 0self.head = {}self.tail = {} self.head.next = self.tail self.tail.prev = self.head return self 
end function list.Back(self)if self.size ~= 0 then return self.tail.prev end return nil 
end -- insert entry after at; list.size++; return entry
local function insert(self, entry, at)entry.prev = at entry.next = at.next entry.prev.next = entry entry.next.prev = entryself.size = self.size + 1return entry
end function list.PushFront(self, entry)return insert(self, entry, self.head)
end -- move entry after at;
local function move(self, entry, at)if entry == at then return end entry.prev.next = entry.next entry.next.prev = entry.preventry.prev = atentry.next = at.nextentry.prev.next = entryentry.next.prev = entry
end function list.MoveToFront(self, entry)if entry == self.head or self.size <= 1 then return end move(self, entry, self.head)
end function list.Remove(self, entry)if entry == nil then return endentry.prev.next = entry.nextentry.next.prev = entry.preventry.next, entry.prev, entry.key, entry.value = nil, nil, nil, nilentry = nil  self.size = self.size - 1
end return list 

设计的对外接口仅和 Go 语言代码一致,满足后续的 LRU 算法模块实现,这里不再过多赘述双向列表的实现。

下面来看 LRU 模块设计:lru.lua

主要实现了 newsetget 三个方法:

function lru.new(size, on_remove)local self = setmetatable({}, mt)self.list = list.New()self.cache = {} self.capacity = size self.size = 0self.on_remove = on_removereturn self 
end function lru.set(self, key, value, force)local entry = self.cache[key]if entry then entry.value = valueself.list:MoveToFront(entry)else local entry = {key = key,value = value}self.list:PushFront(entry)self.cache[key] = entryself.size = self.size + 1end while true do if self.size > self.capacity and not force thenlru_remove(self)elsebreak end end 
end function lru.get(self, key)local entry = self.cache[key]if entry == nil then return end self.list:MoveToFront(entry)return entry.value
end 

lru 模块,不仅要有 list 双向链表结构,cache 哈希表结构,capacity 缓存容量上限,size 当前数据量,还需要一个 on_remove 回调方法。用于当缓存结构移除数据时,执行的该数据回调操作。由使用者进行注册,并且一个 lru 模块所有数据,共享这一个回调方法,即执行的回调操作是相同的。例如后续实现的缓存模块的 lru 结构回调方法,在删除改数据时后,都会判断一下这个数据是否还有引用,还有则继续插入缓存。

还注意到,set 方法提供了一个额外参数 force。可以强制无视当前 lru 容量,进行插入缓存数据。

完整代码:lualib/lru;


通过 debug_consolelru 模块进行测试:

设置 lru 容量是 2,插入数据 [1, 1],[2, 2],输出 [[2, 2],[1, 1]]。

在这里插入图片描述

获取数据 [1],输出 [[1, 1],[2, 2]]。

在这里插入图片描述

插入数据 [3, 3],输出 [[3, 3],[1, 1]]。

在这里插入图片描述

不做过多演示,测试代码参考:test/test_lru


缓存模块

一般游戏逻辑都不直接操作数据库,而是直接操作内存数据库,也称为数据缓存。游戏可以使用 redis 作为内存数据库,也可以和本项目一样实现一个缓存服务。

缓存模块库:lualib/cache.lua

local skynet = require "skynet"local _M = {}local cached function _M.call_cached(func_name, mod, sub_mod, id, ...)return skynet.call(cached, "lua", "run", func_name, mod, sub_mod, id, ...)
endskynet.init(function()cached = skynet.uniqueservice("cached")
end)return _M 

对外接口方法 call_cached

  • func_name 远程调用的函数名
  • mod 为模块名,一个 cached 负责加载多个模块数据
  • sub_mod 子模块名,一个模块下面会有多个子模块数据
  • id 数据的唯一 ID,例如 user 模块数据,id 对应玩家的 uid
  • ... 变参为函数的其他参数

缓存服务 service/cached.lua,该服务的管理模块 module/cached/mng.lua,其余还有不同逻辑模块,例如用户模块 module/cached/user.lua 等。

service/cached.lua

local skynet = require "skynet"
local mng = require "cached.mng"
local user = require "cached.user"local CMD = {}function CMD.run(func_name, mod, sub_mod, id, ...)local func = mng.get_func(mod, sub_mod, func_name)local cache = mng.load_cache(mod, sub_mod, id)return func(id, cache, ...)
end function CMD.SIGHUP()logger.info(SERVICE_NAME, "SIGHUP to save db. Doing.")mng.do_save_loop()logger.info(SERVICE_NAME, "SIGHUP to save db. Down.")
endskynet.start(function()skynet.dispatch("lua", function(_, _, cmd, ...)local f = assert(CMD[cmd])skynet.ret(skynet.pack(f(...)))end)skynet.register(".cached")mng.init()user.init()
end)

缓存服务 cached 主要提供两个接口,run 用于执行远程函数,SIGHUP 用于接受关服信号,执行一次脏数据落盘。

还记得在日志服务中 log.lua,我们注册了系统消息 PTYPE_SYSTEM

-- 捕捉sighup信号(kill -l) 执行安全关服逻辑
skynet.register_protocol {name = "SYSTEM", id = skynet.PTYPE_SYSTEM, unpack = function(...) return ... end,dispatch = function()-- 执行必要服务的安全退出操作local cached = skynet.localname(".cached")if cached then skynet.call(cached, "lua", "SIGHUP")end skynet.sleep(100)skynet.abort()end 
}

在外部停止服务器时,这里就执行一次关服保存数据操作,通知缓存模块进行脏数据落盘。如何更好的更安全的退出 skynet,参考:https://github.com/cloudwu/skynet/issues/288

服务的另一个接口,run 执行远程函数,首先通过 get_func 函数接受 modsub_modfunc_name 三个参数组成内部的函数名称,对应获取要执行的函数。在通过 load_cache,加载该函数要操作的对象,内部先去查找缓存,缓存未命中则会从数据库加载。缓存表中数据字段以 _key 为索引,数据对象由 modid 构成唯一 _key


下面来看缓存的管理模块,这个模块是缓存操作的核心,管理了所有的缓存相关处理逻辑。

module/cached/mng.lua

local _M = {}
local CMD = {}
local cache_list    -- 缓存列表
local dirty_list    -- 脏数据列表
local load_queue    -- 数据加载队列
local mongo_col     -- 数据库操作对象
local init_cb_list = {} -- 数据加载后的初始化函数列表-- 缓存移除回调函数
local function cache_remove_cb(key, cache)-- 数据脏或仍有引用,继续存入缓存if cache._ref > 0 or dirty_list[cache] then cache_list:set(key, cache, true)end 
endfunction _M.init()init_db()local max_cache_cnt = tonumber(skynet.getenv("cache_max_cnt")) or 10240local save_interval = tonumber(skynet.getenv("cache_save_interval")) or 60cache_list = lru.new(max_cache_cnt, cache_remove_cb)dirty_list = {}load_queue = queue()timer.timeout_repeat(save_interval, _M.do_save_loop)
end

先来看基础变量,和模块的初始化。

  • cache_list 实际上是 lru,用于存储缓存的结构
  • dirty_list 脏数据列表,load_cache 加载数据后就会将数据标记脏数据,do_save_loop 定时保存脏数据就会取消标记
  • load_queue 数据加载队列,使用了 skynet.queue 用于缓存未命中时,从数据库中加载数据使用,防止加载数据函数重入的。因为操作数据库是一个阻塞 API,会挂起当前协程,服务会继续响应其他消息,可能造成时序问题。可以参考官方 wiki:CriticalSection
  • mongo_col 数据库表对象,初始化模块前会先 init_db 初始化数据库

创建 cache_list 对象时,指定了当前缓存结构的数据移除回调函数 cache_remove_cb,数据还有引用或该数据还是脏数据 cache._ref > 0 or dirty_list[cache] ,那么重新加入缓存列表中 cache_list:set(key, cache, true)。这里 lruset 方法第三个参数为 true 表示允许缓存列表临时超出上限,避免死循环执行 cache_remove_cb 回调函数。

在最后,我们启动了一个定时器,save_interval 时间间隔执行一次 do_save_loop 进行脏数据落盘。

-- 缓存同步到数据库
local function cache_save_db(key, cache)local data = {['$set'] = cache}local xpcallok, updateok, err, ret = xpcall(mongo_col.safe_update, debug.traceback, mongo_col, { _key = key }, data, true, false)if not xpcallok or not (updateok and ret and ret.n == 1) then end 
end-- 脏的缓存数据写到数据库
function _M.do_save_loop()for key, _ in pairs(dirty_list) dolocal cache = cache_list:get(key)if cache then cache_save_db(key, cache)enddirty_list[key] = nil  end 
end

实际每轮保存数据就是去遍历当前的 dirty_list 脏数据列表,执行 cache_save_db 将缓存 update 到 Mongodb 数据库。

该模块是缓存管理模块,具体每个模块逻辑,都会新建相应的模块处理,并将对外提供的接口按管理模块指定的方式进行注册。如下述代码:


-- 注册模块执行函数
-- mod_id 组合数据库索引字段 key
local function get_key(mod, id)return string.format("%s_%s", mod, id) 
end -- mod_sub_mod_func_name 组合执行函数名
local function get_func_name(mod, sub_mod, func_name)return string.format("%s_%s_%s", mod, sub_mod, func_name)
end function _M.register_cmd(mod, sub_mod, func_list)for func_name, func in pairs(func_list) do func_name = get_func_name(mod, sub_mod, func_name)CMD[func_name] = funcend 
end -- 注册模块数据初始化函数
function _M.register_init_cb(mod, sub_mod, init_cb)if not init_cb_list[mod] then init_cb_list[mod] = {}end init_cb_list[mod][sub_mod] = init_cb
end

get_key 是对应缓存数据存储在数据库的 _key 字段,由 mod 和 id 拼接而成,在 init_db 中,有创建索引 mongo_col:createIndex({{_key = 1}, unique = true})

get_func_name 是对应管理模块中存储不同模块的对外方法,以 mod、sub_mod、func_name 拼接而成,保证了唯一性。

同时,还提供了两个注册方法,用于注册不同模块的 远程调用函数,数据初始化回调函数。

我们先来简单看一下 module/cached/user.lua 模块,理解一下这里的注册方法。

local mng = require "cached.mng"local _M = {}
local CMD = {}function _M.init()mng.register_cmd("user", "user", CMD)mng.register_init_cb("user", "user", init_cb)
endreturn _M 

cached 服务启动时,会执行不同具体逻辑模块的 init 函数。

对于用户 user 模块,初始化时,调用了两个注册方法,将自己的逻辑方法和本模块相关数据初始化回调方法,都注册到了管理模块中。

从上述我们了解到,之后封装模块进行数据逻辑处理,也是同理实现即可。


下面来看管理模块如何获取远程执行函数:

-- 释放缓存
function _M.release_cache(mod, id, cache)local key = get_key(mod, id)cache._ref = cache._ref - 1if cache._ref < 0 then logger.error(SERVICE_NAME, "cache ref wrong", "key: ", key, "ref: ", ref)end 
end-- 获取执行函数
function _M.get_func(mod, sub_mod, func_name)func_name = get_func_name(mod, sub_mod, func_name)logger.debug(SERVICE_NAME, "Get func_name: ", func_name)local f = assert(CMD[func_name])return function(id, cache, ...)local ret = table.pack(pcall(f, id, cache, ...))_M.release_cache(mod, id, cache)return select(2, table.unpack(ret))end
end 

其他服务调用缓存模块(lualib/cache.lua)时,通过对外提供的 call_cached API 调用缓存服务(service/cache.lua)的 run 方法,首先执行的第一步就是 get_func,从缓存管理模块(module/cached/mng.lua)中获取对应可执行的函数,也就是这里的 get_func 返回的闭包函数。

通过闭包的形式返回,为了保证每次执行完成后相应逻辑后,维护当前数据对象的正确引用。获取到的该函数,是在加载数据 load_cache 之后执行,而 load_cache 中会改变数据对象的引用。下面来看相关代码:

-- 从数据库中加载数据
local function load_db(key, mod, sub_mod, id)local ret = mongo_col:findOne({ _key = key })if not ret then local data = {_key = key,}local ok, err, ret = mongo_col:safe_insert(data)if (ok and ret and ret.n == 1) then run_init_cb(mod, sub_mod, id, data)return key, data elsereturn 0, "New data error: " .. errendelseif not ret._key then return 0, "cannot load data. key: " .. keyend run_init_cb(mod, sub_mod, id, ret)return ret._key, retend 
end-- 从缓存中加载数据
function _M.load_cache(mod, sub_mod, id)local key = get_key(mod, id)local cache = cache_list:get(key)if cache then cache._ref = cache._ref + 1dirty_list[key] = true return cacheend local _key, cache = load_queue(load_db, key, mod, sub_mod, id)assert(_key == key)cache_list:set(key, cache)cache._ref = 1dirty_list[key] = truereturn cache
end 

加载数据实则是先进行缓存加载,未命中则进行数据库加载,数据库若中没有数据,则新建数据插入并返回。

每次从数据库中取出数据后,都会执行相关的数据初始化回调函数。如果是全新数据创建插入数据库,并对该数据进行初始化。如果数据是已经存在的,也会取出进行初始化。所以,在不同具体模块实现模块的数据初始化回调时,要考虑这点,而不是一味的当作新数据的初始化。例如用户模块:

local function init_cb(uid, cache)if not cache.username then cache.username = "New Player"end if not cache.lv thencache.lv = 1endif not cache.exp thencache.exp = 0end
end

这样初始化,保证了只会对不存在字段的赋值,如果数据已经有了,并不会影响。

相关的完整代码参考:module/cached/mng.lua


数据库

客户端登录,由看门狗校验,而后登录逻辑在代理服务中执行。代理的逻辑模块中,对客户端登录的处理是先去数据库查找是否存在当前用户,不存在则进行创建,该用户的账号表在数据库中,设计为如下:

字段描述
uid用户唯一ID
acc用户账号名

用户的账号信息存在 game 数据库的 account 表下。

取出当前用户信息后,还会执行用户游戏信息的加载,通过向缓存模块发起 get_userinfo 消息,获取用户的历史信息。用户的游戏内信息设计如下:

字段描述
uid用户唯一ID
username用户昵称
lv用户等级
exp用户当前经验值

用户的游戏信息存在 cache 数据库的 cached 表下。

在配置文件中,mongodb_db_namecache_db_name 这两个配置字段可以修改上述两张表存在的数据库名。表名则没做配置,写死在了对应模块的初始化数据库代码逻辑中。


这里以用户登录注册的例子来看数据库模块的实现:

module/ws_agent/mng.lua

function _M.login(acc, fd)-- 数据库加载数据local uid = db.find_and_create_user(acc)local user = {fd = fd, acc = acc,}online_users[uid] = user fd2uid[fd] = uid -- 加载玩家信息local userinfo = cache.call_cached("get_userinfo", "user", "user", uid)local res = {pid = "s2c_login",msg = "Login success",uid = userinfo.uid, username = userinfo.username, lv = userinfo.lv, exp = userinfo.exp,}return res
end

登录逻辑同上述说的,这里调用了 db 模块,是代理对应的数据库处理模块。完整代码:module/ws_agent/mng.lua

module/ws_agent/db.lua

local _M = {}local mongo_col -- account 表操作对象-- game.account
function _M.init()
end local function call_create_new_user(acc)local uid = tostring(snowflake.snowflake())local user_data = {uid = uid,acc = acc, }local ok, err, ret = mongo_col:safe_insert(user_data)if (ok and ret and ret.n == 1) then return uid, user_dataelsereturn 0, "New user error: " .. errend 
end local function call_load_user(acc)local ret = mongo_col:findOne({acc = acc})if not ret then return call_create_new_user(acc)else if not ret.uid then return 0, "Load user error, acc: " .. acc end return ret.uid, ret end 
end local loading_user = {}
function _M.find_and_create_user(acc)if loading_user[acc] then return 0, "already loading"end loading_user[acc] = true local ok, uid, data = xpcall(call_load_user, debug.traceback, acc)loading_user[acc] = nil if not ok then local err = uid return 0, err end return uid, data
end return _M 

本模块通过 loading_user 正在加载的用户数据标识表,防止重入。call_load_user 会执行数据库操作,是一个阻塞操作,同之前缓存管理模块中的 skynet.queue 性质相识。不过在这里,我们是保证执行加载数据操作,无需在同一相近时间段内多次加载,而不是用 skynet.queue 来保证这多次加载操作的时序问题。

完整代码:module/ws_agent/db.lua


测试逻辑

设计获取和修改用户名协议:

-- client
{pid = "c2s_get_username"
}-- server
{pid = "s2c_get_username",username = "用户昵称"
}
-- client
{pid = "c2s_set_username",username = "用户昵称"
}-- server
{pid = "s2c_set_username",msg = "是否设置成功消息"
}

客户端:

test/cmds/ws.lua

function RPC.s2c_get_username(ws_id, res)logger.debug(SERVICE_NAME, "s2c_get_username: ", cjson.encode(res))
end function RPC.s2c_set_username(ws_id, res)logger.debug(SERVICE_NAME, "s2c_set_username: ", cjson.encode(res))
end function CMD.get_username(ws_id)local req = {pid = "c2s_get_username",}websocket.write(ws_id, cjson.encode(req))
end function CMD.set_username(ws_id, username)local req = {pid = "c2s_set_username",username = username,}websocket.write(ws_id, cjson.encode(req))
end 

服务端:

module/ws_agent/mng.lua

-- c2s_get_username
function RPC.c2s_get_username(req, fd, uid)local userinfo = cache.call_cached("get_userinfo", "user", "user", uid)local res = {pid = "s2c_get_username",username = userinfo.username}return res 
end-- c2s_set_username
function RPC.c2s_set_username(req, fd, uid)local ok = cache.call_cached("set_username", "user", "user", uid, req.username)local msg = "success set username: " .. req.usernameif not ok thenmsg = "failed set username"end local res = {pid = "s2c_set_username",msg = msg,}return res 
end

module/cached/user.lua

function CMD.get_userinfo(uid, cache)local userinfo = {uid = uid, username = cache.username,lv = cache.lv,exp = cache.exp,}return userinfo
endfunction CMD.set_username(uid, cache, username)if not cache then return false end cache.username = usernamereturn true 
end 

以上便是实现一条新协议,基本要修改的文件。客户端需要添加协议对应处理方法 CMD,添加网络消息接受方法 RPC。 服务端需要在代理模块添加网络上行数据对应的协议处理函数 RPC,由于协议要从缓存获取,所以在缓存的用户模块中也要添加对应协议的处理方法 CMD

测试如下:

在这里插入图片描述

在这里插入图片描述

如上述,数据成功上行到服务端并做相应逻辑处理,成功后返回给了客户端。并且数据库中的数据,也同步成功。


以上便是本章节全部内容,项目源码同步:https://gitee.com/Cauchy_AQ/skynet_practice

相关文章:

【Skynet 入门实战练习】分布式 ID | 雪花算法 | 缓存设计 | LRU算法 | 数据库

文章目录 前言雪花算法LRU 算法缓存模块数据库测试逻辑 前言 本节实现了 分布式 ID 生成系统&#xff0c;采用雪花算法实现唯一 ID&#xff1b;实现缓存架构&#xff0c;采用 LRU &#xff08;最近最少使用&#xff09;算法。 雪花算法 分布式 ID 生成算法的有很多种&#x…...

ArcGIS Pro中怎么设置标注换行

在ArcGIS Pro中进行文字标注的时候&#xff0c;如果标注的字段内容太长&#xff0c;直接标注的话会不美观&#xff0c;而且还会影响旁边的标注显示&#xff0c;这里为大家介绍一下在ArcGIS Pro中设置文字换行的方法&#xff0c;希望能对你有所帮助。 数据来源 本教程所使用的…...

MAX26——快速人物毛发插片工具 Hair cards tool

一提到毛发插件&#xff0c;我们一般想起的就是maya的 xgrn 或者max的ox。但是这些都是我们做影视级数字人用的。比较费性能也比较费面 下面分享一个干货 Hair cards tool 这个插件操作不像xgen与ox那么复杂。基本上0基础上手5分钟不到。就能插片出不错的效果。比较适用于&…...

一天一个设计模式---原型模式

基本概念 原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;其主要目的是通过复制现有对象来创建新对象&#xff0c;而不是通过实例化类。原型模式允许在运行时动态创建对象&#xff0c;同时避免了耦合与子类化。 在原型模式中&#xff0…...

<习题集><LeetCode><链表><2/19/21/23/24>

目录 2. 两数相加 19. 删除链表的倒数第 N 个结点 21. 合并两个有序链表 23. 合并 K 个升序链表 24. 两两交换链表中的节点 2. 两数相加 https://leetcode.cn/problems/add-two-numbers/ public ListNode addTwoNumbers(ListNode l1, ListNode l2) {//head是cur链表头节点…...

C++实现DFS、BFS、Kruskal算法和Prim算法、拓扑排序、Dijkstra算法

背景&#xff1a; 实现要求&#xff1a; 根据图的抽象数据类型的定义&#xff0c;请采用邻接矩阵来存储图1&#xff0c;采用邻接表来存储图2&#xff0c;并完成如下操作&#xff1a;对图1无向图进行深度优先遍历和广度优先遍历。对图1无向图采用Kruskal算法和Prim算法得出最小…...

Spring 依赖注入的三种方式优缺点

小王学习录 前言属性注入1. 属性注入的优点2. 属性注入的缺点 Setter注入Setter注入的优点Setter注入的缺点 构造方法注入1. 构造方法的优点 总结补充Aurowired注解和Resource注解的区别 前言 在前面的文章中介绍了基于注解的方式将Bean存储到Spring中, 接下来介绍如何基于注解…...

代理模式介绍(静态代理、jdk动态代理、cglib代理)

一、静态代理 &#xff08;一&#xff09;定义 1、定义 为其他对象提供一种代理以控制对这个对象的访问&#xff1b; 2、涉及到的角色 &#xff08;1&#xff09;抽象主题角色&#xff1a;真实主题和代理主题的共同接口&#xff0c;便于在使用真实主题的地方都可以使用代理…...

设计模式基础——工厂模式剖析(2/2)

目录 一、工厂模式 1.1 工厂模式的定义 1.2 工厂模式的设计意图 1.3 工厂模式主要解决的问题 1.4 工厂模式的缺点 1.5 实际的应用案例 1. 数据库连接池 2. 图形用户界面&#xff08;GUI&#xff09;组件 3. 文件操作 二、各种工厂模式的变形 1.1 简单工厂模式&#…...

spark3.x 读取hudi报错

报错信息如下: Exception in thread "main" org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20231201203145254 at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64) at org.apa…...

微信小程序中block和View组件的使用区别

block和View组件都是用于布局的组件: 1. Block组件&#xff1a; Block组件是一个无实际显示效果的组件&#xff0c;它主要用于包裹一组组件&#xff0c;并提供了类似于div的作用。使用Block组件可以将一组组件进行分组&#xff0c;便于样式的管理和控制。Block组件不会在页面…...

代码混淆技术探究与工具选择

代码混淆技术探究与工具选择 引言 在软件开发中&#xff0c;保护程序代码的安全性是至关重要的一环。代码混淆&#xff08;Obfuscated code&#xff09;作为一种常见的保护手段&#xff0c;通过将代码转换成难以理解的形式来提升应用被逆向破解的难度。本文将介绍代码混淆的概…...

selenium 解决 id定位、class定位中,属性值带空格的解决办法

一、前置说明 selenium遇到下面这种元素&#xff1a; <th id"demo id" class"value1 value2 value3 ">1、虽然id一般不会有空格&#xff0c;但是前端错误的这种写法(如下图)&#xff0c;会造成使用id定位不到元素&#xff0c;如&#xff1a; find…...

gma 空间绘图实战(1):绘制多个子图,连接并展示局部放大区域

安装 gma&#xff1a;pip install gma 本文基于&#xff1a;gma 2.0.3&#xff0c;Python 3.10 本文用到的矢量数据为&#xff1a;CTAmap 1.12。来源于 https://www.shengshixian.com/ 。&#xff08;感谢锐多宝&#xff09; 绘图目标 参考代码 import matplotlib.pyplot as p…...

Unity中C#使用协程控制Shader材质变化

文章目录 前言一、协程是什么二、在Unity中使用协程1、我们在 Start 中测试一下协程的执行顺序2、我们实现一个点击按钮实现角色受击效果 三、协程中的动画过渡1、首先&#xff0c;在协程内实现中毒并且消散的效果2、在 OnGUI 内&#xff0c;给一个新按钮使用刚刚定义的协程 四…...

WordPress禁止显示指定类别的文章

使用wordpress禁止输出指定类别的文章可以给get_posts()函数传个数组参数&#xff0c;如下&#xff1a; <div class"widget" id"diary1"> <h3>随机呈现</h3> <ul> <?php $argsarray( numberposts>16, category>-9,-12, …...

C#里面的泛型(T),泛型类,泛型方法,泛型接口等简单解释

https://blog.csdn.net/dap769815768/article/details/81946506 只是比较简单的解释&#xff0c;在实际使用中&#xff0c;如果遇到需要深入研究的场景&#xff0c;再翻阅相关资料深入研究下。 一、泛型T 这个T在实际使用中很常见&#xff0c;比如List<T>。其实我们还…...

C语言——指针(五)

&#x1f4dd;前言&#xff1a; 上篇文章C语言——指针&#xff08;四&#xff09;更加深入的介绍了不同类型指针的特点&#xff0c;这篇文章主要想记录一下函数与指针的结合运用以及const和assert关于指针的用法&#xff1a; 1&#xff0c;函数与指针 2&#xff0c;const 3&am…...

文章解读与仿真程序复现思路——中国电机工程学报EI\CSCD\北大核心《考虑气电联合需求响应的气电综合能源配网系统协调优化运行》

这个标题涉及到一个涉及气体&#xff08;天然气&#xff09;和电力的综合能源配网系统&#xff0c;并且强调了考虑气电联合需求响应的协调优化运行。让我们逐步解读&#xff1a; 气电综合能源配网系统&#xff1a; 这指的是一个结合了气体&#xff08;通常是天然气&#xff09;…...

PostgreSQL 主键和唯一键的区别

主键和唯一键的区别 主键&#xff08;Primary Key&#xff09;&#xff1a; 主键是用于唯一标识表中的每一条记录的键。主键必须是唯一的&#xff0c;不允许为空。一个表只能有一个主键。主键可以由一个或多个字段组成。主键的值在整个表中必须是唯一的&#xff0c;用于确保数据…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

模型参数、模型存储精度、参数与显存

模型参数量衡量单位 M&#xff1a;百万&#xff08;Million&#xff09; B&#xff1a;十亿&#xff08;Billion&#xff09; 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的&#xff0c;但是一个参数所表示多少字节不一定&#xff0c;需要看这个参数以什么…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者

抖音增长新引擎&#xff1a;品融电商&#xff0c;一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中&#xff0c;品牌如何破浪前行&#xff1f;自建团队成本高、效果难控&#xff1b;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理

引言 Bitmap&#xff08;位图&#xff09;是Android应用内存占用的“头号杀手”。一张1080P&#xff08;1920x1080&#xff09;的图片以ARGB_8888格式加载时&#xff0c;内存占用高达8MB&#xff08;192010804字节&#xff09;。据统计&#xff0c;超过60%的应用OOM崩溃与Bitm…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会

在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...

【实施指南】Android客户端HTTPS双向认证实施指南

&#x1f510; 一、所需准备材料 证书文件&#xff08;6类核心文件&#xff09; 类型 格式 作用 Android端要求 CA根证书 .crt/.pem 验证服务器/客户端证书合法性 需预置到Android信任库 服务器证书 .crt 服务器身份证明 客户端需持有以验证服务器 客户端证书 .crt 客户端身份…...