From a466e82c7b0cb0893599624cced431d9663daa75 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 29 Jan 2022 18:12:11 +0800 Subject: [PATCH] add more optim --- source/libs/transport/inc/transComm.h | 8 ++++++- source/libs/transport/src/transCli.c | 29 +++++++++++++------------- source/libs/transport/src/transComm.c | 30 +++++++++++++++++++++++---- source/libs/transport/src/transSrv.c | 28 +++++++++++++------------ 4 files changed, 63 insertions(+), 32 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f4fe0b1f79..082c89fed4 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -213,6 +213,12 @@ typedef struct SConnBuffer { typedef void (*AsyncCB)(uv_async_t* handle); +typedef struct { + void* pThrd; + queue qmsg; + pthread_mutex_t mtx; // protect qmsg; +} SAsyncItem; + typedef struct { int index; int nAsync; @@ -221,7 +227,7 @@ typedef struct { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb); void transDestroyAsyncPool(SAsyncPool* pool); -int transSendAsync(SAsyncPool* pool); +int transSendAsync(SAsyncPool* pool, queue* mq); int transInitBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5037de1407..24ff5e956a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -432,14 +432,15 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { } } static void clientAsyncCb(uv_async_t* handle) { - SCliThrdObj* pThrd = handle->data; + SAsyncItem* item = handle->data; + SCliThrdObj* pThrd = item->pThrd; SCliMsg* pMsg = NULL; queue wq; // batch process to avoid to lock/unlock frequently - pthread_mutex_lock(&pThrd->msgMtx); - QUEUE_MOVE(&pThrd->msg, &wq); - pthread_mutex_unlock(&pThrd->msgMtx); + pthread_mutex_lock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + pthread_mutex_unlock(&item->mtx); int count = 0; while (!QUEUE_IS_EMPTY(&wq)) { @@ -548,11 +549,11 @@ static void clientSendQuit(SCliThrdObj* thrd) { SCliMsg* msg = calloc(1, sizeof(SCliMsg)); msg->ctx = NULL; // - pthread_mutex_lock(&thrd->msgMtx); - QUEUE_PUSH(&thrd->msg, &msg->q); - pthread_mutex_unlock(&thrd->msgMtx); + // pthread_mutex_lock(&thrd->msgMtx); + // QUEUE_PUSH(&thrd->msg, &msg->q); + // pthread_mutex_unlock(&thrd->msgMtx); - transSendAsync(thrd->asyncPool); + transSendAsync(thrd->asyncPool, &msg->q); // uv_async_send(thrd->cliAsync); } void taosCloseClient(void* arg) { @@ -598,14 +599,14 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; - pthread_mutex_lock(&thrd->msgMtx); - QUEUE_PUSH(&thrd->msg, &cliMsg->q); - pthread_mutex_unlock(&thrd->msgMtx); + // pthread_mutex_lock(&thrd->msgMtx); + // QUEUE_PUSH(&thrd->msg, &cliMsg->q); + // pthread_mutex_unlock(&thrd->msgMtx); - int start = taosGetTimestampUs(); - transSendAsync(thrd->asyncPool); + // int start = taosGetTimestampUs(); + transSendAsync(thrd->asyncPool, &(cliMsg->q)); // uv_async_send(thrd->cliAsync); - int end = taosGetTimestampUs() - start; + // int end = taosGetTimestampUs() - start; // tError("client sent to rpc, time cost: %d", (int)end); } #endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 47eabd4320..d0e504a0a1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -247,7 +247,7 @@ int transDestroyBuffer(SConnBuffer* buf) { } SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) { - static int sz = 20; + static int sz = 10; SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); pool->index = 0; @@ -257,24 +257,46 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); uv_async_init(loop, async, cb); - async->data = arg; + + SAsyncItem* item = calloc(1, sizeof(SAsyncItem)); + item->pThrd = arg; + QUEUE_INIT(&item->qmsg); + pthread_mutex_init(&item->mtx, NULL); + + async->data = item; } return pool; } void transDestroyAsyncPool(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); + + SAsyncItem* item = async->data; + pthread_mutex_destroy(&item->mtx); + free(item); } free(pool->asyncs); free(pool); } -int transSendAsync(SAsyncPool* pool) { +int transSendAsync(SAsyncPool* pool, queue* q) { int idx = pool->index; idx = idx % pool->nAsync; // no need mutex here if (pool->index++ > pool->nAsync) { pool->index = 0; } - return uv_async_send(&(pool->asyncs[idx])); + uv_async_t* async = &(pool->asyncs[idx]); + SAsyncItem* item = async->data; + + int64_t st = taosGetTimestampUs(); + pthread_mutex_lock(&item->mtx); + QUEUE_PUSH(&item->qmsg, q); + pthread_mutex_unlock(&item->mtx); + int64_t el = taosGetTimestampUs() - st; + if (el > 50) { + // tInfo("lock and unlock cost: %d", (int)el); + } + + return uv_async_send(async); } #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 826b91dc02..a005b31fe4 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -376,13 +376,15 @@ static void destroySmsg(SSrvMsg* smsg) { free(smsg); } void uvWorkerAsyncCb(uv_async_t* handle) { - SWorkThrdObj* pThrd = handle->data; + SAsyncItem* item = handle->data; + SWorkThrdObj* pThrd = item->pThrd; SSrvConn* conn = NULL; queue wq; // batch process to avoid to lock/unlock frequently - pthread_mutex_lock(&pThrd->msgMtx); - QUEUE_MOVE(&pThrd->msg, &wq); - pthread_mutex_unlock(&pThrd->msgMtx); + pthread_mutex_lock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + pthread_mutex_unlock(&item->mtx); + // pthread_mutex_unlock(&mtx); while (!QUEUE_IS_EMPTY(&wq)) { queue* head = QUEUE_HEAD(&wq); @@ -539,7 +541,7 @@ static bool addHandleToAcceptloop(void* arg) { tError("failed to bind: %s", uv_err_name(err)); return false; } - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { + if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) { tError("failed to listen: %s", uv_err_name(err)); return false; } @@ -671,12 +673,12 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); - pthread_mutex_lock(&pThrd->msgMtx); - QUEUE_PUSH(&pThrd->msg, &srvMsg->q); - pthread_mutex_unlock(&pThrd->msgMtx); + // pthread_mutex_lock(&pThrd->msgMtx); + // QUEUE_PUSH(&pThrd->msg, &srvMsg->q); + // pthread_mutex_unlock(&pThrd->msgMtx); tDebug("send quit msg to work thread"); - transSendAsync(pThrd->asyncPool); + transSendAsync(pThrd->asyncPool, &srvMsg->q); // uv_async_send(pThrd->workerAsync); } @@ -712,12 +714,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) { srvMsg->pConn = pConn; srvMsg->msg = *pMsg; - pthread_mutex_lock(&pThrd->msgMtx); - QUEUE_PUSH(&pThrd->msg, &srvMsg->q); - pthread_mutex_unlock(&pThrd->msgMtx); + // pthread_mutex_lock(&pThrd->msgMtx); + // QUEUE_PUSH(&pThrd->msg, &srvMsg->q); + // pthread_mutex_unlock(&pThrd->msgMtx); tDebug("conn %p start to send resp", pConn); - transSendAsync(pThrd->asyncPool); + transSendAsync(pThrd->asyncPool, &srvMsg->q); // uv_async_send(pThrd->workerAsync); }