From bc77e3c579c13617c5acefbe82d15a80c00b6e3b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Mar 2022 13:44:24 +0800 Subject: [PATCH] handle except --- source/libs/transport/inc/transComm.h | 47 ++++++++++- source/libs/transport/src/transCli.c | 86 +++++++++----------- source/libs/transport/src/transComm.c | 47 ++++++++++- source/libs/transport/src/transSrv.c | 55 +++++-------- source/libs/transport/test/transportTests.cc | 2 +- 5 files changed, 150 insertions(+), 87 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d4e75dcd84..8cfde8267d 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -228,8 +228,8 @@ typedef struct SConnBuffer { typedef void (*AsyncCB)(uv_async_t* handle); typedef struct { - void* pThrd; - queue qmsg; + void* pThrd; + queue qmsg; TdThreadMutex mtx; // protect qmsg; } SAsyncItem; @@ -273,11 +273,52 @@ void transCloseClient(void* arg); void transCloseServer(void* arg); void transCtxInit(STransCtx* ctx); -void transCtxDestroy(STransCtx* ctx); +void transCtxCleanup(STransCtx* ctx); void transCtxClear(STransCtx* ctx); void transCtxMerge(STransCtx* dst, STransCtx* src); void* transCtxDumpVal(STransCtx* ctx, int32_t key); +// queue sending msgs +typedef struct { + SArray* q; + void (*free)(void* arg); +} STransQueue; + +/* + * init queue + * note: queue'size is small, default 1 + */ +void transQueueInit(STransQueue* queue, void (*free)(void* arg)); + +/* + * put arg into queue + * if queue'size > 1, return false; else return true + */ +bool transQueuePush(STransQueue* queue, void* arg); +/* + * pop head from queue + */ + +void* transQueuePop(STransQueue* queue); +/* + * get head from queue + */ +void* transQueueGet(STransQueue* queue); + +/* + * queue empty or not + */ + +bool transQueueEmpty(STransQueue* queue); +/* + * clear queue + */ +void transQueueClear(STransQueue* queue); +/* + * destroy queue + */ +void transQueueDestroy(STransQueue* queue); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fe5d8bd7f5..c0ee9b9ca5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -25,13 +25,14 @@ typedef struct SCliConn { void* hostThrd; SConnBuffer readBuf; void* data; - SArray* cliMsgs; - queue conn; - uint64_t expireTime; - int hThrdIdx; - bool broken; // link broken or not - STransCtx ctx; + // SArray* cliMsgs; + STransQueue cliMsgs; + queue conn; + uint64_t expireTime; + int hThrdIdx; + STransCtx ctx; + bool broken; // link broken or not ConnStatus status; // int release; // 1: release // spi configure @@ -56,14 +57,14 @@ typedef struct SCliMsg { } SCliMsg; typedef struct SCliThrdObj { - TdThread thread; + TdThread thread; uv_loop_t* loop; SAsyncPool* asyncPool; uv_timer_t timer; void* pool; // conn pool // msg queue - queue msg; + queue msg; TdThreadMutex msgMtx; uint64_t nextTimeout; // next timeout @@ -181,12 +182,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd); static void* cliWorkThread(void* arg); bool cliMaySendCachedMsg(SCliConn* conn) { - if (taosArrayGetSize(conn->cliMsgs) > 0) { + if (!transQueueEmpty(&conn->cliMsgs)) { cliSend(conn); return true; - } else { - return false; } + return false; } void cliHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; @@ -195,6 +195,7 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -204,11 +205,7 @@ void cliHandleResp(SCliConn* conn) { CONN_SHOULD_RELEASE(conn, pHead); - SCliMsg* pMsg = NULL; - if (taosArrayGetSize(conn->cliMsgs) > 0) { - pMsg = taosArrayGetP(conn->cliMsgs, 0); - taosArrayRemove(conn->cliMsgs, 0); - } + SCliMsg* pMsg = transQueuePop(&conn->cliMsgs); STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { @@ -264,7 +261,7 @@ _RETURN: } void cliHandleExcept(SCliConn* pConn) { - if (taosArrayGetSize(pConn->cliMsgs) == 0) { + if (transQueueEmpty(&pConn->cliMsgs)) { if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { transUnrefCliHandle(pConn); return; @@ -274,11 +271,7 @@ void cliHandleExcept(SCliConn* pConn) { STrans* pTransInst = pThrd->pTransInst; do { - SCliMsg* pMsg = NULL; - if (taosArrayGetSize(pConn->cliMsgs) > 0) { - pMsg = taosArrayGetP(pConn->cliMsgs, 0); - taosArrayRemove(pConn->cliMsgs, 0); - } + SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; @@ -303,7 +296,7 @@ void cliHandleExcept(SCliConn* pConn) { } destroyCmsg(pMsg); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); - } while (taosArrayGetSize(pConn->cliMsgs) > 0); + } while (!transQueueEmpty(&pConn->cliMsgs)); transUnrefCliHandle(pConn); } @@ -380,21 +373,20 @@ static void addConnToPool(void* pool, SCliConn* conn) { SCliThrdObj* thrd = conn->hostThrd; CONN_HANDLE_THREAD_QUIT(thrd); - char key[128] = {0}; + STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; + conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + transCtxCleanup(&conn->ctx); + transQueueClear(&conn->cliMsgs); + conn->status = ConnNormal; - transCtxDestroy(&conn->ctx); + char key[128] = {0}; tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); - STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; - - conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - conn->status = ConnNormal; // list already create before assert(plist != NULL); - taosArrayClear(conn->cliMsgs); QUEUE_PUSH(&plist->conn, &conn->conn); assert(!QUEUE_IS_EMPTY(&plist->conn)); } @@ -445,7 +437,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->writeReq.data = conn; conn->connReq.data = conn; - conn->cliMsgs = taosArrayInit(2, sizeof(void*)); + + transQueueInit(&conn->cliMsgs, NULL); QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -465,18 +458,18 @@ static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->ip); free(conn->stream); - transCtxDestroy(&conn->ctx); - taosArrayDestroy(conn->cliMsgs); + transCtxCleanup(&conn->ctx); + transQueueDestroy(&conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } static bool cliHandleNoResp(SCliConn* conn) { - bool res = false; - SArray* msgs = conn->cliMsgs; - if (taosArrayGetSize(msgs) > 0) { - SCliMsg* pMsg = taosArrayGetP(msgs, 0); + bool res = false; + if (!transQueueEmpty(&conn->cliMsgs)) { + SCliMsg* pMsg = transQueueGet(&conn->cliMsgs); if (REQUEST_NO_RESP(&pMsg->msg)) { - taosArrayRemove(msgs, 0); + transQueuePop(&conn->cliMsgs); + // taosArrayRemove(msgs, 0); destroyCmsg(pMsg); res = true; } @@ -509,8 +502,9 @@ static void cliSendCb(uv_write_t* req, int status) { void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); - assert(taosArrayGetSize(pConn->cliMsgs) > 0); - SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0); + // assert(taosArrayGetSize(pConn->cliMsgs) > 0); + assert(!transQueueEmpty(&pConn->cliMsgs)); + SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs); STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; @@ -600,9 +594,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (T_REF_VAL_GET(conn) == 2) { transUnrefCliHandle(conn); - taosArrayPush(conn->cliMsgs, &pMsg); - if (taosArrayGetSize(conn->cliMsgs) >= 2) { - return; // send one by one + if (!transQueuePush(&conn->cliMsgs, pMsg)) { + return; } cliSend(conn); } else { @@ -643,17 +636,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->hThrdIdx = pCtx->hThrdIdx; transCtxMerge(&conn->ctx, &pCtx->appCtx); - if (taosArrayGetSize(conn->cliMsgs) > 0) { - taosArrayPush(conn->cliMsgs, &pMsg); + if (!transQueuePush(&conn->cliMsgs, pMsg)) { return; } - - taosArrayPush(conn->cliMsgs, &pMsg); transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); - taosArrayPush(conn->cliMsgs, &pMsg); + transQueuePush(&conn->cliMsgs, pMsg); conn->hThrdIdx = pCtx->hThrdIdx; conn->ip = strdup(pMsg->ctx->ip); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 6c16113d46..209475ca05 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -228,7 +228,7 @@ void transCtxInit(STransCtx* ctx) { // init transCtx ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); } -void transCtxDestroy(STransCtx* ctx) { +void transCtxCleanup(STransCtx* ctx) { if (ctx->args == NULL) { return; } @@ -276,4 +276,49 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) { return (void*)ret; } +void transQueueInit(STransQueue* queue, void (*free)(void* arg)) { + queue->q = taosArrayInit(2, sizeof(void*)); + queue->free = free; +} +bool transQueuePush(STransQueue* queue, void* arg) { + taosArrayPush(queue->q, &arg); + if (taosArrayGetSize(queue->q) > 1) { + return false; + } + return true; +} +void* transQueuePop(STransQueue* queue) { + if (taosArrayGetSize(queue->q) == 0) { + return NULL; + } + void* ptr = taosArrayGetP(queue->q, 0); + taosArrayRemove(queue->q, 0); + return ptr; +} + +void* transQueueGet(STransQueue* queue) { + if (taosArrayGetSize(queue->q) == 0) { + return NULL; + } + void* ptr = taosArrayGetP(queue->q, 0); + return ptr; +} +bool transQueueEmpty(STransQueue* queue) { + // + return taosArrayGetSize(queue->q) == 0; +} +void transQueueClear(STransQueue* queue) { + if (queue->free != NULL) { + for (int i = 0; i < taosArrayGetSize(queue->q); i++) { + void* p = taosArrayGetP(queue->q, i); + queue->free(p); + } + } + taosArrayClear(queue->q); +} +void transQueueDestroy(STransQueue* queue) { + transQueueClear(queue); + taosArrayDestroy(queue->q); +} + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 3daac3e6f5..c6032a9569 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -37,7 +37,7 @@ typedef struct SSrvConn { void* pTransInst; // rpc init void* ahandle; // void* hostThrd; - SArray* srvMsgs; + STransQueue srvMsgs; SSrvRegArg regArg; bool broken; // conn broken; @@ -62,12 +62,12 @@ typedef struct SSrvMsg { } SSrvMsg; typedef struct SWorkThrdObj { - TdThread thread; - uv_pipe_t* pipe; - uv_os_fd_t fd; - uv_loop_t* loop; - SAsyncPool* asyncPool; - queue msg; + TdThread thread; + uv_pipe_t* pipe; + uv_os_fd_t fd; + uv_loop_t* loop; + SAsyncPool* asyncPool; + queue msg; TdThreadMutex msgMtx; queue conn; @@ -76,7 +76,7 @@ typedef struct SWorkThrdObj { } SWorkThrdObj; typedef struct SServerObj { - TdThread thread; + TdThread thread; uv_tcp_t server; uv_loop_t* loop; @@ -106,8 +106,7 @@ static const char* notify = "a"; srvMsg->msg = tmsg; \ srvMsg->type = Release; \ srvMsg->pConn = conn; \ - taosArrayPush(conn->srvMsgs, &srvMsg); \ - if (taosArrayGetSize(conn->srvMsgs) > 1) { \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ return; \ } \ uvStartSendRespInternal(srvMsg); \ @@ -271,20 +270,16 @@ void uvOnSendCb(uv_write_t* req, int status) { transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); - if (conn->srvMsgs != NULL) { - assert(taosArrayGetSize(conn->srvMsgs) >= 1); - SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); - tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); - taosArrayRemove(conn->srvMsgs, 0); + if (!transQueueEmpty(&conn->srvMsgs)) { + SSrvMsg* msg = transQueuePop(&conn->srvMsgs); if (msg->type == Release && conn->status != ConnNormal) { conn->status = ConnNormal; transUnrefSrvHandle(conn); } destroySmsg(msg); // send second data, just use for push - if (taosArrayGetSize(conn->srvMsgs) > 0) { - tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); - msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); + if (!transQueueEmpty(&conn->srvMsgs)) { + msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs); if (msg->type == Register && conn->status == ConnAcquire) { conn->regArg.notifyCount = 0; conn->regArg.init = 1; @@ -294,7 +289,7 @@ void uvOnSendCb(uv_write_t* req, int status) { (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); } - taosArrayRemove(conn->srvMsgs, 0); + transQueuePop(&conn->srvMsgs); free(msg); } else { uvStartSendRespInternal(msg); @@ -373,10 +368,7 @@ static void uvStartSendResp(SSrvMsg* smsg) { transUnrefSrvHandle(pConn); } - taosArrayPush(pConn->srvMsgs, &smsg); - if (taosArrayGetSize(pConn->srvMsgs) > 1) { - tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + if (!transQueuePush(&pConn->srvMsgs, smsg)) { return; } uvStartSendRespInternal(smsg); @@ -608,14 +600,15 @@ static SSrvConn* createConn(void* hThrd) { QUEUE_INIT(&pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue); - pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // - tTrace("server conn %p created", pConn); + + transQueueInit(&pConn->srvMsgs, NULL); memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; transRefSrvHandle(pConn); + tTrace("server conn %p created", pConn); return pConn; } @@ -625,11 +618,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { } transDestroyBuffer(&conn->readBuf); - for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { - SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i); - destroySmsg(msg); - } - conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); + transQueueDestroy(&conn->srvMsgs); if (clear) { tTrace("server conn %p to be destroyed", conn); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); @@ -724,8 +713,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { // release handle to rpc init SSrvConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - taosArrayPush(conn->srvMsgs, &msg); - if (taosArrayGetSize(conn->srvMsgs) > 1) { + if (!transQueuePush(&conn->srvMsgs, msg)) { return; } uvStartSendRespInternal(msg); @@ -744,8 +732,7 @@ void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; tDebug("server conn %p register brokenlink callback", conn); if (conn->status == ConnAcquire) { - if (taosArrayGetSize(conn->srvMsgs) > 0) { - taosArrayPush(conn->srvMsgs, &msg); + if (!transQueuePush(&conn->srvMsgs, msg)) { return; } conn->regArg.notifyCount = 0; diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 1f8c8e8ff2..65d9302994 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -144,7 +144,7 @@ class TransCtxEnv : public ::testing::Test { // TODO } virtual void TearDown() { - transCtxDestroy(ctx); + transCtxCleanup(ctx); // formate } STransCtx *ctx;