libuv搭建高并发tcp服务器

采用libuv搭建高并发tcp服务器,对于没有使用libuv经验,或者无并发服务器编写经验的朋友还是有一定难度的,通常面对的问题如下(以linux环境为前提):
1,并发服务器采用什么样的服务器模型?epoll?poll?还是select?
2,选择什么样的并发服务器框架?libevent?libev?还是libuv?
3,如果使用libuv作为服务器框架,其基础框架如何搭建
4,libuv对新手并不友好,尤其是处处回调,何时申请内存,何时释放内存?新手经常碰到内存释放不正确导致程序coredump
5,libuv中的async通知机制,即在其他线程写消息到主loop线程,据说会出现消息的合并通知,那如何设计来避免通知合并后,发送和接收端无法做到一一对应?
6,libuv给出的例子中,只单独给了tcp_echo和uv_queue_work的例子,如何把它们组合起来?

对于问题1:如果我们面对的客户端接入量只有几个,几十个,或者几百个,那么用select模型应该就能够满足需求,如果我们要面对几万,甚或者几十万的tcp连接,那么一定要采用epoll机制。对于poll不再讨论
对于问题2:选择什么样的并发框架,现在互联网上对libevent、libev、libuv对比的文章不少,大家可以多搜索多看看。最好是熟悉哪个就使用哪个,这样才能让你少走弯路。当然,对于喜欢折腾的人来说,熟悉其中2个或者三个都去学习学习,还是挺有意思。另外,如果选择libevent,建议去研究下memcached采用的线程模型,从那上面抽取出libevent的使用模型会更好一点
对于问题3-6,不做文字解答,套用linus的话:talk is cheap, show me the code!

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace std;

uv_loop_t *loop;
uv_async_t async_send_msg;

queue<struct task> taskQueue;
pthread_mutex_t mutexTaskQueue = PTHREAD_MUTEX_INITIALIZER;


typedef struct {
	uv_write_t req;
	uv_buf_t buf;
} write_req_t;

struct task {
    task():client(NULL),size(0){
        memset(buffer,0,4097);
    }
    ~task() {}
    uv_work_t req;
    uv_stream_t* client;
    uint8_t buffer[4096+1];
    size_t  size;
};

void free_write_req(uv_write_t *req) {
    write_req_t *wr = (write_req_t*) req;
    free(wr->buf.base);
    free(wr);
}

void echo_write(uv_write_t *req, int status) {
    if (status) {
        fprintf(stderr, "Write error %s\n", uv_strerror(status));
    }
    free_write_req(req);
}

void on_close(uv_handle_t* handle) {
    free(handle);
}

void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
    *buf = uv_buf_init((char*)malloc(4097), 4097);
}

void business_task(uv_work_t *req)
{

    struct task* tsk = (struct task*)(req->data);
    /*
    cout << "tsk->size=" << tsk->size << endl;
    for (int i=0;isize;i++) {
        printf("%02X ", tsk->buffer[i]);
    }
    cout << endl;
    */
    
    struct task new_task;
    new_task.size = tsk->size;
    new_task.client = tsk->client;
    memcpy(new_task.buffer, tsk->buffer, new_task.size);
    
    (void)pthread_mutex_lock(&mutexTaskQueue);
    taskQueue.push(new_task);
    (void)pthread_mutex_unlock(&mutexTaskQueue);

    uv_async_send(&async_send_msg);
}

void after_business(uv_work_t *req, int status) {
    /*
    struct task tsk;
    (void)pthread_mutex_lock(&mutexTaskQueue);
    while (!taskQueue.empty()) {
        tsk = taskQueue.front();
        taskQueue.pop();
        if (tsk.client != NULL) {
            write_req_t *req = (write_req_t*)malloc(sizeof(write_req_t));
            req->buf = uv_buf_init((char*)malloc(tsk.size),tsk.size);
            memcpy(req->buf.base, tsk.buffer, tsk.size);
            uv_write((uv_write_t*)req, (uv_stream_t*)tsk.client, &req->buf, 1, echo_write);
        }
    }
    (void)pthread_mutex_unlock(&mutexTaskQueue);
    */
    struct task* tsk1 = (struct task*)req->data;

    if (tsk1 != NULL) {
        delete tsk1;
        tsk1 = NULL;
    }
    
}

void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t *buf)
{
    if (nread > 0) { 
        //StickyPacketHandling(buf->base, (int)nread, client);
        struct task *tsk = new struct task;
        tsk->req.data = (void*)tsk;
        tsk->size = nread;
        tsk->client = client;
        memcpy(tsk->buffer, buf->base, tsk->size);

        uv_queue_work(loop,&tsk->req, business_task, after_business);

        free(buf->base);
        return ;
    }

    if (nread < 0) {
        if (nread != UV_EOF)
            fprintf(stderr, "Read error %s\n", uv_err_name(nread));
        uv_close((uv_handle_t*)client, on_close);
    }
    
    free(buf->base);
}


void on_new_connection(uv_stream_t* server, int status) {  
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        return ;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*)client) == 0) {
        uv_read_start((uv_stream_t*)client, alloc_buffer, echo_read);
    } else {
        uv_close((uv_handle_t*)client, on_close);
    }
}

void write_task(uv_async_t *handle)
{
    struct task tsk;
    (void)pthread_mutex_lock(&mutexTaskQueue);
    while (!taskQueue.empty()) {
        tsk = taskQueue.front();
        taskQueue.pop();
        if (tsk.client != NULL) {
            write_req_t *req = (write_req_t*)malloc(sizeof(write_req_t));
            req->buf = uv_buf_init((char*)malloc(tsk.size),tsk.size);
            memcpy(req->buf.base, tsk.buffer, tsk.size);
            uv_write((uv_write_t*)req, (uv_stream_t*)tsk.client, &req->buf, 1, echo_write);
        }
    }
    (void)pthread_mutex_unlock(&mutexTaskQueue);
}

int main(int argc, char **argv)
{
    loop = uv_default_loop();
    uv_tcp_t terminal_server;
    uv_tcp_init(loop, &terminal_server);

    struct sockaddr_in addr;
    uv_ip4_addr("0.0.0.0", 10000, &addr);
    uv_tcp_bind(&terminal_server, (const struct sockaddr*)&addr, 0);

    int r = uv_listen((uv_stream_t*)&terminal_server, -1, on_new_connection);
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r));
        exit(1);
    }

    uv_async_init(loop, &async_send_msg, write_task);
    
    uv_run(loop, UV_RUN_DEFAULT);
    
    return 0;
}


以上代码的基本功能就是个回显服务器,框架为1个主loop线程+4个work线程,主loop线程负责连接的建立和数据的收发,work线程负责接收粘包处理后的数据,解析并转码,然后发送给对应的客户端。经过简单测试,5万tcp,每秒1Mb数据量的情况下,主线程cpu大概在10%左右。

你可能感兴趣的