From a78a8c12302d04bbc37401e668c500e224a81ad5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 24 Aug 2024 18:03:50 +0800 Subject: [PATCH] refactor transport --- include/util/taoserror.h | 3 +- source/libs/transport/src/transCli.c | 762 ++++++++++++++------------ source/libs/transport/src/transComm.c | 4 - source/util/src/terror.c | 1 + 4 files changed, 406 insertions(+), 364 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b772edbf22..a084f7b2f5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -91,7 +91,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) #define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) #define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026) -#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) +#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) +#define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5218d376ae..12022f495f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -65,7 +65,7 @@ typedef struct SCliConn { void* hostThrd; SConnBuffer readBuf; - STransQueue reqMsgs; + STransQueue reqs; queue q; SConnList* list; @@ -133,6 +133,10 @@ typedef struct SCliThrd { SCliReq* stopMsg; bool quit; + + int32_t (*initCb)(void* arg, SCliReq* pReq, STransMsg* pResp); + int32_t (*notifyCb)(void* arg, SCliReq* pReq, STransMsg* pResp); + int32_t (*notifyExceptCb)(void* arg, SCliReq* pReq, STransMsg* pResp); } SCliThrd; typedef struct SCliObj { @@ -157,6 +161,9 @@ static void* destroyConnPool(SCliThrd* thread); static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); +static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); +static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn); +static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port); // register conn timer static void cliConnTimeout(uv_timer_t* handle); @@ -219,17 +226,17 @@ static void cliHandleFastFail(SCliConn* pConn, int status); static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code); // handle req from app -static void cliHandleReq(SCliReq* pReq, SCliThrd* pThrd); -static void cliHandleQuit(SCliReq* pReq, SCliThrd* pThrd); -static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd); -static void cliHandleUpdate(SCliReq* pReq, SCliThrd* pThrd); +static void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq); +static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq); +static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq); +static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq); +static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq); static void cliDealReq(queue* h, SCliThrd* pThrd); static void cliBatchDealReq(queue* h, SCliThrd* pThrd); static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDealReq, cliBatchDealReq}; -static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd); -static void (*cliAsyncHandle[])(SCliReq* pReq, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, +static void (*cliAsyncHandle[])(SCliThrd* pThrd, SCliReq* pReq) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate, cliHandleFreeById}; static FORCE_INLINE void destroyReq(void* cmsg); @@ -289,9 +296,9 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ - int i = 0, sz = transQueueSize(&conn->reqMsgs); \ + int i = 0, sz = transQueueSize(&conn->reqs); \ for (; i < sz; i++) { \ - pReq = transQueueGet(&conn->reqMsgs, i); \ + pReq = transQueueGet(&conn->reqs, i); \ if (pReq->ctx != NULL && (uint64_t)pReq->ctx->ahandle == ahandle) { \ break; \ } \ @@ -299,22 +306,22 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); if (i == sz) { \ pReq = NULL; \ } else { \ - pReq = transQueueRm(&conn->reqMsgs, i); \ + pReq = transQueueRm(&conn->reqs, i); \ } \ } while (0) -#define CONN_GET_NEXT_SENDMSG(conn) \ - do { \ - int i = 0; \ - do { \ - pCliMsg = transQueueGet(&conn->reqMsgs, i++); \ - if (pCliMsg && 0 == pCliMsg->sent) { \ - break; \ - } \ - } while (pCliMsg != NULL); \ - if (pCliMsg == NULL) { \ - goto _RETURN; \ - } \ +#define CONN_GET_NEXT_SENDMSG(conn) \ + do { \ + int i = 0; \ + do { \ + pCliMsg = transQueueGet(&conn->reqs, i++); \ + if (pCliMsg && 0 == pCliMsg->sent) { \ + break; \ + } \ + } while (pCliMsg != NULL); \ + if (pCliMsg == NULL) { \ + goto _RETURN; \ + } \ } while (0) #define CONN_SET_PERSIST_BY_APP(conn) \ @@ -351,8 +358,8 @@ static void* cliWorkThread(void* arg); static void cliReleaseUnfinishedMsg(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - for (int i = 0; i < transQueueSize(&conn->reqMsgs); i++) { - SCliReq* msg = transQueueGet(&conn->reqMsgs, i); + for (int i = 0; i < transQueueSize(&conn->reqs); i++) { + SCliReq* msg = transQueueGet(&conn->reqs, i); if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { conn->ctx.freeFunc(msg->ctx->ahandle); @@ -363,11 +370,11 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } destroyReq(msg); } - transQueueClear(&conn->reqMsgs); + transQueueClear(&conn->reqs); memset(&conn->ctx, 0, sizeof(conn->ctx)); } bool cliMaySendCachedMsg(SCliConn* conn) { - if (!transQueueEmpty(&conn->reqMsgs)) { + if (!transQueueEmpty(&conn->reqs)) { SCliReq* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); cliSend(conn); @@ -394,7 +401,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { taosWUnLockLatch(&exh->latch); SCliReq* t = QUEUE_DATA(h, SCliReq, seqq); transCtxMerge(&conn->ctx, &t->ctx->userCtx); - (void)transQueuePush(&conn->reqMsgs, t); + (void)transQueuePush(&conn->reqs, t); tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); (void)transReleaseExHandle(transGetRefMgt(), refId); cliSend(conn); @@ -422,10 +429,10 @@ void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) { SCliReq* pReq = NULL; - for (int i = 0; i < transQueueSize(&conn->reqMsgs); i++) { - pReq = transQueueGet(&conn->reqMsgs, i); + for (int i = 0; i < transQueueSize(&conn->reqs); i++) { + pReq = transQueueGet(&conn->reqs, i); if (pReq->seq == seq) { - transQueueRm(&conn->reqMsgs, i); + transQueueRm(&conn->reqs, i); break; } } @@ -436,7 +443,7 @@ SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) { } bool cliShouldAddConnToPool(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - bool empty = transQueueEmpty(&conn->reqMsgs); + bool empty = transQueueEmpty(&conn->reqs); if (empty) { (void)delConnFromHeapCache(pThrd->connHeapCache, conn); } @@ -534,7 +541,7 @@ void cliHandleResp(SCliConn* conn) { SCliReq* pReq = NULL; SReqCtx* pCtx = NULL; if (CONN_NO_PERSIST_BY_APP(conn)) { - pReq = transQueuePop(&conn->reqMsgs); + pReq = transQueuePop(&conn->reqs); pCtx = pReq ? pReq->ctx : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -620,7 +627,7 @@ static void cliDestroyMsgInExhandle(int64_t refId) { } void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { - if (transQueueEmpty(&pConn->reqMsgs)) { + if (transQueueEmpty(&pConn->reqs)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); @@ -632,7 +639,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { STrans* pInst = pThrd->pInst; bool once = false; do { - SCliReq* pReq = transQueuePop(&pConn->reqMsgs); + SCliReq* pReq = transQueuePop(&pConn->reqs); if (pReq == NULL && once) { break; @@ -685,7 +692,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { } destroyReq(pReq); tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); - } while (!transQueueEmpty(&pConn->reqMsgs)); + } while (!transQueueEmpty(&pConn->reqs)); if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); transUnrefCliHandle(pConn); } @@ -794,6 +801,93 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { return conn; } +static int32_t getOrCreateMsgList(SCliThrd* pThrd, const char* key, SConnList** ppList) { + int32_t code = 0; + void* pool = pThrd->pool; + size_t klen = strlen(key); + SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); + if (plist == NULL) { + SConnList list = {0}; + code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); + if (code != 0) { + return code; + } + + plist = taosHashGet(pool, key, klen); + if (plist == NULL) { + return TSDB_CODE_INVALID_PTR; + } + + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + if (nList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + QUEUE_INIT(&nList->msgQ); + nList->numOfConn++; + + QUEUE_INIT(&plist->conns); + plist->list = nList; + *ppList = plist; + } else { + *ppList = plist; + } + return 0; +} +static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** ppConn) { + int32_t code = 0; + void* pool = pThrd->pool; + STrans* pInst = pThrd->pInst; + // size_t klen = strlen(key); + + SConnList* plist = NULL; + code = getOrCreateMsgList(pThrd, key, &plist); + if (code != 0) { + return code; + } + + if (QUEUE_IS_EMPTY(&plist->conns)) { + if (plist->list->numOfConn >= pInst->connLimitNum) { + return TSDB_CODE_RPC_MAX_SESSIONS; + } + return TSDB_CODE_RPC_NETWORK_BUSY; + } + + queue* h = QUEUE_TAIL(&plist->conns); + plist->size -= 1; + QUEUE_REMOVE(h); + + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); + conn->status = ConnNormal; + QUEUE_INIT(&conn->q); + + if (conn->task != NULL) { + SDelayTask* task = conn->task; + conn->task = NULL; + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, task); + } + + tDebug("conn %p get from pool, pool size:%d, dst:%s", conn, conn->list->size, conn->dstAddr); + + return 0; +} + +// code +static int32_t cliGetConnOrCreate(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { + // impl later + char* fqdn = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); + + int32_t code = cliGetConnFromPool(pThrd, addr, pConn); + if (code == TSDB_CODE_RPC_MAX_SESSIONS) { + return code; + } else if (code == TSDB_CODE_RPC_NETWORK_BUSY) { + code = cliCreateConn2(pThrd, pReq, pConn); + } else { + } + return code; +} static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliReq** pReq) { void* pool = pThrd->pool; STrans* pInst = pThrd->pInst; @@ -899,7 +993,6 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliReq** pReq) { conn->status = ConnNormal; QUEUE_INIT(&conn->q); tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); - if (conn->task != NULL) { transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); conn->task = NULL; @@ -941,7 +1034,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { pReq->ctx->task = NULL; transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); - (void)transQueuePush(&conn->reqMsgs, pReq); + (void)transQueuePush(&conn->reqs, pReq); conn->status = ConnNormal; cliSend(conn); @@ -1069,15 +1162,59 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } } +static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { + SCliConn* pConn = NULL; + + int32_t code = cliCreateConn(pThrd, &pConn); + if (code != 0) { + return code; + } + + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + + pConn->dstAddr = taosStrdup(addr); + code = addConnToHeapCache(pThrd->connHeapCache, pConn); + + transQueuePush(&pConn->reqs, pReq); + return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); +} + static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { int32_t code = 0; - int8_t registed = 0; SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); if (conn == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + transReqQueueInit(&conn->wreqQueue); + QUEUE_INIT(&conn->q); + conn->hostThrd = pThrd; + conn->status = ConnNormal; + conn->broken = false; + + TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed); + + TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); + + transRefCliHandle(conn); + + transReqQueueInit(&conn->wreqQueue); + + TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed); + + TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); + + QUEUE_INIT(&conn->q); + conn->hostThrd = pThrd; + conn->status = ConnNormal; + conn->broken = false; + transRefCliHandle(conn); + conn->seq = 0; + + TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); + // read/write stream handle conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); if (conn->stream == NULL) { @@ -1092,66 +1229,18 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { TAOS_CHECK_GOTO(code, NULL, _failed); } - registed = 1; conn->stream->data = conn; - conn->connReq.data = conn; - transReqQueueInit(&conn->wreqQueue); - QUEUE_INIT(&conn->q); - conn->hostThrd = pThrd; - conn->status = ConnNormal; - conn->broken = false; - - TAOS_CHECK_GOTO(transQueueInit(&conn->reqMsgs, NULL), NULL, _failed); - - TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); - - transRefCliHandle(conn); - - uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; - if (timer == NULL) { - timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); - if (timer == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _failed); - } - - tDebug("no available timer, create a timer %p", timer); - (void)uv_timer_init(pThrd->loop, timer); - } - timer->data = conn; - - conn->timer = timer; - conn->connReq.data = conn; - transReqQueueInit(&conn->wreqQueue); - - TAOS_CHECK_GOTO(transQueueInit(&conn->reqMsgs, NULL), NULL, _failed); - - TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); - - QUEUE_INIT(&conn->q); - conn->hostThrd = pThrd; - conn->status = ConnNormal; - conn->broken = false; - transRefCliHandle(conn); - conn->seq = 0; - // allocConnRef(conn, false); - - TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); - *pCliConn = conn; return code; _failed: - if (registed == 1) { - uv_close((uv_handle_t*)conn->stream, cliDestroy); - } else { - if (conn) { - taosMemoryFree(conn->stream); - (void)transDestroyBuffer(&conn->readBuf); - transQueueDestroy(&conn->reqMsgs); - } - taosMemoryFree(conn); + if (conn) { + taosMemoryFree(conn->stream); + (void)transDestroyBuffer(&conn->readBuf); + transQueueDestroy(&conn->reqs); } + taosMemoryFree(conn); return code; } static void cliDestroyConn(SCliConn* conn, bool clear) { @@ -1217,10 +1306,10 @@ static void cliDestroy(uv_handle_t* handle) { } static bool cliHandleNoResp(SCliConn* conn) { bool res = false; - if (!transQueueEmpty(&conn->reqMsgs)) { - SCliReq* pReq = transQueueGet(&conn->reqMsgs, 0); + if (!transQueueEmpty(&conn->reqs)) { + SCliReq* pReq = transQueueGet(&conn->reqs, 0); if (REQUEST_NO_RESP(&pReq->msg)) { - (void)transQueuePop(&conn->reqMsgs); + (void)transQueuePop(&conn->reqs); destroyReq(pReq); res = true; } @@ -1242,7 +1331,7 @@ static void cliSendCb(uv_write_t* req, int status) { SCliConn* pConn = transReqQueueRemove(req); if (pConn == NULL) return; - SCliReq* pReq = transQueueGet(&pConn->reqMsgs, 0); + SCliReq* pReq = transQueueGet(&pConn->reqs, 0); if (pReq != NULL) { int64_t cost = taosGetTimestampUs() - pReq->st; if (cost > 1000 * 50) { @@ -1274,8 +1363,8 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { int32_t code = -1; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - while (!transQueueEmpty(&conn->reqMsgs)) { - SCliReq* pReq = transQueuePop(&conn->reqMsgs); + while (!transQueueEmpty(&conn->reqs)) { + SCliReq* pReq = transQueuePop(&conn->reqs); ASSERT(pReq->type != Release); ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0); @@ -1317,7 +1406,7 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { void cliSendBatch_shareConn(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - int32_t size = transQueueSize(&pConn->reqMsgs); + int32_t size = transQueueSize(&pConn->reqs); int32_t totalLen = 0; if (size == 0) { @@ -1330,7 +1419,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int j = 0; for (int i = 0; i < size; i++) { - SCliReq* pCliMsg = transQueueGet(&pConn->reqMsgs, i); + SCliReq* pCliMsg = transQueueGet(&pConn->reqs, i); if (pCliMsg->sent == 1) { continue; } @@ -1478,7 +1567,7 @@ void cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - if (transQueueEmpty(&pConn->reqMsgs)) { + if (transQueueEmpty(&pConn->reqs)) { tError("%s conn %p not msg to send", pInst->label, pConn); cliHandleExcept(pConn, -1); return; @@ -1567,19 +1656,13 @@ static void cliDestroyBatch(SCliBatch* pBatch) { taosMemoryFree(pBatch); } -static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { +static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { + int32_t lino = 0; STrans* pInst = pThrd->pInst; uint32_t ipaddr; int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr); if (code != 0) { - cliResetConnTimer(conn); - if (conn->pBatch != NULL) { - cliHandleFastFail(conn, -1); - } else { - cliHandleBatch_shareConnExcept(conn); - } - - return; + TAOS_CHECK_GOTO(code, &lino, _exception); } struct sockaddr_in addr; @@ -1590,32 +1673,40 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { tTrace("%s conn %p try to connect to %s", pInst->label, conn, conn->dstAddr); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); - if (fd == -1) { - tError("%s conn %p failed to create socket, reason:%s", transLabel(pInst), conn, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - cliHandleFastFail(conn, -1); - return; + if (fd < 0) { + TAOS_CHECK_GOTO(terrno, &lino, _exception); } int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - cliHandleFastFail(conn, -1); - return; + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); } ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); if (ret != 0) { tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - cliHandleFastFail(conn, -1); - return; + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); + return code; } + uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; + if (timer == NULL) { + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } + + tDebug("no available timer, create a timer %p", timer); + (void)uv_timer_init(pThrd->loop, timer); + } + timer->data = conn; + conn->timer = timer; ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - cliResetConnTimer(conn); + // cliResetConnTimer(conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, -1); - return; + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); @@ -1624,9 +1715,13 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { cliResetConnTimer(conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, -1); - return; + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } - return; + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; +_exception: + tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); + taosMemoryFree(conn); // free conn later + return code; } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { @@ -1662,7 +1757,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); - return cliDoConn(pThrd, conn, pList->ip, pList->port); + (void)cliDoConn(pThrd, conn, pList->ip, pList->port); } conn->pBatch = pBatch; @@ -1707,7 +1802,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { static void cliHandleFastFail_resp(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - SCliReq* pReq = transQueueGet(&pConn->reqMsgs, 0); + SCliReq* pReq = transQueueGet(&pConn->reqs, 0); STraceId* trace = &pReq->msg.info.traceId; tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), @@ -1784,27 +1879,27 @@ static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { SReqCtx* pCtx = pReq->ctx; STrans* pInst = pThrd->pInst; - STransMsg transMsg = {0}; - transMsg.contLen = 0; - transMsg.pCont = NULL; - transMsg.code = code; - transMsg.msgType = pReq->msg.msgType + 1; - transMsg.info.ahandle = pReq->ctx->ahandle; - transMsg.info.traceId = pReq->msg.info.traceId; - transMsg.info.hasEpSet = false; - transMsg.info.cliVer = pInst->compatibilityVer; + STransMsg resp = {0}; + resp.contLen = 0; + resp.pCont = NULL; + resp.code = code; + resp.msgType = pReq->msg.msgType + 1; + resp.info.ahandle = pReq->ctx->ahandle; + resp.info.traceId = pReq->msg.info.traceId; + resp.info.hasEpSet = false; + resp.info.cliVer = pInst->compatibilityVer; if (pCtx->pSem != NULL) { if (pCtx->pRsp == NULL) { } else { - memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); + memcpy((char*)pCtx->pRsp, (char*)&resp, sizeof(resp)); } } else { - pInst->cfp(pInst->parent, &transMsg, NULL); + pInst->cfp(pInst->parent, &resp, NULL); } destroyReq(pReq); } -static void cliHandleQuit(SCliReq* pReq, SCliThrd* pThrd) { +static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) { if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { pThrd->stopMsg = pReq; return; @@ -1817,7 +1912,7 @@ static void cliHandleQuit(SCliReq* pReq, SCliThrd* pThrd) { (void)destroyConnPool(pThrd); (void)uv_walk(pThrd->loop, cliWalkCb, NULL); } -static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd) { +static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { int64_t refId = (int64_t)(pReq->msg.info.handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { @@ -1835,7 +1930,7 @@ static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd) { if (T_REF_VAL_GET(conn) == 2) { transUnrefCliHandle(conn); - if (!transQueuePush(&conn->reqMsgs, pReq)) { + if (!transQueuePush(&conn->reqs, pReq)) { return; } cliSend(conn); @@ -1844,12 +1939,12 @@ static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd) { destroyReq(pReq); } } -static void cliHandleUpdate(SCliReq* pReq, SCliThrd* pThrd) { +static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { SReqCtx* pCtx = pReq->ctx; pThrd->cvtAddr = pCtx->cvtAddr; destroyReq(pReq); } -static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd) { +static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq) { int32_t code = 0; int64_t refId = (int64_t)(pReq->msg.info.handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); @@ -1868,7 +1963,7 @@ static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd) { } tDebug("do free conn %p by id %" PRId64 "", conn, refId); - int32_t size = transQueueSize(&conn->reqMsgs); + int32_t size = transQueueSize(&conn->reqs); if (size == 0) { // already recv, and notify upper layer TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); @@ -2041,225 +2136,84 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } -static int32_t getOrCreateHeapIfNotExist(SHashObj* pConnHeapCache, char* key, SHeap** pHeap) { - int32_t code = 0; - size_t klen = strlen(key); +void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) { + int32_t code = 0; + int32_t lino = 0; + STransMsg resp = {0}; - SHeap* p = taosHashGet(pConnHeapCache, key, klen); - if (p == NULL) { - SHeap heap = {0}; - code = transHeapInit(&heap, compareHeapNode); - if (code != 0) { - tError("failed to init heap cache for key:%s, reason: %s", key, tstrerror(code)); - return code; - } + code = (pThrd->initCb)(pThrd, pReq, NULL); + TAOS_CHECK_GOTO(code, &lino, _exception); - code = taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap)); - if (code != 0) { - transHeapDestroy(&heap); - tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(code)); - } - p = taosHashGet(pConnHeapCache, key, klen); - if (p == NULL) { - code = TSDB_CODE_INVALID_PARA; + STraceId* trace = &pReq->msg.info.traceId; + STrans* pInst = pThrd->pInst; + + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + + SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); + if (pConn == NULL) { + tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + bool ignore = false; + pConn = getConnFromPool(pThrd, addr, &ignore); + if (pConn != NULL) { + addConnToHeapCache(pThrd->connHeapCache, pConn); + transQueuePush(&pConn->reqs, pReq); + return cliSendBatch_shareConn(pConn); } + } else { + tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + transQueuePush(&pConn->reqs, pReq); + cliSendBatch_shareConn(pConn); + return; } - *pHeap = p; - return code; + + code = cliCreateConn(pThrd, &pConn); + pConn->dstAddr = taosStrdup(addr); + code = addConnToHeapCache(pThrd->connHeapCache, pConn); + + transQueuePush(&pConn->reqs, pReq); + cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + +_exception: + + resp.code = code; + (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp); + return; } -static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { - int code = 0; - SHeap* pHeap = NULL; +void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { + int32_t lino = 0; + STransMsg resp = {0}; + int32_t code = (pThrd->initCb)(pThrd, pReq, NULL); + TAOS_CHECK_GOTO(code, &lino, _exception); + + STrans* pInst = pThrd->pInst; SCliConn* pConn = NULL; - code = getOrCreateHeapIfNotExist(pConnHeapCache, key, &pHeap); - if (code != 0) { - tDebug("failed to get conn heap from cache for key:%s", key); - return NULL; - } - code = transHeapGet(pHeap, &pConn); - if (code != 0) { - tDebug("failed to get conn from heap cache for key:%s", key); - return NULL; - } - return pConn; -} -static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { - SHeap* p = NULL; - int32_t code = getOrCreateHeapIfNotExist(pConnHeapCacahe, pConn->dstAddr, &p); - if (code != 0) { - return code; + code = cliGetConnOrCreate(pThrd, pReq, &pConn); + if (code == TSDB_CODE_RPC_MAX_SESSIONS) { + TAOS_CHECK_GOTO(code, &lino, _exception); + } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + // do nothing, notifyCb + return; + } else { } - return transHeapInsert(p, pConn); + + tTrace("%s conn %p ready", pInst->label, pConn); +_exception: + + resp.code = code; + (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp); + return; } -static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { - SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr)); - if (p == NULL) { - tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr); - return 0; +void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { + STrans* pInst = pThrd->pInst; + if (pInst->shareConn == 1) { + return cliHandleReq__shareConn(pThrd, pReq); + } else { + return cliHandleReq__noShareConn(pThrd, pReq); } - int32_t code = transHeapDelete(p, pConn); - if (code != 0) { - tDebug("failed to delete conn %p from heap cache since %s", pConn, tstrerror(code)); - } - return code; -} - -void cliHandleReq__shareConn(SCliReq* pReq, SCliThrd* pThrd) { - // int32_t code = 0; - - // STraceId* trace = &pReq->msg.info.traceId; - // STrans* pInst = pThrd->pInst; - - // code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); - // if (code != 0) { - // // TODO: notifyCb - // destroyReq(pReq); - // return; - // } - - // char addr[TSDB_FQDN_LEN + 64] = {0}; - // CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); - - // SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); - // if (pConn == NULL) { - // tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); - // bool ignore = false; - // pConn = getConnFromPool(pThrd, addr, &ignore); - // if (pConn != NULL) { - // addConnToHeapCache(pThrd->connHeapCache, pConn); - // transQueuePush(&pConn->reqMsgs, pReq); - // return cliSendBatch_shareConn(pConn); - // } - // } else { - // tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); - // transQueuePush(&pConn->reqMsgs, pReq); - // cliSendBatch_shareConn(pConn); - // return; - // } - - // code = cliCreateConn(pThrd, &pConn); - // pConn->dstAddr = taosStrdup(addr); - // code = addConnToHeapCache(pThrd->connHeapCache, pConn); - - // transQueuePush(&pConn->reqMsgs, pReq); - // return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); -} - -void cliHandleReq__noShareConn(SCliReq* pReq, SCliThrd* pThrd) { - // int32_t code; - // STrans* pInst = pThrd->pInst; - // code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); - // if (code != 0) { - // // notifyCb - // destroyReq(pReq); - // } - - // char* fqdn = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); - // uint16_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); - // char addr[TSDB_FQDN_LEN + 64] = {0}; - // CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); - - // bool ignore = false; - // SCliConn* conn = cliGetConn(&pReq, pThrd, &ignore, addr); - // if (ignore == true) { - // // persist conn already release by server - // STransMsg resp = {0}; - // if (pReq->type != Release) { - // (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, 0); - // } - // destroyReq(pReq); - // return; - // } - // if (conn == NULL && pReq == NULL) { - // return; - // } - // STraceId* trace = &pReq->msg.info.traceId; - - // if (conn != NULL) { - // transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); - // (void)transQueuePush(&conn->reqMsgs, pReq); - // cliSend(conn); - // } else { - // code = cliCreateConn(pThrd, &conn); - // if (code != 0) { - // tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code)); - // (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, code); - // destroyReq(pReq); - // return; - // } - - // specifyConnRef(conn, true, (int64_t)pReq->msg.info.handle); - - // transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); - // (void)transQueuePush(&conn->reqMsgs, pReq); - - // conn->dstAddr = taosStrdup(addr); - // if (conn->dstAddr == NULL) { - // tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pInst), conn, - // tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // cliHandleFastFail(conn, -1); - // return; - // } - - // uint32_t ipaddr; - // int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); - // if (code != 0) { - // cliResetConnTimer(conn); - // cliHandleExcept(conn, code); - // return; - // } - - // struct sockaddr_in addr; - // addr.sin_family = AF_INET; - // addr.sin_addr.s_addr = ipaddr; - // addr.sin_port = (uint16_t)htons(port); - - // tGTrace("%s conn %p try to connect to %s", pInst->label, conn, conn->dstAddr); - // int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); - // if (fd == -1) { - // tGError("%s conn %p failed to create socket, reason:%s", transLabel(pInst), conn, - // tstrerror(TAOS_SYSTEM_ERROR(errno))); - // cliHandleExcept(conn, -1); - // errno = 0; - // return; - // } - - // int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); - // if (ret != 0) { - // tGError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - // cliHandleExcept(conn, -1); - // return; - // } - - // ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); - // if (ret != 0) { - // tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - // cliHandleExcept(conn, -1); - // return; - // } - - // ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); - // if (ret != 0) { - // cliResetConnTimer(conn); - // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - // cliHandleFastFail(conn, ret); - // return; - // } - // (void)uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); - // } - // tGTrace("%s conn %p ready", pInst->label, conn); -} - -void cliHandleReq(SCliReq* pReq, SCliThrd* pThrd) { - // STrans* pInst = pThrd->pInst; - // if (pInst->shareConn == 1) { - // return cliHandleReq__shareConn(pReq, pThrd); - // } else { - // return cliHandleReq__noShareConn(pReq, pThrd); - // } } static void cliDealReq(queue* wq, SCliThrd* pThrd) { @@ -2275,7 +2229,7 @@ static void cliDealReq(queue* wq, SCliThrd* pThrd) { pThrd->stopMsg = pReq; continue; } - (*cliAsyncHandle[pReq->type])(pReq, pThrd); + (*cliAsyncHandle[pReq->type])(pThrd, pReq); count++; } if (count >= 2) { @@ -2434,7 +2388,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { } continue; } - (*cliAsyncHandle[pReq->type])(pReq, pThrd); + (*cliAsyncHandle[pReq->type])(pThrd, pReq); count++; } @@ -2466,16 +2420,16 @@ static void cliAsyncCb(uv_async_t* handle) { cliDealFunc[pInst->supportBatch](&wq, pThrd); - if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); + if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg); } void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { transCtxCleanup(&conn->ctx); cliReleaseUnfinishedMsg(conn); if (destroy == 1) { - transQueueDestroy(&conn->reqMsgs); + transQueueDestroy(&conn->reqs); } else { - transQueueClear(&conn->reqMsgs); + transQueueClear(&conn->reqs); } } @@ -2483,8 +2437,8 @@ void cliConnFreeMsgs(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - for (int i = 0; i < transQueueSize(&conn->reqMsgs); i++) { - SCliReq* cmsg = transQueueGet(&conn->reqMsgs, i); + for (int i = 0; i < transQueueSize(&conn->reqs); i++) { + SCliReq* cmsg = transQueueGet(&conn->reqs, i); if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { continue; } @@ -2507,8 +2461,8 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { (void)transClearBuffer(&conn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); - for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqMsgs); i++) { - SCliReq* pReq = transQueueGet(&conn->reqMsgs, i); + for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqs); i++) { + SCliReq* pReq = transQueueGet(&conn->reqs, i); if (pReq->type == Release) { ASSERTS(pReq == NULL, "trans-cli recv invaid release-req"); tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, @@ -2629,6 +2583,27 @@ _err: return NULL; } +int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { + SCliThrd* pThrd = pThrd; + return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); +} +int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { + STrans* pInst = ((SCliThrd*)thrd)->pInst; + int32_t code = cliBuildExceptResp(pReq, pResp); + + if (code != 0) { + return code; + } + pResp->info.cliVer = pInst->compatibilityVer; + pInst->cfp(pInst->parent, pResp, NULL); + return code; +} + +int32_t notfiyCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { + // impl later + return 0; +} + static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { int32_t code = 0; STrans* pInst = trans; @@ -2714,6 +2689,10 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } + pThrd->initCb = initCb; + pThrd->notifyCb = notfiyCb; + pThrd->notifyExceptCb = notifyExceptCb; + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pInst->idleTime); pThrd->pInst = trans; pThrd->quit = false; @@ -2841,7 +2820,7 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pInst) { } static FORCE_INLINE void doDelayTask(void* param) { STaskArg* arg = param; - cliHandleReq((SCliReq*)arg->param1, (SCliThrd*)arg->param2); + cliHandleReq((SCliThrd*)arg->param2, (SCliReq*)arg->param1); taosMemoryFree(arg); } @@ -3771,11 +3750,76 @@ _exception: return code; } +static int32_t getOrCreateHeapIfNotExist(SHashObj* pConnHeapCache, char* key, SHeap** pHeap) { + int32_t code = 0; + size_t klen = strlen(key); + + SHeap* p = taosHashGet(pConnHeapCache, key, klen); + if (p == NULL) { + SHeap heap = {0}; + code = transHeapInit(&heap, compareHeapNode); + if (code != 0) { + tError("failed to init heap cache for key:%s, reason: %s", key, tstrerror(code)); + return code; + } + + code = taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap)); + if (code != 0) { + transHeapDestroy(&heap); + tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(code)); + } + p = taosHashGet(pConnHeapCache, key, klen); + if (p == NULL) { + code = TSDB_CODE_INVALID_PARA; + } + } + *pHeap = p; + return code; +} + +static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { + int code = 0; + SHeap* pHeap = NULL; + SCliConn* pConn = NULL; + code = getOrCreateHeapIfNotExist(pConnHeapCache, key, &pHeap); + if (code != 0) { + tDebug("failed to get conn heap from cache for key:%s", key); + return NULL; + } + code = transHeapGet(pHeap, &pConn); + if (code != 0) { + tDebug("failed to get conn from heap cache for key:%s", key); + return NULL; + } + return pConn; +} +static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { + SHeap* p = NULL; + + int32_t code = getOrCreateHeapIfNotExist(pConnHeapCacahe, pConn->dstAddr, &p); + if (code != 0) { + return code; + } + return transHeapInsert(p, pConn); +} + +static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { + SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr)); + if (p == NULL) { + tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr); + return 0; + } + int32_t code = transHeapDelete(p, pConn); + if (code != 0) { + tDebug("failed to delete conn %p from heap cache since %s", pConn, tstrerror(code)); + } + return code; +} // conn heap int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node); - if (transQueueSize(&args1->reqMsgs) > transQueueSize(&args2->reqMsgs)) { + if (transQueueSize(&args1->reqs) > transQueueSize(&args2->reqs)) { return 0; } return 1; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5dc375da24..60058bbbd2 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -376,10 +376,6 @@ void transCtxMerge(STransCtx* dst, STransCtx* src) { STransCtxVal* sVal = (STransCtxVal*)iter; key = taosHashGetKey(sVal, &klen); - // STransCtxVal* dVal = taosHashGet(dst->args, key, klen); - // if (dVal) { - // dst->freeFunc(dVal->val); - // } (void)taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); iter = taosHashIterate(src->args, iter); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b307c4ac4b..e03913d0e7 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -59,6 +59,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")