本文着重讨论skynet框架中,一个服务的启动流程;
本文以一个编写好的service.lua服务作为示例,代码如下:
-- 每个服务独立, 都需要引入skynetlocal skynet = require "skynet"require "skynet.manager" -- 引入 skynet.registerlocal db = {}local command = {}function command.get(key) print("comman.get:"..key) return db[key]endfunction command.set(key, value) print("comman.set:key="..key..",value:"..value) db[key] = value local last = db[key] return lastendskynet.start(function() print("==========Service Start=========") skynet.dispatch("lua", function(session, address, cmd, ...) print("==========Service dispatch============"..cmd) local f = command[cmd] if f then -- 回应一个消息可以使用 skynet.ret(message, size) 。 -- 它会将 message size 对应的消息附上当前消息的 session ,以及 skynet.PTYPE_RESPONSE 这个类别,发送给当前消息的来源 source skynet.ret(skynet.pack(f(...))) --回应消息 else error(string.format("Unknown command %s", tostring(cmd))) end end) --可以为自己注册一个别名。(别名必须在 32 个字符以内) skynet.register "SERVICE"end)
以上服务实现一个简单的存、取功能;main.lua中代码如下:
local skynet = require "skynet"-- 启动服务(启动函数)skynet.start(function() -- 启动函数里调用Skynet API开发各种服务 local svr = skynet.newservice("service"); skynet.call(svr, "lua", "set", "key", "val1111111"); local kv = skynet.call(svr, "lua", "get", "key"); print(kv); -- 退出当前的服务 -- skynet.exit 之后的代码都不会被运行。而且,当前服务被阻塞住的 coroutine 也会立刻中断退出。 skynet.exit()end)
启动一个服务的代码:
skynet.newservice("service");看看的newservice函数实现:
function skynet.newservice(name, ...) return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)end
再看看call函数的实现代码如下:
function skynet.call(addr, typename, ...) local p = proto[typename] local session = c.send(addr, p.id , nil , p.pack(...)) if session == nil then error("call to invalid address " .. skynet.address(addr)) end return p.unpack(yield_call(addr, session))end
proto:
这个表在skynet.lua中被定义,
初始化是在如下函数中进行:
以上步骤中,索引name和id都是存的注册协议表,后面通过name或者id就能直接找到对应的协议表,获取打包函数和解包函数;
在以下地方调用了该函数:
以上动作完成了proto表初始化,进行协议的注册;
这里为什么要进行协议注册呢?
c.send(addr, p.id , nil , p.pack(...)):
send函数由c库提供,实现代码如下:
我们看到,该函数接着调用了send_message(L, 0, 2),该函数实现如下:
static int
send_message(lua_State *L, int source, int idx_type) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
uint32_t dest = (uint32_t)lua_tointeger(L, 1);
const char * dest_string = NULL;
if (dest == 0) {
if (lua_type(L,1) == LUA_TNUMBER) {
return luaL_error(L, "Invalid service address 0");
}
dest_string = get_dest_string(L, 1);
}
int type = luaL_checkinteger(L, idx_type+0);
int session = 0;
if (lua_isnil(L,idx_type+1)) {
type |= PTYPE_TAG_ALLOCSESSION;
} else {
session = luaL_checkinteger(L,idx_type+1);
}
int mtype = lua_type(L,idx_type+2);
switch (mtype) {
case LUA_TSTRING: {
size_t len = 0;
void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
if (len == 0) {
msg = NULL;
}
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
break;
}
case LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
if (session < 0) {
// send to invalid address
// todo: maybe throw an error would be better
return 0;
}
lua_pushinteger(L,session);
return 1;
}
要理解上述代码的逻辑,我们需要先清除的理解Lua调用C扩展库的原理;
先来看看lua-skynet.c这个库的代码结构,注册给lua层的函数直接略过,都具有统一参数结构,直接看关键代码:
LUAMOD_API int
luaopen_skynet_core(lua_State *L) {
luaL_checkversion(L);
luaL_Reg l[] = {
{ "send" , lsend },
{ "genid", lgenid },
{ "redirect", lredirect },
{ "command" , lcommand },
{ "intcommand", lintcommand },
{ "error", lerror },
{ "tostring", ltostring },
{ "harbor", lharbor },
{ "pack", luaseri_pack },
{ "unpack", luaseri_unpack },
{ "packstring", lpackstring },
{ "trash" , ltrash },
{ "callback", lcallback },
{ "now", lnow },
{ NULL, NULL },
};
luaL_newlibtable(L, l);
/*在LUA_REGISTRYINDEX*/
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
struct skynet_context *ctx = lua_touserdata(L,-1);
if (ctx == NULL) {
return luaL_error(L, "Init skynet context first");
}
luaL_setfuncs(L,l,1);
return 1;
}
上面的代码是为lua编写C库的标准写法,luaopen_skynet_core函数的命名方式直接关系到在lua中怎么引用这个库;
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
上面这行代码取全局表registry中skynet_context字段的值,将值压入栈中,至于为什么是registry,请参考:
那么这个skynet_context是何时被放入全局表中的呢?
查看service_lua.c中的代码,关键代码如下:
static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
lua_State *L = l->L;
l->ctx = ctx;
lua_gc(L, LUA_GCSTOP, 0);
lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */
lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");
luaL_openlibs(L);
lua_pushlightuserdata(L, ctx);
lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
luaL_requiref(L, "skynet.codecache", codecache , 0);
lua_pop(L,1);
const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");
lua_pushstring(L, path);
lua_setglobal(L, "LUA_PATH");
const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");
lua_pushstring(L, cpath);
lua_setglobal(L, "LUA_CPATH");
const char *service = optstring(ctx, "luaservice", "./service/?.lua");
lua_pushstring(L, service);
lua_setglobal(L, "LUA_SERVICE");
const char *preload = skynet_command(ctx, "GETENV", "preload");
lua_pushstring(L, preload);
lua_setglobal(L, "LUA_PRELOAD");
lua_pushcfunction(L, traceback);
assert(lua_gettop(L) == 1);
const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
int r = luaL_loadfile(L,loader);
if (r != LUA_OK) {
skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_pushlstring(L, args, sz);
r = lua_pcall(L,1,0,1);
if (r != LUA_OK) {
skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_settop(L,0);
if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {
size_t limit = lua_tointeger(L, -1);
l->mem_limit = limit;
skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));
lua_pushnil(L);
lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");
}
lua_pop(L, 1);
lua_gc(L, LUA_GCRESTART, 0);
return 0;
}
加粗的那两行代码将一个skynet_context *ctx放入了全局表;
struct skynet_context *ctx = lua_touserdata(L,-1);
上面的代码行获取上一步从全局表中取出并入栈的skynet_context指针;
此处只是做了指针存在性校验,并未使用;
有了以上的分析,我们继续回到send_message()函数的流程来看,该函数的第一行代码:
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
上面已经将skynet_context *入栈,这里直接取出来就好了;
关于lua_upvalueindex(1)的解释:upvalue简单的来说就是类似于c语言中的static变量,即共享的全局变量,通过lua_upvalueindex()函数来生成假的索引可以
访问registry全局表中的元素值;
uint32_t dest = (uint32_t)lua_tointeger(L, 1);const char * dest_string = NULL;if (dest == 0) { if (lua_type(L,1) == LUA_TNUMBER) { return luaL_error(L, "Invalid service address 0"); } dest_string = get_dest_string(L, 1);}
以上代码是获取目标服务的地址,这里地址可以使一个整数也可以是一个字符串,我们在lua中调用时,第一个参数为".launcher";
这里还涉及到一个问题,lua中调用C函数库中函数时,lua中参数顺序是按照从左到右依次入栈的,这里取栈底元素,即lua参数表第一个元素;
int type = luaL_checkinteger(L, idx_type+0);//该参数在lua中传入为PTYPE_LUA = 10int session = 0;if (lua_isnil(L,idx_type+1)) {//在lua中,我们传入的nil type |= PTYPE_TAG_ALLOCSESSION;} else { session = luaL_checkinteger(L,idx_type+1);}
以上代码用来获取消息类型,我们在lua中调用时,传入的值为PTYPE_LUA = 10
int mtype = lua_type(L,idx_type+2);
以上代码行获取第四个参数,我们再来看看lua中是怎么传参的:
local session = c.send(addr, p.id , nil , p.pack(...))
第四个参数是调用pack函数,而pack函数实现在这里,
skynet.pack = assert(c.pack),使用的C库中的pack,该函数对应luaseri_pack,实现如下:
LUAMOD_API intluaseri_pack(lua_State *L) { struct block temp; temp.next = NULL; struct write_block wb; wb_init(&wb, &temp); pack_from(L,&wb,0); assert(wb.head == &temp); seri(L, &temp, wb.len); wb_free(&wb); return 2;}该函数的细节暂不展开讨论,该函数的作用是将lua中函数的变参打包成一个usedata数据,并返回数据长度,并同时将这两个元素入栈,
在send_message函数中,我们打印栈信息显示如下:
==========stack top==========
idx type value
5 number 21
4 userdata (null)
3 nil (null)
2 number 10
1 string .launcher
==========stack button==========
这个显示结果和我们上面分析的步骤得到的结果是符合的;
那么int mtype = lua_type(L,idx_type+2);返回的元素类型肯定是一个userdata类型;
int mtype = lua_type(L,idx_type+2); switch (mtype) { case LUA_TSTRING: { size_t len = 0; void * msg = (void *)lua_tolstring(L,idx_type+2,&len); if (len == 0) { msg = NULL; } if (dest_string) { session = skynet_sendname(context, source, dest_string, type, session , msg, len); } else { session = skynet_send(context, source, dest, type, session , msg, len); } break; } case LUA_TLIGHTUSERDATA: { void * msg = lua_touserdata(L,idx_type+2); int size = luaL_checkinteger(L,idx_type+3); if (dest_string) { session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size); } else { session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size); } break; } default: luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2))); } if (session < 0) { // send to invalid address // todo: maybe throw an error would be better return 0; } lua_pushinteger(L,session); return 1;
有了上面的分析,上面的代码流程就很好理解了,启动一个服务的代码肯定会执行到加粗部分标注的代码分支里面;
最后从栈中取出userdata数据和其长度后,调用了skynet_sendname函数来发送一个消息,具体来看看这个函数:
intskynet_sendname(struct skynet_context * context, uint32_t source, const char * addr , int type, int session, void * data, size_t sz) { if (source == 0) { source = context->handle;//这里其实就是snlua的handle } uint32_t des = 0; if (addr[0] == ':') { des = strtoul(addr+1, NULL, 16); } else if (addr[0] == '.') { des = skynet_handle_findname(addr + 1); if (des == 0) { if (type & PTYPE_TAG_DONTCOPY) { skynet_free(data); } return -1; } } else { //单节点模式用不到 _filter_args(context, type, &session, (void **)&data, &sz); struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); copy_name(rmsg->destination.name, addr); rmsg->destination.handle = 0; rmsg->message = data; rmsg->sz = sz; skynet_harbor_send(rmsg, source, session); return session; } return skynet_send(context, source, des, type, session, data, sz);}
代码比较简单,我们传进来的addr=" .launcher", skynet_handle_findname函数会将字符串地址转换为整数地址,函数实现代码如下:
uint32_t skynet_handle_findname(const char * name) { struct handle_storage *s = H; rwlock_rlock(&s->lock); uint32_t handle = 0; int begin = 0; int end = s->name_count - 1; while (begin<=end) { int mid = (begin+end)/2; struct handle_name *n = &s->name[mid]; int c = strcmp(n->name, name); if (c==0) { handle = n->handle; break; } if (c<0) { begin = mid + 1; } else { end = mid - 1; } } rwlock_runlock(&s->lock); return handle;}
上面传进来的name="launcher",通过使用二分查找的方式,寻找name对应的handle,而我们知道,skynet框架启动时,启动的第一个服务就是luancher服务;
解释如下:
我们查看config.lua配置文件,发现有如下配置行:
bootstrap = "snlua bootstrap" -- The service for bootstrap
该配置行会在以下启动函数中被读取并执行:
void skynet_start(struct skynet_config * config) { …… bootstrap(ctx, config->bootstrap); …… start(config->thread);}
框架启动时,首先就会执行bootstrap.lua中的代码,代码如下:
local skynet = require "skynet"local harbor = require "skynet.harbor"require "skynet.manager" -- import skynet.launch, ...local memory = require "skynet.memory"skynet.start(function() local sharestring = tonumber(skynet.getenv "sharestring" or 4096) memory.ssexpand(sharestring) local standalone = skynet.getenv "standalone" local launcher = assert(skynet.launch("snlua","launcher")) skynet.name(".launcher", launcher) local harbor_id = tonumber(skynet.getenv "harbor" or 0) if harbor_id == 0 then assert(standalone == nil) standalone = true skynet.setenv("standalone", "true") local ok, slave = pcall(skynet.newservice, "cdummy") if not ok then skynet.abort() end skynet.name(".cslave", slave) else if standalone then if not pcall(skynet.newservice,"cmaster") then skynet.abort() end end local ok, slave = pcall(skynet.newservice, "cslave") if not ok then skynet.abort() end skynet.name(".cslave", slave) end if standalone then local datacenter = skynet.newservice "datacenterd" skynet.name("DATACENTER", datacenter) end skynet.newservice "service_mgr" pcall(skynet.newservice,skynet.getenv "start" or "main") skynet.exit()end)
如上粗体部分标注,框架第一个启动的服务就是launcher;
那么,找到launcher服务的地址后,接着就调用了skynet_send函数,函数实现代码如下:
intskynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) { if ((sz & MESSAGE_TYPE_MASK) != sz) { skynet_error(context, "The message to %x is too large", destination); if (type & PTYPE_TAG_DONTCOPY) { skynet_free(data); } return -1; } _filter_args(context, type, &session, (void **)&data, &sz); if (source == 0) { source = context->handle; } if (destination == 0) { return session; } if (skynet_harbor_message_isremote(destination)) { struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); rmsg->destination.handle = destination; rmsg->message = data; rmsg->sz = sz; skynet_harbor_send(rmsg, source, session); } else { struct skynet_message smsg; smsg.source = source; smsg.session = session; smsg.data = data; smsg.sz = sz; if (skynet_context_push(destination, &smsg)) { skynet_free(data); return -1; } } return session;}
里面有一个函数_filter_args(context, type, &session, (void **)&data, &sz);比较有意思,函数实现如下:
static void_filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) { int needcopy = !(type & PTYPE_TAG_DONTCOPY); int allocsession = type & PTYPE_TAG_ALLOCSESSION;//栈中传入的第三个参数为nil,就会打上PTYPE_TAG_ALLOCSESSION这个标记 type &= 0xff; if (allocsession) { assert(*session == 0); *session = skynet_context_newsession(context);//这里就为新服务创建了一个session } if (needcopy && *data) { //是否为消息保存副本 char * msg = skynet_malloc(*sz+1); memcpy(msg, *data, *sz); msg[*sz] = '\0'; *data = msg; } *sz |= (size_t)type << MESSAGE_TYPE_SHIFT;}
消息处理完毕后,上面函数将初始化一个struct skynet_message smsg消息,并将消息放入launcher服务的消息队列中,
接下来的流程就是launcher服务读取消息队列中消息,并处理消息了,launcher服务启动说明请参考:http://www.cnblogs.com/skiing886/p/7749307.html