nanomsg源码阅读(三)

上一篇 nanomsg源码阅读(二),介绍了nanomsg的实用工具类utils模块,这篇将试图解析下nanomsg最核心的aio(即用线程池模拟带状态机的异步IO)模块,当然这里的IO是广义上的IO,包含了用户自定义事件的输入输出,后面的分析中我们会看到。

定时任务集合:timerset.h timerset.c

采用有序链表方式实现,新增定时任务时,根据到期timeout时间,排序插入,最近到期置于链表头,通过nn_timerset_timeout函数获取最近到期时间与当前时间的差,作为参数传入nn_poller_wait

IO多路分离器:poller.h poller.c poller_kqueue.h poller_kqueue.inc poller_epoll.h poller_epoll.inc poller_poll.h poller_poll.inc

我的关于IO的这篇文章提到常用的IO多路分离器,网上对他们各自的对比也有很多,这里不再细述了,值得注意的是nanomsg打开任意的fd都会设置CLOEXEC标志(出于安全方面的考虑)。

上下文环境:ctx.h ctx.c

每个nn_socket对应一个nn_ctx,调用关系为:nn_socket–>nn_sock_init–>nn_ctx_init,即程序中创建了多少个nn_socket就创建了多少个nn_ctx。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct nn_ctx {
struct nn_mutex sync; //互斥量
struct nn_pool *pool; //线程池外部初始化后,init时赋值
struct nn_queue events; //本上下文状态机事件队列
struct nn_queue eventsto; //外部状态机事件队列
nn_ctx_onleave onleave; //通知函数,在nn_ctx_leave时调用
};

//上下文初始化与回收
void nn_ctx_init (struct nn_ctx *self, struct nn_pool *pool,
nn_ctx_onleave onleave);
void nn_ctx_term (struct nn_ctx *self);

//对上下文上锁
void nn_ctx_enter (struct nn_ctx *self);

/* 先处理完内部的events,调用onleave函数通知owner,
* 再copy一份eventsto队列后,对ctx解锁,之后处理外部上下文队列数据
*/
void nn_ctx_leave (struct nn_ctx *self);

//对nn_pool_choose_worker的包装,从线程池中选取一个thread作为工作线程
struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self);

//往内部状态机事件队列中添加事件
void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event);

//往外部状态机时间队列中添加事件,仅用在线程间通讯中
void nn_ctx_raiseto (struct nn_ctx *self, struct nn_fsm_event *event);

线程池:pool.h pool.c

好吧,pool.c上赫然的TODO告诉我们:这个线程池里面目前其实只有一个线程。

/ TODO: The dummy implementation of a thread pool. As for now there’s only one worker thread created. /

这个线程池需要在外部进行初始化,然后在aio contex初始化时,赋上对它的引用,通过nn_ctx_choose_worker函数从线程池中选取一个thread作为该上下文的工作线程worker。

工作线程:worker.h worker.c worker_win.h worker_win.inc worker_posix.h worker_posix.inc

这里我主要查看的是posix的实现:worker_posix.inc,windows的实现没有细看了。

1
2
3
4
5
6
7
8
9
10
struct nn_worker {
struct nn_mutex sync; //对下面自定义任务队列tasks操作的锁
struct nn_queue tasks; //本工作线程的待处理自定义任务队列
struct nn_queue_item stop; //worker停止通知任务,收到此任务后,线程将终止
struct nn_efd efd; //用于处理自定义任务task事件的通知描述符
struct nn_poller poller; //多路分离器,处理定时timer、IO的fd、任务通知efd
struct nn_poller_hndl efd_hndl; //用于标识此事件为自定义任务task
struct nn_timerset timerset; //定时任务列表
struct nn_thread thread; //工作线程
};

工作线程有三类工作要做,由多路分离器poller轮询获取处理(具体处理流程在nn_worker_routine函数里有较为清晰的注释),他们分别为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//定时任务
struct nn_worker_timer {
struct nn_fsm *owner;
struct nn_timerset_hndl hndl;
};

//文件描述符的IO操作
struct nn_worker_fd {
int src;
struct nn_fsm *owner;
struct nn_poller_hndl hndl;
};

//自定义任务
struct nn_worker_task {
int src;
struct nn_fsm *owner;
struct nn_queue_item item;
};

值得注意的是:处理工作前,都会对工作处理状态机nn_fsm所属的上下文nn_ctx上锁并处理(调用nn_ctx_enter/nn_ctx_leave)。

即worker中通过nn_poller_wait轮询获取工作,然后每一个工作都有对应的状态机nn_fsm做流程处理。

状态机:fsm.h fsm.c

上面看到worker接到工作以后,进入工作区域,按照工作流程来处理,如果这是一个顺序执行的大步骤,便相当于同步流程了。引入状态和状态机以后,将工作进一步切分为几个步骤,通过状态机状态及动作的方式,不仅让流程相对清晰,更通过这样对任务的进一步细分,将很多同步的操作内部转变为异步。

传统的状态机一般由状态state+动作action构成,这里的状态机增加了一个来源src:即动作发起者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct nn_fsm_event {
struct nn_fsm *fsm; //事件所属状态机
int src; //事件发起者
void *srcptr; //事件发起者附属信息
int type; //动作action,因为包含文件的IO,这里取名为type
struct nn_queue_item item; //事件会加入到nn_ctx的events队列中
};

struct nn_fsm {
nn_fsm_fn fn; //状态机处理主函数,大switch所在
nn_fsm_fn shutdown_fn; //状态机关闭处理函数
int state; //状态
int src; //本状态机的源(动作发起者)
void *srcptr; //来源附属信息
struct nn_fsm *owner; //父指针
struct nn_ctx *ctx; //状态机所属上下文
struct nn_fsm_event stopped; //状态机停止事件
};

基础的fsm内部有个默认的src:NN_FSM_ACTION,以及与此相对应的两个action:NN_FSM_START/NN_FSM_STOP,而后用户可以在此基础上扩展,只要定义的src和action不冲突即可(默认为负,只要自定义为正即可)。

定时处理:timer.h timer.c

使用状态机具体实现

socket包装:usock.h usock.c usock_posix.h usock_posix.inc usock_win.h usock_win.inc

使用状态机具体实现

总结

aio应该是nanomsg最核心的模块了,中间最复杂的当属作者将将状态机引入到事件驱动的编程中。

martin用了连续两篇博客 《The Callback Hell》《Event-driven architecture, state machines et al.》介绍了传统的callback的恶心,最主要的是不易扩展,而网络层的scalable正式nanomsg追求的目标也是与zeromq最大的区别,用户可以在其核心构建基础上,实现自己的网络拓扑协议。martin在文章里有说过有空要博客详细介绍下nanomsg的状态机机制,可惜到目前为止,他还没有写……。

欢迎大家探讨指正!