Merge pull request #9967 from taosdata/origin/feature/rpc-refactor
refactor rpc
This commit is contained in:
commit
963ef70e0e
|
@ -17,6 +17,8 @@
|
||||||
|
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
|
#define CONN_PERSIST_TIME(para) (para * 1000 * 100)
|
||||||
|
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
uv_connect_t connReq;
|
uv_connect_t connReq;
|
||||||
uv_stream_t* stream;
|
uv_stream_t* stream;
|
||||||
|
@ -42,7 +44,7 @@ typedef struct SCliThrdObj {
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
uv_async_t* cliAsync; //
|
uv_async_t* cliAsync; //
|
||||||
uv_timer_t* pTimer;
|
uv_timer_t* pTimer;
|
||||||
void* cache; // conn pool
|
void* pool; // conn pool
|
||||||
queue msg;
|
queue msg;
|
||||||
pthread_mutex_t msgMtx;
|
pthread_mutex_t msgMtx;
|
||||||
uint64_t nextTimeout; // next timeout
|
uint64_t nextTimeout; // next timeout
|
||||||
|
@ -63,10 +65,10 @@ typedef struct SConnList {
|
||||||
|
|
||||||
// conn pool
|
// conn pool
|
||||||
// add expire timeout and capacity limit
|
// add expire timeout and capacity limit
|
||||||
static void* connCacheCreate(int size);
|
static void* connPoolCreate(int size);
|
||||||
static void* connCacheDestroy(void* cache);
|
static void* connPoolDestroy(void* pool);
|
||||||
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
|
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||||
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
|
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
|
||||||
|
|
||||||
// register timer in each thread to clear expire conn
|
// register timer in each thread to clear expire conn
|
||||||
static void clientTimeoutCb(uv_timer_t* handle);
|
static void clientTimeoutCb(uv_timer_t* handle);
|
||||||
|
@ -88,8 +90,14 @@ static void clientConnDestroy(SCliConn* pConn);
|
||||||
|
|
||||||
static void clientMsgDestroy(SCliMsg* pMsg);
|
static void clientMsgDestroy(SCliMsg* pMsg);
|
||||||
|
|
||||||
|
// thread obj
|
||||||
|
static SCliThrdObj* createThrdObj();
|
||||||
|
static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
|
// thread
|
||||||
static void* clientThread(void* arg);
|
static void* clientThread(void* arg);
|
||||||
|
|
||||||
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
|
|
||||||
static void clientProcessData(SCliConn* conn) {
|
static void clientProcessData(SCliConn* conn) {
|
||||||
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
|
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
|
||||||
SRpcInfo* pRpc = pCtx->pRpc;
|
SRpcInfo* pRpc = pCtx->pRpc;
|
||||||
|
@ -101,19 +109,22 @@ static void clientProcessData(SCliConn* conn) {
|
||||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||||
|
|
||||||
SCliThrdObj* pThrd = conn->hostThrd;
|
SCliThrdObj* pThrd = conn->hostThrd;
|
||||||
addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn);
|
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||||
|
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
|
||||||
|
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||||
|
}
|
||||||
free(pCtx->ip);
|
free(pCtx->ip);
|
||||||
free(pCtx);
|
free(pCtx);
|
||||||
// impl
|
// impl
|
||||||
}
|
}
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
|
||||||
|
|
||||||
static void clientTimeoutCb(uv_timer_t* handle) {
|
static void clientTimeoutCb(uv_timer_t* handle) {
|
||||||
SCliThrdObj* pThrd = handle->data;
|
SCliThrdObj* pThrd = handle->data;
|
||||||
SRpcInfo* pRpc = pThrd->pTransInst;
|
SRpcInfo* pRpc = pThrd->pTransInst;
|
||||||
int64_t currentTime = pThrd->nextTimeout;
|
int64_t currentTime = pThrd->nextTimeout;
|
||||||
|
tDebug("timeout, try to remove expire conn from conn pool");
|
||||||
|
|
||||||
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL);
|
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
|
||||||
while (p != NULL) {
|
while (p != NULL) {
|
||||||
while (!QUEUE_IS_EMPTY(&p->conn)) {
|
while (!QUEUE_IS_EMPTY(&p->conn)) {
|
||||||
queue* h = QUEUE_HEAD(&p->conn);
|
queue* h = QUEUE_HEAD(&p->conn);
|
||||||
|
@ -125,18 +136,18 @@ static void clientTimeoutCb(uv_timer_t* handle) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p = taosHashIterate((SHashObj*)pThrd->cache, p);
|
p = taosHashIterate((SHashObj*)pThrd->pool, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
|
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||||
uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0);
|
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||||
}
|
}
|
||||||
static void* connCacheCreate(int size) {
|
static void* connPoolCreate(int size) {
|
||||||
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
SHashObj* pool = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
return cache;
|
return pool;
|
||||||
}
|
}
|
||||||
static void* connCacheDestroy(void* cache) {
|
static void* connPoolDestroy(void* pool) {
|
||||||
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL);
|
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
|
||||||
while (connList != NULL) {
|
while (connList != NULL) {
|
||||||
while (!QUEUE_IS_EMPTY(&connList->conn)) {
|
while (!QUEUE_IS_EMPTY(&connList->conn)) {
|
||||||
queue* h = QUEUE_HEAD(&connList->conn);
|
queue* h = QUEUE_HEAD(&connList->conn);
|
||||||
|
@ -144,23 +155,22 @@ static void* connCacheDestroy(void* cache) {
|
||||||
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
||||||
clientConnDestroy(c);
|
clientConnDestroy(c);
|
||||||
}
|
}
|
||||||
connList = taosHashIterate((SHashObj*)cache, connList);
|
connList = taosHashIterate((SHashObj*)pool, connList);
|
||||||
}
|
}
|
||||||
taosHashClear(cache);
|
taosHashClear(pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
|
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
char key[128] = {0};
|
char key[128] = {0};
|
||||||
tstrncpy(key, ip, strlen(ip));
|
tstrncpy(key, ip, strlen(ip));
|
||||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||||
|
|
||||||
SHashObj* pCache = cache;
|
SHashObj* pPool = pool;
|
||||||
SConnList* plist = taosHashGet(pCache, key, strlen(key));
|
SConnList* plist = taosHashGet(pPool, key, strlen(key));
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list;
|
SConnList list;
|
||||||
plist = &list;
|
taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
|
||||||
taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist));
|
plist = taosHashGet(pPool, key, strlen(key));
|
||||||
plist = taosHashGet(pCache, key, strlen(key));
|
|
||||||
QUEUE_INIT(&plist->conn);
|
QUEUE_INIT(&plist->conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,14 +181,14 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
return QUEUE_DATA(h, SCliConn, conn);
|
return QUEUE_DATA(h, SCliConn, conn);
|
||||||
}
|
}
|
||||||
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
|
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||||
char key[128] = {0};
|
char key[128] = {0};
|
||||||
tstrncpy(key, ip, strlen(ip));
|
tstrncpy(key, ip, strlen(ip));
|
||||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||||
|
|
||||||
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||||
conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
|
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||||
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key));
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||||
// list already create before
|
// list already create before
|
||||||
assert(plist != NULL);
|
assert(plist != NULL);
|
||||||
QUEUE_PUSH(&plist->conn, &conn->conn);
|
QUEUE_PUSH(&plist->conn, &conn->conn);
|
||||||
|
@ -322,7 +332,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
et = taosGetTimestampUs();
|
et = taosGetTimestampUs();
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port);
|
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
// impl later
|
// impl later
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
|
@ -373,15 +383,14 @@ static void clientAsyncCb(uv_async_t* handle) {
|
||||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
clientHandleReq(pMsg, pThrd);
|
clientHandleReq(pMsg, pThrd);
|
||||||
count++;
|
count++;
|
||||||
if (count >= 2) {
|
|
||||||
tError("send batch size: %d", count);
|
|
||||||
}
|
}
|
||||||
|
if (count >= 2) {
|
||||||
|
tDebug("already process batch size: %d", count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* clientThread(void* arg) {
|
static void* clientThread(void* arg) {
|
||||||
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
||||||
|
|
||||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,6 +403,23 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
|
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
|
||||||
|
|
||||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||||
|
SCliThrdObj* pThrd = createThrdObj();
|
||||||
|
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||||
|
pThrd->pTransInst = shandle;
|
||||||
|
|
||||||
|
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
|
||||||
|
if (err == 0) {
|
||||||
|
tDebug("sucess to create tranport-client thread %d", i);
|
||||||
|
}
|
||||||
|
cli->pThreadObj[i] = pThrd;
|
||||||
|
}
|
||||||
|
return cli;
|
||||||
|
}
|
||||||
|
static void clientMsgDestroy(SCliMsg* pMsg) {
|
||||||
|
// impl later
|
||||||
|
free(pMsg);
|
||||||
|
}
|
||||||
|
static SCliThrdObj* createThrdObj() {
|
||||||
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
|
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
|
||||||
|
|
||||||
QUEUE_INIT(&pThrd->msg);
|
QUEUE_INIT(&pThrd->msg);
|
||||||
|
@ -409,33 +435,26 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
pThrd->pTimer = malloc(sizeof(uv_timer_t));
|
pThrd->pTimer = malloc(sizeof(uv_timer_t));
|
||||||
uv_timer_init(pThrd->loop, pThrd->pTimer);
|
uv_timer_init(pThrd->loop, pThrd->pTimer);
|
||||||
pThrd->pTimer->data = pThrd;
|
pThrd->pTimer->data = pThrd;
|
||||||
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
|
|
||||||
|
|
||||||
pThrd->cache = connCacheCreate(1);
|
pThrd->pool = connPoolCreate(1);
|
||||||
pThrd->pTransInst = shandle;
|
return pThrd;
|
||||||
|
|
||||||
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
|
|
||||||
if (err == 0) {
|
|
||||||
tDebug("sucess to create tranport-client thread %d", i);
|
|
||||||
}
|
|
||||||
cli->pThreadObj[i] = pThrd;
|
|
||||||
}
|
|
||||||
return cli;
|
|
||||||
}
|
}
|
||||||
static void clientMsgDestroy(SCliMsg* pMsg) {
|
static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||||
// impl later
|
if (pThrd == NULL) {
|
||||||
free(pMsg);
|
return;
|
||||||
}
|
}
|
||||||
void taosCloseClient(void* arg) {
|
|
||||||
// impl later
|
|
||||||
SClientObj* cli = arg;
|
|
||||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
|
||||||
SCliThrdObj* pThrd = cli->pThreadObj[i];
|
|
||||||
pthread_join(pThrd->thread, NULL);
|
pthread_join(pThrd->thread, NULL);
|
||||||
pthread_mutex_destroy(&pThrd->msgMtx);
|
pthread_mutex_destroy(&pThrd->msgMtx);
|
||||||
free(pThrd->cliAsync);
|
free(pThrd->cliAsync);
|
||||||
free(pThrd->loop);
|
free(pThrd->loop);
|
||||||
free(pThrd);
|
free(pThrd);
|
||||||
|
}
|
||||||
|
//
|
||||||
|
void taosCloseClient(void* arg) {
|
||||||
|
// impl later
|
||||||
|
SClientObj* cli = arg;
|
||||||
|
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||||
|
destroyThrdObj(cli->pThreadObj[i]);
|
||||||
}
|
}
|
||||||
free(cli->pThreadObj);
|
free(cli->pThreadObj);
|
||||||
free(cli);
|
free(cli);
|
||||||
|
|
|
@ -91,10 +91,14 @@ static SConn* connCreate();
|
||||||
static void connDestroy(SConn* conn);
|
static void connDestroy(SConn* conn);
|
||||||
static void uvConnDestroy(uv_handle_t* handle);
|
static void uvConnDestroy(uv_handle_t* handle);
|
||||||
|
|
||||||
// server worke thread
|
// server and worker thread
|
||||||
static void* workerThread(void* arg);
|
static void* workerThread(void* arg);
|
||||||
static void* acceptThread(void* arg);
|
static void* acceptThread(void* arg);
|
||||||
|
|
||||||
|
// add handle loop
|
||||||
|
static bool addHandleToWorkloop(void* arg);
|
||||||
|
static bool addHandleToAcceptloop(void* arg);
|
||||||
|
|
||||||
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
/*
|
/*
|
||||||
* formate of data buffer:
|
* formate of data buffer:
|
||||||
|
@ -268,7 +272,7 @@ static void uvProcessData(SConn* pConn) {
|
||||||
rpcMsg.handle = pConn;
|
rpcMsg.handle = pConn;
|
||||||
|
|
||||||
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
||||||
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||||
// auth
|
// auth
|
||||||
// validate msg type
|
// validate msg type
|
||||||
}
|
}
|
||||||
|
@ -451,22 +455,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
void* acceptThread(void* arg) {
|
void* acceptThread(void* arg) {
|
||||||
// opt
|
// opt
|
||||||
SServerObj* srv = (SServerObj*)arg;
|
SServerObj* srv = (SServerObj*)arg;
|
||||||
uv_tcp_init(srv->loop, &srv->server);
|
|
||||||
|
|
||||||
struct sockaddr_in bind_addr;
|
|
||||||
|
|
||||||
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
|
|
||||||
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
|
|
||||||
int err = 0;
|
|
||||||
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
|
|
||||||
tError("Listen error %s\n", uv_err_name(err));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
uv_run(srv->loop, UV_RUN_DEFAULT);
|
uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
static void initWorkThrdObj(SWorkThrdObj* pThrd) {
|
static bool addHandleToWorkloop(void* arg) {
|
||||||
|
SWorkThrdObj* pThrd = arg;
|
||||||
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
uv_loop_init(pThrd->loop);
|
if (0 != uv_loop_init(pThrd->loop)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// SRpcInfo* pRpc = pThrd->shandle;
|
// SRpcInfo* pRpc = pThrd->shandle;
|
||||||
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
||||||
|
@ -482,6 +478,31 @@ static void initWorkThrdObj(SWorkThrdObj* pThrd) {
|
||||||
pThrd->workerAsync->data = pThrd;
|
pThrd->workerAsync->data = pThrd;
|
||||||
|
|
||||||
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool addHandleToAcceptloop(void* arg) {
|
||||||
|
// impl later
|
||||||
|
SServerObj* srv = arg;
|
||||||
|
|
||||||
|
int err = 0;
|
||||||
|
if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
|
||||||
|
tError("failed to init accept server: %s", uv_err_name(err));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct sockaddr_in bind_addr;
|
||||||
|
|
||||||
|
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
|
||||||
|
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
|
||||||
|
tError("failed to bind: %s", uv_err_name(err));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
|
||||||
|
tError("failed to listen: %s", uv_err_name(err));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
void* workerThread(void* arg) {
|
void* workerThread(void* arg) {
|
||||||
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
|
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
|
||||||
|
@ -546,11 +567,12 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
|
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
|
||||||
|
srv->pThreadObj[i] = thrd;
|
||||||
|
|
||||||
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
||||||
int fds[2];
|
int fds[2];
|
||||||
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
||||||
return NULL;
|
goto End;
|
||||||
}
|
}
|
||||||
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
||||||
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
|
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
|
||||||
|
@ -559,7 +581,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
thrd->fd = fds[0];
|
thrd->fd = fds[0];
|
||||||
thrd->pipe = &(srv->pipe[i][1]); // init read
|
thrd->pipe = &(srv->pipe[i][1]); // init read
|
||||||
|
|
||||||
initWorkThrdObj(thrd);
|
if (false == addHandleToWorkloop(thrd)) {
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
|
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tDebug("sucess to create worker-thread %d", i);
|
tDebug("sucess to create worker-thread %d", i);
|
||||||
|
@ -568,9 +592,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
// TODO: clear all other resource later
|
// TODO: clear all other resource later
|
||||||
tError("failed to create worker-thread %d", i);
|
tError("failed to create worker-thread %d", i);
|
||||||
}
|
}
|
||||||
srv->pThreadObj[i] = thrd;
|
|
||||||
}
|
}
|
||||||
|
if (false == addHandleToAcceptloop(srv)) {
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
|
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tDebug("success to create accept-thread");
|
tDebug("success to create accept-thread");
|
||||||
|
@ -579,16 +604,25 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
}
|
}
|
||||||
|
|
||||||
return srv;
|
return srv;
|
||||||
|
End:
|
||||||
|
taosCloseServer(srv);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
|
if (pThrd == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
pthread_join(pThrd->thread, NULL);
|
||||||
|
// free(srv->pipe[i]);
|
||||||
|
free(pThrd->loop);
|
||||||
|
free(pThrd);
|
||||||
}
|
}
|
||||||
void taosCloseServer(void* arg) {
|
void taosCloseServer(void* arg) {
|
||||||
// impl later
|
// impl later
|
||||||
SServerObj* srv = arg;
|
SServerObj* srv = arg;
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
SWorkThrdObj* pThrd = srv->pThreadObj[i];
|
destroyWorkThrd(srv->pThreadObj[i]);
|
||||||
pthread_join(pThrd->thread, NULL);
|
|
||||||
free(srv->pipe[i]);
|
|
||||||
free(pThrd->loop);
|
|
||||||
free(pThrd);
|
|
||||||
}
|
}
|
||||||
free(srv->loop);
|
free(srv->loop);
|
||||||
free(srv->pipe);
|
free(srv->pipe);
|
||||||
|
|
Loading…
Reference in New Issue