博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
skynet服务的本质
阅读量:5060 次
发布时间:2019-06-12

本文共 13875 字,大约阅读时间需要 46 分钟。

本文着重讨论skynet框架中,一个服务的启动流程;

本文以一个编写好的service.lua服务作为示例,代码如下:

-- 每个服务独立, 都需要引入skynet

local skynet = require "skynet"
require "skynet.manager"    --
引入 skynet.register
local db = {}
local command = {}
function command.get(key)
    print("comman.get:"..key)
    return db[key]
end
function command.set(key, value)
    print("comman.set:key="..key..",value:"..value)
    db[key] = value
    local last = db[key]
    return last
end
skynet.start(function()
    print("==========Service Start=========")
    skynet.dispatch("lua", function(session, address, cmd, ...)
        print("==========Service dispatch============"..cmd)
        local f = command[cmd]    
        if f then
            --
回应一个消息可以使用 skynet.ret(message, size)
            --
它会将 message size 对应的消息附上当前消息的 session ,以及 skynet.PTYPE_RESPONSE 这个类别,发送给当前消息的来源 source
            skynet.ret(skynet.pack(f(...))) --
回应消息
        else
            error(string.format("Unknown command %s", tostring(cmd)))
        end
    end)
    --
可以为自己注册一个别名。(别名必须在 32 个字符以内)
    skynet.register "SERVICE"
end)

以上服务实现一个简单的存、取功能;

main.lua中代码如下:

local skynet = require "skynet"

-- 启动服务(启动函数)
skynet.start(function()
    --
启动函数里调用Skynet API开发各种服务
    
    local svr = skynet.newservice("service");
    skynet.call(svr, "lua", "set", "key", "val1111111");
    
    local kv = skynet.call(svr, "lua", "get", "key");
    print(kv);
    --
退出当前的服务
    -- skynet.exit
之后的代码都不会被运行。而且,当前服务被阻塞住的 coroutine 也会立刻中断退出。
    skynet.exit()
end)

启动一个服务的代码:

skynet.newservice("service");

看看的newservice函数实现:

function skynet.newservice(name, ...)

    return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end

再看看call函数的实现代码如下:

function skynet.call(addr, typename, ...)

    local p = proto[typename]
    local session = c.send(addr, p.id , nil , p.pack(...))
    if session == nil then
        error("call to invalid address " .. skynet.address(addr))
    end
    return p.unpack(yield_call(addr, session))
end

proto

这个表在skynet.lua中被定义,

初始化是在如下函数中进行:

以上步骤中,索引name和id都是存的注册协议表,后面通过name或者id就能直接找到对应的协议表,获取打包函数和解包函数;

在以下地方调用了该函数:

以上动作完成了proto表初始化,进行协议的注册;

这里为什么要进行协议注册呢?

c.send(addr, p.id , nil , p.pack(...))

send函数由c库提供,实现代码如下:

我们看到,该函数接着调用了send_message(L, 0, 2),该函数实现如下:

static int

send_message(lua_State *L, int source, int idx_type) {

    struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));

    uint32_t dest = (uint32_t)lua_tointeger(L, 1);

    const char * dest_string = NULL;

    if (dest == 0) {

        if (lua_type(L,1) == LUA_TNUMBER) {

            return luaL_error(L, "Invalid service address 0");

        }

        dest_string = get_dest_string(L, 1);

    }

 

    int type = luaL_checkinteger(L, idx_type+0);

    int session = 0;

    if (lua_isnil(L,idx_type+1)) {

        type |= PTYPE_TAG_ALLOCSESSION;

    } else {

        session = luaL_checkinteger(L,idx_type+1);

    }

 

    int mtype = lua_type(L,idx_type+2);

    switch (mtype) {

    case LUA_TSTRING: {

        size_t len = 0;

        void * msg = (void *)lua_tolstring(L,idx_type+2,&len);

        if (len == 0) {

            msg = NULL;

        }

        if (dest_string) {

            session = skynet_sendname(context, source, dest_string, type, session , msg, len);

        } else {

            session = skynet_send(context, source, dest, type, session , msg, len);

        }

        break;

    }

    case LUA_TLIGHTUSERDATA: {

        void * msg = lua_touserdata(L,idx_type+2);

        int size = luaL_checkinteger(L,idx_type+3);

        if (dest_string) {

            session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);

        } else {

            session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);

        }

        break;

    }

    default:

        luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));

    }

    if (session < 0) {

        // send to invalid address

        // todo: maybe throw an error would be better

        return 0;

    }

    lua_pushinteger(L,session);

    return 1;

}

要理解上述代码的逻辑,我们需要先清除的理解Lua调用C扩展库的原理;

先来看看lua-skynet.c这个库的代码结构,注册给lua层的函数直接略过,都具有统一参数结构,直接看关键代码:

LUAMOD_API int

luaopen_skynet_core(lua_State *L) {

    luaL_checkversion(L);

    luaL_Reg l[] = {

        { "send" , lsend },

        { "genid", lgenid },

        { "redirect", lredirect },

        { "command" , lcommand },

        { "intcommand", lintcommand },

        { "error", lerror },

        { "tostring", ltostring },

        { "harbor", lharbor },

        { "pack", luaseri_pack },

        { "unpack", luaseri_unpack },

        { "packstring", lpackstring },

        { "trash" , ltrash },

        { "callback", lcallback },

        { "now", lnow },

        { NULL, NULL },

    };

 

    luaL_newlibtable(L, l);

 

    /*在LUA_REGISTRYINDEX*/

    lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");

    struct skynet_context *ctx = lua_touserdata(L,-1);

    if (ctx == NULL) {

        return luaL_error(L, "Init skynet context first");

    }

 

    luaL_setfuncs(L,l,1);

 

    return 1;

}

上面的代码是为lua编写C库的标准写法,luaopen_skynet_core函数的命名方式直接关系到在lua中怎么引用这个库;

lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");

上面这行代码取全局表registry中skynet_context字段的值,将值压入栈中,至于为什么是registry,请参考:

那么这个skynet_context是何时被放入全局表中的呢?

查看service_lua.c中的代码,关键代码如下:

static int

init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {

    lua_State *L = l->L;

    l->ctx = ctx;

    lua_gc(L, LUA_GCSTOP, 0);

    lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */

    lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");

    luaL_openlibs(L);

    lua_pushlightuserdata(L, ctx);

    lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");

    luaL_requiref(L, "skynet.codecache", codecache , 0);

    lua_pop(L,1);

 

    const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");

    lua_pushstring(L, path);

    lua_setglobal(L, "LUA_PATH");

    const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");

    lua_pushstring(L, cpath);

    lua_setglobal(L, "LUA_CPATH");

    const char *service = optstring(ctx, "luaservice", "./service/?.lua");

    lua_pushstring(L, service);

    lua_setglobal(L, "LUA_SERVICE");

    const char *preload = skynet_command(ctx, "GETENV", "preload");

    lua_pushstring(L, preload);

    lua_setglobal(L, "LUA_PRELOAD");

 

    lua_pushcfunction(L, traceback);

    assert(lua_gettop(L) == 1);

 

    const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");

 

    int r = luaL_loadfile(L,loader);

    if (r != LUA_OK) {

        skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));

        report_launcher_error(ctx);

        return 1;

    }

    lua_pushlstring(L, args, sz);

    r = lua_pcall(L,1,0,1);

    if (r != LUA_OK) {

        skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));

        report_launcher_error(ctx);

        return 1;

    }

    lua_settop(L,0);

    if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {

        size_t limit = lua_tointeger(L, -1);

        l->mem_limit = limit;

        skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));

        lua_pushnil(L);

        lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");

    }

    lua_pop(L, 1);

 

    lua_gc(L, LUA_GCRESTART, 0);

 

    return 0;

}

加粗的那两行代码将一个skynet_context *ctx放入了全局表;

struct skynet_context *ctx = lua_touserdata(L,-1);

上面的代码行获取上一步从全局表中取出并入栈的skynet_context指针;

此处只是做了指针存在性校验,并未使用;

有了以上的分析,我们继续回到send_message()函数的流程来看,该函数的第一行代码:

struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));

上面已经将skynet_context *入栈,这里直接取出来就好了;

关于lua_upvalueindex(1)的解释:upvalue简单的来说就是类似于c语言中的static变量,即共享的全局变量,通过lua_upvalueindex()函数来生成假的索引可以

访问registry全局表中的元素值;

uint32_t dest = (uint32_t)lua_tointeger(L, 1);

const char * dest_string = NULL;
if (dest == 0) {
    if (lua_type(L,1) == LUA_TNUMBER) {
        return luaL_error(L, "Invalid service address 0");
    }
    dest_string = get_dest_string(L, 1);
}

以上代码是获取目标服务的地址,这里地址可以使一个整数也可以是一个字符串,我们在lua中调用时,第一个参数为".launcher"

这里还涉及到一个问题,lua中调用C函数库中函数时,lua中参数顺序是按照从左到右依次入栈的,这里取栈底元素,即lua参数表第一个元素;

int type = luaL_checkinteger(L, idx_type+0);//该参数在lua中传入为PTYPE_LUA = 10

int session = 0;
if (lua_isnil(L,idx_type+1)) {//在lua中,我们传入的nil
    type |= PTYPE_TAG_ALLOCSESSION;
} else {
    session = luaL_checkinteger(L,idx_type+1);
}

以上代码用来获取消息类型,我们在lua中调用时,传入的值为PTYPE_LUA = 10

 

int mtype = lua_type(L,idx_type+2);

以上代码行获取第四个参数,我们再来看看lua中是怎么传参的:

local session = c.send(addr, p.id , nil , p.pack(...))

第四个参数是调用pack函数,而pack函数实现在这里,

skynet.pack = assert(c.pack),使用的C库中的pack,该函数对应luaseri_pack,实现如下:

LUAMOD_API int

luaseri_pack(lua_State *L) {
    struct block temp;
    temp.next = NULL;
    struct write_block wb;
    wb_init(&wb, &temp);
    pack_from(L,&wb,0);
    assert(wb.head == &temp);
    seri(L, &temp, wb.len);
    wb_free(&wb);
    return 2;
}
该函数的细节暂不展开讨论,该函数的作用是将lua中函数的变参打包成一个usedata数据,并返回数据长度,并同时将这两个元素入栈,

send_message函数中,我们打印栈信息显示如下:

==========stack top==========

idx type value

5 number 21

4 userdata (null)

3 nil (null)

2 number 10

1 string .launcher

==========stack button==========

这个显示结果和我们上面分析的步骤得到的结果是符合的;

那么int mtype = lua_type(L,idx_type+2);返回的元素类型肯定是一个userdata类型;

int mtype = lua_type(L,idx_type+2);

    switch (mtype) {
    case LUA_TSTRING: {
        size_t len = 0;
        void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
        if (len == 0) {
            msg = NULL;
        }
        if (dest_string) {
            session = skynet_sendname(context, source, dest_string, type, session , msg, len);
        } else {
            session = skynet_send(context, source, dest, type, session , msg, len);
        }
        break;
    }
    case LUA_TLIGHTUSERDATA: {
        void * msg = lua_touserdata(L,idx_type+2);
        int size = luaL_checkinteger(L,idx_type+3);
        if (dest_string) {
            session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
        } else {
            session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
        }
        break;
    }
    default:
        luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
    }
    if (session < 0) {
        // send to invalid address
        // todo: maybe throw an error would be better
        return 0;
    }
    lua_pushinteger(L,session);
    return 1;

有了上面的分析,上面的代码流程就很好理解了,启动一个服务的代码肯定会执行到加粗部分标注的代码分支里面;

最后从栈中取出userdata数据和其长度后,调用了skynet_sendname函数来发送一个消息,具体来看看这个函数:

int

skynet_sendname(struct skynet_context * context, uint32_t source, const char * addr , int type, int session, void * data, size_t sz) {
    if (source == 0) {
        source = context->handle;//这里其实就是snlua的handle
    }
    uint32_t des = 0;
    if (addr[0] == ':') {
        des = strtoul(addr+1, NULL, 16);
    } else if (addr[0] == '.') {
        des = skynet_handle_findname(addr + 1);
        if (des == 0) {
            if (type & PTYPE_TAG_DONTCOPY) {
                skynet_free(data);
            }
            return -1;
        }
    } else {
//单节点模式用不到
        _filter_args(context, type, &session, (void **)&data, &sz);
        struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
        copy_name(rmsg->destination.name, addr);
        rmsg->destination.handle = 0;
        rmsg->message = data;
        rmsg->sz = sz;
        skynet_harbor_send(rmsg, source, session);
        return session;
    }
    return skynet_send(context, source, des, type, session, data, sz);
}

代码比较简单,我们传进来的addr=" .launcher" skynet_handle_findname函数会将字符串地址转换为整数地址,函数实现代码如下:

uint32_t

skynet_handle_findname(const char * name) {
    struct handle_storage *s = H;
    rwlock_rlock(&s->lock);
    uint32_t handle = 0;
    int begin = 0;
    int end = s->name_count - 1;
    while (begin<=end) {
        int mid = (begin+end)/2;
        struct handle_name *n = &s->name[mid];
        int c = strcmp(n->name, name);
        if (c==0) {
            handle = n->handle;
            break;
        }
        if (c<0) {
            begin = mid + 1;
        } else {
            end = mid - 1;
        }
    }
    rwlock_runlock(&s->lock);
    return handle;
}

上面传进来的name="launcher",通过使用二分查找的方式,寻找name对应的handle,而我们知道,skynet框架启动时,启动的第一个服务就是luancher服务;

解释如下:

我们查看config.lua配置文件,发现有如下配置行:

bootstrap = "snlua bootstrap"    -- The service for bootstrap

该配置行会在以下启动函数中被读取并执行:

void

skynet_start(struct skynet_config * config) {
    
……
    bootstrap(ctx, config->bootstrap);
    
……
    start(config->thread);
}

框架启动时,首先就会执行bootstrap.lua中的代码,代码如下:

local skynet = require "skynet"

local harbor = require "skynet.harbor"
require "skynet.manager"    -- import skynet.launch, ...
local memory = require "skynet.memory"
skynet.start(function()
    local sharestring = tonumber(skynet.getenv "sharestring" or 4096)
    memory.ssexpand(sharestring)
    local standalone = skynet.getenv "standalone"
    
local launcher = assert(skynet.launch("snlua","launcher"))
    skynet.name(".launcher", launcher)
    local harbor_id = tonumber(skynet.getenv "harbor" or 0)
    if harbor_id == 0 then
        assert(standalone == nil)
        standalone = true
        skynet.setenv("standalone", "true")
        local ok, slave = pcall(skynet.newservice, "cdummy")
        if not ok then
            skynet.abort()
        end
        skynet.name(".cslave", slave)
    else
        if standalone then
            if not pcall(skynet.newservice,"cmaster") then
                skynet.abort()
            end
        end
        local ok, slave = pcall(skynet.newservice, "cslave")
        if not ok then
            skynet.abort()
        end
        skynet.name(".cslave", slave)
    end
    if standalone then
        local datacenter = skynet.newservice "datacenterd"
        skynet.name("DATACENTER", datacenter)
    end
    skynet.newservice "service_mgr"
    pcall(skynet.newservice,skynet.getenv "start" or "main")
    skynet.exit()
end)

如上粗体部分标注,框架第一个启动的服务就是launcher

那么,找到launcher服务的地址后,接着就调用了skynet_send函数,函数实现代码如下:

int

skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
    if ((sz & MESSAGE_TYPE_MASK) != sz) {
        skynet_error(context, "The message to %x is too large", destination);
        if (type & PTYPE_TAG_DONTCOPY) {
            skynet_free(data);
        }
        return -1;
    }
    _filter_args(context, type, &session, (void **)&data, &sz);
    if (source == 0) {
        source = context->handle;
    }
    if (destination == 0) {
        return session;
    }
    if (skynet_harbor_message_isremote(destination)) {
        struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
        rmsg->destination.handle = destination;
        rmsg->message = data;
        rmsg->sz = sz;
        skynet_harbor_send(rmsg, source, session);
    } else {
        struct skynet_message smsg;
        smsg.source = source;
        smsg.session = session;
        smsg.data = data;
        smsg.sz = sz;
        if (skynet_context_push(destination, &smsg)) {
            skynet_free(data);
            return -1;
        }
    }
    return session;
}

里面有一个函数_filter_args(context, type, &session, (void **)&data, &sz);比较有意思,函数实现如下:

static void

_filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) {
    int needcopy = !(type & PTYPE_TAG_DONTCOPY);
    int allocsession = type & PTYPE_TAG_ALLOCSESSION;//栈中传入的第三个参数为nil,就会打上PTYPE_TAG_ALLOCSESSION这个标记
    type &= 0xff;
    if (allocsession) {
        assert(*session == 0);
        *session = skynet_context_newsession(context);//这里就为新服务创建了一个session
    }
    if (needcopy && *data) {
//是否为消息保存副本
        char * msg = skynet_malloc(*sz+1);
        memcpy(msg, *data, *sz);
        msg[*sz] = '\0';
        *data = msg;
    }
    *sz |= (size_t)type << MESSAGE_TYPE_SHIFT;
}

消息处理完毕后,上面函数将初始化一个struct skynet_message smsg消息,并将消息放入launcher服务的消息队列中,

接下来的流程就是launcher服务读取消息队列中消息,并处理消息了,launcher服务启动说明请参考:http://www.cnblogs.com/skiing886/p/7749307.html

 

 

 

 

 

转载于:https://www.cnblogs.com/skiing886/p/7737931.html

你可能感兴趣的文章
spring security 11种过滤器介绍
查看>>
代码实现导航栏分割线
查看>>
大数据学习系列(8)-- WordCount+Block+Split+Shuffle+Map+Reduce技术详解
查看>>
【AS3代码】播放FLV视频流的三步骤!
查看>>
枚举的使用
查看>>
luogu4849 寻找宝藏 (cdq分治+dp)
查看>>
日志框架--(一)基础篇
查看>>
关于源程序到可运行程序的过程
查看>>
转载:mysql数据库密码忘记找回方法
查看>>
scratch少儿编程第一季——06、人在江湖混,没有背景怎么行。
查看>>
【贪心+DFS】D. Field expansion
查看>>
C# Async与Await的使用
查看>>
Mysql性能调优
查看>>
iOS基础-UIKit框架-多控制器管理-实例:qq界面框架
查看>>
IOS-每个程序员的编程之路上都应该看这11本书
查看>>
自定义tabbar(纯代码)
查看>>
小程序底部导航栏
查看>>
ibatis学习笔记
查看>>
18-ES6(1)
查看>>
poj1611 简单并查集
查看>>