diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5b860cc23a..486a5e35c3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -125,7 +125,8 @@ typedef struct SRpcInit { int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch int32_t batchSize; - int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait + int8_t shareConn; // 0: no share, 1. share + int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait void *parent; } SRpcInit; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 986bbc4ac8..1861144bf0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -386,6 +386,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; + rpcInit.shareConn = 1; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.notWaitAvaliableConn = 0; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 83e55791d2..0c39919beb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3413,6 +3413,9 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons size_t vlen = 0; const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); *pVLen = valueDecode((void*)valStr, vlen, NULL, (char**)pVal); + if (*pVLen < 0) { + return -1; + } } *pKey = pKtmp->key; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 820075787f..ad3ee0bfb1 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -186,6 +186,7 @@ typedef struct { uint32_t code; // del later uint32_t msgType; int32_t msgLen; + int32_t seqNum; uint8_t content[0]; // message body starts from here } STransMsgHead; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 703a4dde3e..f56c435cba 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -71,11 +71,13 @@ typedef struct { int8_t connLimitLock; // 0: no lock. 1. lock int8_t supportBatch; // 0: no batch, 1: support batch int32_t batchSize; + int8_t optBatchFetch; int32_t timeToGetConn; int index; void* parent; void* tcphandle; // returned handle from TCP initialization int64_t refId; + int8_t shareConn; TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 8b99443a84..6b0b557f1c 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -115,6 +115,8 @@ void* rpcOpen(const SRpcInit* pInit) { int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); (void)transAcquireExHandle(transGetInstMgt(), refId); pRpc->refId = refId; + + pRpc->shareConn = pInit->shareConn; return (void*)refId; _end: taosMemoryFree(pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 6d6336425d..1f2b1fd88f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -13,8 +13,10 @@ */ // clang-format off +#include "taoserror.h" #include "transComm.h" #include "tmisce.h" +#include "transLog.h" // clang-format on typedef struct { @@ -76,6 +78,8 @@ typedef struct SCliConn { SDelayTask* task; + HeapNode node; // for heap + int8_t inHeap; uint32_t clientIp; uint32_t serverIp; @@ -84,6 +88,8 @@ typedef struct SCliConn { char dst[32]; int64_t refId; + int32_t seq; + int32_t shareCnt; } SCliConn; typedef struct SCliMsg { @@ -96,6 +102,7 @@ typedef struct SCliMsg { uint64_t st; int sent; //(0: no send, 1: alread sent) queue seqq; + int32_t seqNum; } SCliMsg; typedef struct SCliThrd { @@ -114,15 +121,15 @@ typedef struct SCliThrd { SDelayQueue* timeoutQueue; SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout - STrans* pTransInst; // + STrans* pInst; // - int connCount; void (*destroyAhandleFp)(void* ahandle); SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; SHashObj* failFastCache; SHashObj* batchCache; + SHashObj* connHeapCache; SCliMsg* stopMsg; bool quit; @@ -166,8 +173,8 @@ static void cliSendCb(uv_write_t* req, int status); // callback after conn to server static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -static void cliIdleCb(uv_idle_t* handle); -static void cliPrepareCb(uv_prepare_t* handle); +// static void cliIdleCb(uv_idle_t* handle); +// static void cliPrepareCb(uv_prepare_t* handle); static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); static void cliSendBatchCb(uv_write_t* req, int status); @@ -178,7 +185,8 @@ static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); -static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); +static int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); +void cliResetConnTimer(SCliConn* conn); static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); @@ -192,10 +200,11 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs static void cliDestroyBatch(SCliBatch* pBatch); // cli util func -static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); -static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); +static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); +static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); +static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); @@ -208,36 +217,61 @@ static void cliHandleExcept(SCliConn* conn, int32_t code); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); +static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); + +static void cliDealReq(queue* h, SCliThrd* pThrd); +static void cliBatchDealReq(queue* h, SCliThrd* pThrd); +static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDealReq, cliBatchDealReq}; static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd); static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate, cliHandleFreeById}; -/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, -/// NULL,cliHandleUpdate}; static FORCE_INLINE void destroyCmsg(void* cmsg); static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param); static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); -static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); +static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); +static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); +static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); +static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); + // thread obj static int32_t createThrdObj(void* trans, SCliThrd** pThrd); static void destroyThrdObj(SCliThrd* pThrd); static void cliWalkCb(uv_handle_t* handle, void* arg); -#define CLI_RELEASE_UV(loop) \ - do { \ - (void)uv_walk(loop, cliWalkCb, NULL); \ - (void)uv_run(loop, UV_RUN_DEFAULT); \ - (void)uv_loop_close(loop); \ +static void cliWalkCb(uv_handle_t* handle, void* arg); + +typedef struct { + void* p; + HeapNode node; +} SHeapNode; +typedef struct { + // void* p; + Heap* heap; + int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b); +} SHeap; + +int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); +int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); +void transHeapDestroy(SHeap* heap); +int32_t transHeapGet(SHeap* heap, SCliConn** p); +int32_t transHeapInsert(SHeap* heap, SCliConn* p); +int32_t transHeapDelete(SHeap* heap, SCliConn* p); + +#define CLI_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, cliWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ } while (0); // snprintf may cause performance problem @@ -251,7 +285,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } while (0) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pInst))->label) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -372,10 +406,8 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { return false; } -void cliHandleResp(SCliConn* conn) { +void cliResetConnTimer(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; - if (conn->timer) { if (uv_is_active((uv_handle_t*)conn->timer)) { tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); @@ -385,6 +417,83 @@ void cliHandleResp(SCliConn* conn) { conn->timer->data = NULL; conn->timer = NULL; } +} +void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } + +SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) { + SCliMsg* pMsg = NULL; + for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { + pMsg = transQueueGet(&conn->cliMsgs, i); + if (pMsg->seqNum == seqNum) { + transQueueRm(&conn->cliMsgs, i); + break; + } + } + if (pMsg == NULL) { + ASSERT(0); + } + return pMsg; +} +bool cliShouldAddConnToPool(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + bool empty = transQueueEmpty(&conn->cliMsgs); + if (empty) { + (void)delConnFromHeapCache(pThrd->connHeapCache, conn); + } + + return empty; +} +void cliHandleResp_shareConn(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pInst = pThrd->pInst; + cliResetConnTimer(conn); + + STransMsgHead* pHead = NULL; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 1); + + if (msgLen <= 0) { + taosMemoryFree(pHead); + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + return; + } + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { + tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); + } + pHead->code = htonl(pHead->code); + pHead->msgLen = htonl(pHead->msgLen); + + STransMsg transMsg = {0}; + transMsg.contLen = transContLenFromMsg(pHead->msgLen); + transMsg.pCont = transContFromHead((char*)pHead); + transMsg.code = pHead->code; + transMsg.msgType = pHead->msgType; + transMsg.info.ahandle = NULL; + transMsg.info.traceId = pHead->traceId; + transMsg.info.hasEpSet = pHead->hasEpSet; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); + + SCliMsg* pMsg = cliFindMsgBySeqnum(conn, pHead->seqNum); + pMsg->seqNum = 0; + + STransConnCtx* pCtx = pMsg->ctx; + transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; + STraceId* trace = &transMsg.info.traceId; + + int32_t ret = cliNotifyCb(conn, &transMsg, pMsg); + if (ret != 0) { + return; + } else { + destroyCmsg(pMsg); + } +} +void cliHandleResp(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pInst = pThrd->pInst; + + if (pInst->shareConn) { + return cliHandleResp_shareConn(conn); + } + cliResetConnTimer(conn); STransMsgHead* pHead = NULL; @@ -397,7 +506,7 @@ void cliHandleResp(SCliConn* conn) { } if (resetBuf == 0) { - tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn); + tTrace("%s conn %p not reset read buf", transLabel(pInst), conn); } if (transDecompressMsg((char**)&pHead, msgLen) < 0) { @@ -469,7 +578,7 @@ void cliHandleResp(SCliConn* conn) { } if (pMsg == NULL || (pMsg && pMsg->type != Release)) { - if (cliAppCb(conn, &transMsg, pMsg) != 0) { + if (cliNotifyCb(conn, &transMsg, pMsg) != 0) { return; } } @@ -517,7 +626,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { } } SCliThrd* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; bool once = false; do { SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); @@ -537,7 +646,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; - transMsg.info.cliVer = pTransInst->compatibilityVer; + transMsg.info.cliVer = pInst->compatibilityVer; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); @@ -567,7 +676,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (pMsg == NULL || (pMsg && pMsg->type != Release)) { int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); cliDestroyMsgInExhandle(refId); - if (cliAppCb(pConn, &transMsg, pMsg) != 0) { + if (cliNotifyCb(pConn, &transMsg, pMsg) != 0) { return; } } @@ -591,11 +700,7 @@ void cliConnTimeout(uv_timer_t* handle) { tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); - (void)uv_timer_stop(handle); - handle->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, UV_ECANCELED); } @@ -631,7 +736,7 @@ void* destroyConnPool(SCliThrd* pThrd) { transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); pMsg->ctx->task = NULL; - doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); + doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); } taosMemoryFree(msglist); @@ -644,7 +749,7 @@ void* destroyConnPool(SCliThrd* pThrd) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; - STrans* pTranInst = pThrd->pTransInst; + STrans* pTranInst = pThrd->pInst; size_t klen = strlen(key); SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { @@ -682,12 +787,13 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); conn->task = NULL; } + conn->seq++; return conn; } static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { void* pool = pThrd->pool; - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; size_t klen = strlen(key); SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { @@ -697,7 +803,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); if (nList == NULL) { - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + // doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } @@ -712,35 +818,34 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { // no avaliable conn in pool if (QUEUE_IS_EMPTY(&plist->conns)) { SMsgList* list = plist->list; - if ((list)->numOfConn >= pTransInst->connLimitNum) { + if ((list)->numOfConn >= pInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; - if (pTransInst->notWaitAvaliableConn || - (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) { - tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), + if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pMsg)->msg.msgType))) { + tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pMsg)->msg.msgType), tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); - doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); *pMsg = NULL; return NULL; } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } arg->param1 = *pMsg; arg->param2 = pThrd; - SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); if (task == NULL) { taosMemoryFree(arg); - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } (*pMsg)->ctx->task = task; - tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pMsg)->msg.msgType)); QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); *pMsg = NULL; } else { @@ -748,24 +853,23 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } arg->param1 = *pMsg; arg->param2 = pThrd; - SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); if (task == NULL) { taosMemoryFree(arg); - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } (*pMsg)->ctx->task = task; - tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, - TMSG_INFO((*pMsg)->msg.msgType)); + tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pMsg)->msg.msgType)); QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); queue* h = QUEUE_HEAD(&(list)->msgQ); @@ -775,12 +879,12 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { *pMsg = ans; trace = &(*pMsg)->msg.info.traceId; - tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pMsg)->msg.msgType)); transDQCancel(pThrd->waitConnQueue, ans->ctx->task); } list->numOfConn++; } - tDebug("%s numOfConn: %d, limit: %d, dst:%s", pTransInst->label, list->numOfConn, pTransInst->connLimitNum, key); + tDebug("%s numOfConn: %d, limit: %d, dst:%s", pInst->label, list->numOfConn, pInst->connLimitNum, key); return NULL; } @@ -810,17 +914,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { } SCliThrd* thrd = conn->hostThrd; - if (conn->timer != NULL) { - (void)uv_timer_stop(conn->timer); - (void)taosArrayPush(thrd->timerList, &conn->timer); - conn->timer->data = NULL; - conn->timer = NULL; - } + cliResetConnTimer(conn); if (T_REF_VAL_GET(conn) > 1) { transUnrefCliHandle(conn); } cliDestroyConnMsgs(conn, false); + conn->seq = 0; if (conn->list == NULL && conn->dstAddr != NULL) { conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); @@ -856,8 +956,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { arg->param1 = conn; arg->param2 = thrd; - STrans* pTransInst = thrd->pTransInst; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pTransInst->idleTime)); + STrans* pInst = thrd->pInst; + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pInst->idleTime)); } } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -897,6 +997,7 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { } static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { + if (handle == 0) return -1; if (update) { (void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -1018,8 +1119,20 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { timer->data = conn; conn->timer = timer; + conn->connReq.data = conn; + transReqQueueInit(&conn->wreqQueue); - (void)atomic_add_fetch_32(&pThrd->connCount, 1); + TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed); + + TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); + + QUEUE_INIT(&conn->q); + conn->hostThrd = pThrd; + conn->status = ConnNormal; + conn->broken = false; + transRefCliHandle(conn); + conn->seq = 0; + // allocConnRef(conn, false); TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); @@ -1067,12 +1180,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transDQCancel(pThrd->timeoutQueue, conn->task); conn->task = NULL; } - if (conn->timer != NULL) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - } + cliResetConnTimer(conn); if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { @@ -1087,14 +1195,7 @@ static void cliDestroy(uv_handle_t* handle) { } SCliConn* conn = handle->data; SCliThrd* pThrd = conn->hostThrd; - if (conn->timer != NULL) { - (void)uv_timer_stop(conn->timer); - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer->data = NULL; - conn->timer = NULL; - } - - (void)atomic_sub_fetch_32(&pThrd->connCount, 1); + cliResetConnTimer(conn); if (conn->refId > 0) { (void)transReleaseExHandle(transGetRefMgt(), conn->refId); @@ -1165,10 +1266,126 @@ static void cliSendCb(uv_write_t* req, int status) { } (void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } + +static void cliHandleBatch_shareConnExcept(SCliConn* conn) { + int32_t code = -1; + SCliThrd* pThrd = conn->hostThrd; + STrans* pInst = pThrd->pInst; + while (!transQueueEmpty(&conn->cliMsgs)) { + SCliMsg* pMsg = transQueuePop(&conn->cliMsgs); + ASSERT(pMsg->type != Release); + ASSERT(REQUEST_NO_RESP(&pMsg->msg) == 0); + + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; + + STransMsg transMsg = {0}; + transMsg.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; + transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + transMsg.info.ahandle = NULL; + transMsg.info.cliVer = pInst->compatibilityVer; + transMsg.info.ahandle = pCtx->ahandle; + + pMsg->seqNum = 0; + code = cliNotifyCb(conn, &transMsg, pMsg); + if (code != 0) { + continue; + } else { + // already notify user + destroyCmsg(pMsg); + } + } + + if (T_REF_VAL_GET(conn) > 1) transUnrefCliHandle(conn); + transUnrefCliHandle(conn); +} +static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { + SCliConn* conn = req->data; + conn->shareCnt -= 1; + if (status != 0) { + tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); + if (!uv_is_closing((uv_handle_t*)&conn->stream)) { + cliHandleBatch_shareConnExcept(conn); + } + return; + } + uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + taosMemoryFree(req); +} +void cliSendBatch_shareConn(SCliConn* pConn) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + int32_t size = transQueueSize(&pConn->cliMsgs); + + int32_t totalLen = 0; + if (size == 0) { + tError("%s conn %p not msg to send", pInst->label, pConn); + ASSERT(0); + // cliHandleExcept(pConn); + return; + } + uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); + + int j = 0; + for (int i = 0; i < size; i++) { + SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, i); + if (pCliMsg->sent == 1) { + continue; + } + STransConnCtx* pCtx = pCliMsg->ctx; + pConn->seq++; + + STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } + + int msgLen = transMsgLenFromCont(pMsg->contLen); + + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + + if (pHead->comp == 0) { + pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; + pHead->msgType = pMsg->msgType; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; + memcpy(pHead->user, pInst->user, strlen(pInst->user)); + pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pInst->compatibilityVer); + } + pHead->timestamp = taosHton64(taosGetTimestampUs()); + pHead->seqNum = pConn->seq; + + if (pHead->comp == 0) { + if (pInst->compressSize != -1 && pInst->compressSize < pMsg->contLen) { + msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + } else { + msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); + } + wb[j++] = uv_buf_init((char*)pHead, msgLen); + totalLen += msgLen; + + pCliMsg->sent = 1; + pCliMsg->seqNum = pHead->seqNum; + } + uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); + req->data = pConn; + pConn->shareCnt += 1; + tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, + totalLen); + uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); + taosMemoryFree(wb); +} void cliSendBatch(SCliConn* pConn) { int32_t code = 0; SCliThrd* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; SCliBatch* pBatch = pConn->pBatch; int32_t wLen = pBatch->wLen; @@ -1210,16 +1427,16 @@ void cliSendBatch(SCliConn* pConn) { pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; - memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); + memcpy(pHead->user, pInst->user, strlen(pInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; - pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); + pHead->compatibilityVer = htonl(pInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { - if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + if (pInst->compressSize != -1 && pInst->compressSize < pMsg->contLen) { msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } @@ -1256,10 +1473,10 @@ _exception: } void cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; if (transQueueEmpty(&pConn->cliMsgs)) { - tError("%s conn %p not msg to send", pTransInst->label, pConn); + tError("%s conn %p not msg to send", pInst->label, pConn); cliHandleExcept(pConn, -1); return; } @@ -1287,11 +1504,11 @@ void cliSend(SCliConn* pConn) { pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; - memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); + memcpy(pHead->user, pInst->user, strlen(pInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; - pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); + pHead->compatibilityVer = htonl(pInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); @@ -1301,22 +1518,19 @@ void cliSend(SCliConn* pConn) { STraceId* trace = &pMsg->info.traceId; - // if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { - // uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : - // NULL; if (timer == NULL) { - // timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); - // tDebug("no available timer, create a timer %p", timer); - // (void)uv_timer_init(pThrd->loop, timer); - // } - // timer->data = pConn; - // pConn->timer = timer; - - // tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType)); - // (void)uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); - // } + if (pInst->startTimer != NULL && pInst->startTimer(0, pMsg->msgType)) { + uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; + if (timer == NULL) { + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + tDebug("no available timer, create a timer %p", timer); + (void)uv_timer_init(pThrd->loop, timer); + } + timer->data = pConn; + pConn->timer = timer; + } if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { - if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + if (pInst->compressSize != -1 && pInst->compressSize < pMsg->contLen) { msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } @@ -1360,98 +1574,103 @@ static void cliDestroyBatch(SCliBatch* pBatch) { p->sending -= 1; taosMemoryFree(pBatch); } + +static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { + STrans* pInst = pThrd->pInst; + uint32_t ipaddr; + int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr); + if (code != 0) { + cliResetConnTimer(conn); + if (conn->pBatch != NULL) { + cliHandleFastFail(conn, -1); + } else { + cliHandleBatch_shareConnExcept(conn); + } + + return; + } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = ipaddr; + addr.sin_port = (uint16_t)htons(port); + + tTrace("%s conn %p try to connect to %s", pInst->label, conn, conn->dstAddr); + + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); + if (fd == -1) { + tError("%s conn %p failed to create socket, reason:%s", transLabel(pInst), conn, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + cliHandleFastFail(conn, -1); + return; + } + + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); + if (ret != 0) { + tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); + return; + } + ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); + if (ret != 0) { + tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); + return; + } + + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + if (ret != 0) { + cliResetConnTimer(conn); + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + cliHandleFastFail(conn, -1); + return; + } + + ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + if (ret != 0) { + tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); + cliResetConnTimer(conn); + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + cliHandleFastFail(conn, -1); + return; + } + return; +} static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { + return; + } + int32_t code = 0; if (pThrd->quit == true) { cliDestroyBatch(pBatch); return; } - if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { - return; - } - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; SCliBatchList* pList = pBatch->pList; - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); - bool exceed = false; - SCliConn* conn = getConnFromPool(pThrd, key, &exceed); + SCliConn* conn = getConnFromPool(pThrd, pList->dst, &exceed); if (conn == NULL && exceed) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen, - pBatch->batchSize, pTransInst->connLimitNum); + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pInst->label, pBatch->wLen, + pBatch->batchSize, pInst->connLimitNum); cliDestroyBatch(pBatch); return; } if (conn == NULL) { code = cliCreateConn(pThrd, &conn); if (code != 0) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pTransInst->label, - pBatch->wLen, pBatch->batchSize, pTransInst->connLimitNum, tstrerror(code)); + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label, + pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code)); cliDestroyBatch(pBatch); return; } conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); - if (conn->dstAddr == NULL) { - tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - uv_close((uv_handle_t*)conn->stream, cliDestroy); - return; - } - - uint32_t ipaddr = 0; - if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - cliHandleFastFail(conn, code); - return; - } - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = ipaddr; - addr.sin_port = (uint16_t)htons(pList->port); - - tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); - int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); - if (fd == -1) { - tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - cliHandleFastFail(conn, -1); - return; - } - int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); - if (ret != 0) { - tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); - cliHandleFastFail(conn, -1); - return; - } - ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); - if (ret != 0) { - tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); - cliHandleFastFail(conn, -1); - return; - } - - ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); - if (ret != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, -1); - return; - } - (void)uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); - return; + return cliDoConn(pThrd, conn, pList->ip, pList->port); } conn->pBatch = pBatch; @@ -1462,13 +1681,11 @@ static void cliSendBatchCb(uv_write_t* req, int status) { SCliConn* conn = req->data; SCliThrd* thrd = conn->hostThrd; SCliBatch* p = conn->pBatch; - - SCliBatchList* pBatchList = p->pList; - SCliBatch* nxtBatch = cliGetHeadFromList(pBatchList); - pBatchList->connCnt -= 1; - conn->pBatch = NULL; + SCliBatch* nxtBatch = cliGetHeadFromList(p->pList); + p->pList->connCnt -= 1; + if (status != 0) { tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize, uv_err_name(status)); @@ -1488,48 +1705,36 @@ static void cliSendBatchCb(uv_write_t* req, int status) { } } else { cliDestroyBatch(nxtBatch); - // conn release by other callback } } cliDestroyBatch(p); taosMemoryFree(req); } -static void cliHandleFastFail(SCliConn* pConn, int status) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; +static void cliHandleFastFail_resp(SCliConn* pConn, int status) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + + STraceId* trace = &pMsg->msg.info.traceId; + tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), + TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); +} + +static void cliHandleFastFail_noresp(SCliConn* pConn, int status) { + tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), pConn, + pConn->dstAddr, uv_strerror(status)); + cliDestroyBatch(pConn->pBatch); + pConn->pBatch = NULL; +} +static void cliHandleFastFail(SCliConn* pConn, int status) { if (status == -1) status = UV_EADDRNOTAVAIL; if (pConn->pBatch == NULL) { - SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); - - STraceId* trace = &pMsg->msg.info.traceId; - tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); - - if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && - (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr)); - int64_t cTimestamp = taosGetTimestampMs(); - if (item != NULL) { - int32_t elapse = cTimestamp - item->timestamp; - if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { - item->count++; - } else { - item->count = 1; - item->timestamp = cTimestamp; - } - } else { - SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - (void)taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); - } - } + cliHandleFastFail_resp(pConn, status); } else { - tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - pConn, pConn->dstAddr, uv_strerror(status)); - cliDestroyBatch(pConn->pBatch); - pConn->pBatch = NULL; + cliHandleFastFail_noresp(pConn, status); } cliHandleExcept(pConn, status); } @@ -1542,10 +1747,7 @@ void cliConnCb(uv_connect_t* req, int status) { if (pConn->timer == NULL) { timeout = true; } else { - (void)uv_timer_stop(pConn->timer); - pConn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &pConn->timer); - pConn->timer = NULL; + cliResetConnTimer(pConn); } STUB_RAND_NETWORK_ERR(status); @@ -1577,15 +1779,18 @@ void cliConnCb(uv_connect_t* req, int status) { tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); if (pConn->pBatch != NULL) { - cliSendBatch(pConn); - } else { - cliSend(pConn); + return cliSendBatch(pConn); } + if (pConn->inHeap) { + return cliSendBatch_shareConn(pConn); + } + + return cliSend(pConn); } -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { +static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { STransConnCtx* pCtx = pMsg->ctx; - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; STransMsg transMsg = {0}; transMsg.contLen = 0; @@ -1595,14 +1800,14 @@ static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.traceId = pMsg->msg.info.traceId; transMsg.info.hasEpSet = false; - transMsg.info.cliVer = pTransInst->compatibilityVer; + transMsg.info.cliVer = pInst->compatibilityVer; if (pCtx->pSem != NULL) { if (pCtx->pRsp == NULL) { } else { memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); } } else { - pTransInst->cfp(pTransInst->parent, &transMsg, NULL); + pInst->cfp(pInst->parent, &transMsg, NULL); } destroyCmsg(pMsg); @@ -1718,26 +1923,34 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { - tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pTransInst)->label, pThrd->pool, addr); + tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pInst)->label, pThrd->pool, addr); } return conn; } -FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { +FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { if (pCvtAddr->cvt == false) { - return; + if (EPSET_IS_VALID(pEpSet)) { + return 0; + } else { + return TSDB_CODE_RPC_FQDN_ERROR; + } } if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN); memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); } + if (EPSET_IS_VALID(pEpSet)) { + return 0; + } + return TSDB_CODE_RPC_FQDN_ERROR; } FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { if (code != 0) return false; - // if (pCtx->retryCnt == 0) return false; - if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; - return true; + + return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; } + FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { if (pMsg == NULL) return -1; @@ -1751,6 +1964,20 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { return 0; } +FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code) { + STrans* pInst = pThrd->pInst; + + STransMsg resp = {.code = code}; + code = cliBuildExceptResp(pMsg, &resp); + if (code != 0) { + return code; + } + resp.info.cliVer = pInst->compatibilityVer; + pInst->cfp(pInst->parent, &resp, NULL); + + return 0; +} + static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { int32_t code = 0; uint32_t addr = 0; @@ -1811,25 +2038,133 @@ static void doFreeTimeoutMsg(void* param) { STaskArg* arg = param; SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; - STrans* pTransInst = pThrd->pTransInst; - int32_t code = TSDB_CODE_RPC_MAX_SESSIONS; + STrans* pInst = pThrd->pInst; + QUEUE_REMOVE(&pMsg->q); STraceId* trace = &pMsg->msg.info.traceId; - tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); - doNotifyApp(pMsg, pThrd, code); + + tGTrace("%s msg %s cannot get available conn after timeout", pInst->label, TMSG_INFO(pMsg->msg.msgType)); + doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); + taosMemoryFree(arg); } -void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { +static int32_t getOrCreateHeapIfNotExist(SHashObj* pConnHeapCache, char* key, SHeap** pHeap) { int32_t code = 0; - STrans* pTransInst = pThrd->pTransInst; + size_t klen = strlen(key); - cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); - if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { + SHeap* p = taosHashGet(pConnHeapCache, key, klen); + if (p == NULL) { + SHeap heap = {0}; + code = transHeapInit(&heap, compareHeapNode); + if (code != 0) { + tError("failed to init heap cache for key:%s, reason: %s", key, tstrerror(code)); + return code; + } + + code = taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap)); + if (code != 0) { + transHeapDestroy(&heap); + tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(code)); + } + p = taosHashGet(pConnHeapCache, key, klen); + if (p == NULL) { + code = TSDB_CODE_INVALID_PARA; + } + } + *pHeap = p; + return code; +} + +static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { + int code = 0; + SHeap* pHeap = NULL; + SCliConn* pConn = NULL; + code = getOrCreateHeapIfNotExist(pConnHeapCache, key, &pHeap); + if (code != 0) { + tDebug("failed to get conn heap from cache for key:%s", key); + return NULL; + } + code = transHeapGet(pHeap, &pConn); + if (code != 0) { + tDebug("failed to get conn from heap cache for key:%s", key); + return NULL; + } + return pConn; +} +static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { + SHeap* p = NULL; + + int32_t code = getOrCreateHeapIfNotExist(pConnHeapCacahe, pConn->dstAddr, &p); + if (code != 0) { + return code; + } + return transHeapInsert(p, pConn); +} + +static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { + SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr)); + if (p == NULL) { + tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr); + return 0; + } + int32_t code = transHeapDelete(p, pConn); + if (code != 0) { + tDebug("failed to delete conn %p from heap cache since %s", pConn, tstrerror(code)); + } + return code; +} + +void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { + int32_t code = 0; + + STraceId* trace = &pMsg->msg.info.traceId; + STrans* pInst = pThrd->pInst; + + code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (code != 0) { + // notifyCb destroyCmsg(pMsg); return; } + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); + + SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); + if (pConn == NULL) { + tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + bool ignore = false; + pConn = getConnFromPool(pThrd, addr, &ignore); + if (pConn != NULL) { + addConnToHeapCache(pThrd->connHeapCache, pConn); + transQueuePush(&pConn->cliMsgs, pMsg); + return cliSendBatch_shareConn(pConn); + } + } else { + tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + transQueuePush(&pConn->cliMsgs, pMsg); + cliSendBatch_shareConn(pConn); + return; + } + + code = cliCreateConn(pThrd, &pConn); + pConn->dstAddr = taosStrdup(addr); + code = addConnToHeapCache(pThrd->connHeapCache, pConn); + + transQueuePush(&pConn->cliMsgs, pMsg); + return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); +} + +void cliHandleReq__noShareConn(SCliMsg* pMsg, SCliThrd* pThrd) { + int32_t code; + STrans* pInst = pThrd->pInst; + code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (code != 0) { + // notifyCb + destroyCmsg(pMsg); + } + char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; @@ -1840,12 +2175,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { if (ignore == true) { // persist conn already release by server STransMsg resp = {0}; - (void)cliBuildExceptResp(pMsg, &resp); - // refactorr later - resp.info.cliVer = pTransInst->compatibilityVer; - if (pMsg->type != Release) { - pTransInst->cfp(pTransInst->parent, &resp, NULL); + (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, 0); } destroyCmsg(pMsg); return; @@ -1862,27 +2193,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { } else { code = cliCreateConn(pThrd, &conn); if (code != 0) { - tError("%s failed to create conn, reason:%s", pTransInst->label, tstrerror(code)); - STransMsg resp = {.code = code}; - (void)cliBuildExceptResp(pMsg, &resp); - - resp.info.cliVer = pTransInst->compatibilityVer; - if (pMsg->type != Release) { - pTransInst->cfp(pTransInst->parent, &resp, NULL); - } + tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code)); + (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, code); destroyCmsg(pMsg); return; } - int64_t refId = (int64_t)pMsg->msg.info.handle; - if (refId != 0) specifyConnRef(conn, true, refId); + specifyConnRef(conn, true, (int64_t)pMsg->msg.info.handle); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); (void)transQueuePush(&conn->cliMsgs, pMsg); conn->dstAddr = taosStrdup(addr); if (conn->dstAddr == NULL) { - tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn, + tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pInst), conn, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); cliHandleFastFail(conn, -1); return; @@ -1891,11 +2215,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { uint32_t ipaddr; int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); if (code != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliHandleExcept(conn, code); return; } @@ -1905,10 +2225,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_addr.s_addr = ipaddr; addr.sin_port = (uint16_t)htons(port); - tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); + tGTrace("%s conn %p try to connect to %s", pInst->label, conn, conn->dstAddr); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { - tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, + tGError("%s conn %p failed to create socket, reason:%s", transLabel(pInst), conn, tstrerror(TAOS_SYSTEM_ERROR(errno))); cliHandleExcept(conn, -1); errno = 0; @@ -1917,35 +2237,40 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { - tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + tGError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); cliHandleExcept(conn, -1); return; } ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); if (ret != 0) { - tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); cliHandleExcept(conn, -1); return; } ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, ret); return; } (void)uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } - tGTrace("%s conn %p ready", pTransInst->label, conn); + tGTrace("%s conn %p ready", pInst->label, conn); } -static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { +void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { + STrans* pInst = pThrd->pInst; + if (pInst->shareConn == 1) { + return cliHandleReq__shareConn(pMsg, pThrd); + } else { + return cliHandleReq__noShareConn(pMsg, pThrd); + } +} + +static void cliDealReq(queue* wq, SCliThrd* pThrd) { int count = 0; while (!QUEUE_IS_EMPTY(wq)) { @@ -1959,7 +2284,6 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { continue; } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); - count++; } if (count >= 2) { @@ -1979,7 +2303,11 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); return batch; } - +static void cliBuildBatch(SCliMsg* pMsg, queue* h, SCliThrd* pThrd) { + STrans* pInst = pThrd->pInst; + STransConnCtx* pCtx = pMsg->ctx; + return; +} static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); if (pBatchList == NULL) { @@ -2042,7 +2370,7 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliMsg* p return 0; } static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { - STrans* pInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; int32_t code = 0; int count = 0; @@ -2052,8 +2380,8 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - if (pMsg->type == Quit) { - pThrd->stopMsg = pMsg; + if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { + cliBuildBatch(pMsg, h, pThrd); continue; } @@ -2073,6 +2401,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { destroyCmsg(pMsg); continue; } + pBatchList->batchLenLimit = pInst->batchSize; SCliBatch* pBatch = NULL; @@ -2135,7 +2464,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SCliThrd* pThrd = item->pThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; // batch process to avoid to lock/unlock frequently queue wq; @@ -2143,42 +2472,10 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); (void)taosThreadMutexUnlock(&item->mtx); - int8_t supportBatch = pTransInst->supportBatch; - if (supportBatch == 0) { - cliNoBatchDealReq(&wq, pThrd); - } else if (supportBatch == 1) { - cliBatchDealReq(&wq, pThrd); - } + cliDealFunc[pInst->supportBatch](&wq, pThrd); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } -static void cliPrepareCb(uv_prepare_t* handle) { - SCliThrd* thrd = handle->data; - tTrace("prepare work start"); - - SAsyncPool* pool = thrd->asyncPool; - for (int i = 0; i < pool->nAsync; i++) { - uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; - - queue wq; - (void)taosThreadMutexLock(&item->mtx); - QUEUE_MOVE(&item->qmsg, &wq); - (void)taosThreadMutexUnlock(&item->mtx); - - int count = 0; - while (!QUEUE_IS_EMPTY(&wq)) { - queue* h = QUEUE_HEAD(&wq); - QUEUE_REMOVE(h); - - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - (*cliAsyncHandle[pMsg->type])(pMsg, thrd); - count++; - } - } - tTrace("prepare work end"); - if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); -} void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { transCtxCleanup(&conn->ctx); @@ -2192,7 +2489,7 @@ void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { void cliConnFreeMsgs(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pInst = pThrd->pInst; for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i); @@ -2200,12 +2497,9 @@ void cliConnFreeMsgs(SCliConn* conn) { continue; } - STransMsg resp = {0}; - if (-1 == cliBuildExceptResp(cmsg, &resp)) { + if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) { continue; } - resp.info.cliVer = pTransInst->compatibilityVer; - pTransInst->cfp(pTransInst->parent, &resp, NULL); cmsg->ctx->ahandle = NULL; } @@ -2249,7 +2543,7 @@ static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); tsEnableRandErr = true; - (void)strtolower(threadName, pThrd->pTransInst->label); + (void)strtolower(threadName, pThrd->pInst->label); setThreadName(threadName); (void)uv_run(pThrd->loop, UV_RUN_DEFAULT); @@ -2265,7 +2559,7 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); } - STrans* pTransInst = shandle; + STrans* pInst = shandle; memcpy(cli->label, label, TSDB_LABEL_LEN); cli->numOfThreads = numOfThreads; @@ -2345,7 +2639,7 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { int32_t code = 0; - STrans* pTransInst = trans; + STrans* pInst = trans; SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd)); if (pThrd == NULL) { @@ -2366,26 +2660,13 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); } - int32_t nSync = pTransInst->supportBatch ? 4 : 8; + int32_t nSync = pInst->supportBatch ? 4 : 8; code = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb, &pThrd->asyncPool); if (code != 0) { tError("failed to init async pool since:%s", tstrerror(code)); TAOS_CHECK_GOTO(code, NULL, _end); } - pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - if (pThrd->prepare == NULL) { - tError("failed to create prepre since:%s", tstrerror(code)); - TAOS_CHECK_GOTO(code, NULL, _end); - } - - code = uv_prepare_init(pThrd->loop, pThrd->prepare); - if (code != 0) { - tError("failed to create prepre since:%s", uv_err_name(code)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); - } - pThrd->prepare->data = pThrd; - int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); if (pThrd->timerList == NULL) { @@ -2419,11 +2700,13 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(code, NULL, _end); } - pThrd->destroyAhandleFp = pTransInst->destroyFp; - pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->destroyAhandleFp = pInst->destroyFp; + + pThrd->fqdn2ipCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->fqdn2ipCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } + pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->failFastCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); @@ -2434,8 +2717,13 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); - pThrd->pTransInst = trans; + pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->connHeapCache == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pInst->idleTime); + pThrd->pInst = trans; pThrd->quit = false; *ppThrd = pThrd; @@ -2445,7 +2733,6 @@ _end: if (pThrd) { (void)uv_loop_close(pThrd->loop); taosMemoryFree(pThrd->loop); - taosMemoryFree(pThrd->prepare); (void)taosThreadMutexDestroy(&pThrd->msgMtx); transAsyncPoolDestroy(pThrd->asyncPool); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { @@ -2486,7 +2773,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); - taosHashCleanup(pThrd->failFastCache); void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { @@ -2505,6 +2791,15 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); + + void** pIter2 = taosHashIterate(pThrd->connHeapCache, NULL); + while (pIter2 != NULL) { + SHeap* heap = (SHeap*)(*pIter2); + transHeapDestroy(heap); + pIter2 = (void**)taosHashIterate(pThrd->connHeapCache, pIter2); + } + taosHashCleanup(pThrd->connHeapCache); + taosMemoryFree(pThrd); } @@ -2542,18 +2837,18 @@ void cliWalkCb(uv_handle_t* handle, void* arg) { } } -FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) { - int32_t index = pTransInst->index; - if (pTransInst->numOfThreads == 0) { +FORCE_INLINE int cliRBChoseIdx(STrans* pInst) { + int32_t index = pInst->index; + if (pInst->numOfThreads == 0) { return -1; } /* - * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000; + * no lock, and to avoid CPU load imbalance, set limit pInst->numOfThreads * 2000; */ - if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) { - pTransInst->index = 0; + if (pInst->index++ >= pInst->numOfThreads * 2000) { + pInst->index = 0; } - return index % pTransInst->numOfThreads; + return index % pInst->numOfThreads; } static FORCE_INLINE void doDelayTask(void* param) { STaskArg* arg = param; @@ -2561,7 +2856,7 @@ static FORCE_INLINE void doDelayTask(void* param) { taosMemoryFree(arg); } -static void doCloseIdleConn(void* param) { +static FORCE_INLINE void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); @@ -2569,7 +2864,7 @@ static void doCloseIdleConn(void* param) { cliDestroyConn(conn, true); taosMemoryFree(arg); } -static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { +static FORCE_INLINE void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) { if (!(rpcDebugFlag & DEBUG_DEBUG)) { return; } @@ -2577,21 +2872,21 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { STraceId* trace = &pMsg->msg.info.traceId; char tbuf[512] = {0}; (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, pCtx->retryNextInterval); return; } -static int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pTransInst = pThrd->pTransInst; +static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { + STrans* pInst = pThrd->pInst; STransConnCtx* pCtx = pMsg->ctx; - cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); + cliPerfLog_schedMsg(pMsg, transLabel(pThrd->pInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - arg->param1 = pMsg; arg->param2 = pThrd; @@ -2693,42 +2988,58 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) { } return noDelay; } -bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; - STransConnCtx* pCtx = pMsg->ctx; - int32_t code = pResp->code; - - bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false; +int8_t cliRetryShouldRetry(STrans* pInst, STransMsg* pResp) { + bool retry = pInst->retry != NULL ? pInst->retry(pResp->code, pResp->msgType - 1) : false; if (retry == false) { - return false; + return 0; } + return 1; +} +void cliRetryMayInitCtx(STrans* pInst, SCliMsg* pMsg) { + STransConnCtx* pCtx = pMsg->ctx; if (!pCtx->retryInit) { - pCtx->retryMinInterval = pTransInst->retryMinInterval; - pCtx->retryMaxInterval = pTransInst->retryMaxInterval; - pCtx->retryStepFactor = pTransInst->retryStepFactor; - pCtx->retryMaxTimeout = pTransInst->retryMaxTimeout; + pCtx->retryMinInterval = pInst->retryMinInterval; + pCtx->retryMaxInterval = pInst->retryMaxInterval; + pCtx->retryStepFactor = pInst->retryStepFactor; + pCtx->retryMaxTimeout = pInst->retryMaxTimeout; pCtx->retryInitTimestamp = taosGetTimestampMs(); pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryStep = 0; pCtx->retryInit = true; pCtx->retryCode = TSDB_CODE_SUCCESS; - - // already retry, not use handle specified by app; pMsg->msg.info.handle = 0; } +} +int32_t cliRetryIsTimeout(STrans* pInst, SCliMsg* pMsg) { + STransConnCtx* pCtx = pMsg->ctx; + if (pCtx->retryMaxTimeout != -1 && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { + return 1; + } + return 0; +} +bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; - if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { + STransConnCtx* pCtx = pMsg->ctx; + int32_t code = pResp->code; + + cliRetryMayInitCtx(pInst, pMsg); + + if (!cliRetryShouldRetry(pInst, pResp)) { return false; } + if (cliRetryIsTimeout(pInst, pMsg)) { + return false; + } // code, msgType - // A: epset, leader, not self - // B: epset, not know leader - // C: no epset, leader but not serivce + // A: epset,leader, not self + // B: epset,not know leader + // C: noepset,leader but not serivce bool noDelay = false; if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { @@ -2783,23 +3094,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } return true; } -int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; - - if (pMsg == NULL || pMsg->ctx == NULL) { - tTrace("%s conn %p handle resp", pTransInst->label, pConn); - pTransInst->cfp(pTransInst->parent, pResp, NULL); - return 0; - } - - STransConnCtx* pCtx = pMsg->ctx; - - bool retry = cliGenRetryRule(pConn, pResp, pMsg); - if (retry == true) { - return -1; - } - +void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) { if (pCtx->retryCode != TSDB_CODE_SUCCESS) { int32_t code = pResp->code; // return internal code app @@ -2817,6 +3112,24 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK; } } +} +int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + + if (pMsg == NULL || pMsg->ctx == NULL) { + tTrace("%s conn %p handle resp", pInst->label, pConn); + pInst->cfp(pInst->parent, pResp, NULL); + return 0; + } + + bool retry = cliGenRetryRule(pConn, pResp, pMsg); + if (retry == true) { + return -1; + } + + STransConnCtx* pCtx = pMsg->ctx; + cliMayReSetRespCode(pCtx, pResp); STraceId* trace = &pResp->info.traceId; bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); @@ -2854,12 +3167,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } else { tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (retry == false && hasEpSet == true) { - pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet); + pInst->cfp(pInst->parent, pResp, &pCtx->epSet); } else { if (!cliIsEpsetUpdated(pResp->code, pCtx)) { - pTransInst->cfp(pTransInst->parent, pResp, NULL); + pInst->cfp(pInst->parent, pResp, NULL); } else { - pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet); + pInst->cfp(pInst->parent, pResp, &pCtx->epSet); } } } @@ -2967,6 +3280,7 @@ int32_t transReleaseCliHandle(void* handle) { static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliMsg** pCliMsg) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); if (pCtx == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -2999,17 +3313,17 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq } int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pInst == NULL) { transFreeMsg(pReq->pCont); pReq->pCont = NULL; return TSDB_CODE_RPC_MODULE_QUIT; } int32_t code = 0; int64_t handle = (int64_t)pReq->info.handle; - SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); + SCliThrd* pThrd = transGetWorkThrd(pInst, handle); if (pThrd == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception;); + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception); } if (handle != 0) { @@ -3042,7 +3356,7 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception); STraceId* trace = &pReq->info.traceId; - tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { destroyCmsg(pCliMsg); @@ -3065,14 +3379,14 @@ int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* p } int32_t code = 0; - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pInst == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception); } TAOS_CHECK_GOTO(transAllocHandle(transpointId), NULL, _exception); - SCliThrd* pThrd = transGetWorkThrd(pTransInst, *transpointId); + SCliThrd* pThrd = transGetWorkThrd(pInst, *transpointId); if (pThrd == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception); } @@ -3088,7 +3402,7 @@ int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* p TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception); STraceId* trace = &pReq->info.traceId; - tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { destroyCmsg(pCliMsg); @@ -3106,8 +3420,8 @@ _exception: } int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pInst == NULL) { transFreeMsg(pReq->pCont); pReq->pCont = NULL; return TSDB_CODE_RPC_MODULE_QUIT; @@ -3119,7 +3433,7 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); } - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); + SCliThrd* pThrd = transGetWorkThrd(pInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN1); } @@ -3166,7 +3480,7 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; - tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); @@ -3230,8 +3544,8 @@ _EXIT: int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { int32_t code = 0; - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pInst == NULL) { transFreeMsg(pReq->pCont); pReq->pCont = NULL; return TSDB_CODE_RPC_MODULE_QUIT; @@ -3242,7 +3556,7 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); } - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); + SCliThrd* pThrd = transGetWorkThrd(pInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN2); } @@ -3284,7 +3598,7 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; - tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); @@ -3325,8 +3639,8 @@ _RETURN2: int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { if (ip == NULL || fqdn == NULL) return TSDB_CODE_INVALID_PARA; - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pInst == NULL) { return TSDB_CODE_RPC_MODULE_QUIT; } @@ -3336,7 +3650,7 @@ int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { cvtAddr.cvt = true; int32_t code = 0; - for (int8_t i = 0; i < pTransInst->numOfThreads; i++) { + for (int8_t i = 0; i < pInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); if (pCtx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -3356,8 +3670,8 @@ int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { cliMsg->type = Update; cliMsg->refId = (int64_t)shandle; - SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; - tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid); + SCliThrd* thrd = ((SCliObj*)pInst->tcphandle)->pThreadObj[i]; + tDebug("%s update epset at thread:%08" PRId64, pInst->label, thrd->pid); if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) { destroyCmsg(cliMsg); @@ -3398,8 +3712,8 @@ int32_t transAllocHandle(int64_t* refId) { } int32_t transFreeConnById(void* shandle, int64_t transpointId) { int32_t code = 0; - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pInst == NULL) { return TSDB_CODE_RPC_MODULE_QUIT; } if (transpointId == 0) { @@ -3407,7 +3721,7 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) { TAOS_CHECK_GOTO(0, NULL, _exception); } - SCliThrd* pThrd = transGetWorkThrdFromHandle(pTransInst, transpointId); + SCliThrd* pThrd = transGetWorkThrdFromHandle(pInst, transpointId); if (pThrd == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); } @@ -3433,3 +3747,58 @@ _exception: transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return code; } + +// conn heap +int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { + SCliConn* args1 = container_of(a, SCliConn, node); + SCliConn* args2 = container_of(b, SCliConn, node); + if (transQueueSize(&args1->cliMsgs) > transQueueSize(&args2->cliMsgs)) { + return 0; + } + return 1; +} +int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)) { + heap->heap = heapCreate(cmpFunc); + if (heap->heap == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + heap->cmpFunc = cmpFunc; + return 0; +} +void transHeapDestroy(SHeap* heap) { + if (heap != NULL) { + heapDestroy(heap->heap); + } +} +int32_t transHeapGet(SHeap* heap, SCliConn** p) { + if (heapSize(heap->heap) == 0) { + *p = NULL; + return -1; + } + HeapNode* minNode = heapMin(heap->heap); + if (minNode == NULL) { + *p = NULL; + return -1; + } + *p = container_of(minNode, SCliConn, node); + return 0; +} +int32_t transHeapInsert(SHeap* heap, SCliConn* p) { + // impl later + if (p->inHeap == 1) { + return TSDB_CODE_DUP_KEY; + } + + heapInsert(heap->heap, &p->node); + p->inHeap = 1; + return 0; +} +int32_t transHeapDelete(SHeap* heap, SCliConn* p) { + // impl later + if (p->inHeap == 0) { + return TSDB_CODE_INVALID_PARA; + } + heapRemove(heap->heap, &p->node); + return 0; +} diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 85d7470871..5dc375da24 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -330,6 +330,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { (void)taosThreadMutexLock(&item->mtx); QUEUE_PUSH(&item->qmsg, q); (void)taosThreadMutexUnlock(&item->mtx); + int ret = uv_async_send(async); if (ret != 0) { tError("failed to send async,reason:%s", uv_err_name(ret)); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d4c98591d7..9c942c6d00 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -33,8 +33,8 @@ typedef struct SSvrConn { queue queue; SConnBuffer readBuf; // read buf, int inType; - void* pTransInst; // rpc init - void* ahandle; // + void* pInst; // rpc init + void* ahandle; // void* hostThrd; STransQueue srvMsgs; @@ -92,7 +92,7 @@ typedef struct SWorkThrd { queue msg; queue conn; - void* pTransInst; + void* pInst; bool quit; SIpWhiteListTab* pWhiteList; @@ -369,8 +369,66 @@ void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) { pConn->whiteListVer = pWhite->ver; } +static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMsg) { + if (!(rpcDebugFlag & DEBUG_DEBUG)) { + return; + } + + STrans* pInst = pConn->pInst; + STraceId* trace = &pHead->traceId; + + int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp); + static int64_t EXCEPTION_LIMIT_US = 100 * 1000; + + if (pConn->status == ConnNormal && pHead->noResp == 0) { + // transRefSrvHandle(pConn); + if (cost >= EXCEPTION_LIMIT_US) { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pInst), + pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost); + } else { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pInst), pConn, + TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost); + } + } else { + if (cost >= EXCEPTION_LIMIT_US) { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", + transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + pHead->noResp, pTransMsg->code, (int)(cost)); + } else { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pInst), + pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp, + pTransMsg->code, (int)(cost)); + } + } + tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pInst), pTransMsg->info.handle, pConn, + pConn->refId); +} + +static int8_t uvValidConn(SSvrConn* pConn) { + STrans* pInst = pConn->pInst; + SWorkThrd* pThrd = pConn->hostThrd; + int8_t forbiddenIp = 0; + if (pThrd->enableIpWhiteList) { + forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0; + if (forbiddenIp == 0) { + uvWhiteListSetConnVer(pThrd->pWhiteList, pConn); + } + } + return forbiddenIp; +} +static void uvMaySetConnAcquired(SSvrConn* pConn, STransMsgHead* pHead) { + if (pConn->status == ConnNormal) { + if (pHead->persist == 1) { + pConn->status = ConnAcquire; + transRefSrvHandle(pConn); + tDebug("conn %p acquired by server app", pConn); + } else if (pHead->noResp == 0) { + transRefSrvHandle(pConn); + } + } +} static bool uvHandleReq(SSvrConn* pConn) { - STrans* pTransInst = pConn->pTransInst; + STrans* pInst = pConn->pInst; SWorkThrd* pThrd = pConn->hostThrd; STransMsgHead* pHead = NULL; @@ -378,15 +436,15 @@ static bool uvHandleReq(SSvrConn* pConn) { int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf); if (msgLen <= 0) { - tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + tError("%s conn %p read invalid packet", transLabel(pInst), pConn); return false; } if (resetBuf == 0) { - tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn); + tTrace("%s conn %p not reset read buf", transLabel(pInst), pConn); } if (transDecompressMsg((char**)&pHead, msgLen) < 0) { - tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); + tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn); return false; } pHead->code = htonl(pHead->code); @@ -407,75 +465,32 @@ static bool uvHandleReq(SSvrConn* pConn) { return true; } - // TODO(dengyihao): time-consuming task throwed into BG Thread - // uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t)); - // wreq->data = pConn; - // uv_read_stop((uv_stream_t*)pConn->pTcp); - // transRefSrvHandle(pConn); - // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - - STransMsg transMsg; - memset(&transMsg, 0, sizeof(transMsg)); + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = pHead->content; transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - if (pConn->status == ConnNormal) { - if (pHead->persist == 1) { - pConn->status = ConnAcquire; - transRefSrvHandle(pConn); - tDebug("conn %p acquired by server app", pConn); - } + if (pHead->seqNum != 0) { + ASSERT(0); } - STraceId* trace = &pHead->traceId; - - int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp); - static int64_t EXCEPTION_LIMIT_US = 100 * 1000; - - if (pConn->status == ConnNormal && pHead->noResp == 0) { - transRefSrvHandle(pConn); - if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); - } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn, - TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); - } - } else { - if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, - transMsg.code, (int)(cost)); - } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, - transMsg.code, (int)(cost)); - } - } - // pHead->noResp = 1, // 1. server application should not send resp on handle // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); - transMsg.info.refId = pConn->refId; + ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg"); + + transMsg.info.refId = pHead->noResp == 1 ? -1 : pConn->refId; transMsg.info.traceId = pHead->traceId; transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.forbiddenIp = forbiddenIp; transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0; - tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, - pConn->refId); - ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg"); - if (transMsg.info.handle == NULL) { - return false; - } + uvMaySetConnAcquired(pConn, pHead); - if (pHead->noResp == 1) { - transMsg.info.refId = -1; - } + uvPerfLog_receive(pConn, pHead, &transMsg); // set up conn info SRpcConnInfo* pConnInfo = &(transMsg.info.conn); @@ -485,7 +500,7 @@ static bool uvHandleReq(SSvrConn* pConn) { (void)transReleaseExHandle(transGetRefMgt(), pConn->refId); - (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + (*pInst->cfp)(pInst->parent, &transMsg, NULL); return true; } @@ -500,7 +515,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { destroyConn(conn, true); return; } - STrans* pTransInst = conn->pTransInst; + STrans* pInst = conn->pInst; SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { @@ -508,16 +523,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { if (true == pBuf->invalid || false == uvHandleReq(conn)) { - tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pTransInst), conn, - conn->dst, conn->src); + tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pInst), conn, conn->dst, + conn->src); destroyConn(conn, true); return; } } return; } else { - tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pTransInst), - conn, conn->dst, conn->src); + tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pInst), conn, + conn->dst, conn->src); destroyConn(conn, true); return; } @@ -526,14 +541,14 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } - tDebug("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread)); + tDebug("%s conn %p read error:%s", transLabel(pInst), conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; if (conn->status == ConnAcquire) { if (conn->regArg.init) { - tTrace("%s conn %p broken, notify server app", transLabel(pTransInst), conn); - STrans* pTransInst = conn->pTransInst; - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + tTrace("%s conn %p broken, notify server app", transLabel(pInst), conn); + STrans* pInst = conn->pInst; + (*pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); } } @@ -579,8 +594,8 @@ void uvOnSendCb(uv_write_t* req, int status) { conn->regArg.init = 1; conn->regArg.msg = msg->msg; if (conn->broken) { - STrans* pTransInst = conn->pTransInst; - (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + STrans* pInst = conn->pInst; + (pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); } (void)transQueuePop(&conn->srvMsgs); @@ -635,7 +650,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->compatibilityVer = htonl(((STrans*)pConn->pTransInst)->compatibilityVer); + pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer); pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 @@ -667,16 +682,16 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - STrans* pTransInst = pConn->pTransInst; - if (pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp && pTransInst->compressSize != -1 && - pTransInst->compressSize < pMsg->contLen) { + STrans* pInst = pConn->pInst; + if (pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp && pInst->compressSize != -1 && + pInst->compressSize < pMsg->contLen) { len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)len); } STraceId* trace = &pMsg->info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn, - TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len); + tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType), + pConn->dst, pConn->src, len); wb->base = (char*)pHead; wb->len = len; @@ -841,8 +856,8 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { } if (pConn->regArg.init) { tTrace("conn %p release, notify server app", pConn); - STrans* pTransInst = pConn->pTransInst; - (*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL); + STrans* pInst = pConn->pInst; + (*pInst->cfp)(pInst->parent, &(pConn->regArg.msg), NULL); memset(&pConn->regArg, 0, sizeof(pConn->regArg)); } uvStartSendRespImpl(srvMsg); @@ -998,6 +1013,16 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } + // pConn->pInst = pThrd->pInst; + // /* init conn timer*/ + // // uv_timer_init(pThrd->loop, &pConn->pTimer); + // // pConn->pTimer.data = pConn; + // pConn->hostThrd = pThrd; + // // init client handle + // pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + // uv_tcp_init(pThrd->loop, pConn->pTcp); + // pConn->pTcp->data = pConn; + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; (void)uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); @@ -1181,7 +1206,6 @@ void* transWorkerThread(void* arg) { static FORCE_INLINE SSvrConn* createConn(void* hThrd) { int32_t code = 0; SWorkThrd* pThrd = hThrd; - STrans* pTransInst = pThrd->pTransInst; int32_t lino; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); @@ -1223,12 +1247,17 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); } + STrans* pInst = pThrd->pInst; pConn->refId = exh->refId; + QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); - tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); + tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId); - pConn->pTransInst = pThrd->pTransInst; + pConn->pInst = pThrd->pInst; + /* init conn timer*/ + // uv_timer_init(pThrd->loop, &pConn->pTimer); + // pConn->pTimer.data = pConn; pConn->hostThrd = pThrd; // init client handle pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); @@ -1238,8 +1267,8 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { code = uv_tcp_init(pThrd->loop, pConn->pTcp); if (code != 0) { - tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _end); + tError("%s failed to create conn since %s" PRId64, transLabel(pInst), uv_strerror(code)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); } pConn->pTcp->data = pConn; QUEUE_PUSH(&pThrd->conn, &pConn->queue); @@ -1253,7 +1282,7 @@ _end: taosMemoryFree(pConn); pConn = NULL; } - tError("%s failed to create conn since %s, lino:%d" PRId64, transLabel(pTransInst), tstrerror(code), lino); + tError("%s failed to create conn since %s" PRId64, transLabel(pInst), tstrerror(code)); return NULL; } @@ -1317,8 +1346,8 @@ static void uvDestroyConn(uv_handle_t* handle) { (void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetRefMgt(), conn->refId); - STrans* pTransInst = thrd->pTransInst; - tDebug("%s conn %p destroy", transLabel(pTransInst), conn); + STrans* pInst = thrd->pInst; + tDebug("%s conn %p destroy", transLabel(pInst), conn); for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) { SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); @@ -1434,9 +1463,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); - thrd->pTransInst = shandle; + thrd->pInst = shandle; thrd->quit = false; - thrd->pTransInst = shandle; + thrd->pInst = shandle; thrd->pWhiteList = uvWhiteListCreate(); srv->pThreadObj[i] = thrd; @@ -1465,9 +1494,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } - thrd->pTransInst = shandle; + thrd->pInst = shandle; thrd->quit = false; - thrd->pTransInst = shandle; + thrd->pInst = shandle; thrd->pWhiteList = uvWhiteListCreate(); if (thrd->pWhiteList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1568,18 +1597,18 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { uvStartSendRespImpl(msg); return; } else if (conn->status == ConnRelease || conn->status == ConnNormal) { - tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn); + tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn); } destroySmsg(msg); } void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) { // send msg to client - tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pTransInst), msg->pConn); + tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn); uvStartSendResp(msg); } void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { SSvrConn* conn = msg->pConn; - tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pTransInst), conn); + tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn); if (conn->status == ConnAcquire) { if (!transQueuePush(&conn->srvMsgs, msg)) { return; @@ -1596,8 +1625,8 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { tDebug("conn %p register brokenlink callback succ", conn); if (conn->broken) { - STrans* pTransInst = conn->pTransInst; - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + STrans* pInst = conn->pInst; + (*pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); } taosMemoryFree(msg); @@ -1735,7 +1764,7 @@ int32_t transReleaseSrvHandle(void* handle) { m->msg = tmsg; m->type = Release; - tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); + tDebug("%s conn %p start to release", transLabel(pThrd->pInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); (void)transReleaseExHandle(transGetRefMgt(), refId); @@ -1830,8 +1859,8 @@ int32_t transRegisterMsg(const STransMsg* msg) { m->msg = tmsg; m->type = Register; - STrans* pTransInst = pThrd->pTransInst; - tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); + STrans* pInst = pThrd->pInst; + tDebug("%s conn %p start to register brokenlink callback", transLabel(pInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); (void)transReleaseExHandle(transGetRefMgt(), refId); @@ -1853,15 +1882,15 @@ _return2: } int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle); - if (pTransInst == NULL) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle); + if (pInst == NULL) { return TSDB_CODE_RPC_MODULE_QUIT; } int32_t code = 0; tDebug("ip-white-list update on rpc"); - SServerObj* svrObj = pTransInst->tcphandle; + SServerObj* svrObj = pInst->tcphandle; for (int i = 0; i < svrObj->numOfThreads; i++) { SWorkThrd* pThrd = svrObj->pThreadObj[i]; diff --git a/source/util/src/theap.c b/source/util/src/theap.c index 7ee49ff56d..0d7c0e5f97 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -21,6 +21,7 @@ size_t heapSize(Heap* heap) { return heap->nelts; } Heap* heapCreate(HeapCompareFn fn) { Heap* heap = taosMemoryCalloc(1, sizeof(Heap)); if (heap == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -226,9 +227,9 @@ static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ } static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ } static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */ } static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) { - void* tmp = a->data; - a->data = b->data; - b->data = tmp; + void* tmp = a->data; + a->data = b->data; + b->data = tmp; } #define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i)))