跳至主要內容

nanomsg源码阅读(三)

abstiger大约 5 分钟learningnanomsgC

上一篇 nanomsg源码阅读(二) 介绍了nanomsg的实用工具类utils模块,这篇将试图解析下nanomsg最核心的aio(即用线程池模拟带状态机的异步IO)模块,当然这里的IO是广义上的IO,包含了用户自定义事件的输入输出,后面的分析中我们会看到。

定时任务集合:timerset.h timerset.c

采用有序链表方式实现,新增定时任务时,根据到期timeout时间,排序插入,最近到期置于链表头,通过nn_timerset_timeout函数获取最近到期时间与当前时间的差,作为参数传入nn_poller_wait

IO多路分离器:poller.h poller.c poller_kqueue.h poller_kqueue.inc poller_epoll.h poller_epoll.inc poller_poll.h poller_poll.inc

前文提到常用的IO多路分离器,网上对他们各自的对比也有很多,这里不再细述了,值得注意的是nanomsg打开任意的fd都会设置CLOEXEC标志(出于安全方面的考虑)。

上下文环境:ctx.h ctx.c

每个nn_socket对应一个nn_ctx,调用关系为:nn_socket–>nn_sock_init–>nn_ctx_init,即程序中创建了多少个nn_socket就创建了多少个nn_ctx。

struct nn_ctx {
    struct nn_mutex sync;            //互斥量
    struct nn_pool *pool;            //线程池外部初始化后,init时赋值
    struct nn_queue events;          //本上下文状态机事件队列
    struct nn_queue eventsto;        //外部状态机事件队列
    nn_ctx_onleave onleave;          //通知函数,在nn_ctx_leave时调用
}; 

//上下文初始化与回收
void nn_ctx_init (struct nn_ctx *self, struct nn_pool *pool, 
                  nn_ctx_onleave onleave);
void nn_ctx_term (struct nn_ctx *self);

//对上下文上锁
void nn_ctx_enter (struct nn_ctx *self);

/* 先处理完内部的events,调用onleave函数通知owner,
 * 再copy一份eventsto队列后,对ctx解锁,之后处理外部上下文队列数据
 */
void nn_ctx_leave (struct nn_ctx *self);

//对nn_pool_choose_worker的包装,从线程池中选取一个thread作为工作线程
struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self);

//往内部状态机事件队列中添加事件
void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event);

//往外部状态机时间队列中添加事件,仅用在线程间通讯中
void nn_ctx_raiseto (struct nn_ctx *self, struct nn_fsm_event *event);

线程池:pool.h pool.c

好吧,pool.c上赫然的TODO告诉我们:这个线程池里面目前其实只有一个线程。

/* TODO: The dummy implementation of a thread pool. As for now there's only one worker thread created. */

这个线程池需要在外部进行初始化,然后在aio contex初始化时,赋上对它的引用,通过nn_ctx_choose_worker函数从线程池中选取一个thread作为该上下文的工作线程worker。

工作线程:worker.h worker.c worker_win.h worker_win.inc worker_posix.h worker_posix.inc

这里我主要查看的是posix的实现:worker_posix.inc,windows的实现没有细看了。

struct nn_worker {
    struct nn_mutex sync;              //对下面自定义任务队列tasks操作的锁
    struct nn_queue tasks;             //本工作线程的待处理自定义任务队列
    struct nn_queue_item stop;         //worker停止通知任务,收到此任务后,线程将终止
    struct nn_efd efd;                 //用于处理自定义任务task事件的通知描述符
    struct nn_poller poller;           //多路分离器,处理定时timer、IO的fd、任务通知efd
    struct nn_poller_hndl efd_hndl;    //用于标识此事件为自定义任务task
    struct nn_timerset timerset;       //定时任务列表
    struct nn_thread thread;           //工作线程
};

工作线程有三类工作要做,由多路分离器poller轮询获取处理(具体处理流程在nn_worker_routine函数里有较为清晰的注释),他们分别为:

//定时任务
struct nn_worker_timer {
    struct nn_fsm *owner;
    struct nn_timerset_hndl hndl;
};

//文件描述符的IO操作
struct nn_worker_fd {
    int src;
    struct nn_fsm *owner;
    struct nn_poller_hndl hndl;
};

//自定义任务
struct nn_worker_task {
    int src;
    struct nn_fsm *owner;
    struct nn_queue_item item;
};

值得注意的是:处理工作前,都会对工作处理状态机nn_fsm所属的上下文nn_ctx上锁并处理(调用nn_ctx_enter/nn_ctx_leave)。

即worker中通过nn_poller_wait轮询获取工作,然后每一个工作都有对应的状态机nn_fsm做流程处理。

状态机:fsm.h fsm.c

上面看到worker接到工作以后,进入工作区域,按照工作流程来处理,如果这是一个顺序执行的大步骤,便相当于同步流程了。引入状态和状态机以后,将工作进一步切分为几个步骤,通过状态机状态及动作的方式,不仅让流程相对清晰,更通过这样对任务的进一步细分,将很多同步的操作内部转变为异步。

传统的状态机一般由状态state+动作action构成,这里的状态机增加了一个来源src:即动作发起者

struct nn_fsm_event {
    struct nn_fsm *fsm;          //事件所属状态机
    int src;                     //事件发起者
    void *srcptr;                //事件发起者附属信息
    int type;                    //动作action,因为包含文件的IO,这里取名为type
    struct nn_queue_item item;   //事件会加入到nn_ctx的events队列中
};

struct nn_fsm {
    nn_fsm_fn fn;                //状态机处理主函数,大switch所在
    nn_fsm_fn shutdown_fn;       //状态机关闭处理函数
    int state;                   //状态
    int src;                     //本状态机的源(动作发起者)
    void *srcptr;                //来源附属信息
    struct nn_fsm *owner;        //父指针
    struct nn_ctx *ctx;          //状态机所属上下文
    struct nn_fsm_event stopped; //状态机停止事件
};

基础的fsm内部有个默认的src:NN_FSM_ACTION,以及与此相对应的两个action:NN_FSM_START/NN_FSM_STOP,而后用户可以在此基础上扩展,只要定义的src和action不冲突即可(默认为负,只要自定义为正即可)。

定时处理:timer.h timer.c

使用状态机具体实现

socket包装:usock.h usock.c usock_posix.h usock_posix.inc usock_win.h usock_win.inc

使用状态机具体实现

总结

aio应该是nanomsg最核心的模块了,中间最复杂的当属作者将将状态机引入到事件驱动的编程中。

martin用了连续两篇博客 《The Callback Hell》open in new window《Event-driven architecture, state machines et al.》open in new window介绍了传统的callback的恶心,最主要的是不易扩展,而网络层的scalable正式nanomsg追求的目标也是与zeromq最大的区别,用户可以在其核心构建基础上,实现自己的网络拓扑协议。martin在文章里有说过有空要博客详细介绍下nanomsg的状态机机制,可惜到目前为止,他还没有写……。

欢迎大家探讨指正!