diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b98421cb76..701cd55d9b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -173,8 +173,8 @@ static void cliSendCb(uv_write_t* req, int status); // callback after conn to server static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -static void cliIdleCb(uv_idle_t* handle); -static void cliPrepareCb(uv_prepare_t* handle); +// static void cliIdleCb(uv_idle_t* handle); +// static void cliPrepareCb(uv_prepare_t* handle); static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); static void cliSendBatchCb(uv_write_t* req, int status); @@ -185,7 +185,7 @@ static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); -static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); +static int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); void cliResetConnTimer(SCliConn* conn); static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); @@ -200,10 +200,11 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs static void cliDestroyBatch(SCliBatch* pBatch); // cli util func -static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); -static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); +static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); +static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); +static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); @@ -216,7 +217,7 @@ static void cliHandleExcept(SCliConn* conn, int32_t code); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); +static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); @@ -240,7 +241,7 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); -static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); +static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); // thread obj static int32_t createThrdObj(void* trans, SCliThrd** pThrd); @@ -260,11 +261,11 @@ typedef struct { } SHeap; int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); -int transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); +int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); void transHeapDestroy(SHeap* heap); -int transHeapGet(SHeap* heap, SCliConn** p); -int transHeapInsert(SHeap* heap, SCliConn* p); -int transHeapDelete(SHeap* heap, SCliConn* p); +int32_t transHeapGet(SHeap* heap, SCliConn** p); +int32_t transHeapInsert(SHeap* heap, SCliConn* p); +int32_t transHeapDelete(SHeap* heap, SCliConn* p); #define CLI_RELEASE_UV(loop) \ do { \ @@ -437,7 +438,7 @@ bool cliShouldAddConnToPool(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; bool empty = transQueueEmpty(&conn->cliMsgs); if (empty) { - delConnFromHeapCache(pThrd->connHeapCache, conn); + (void)delConnFromHeapCache(pThrd->connHeapCache, conn); } return empty; @@ -478,7 +479,7 @@ void cliHandleResp_shareConn(SCliConn* conn) { transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; STraceId* trace = &transMsg.info.traceId; - int32_t ret = cliAppCb(conn, &transMsg, pMsg); + int32_t ret = cliNotifyCb(conn, &transMsg, pMsg); if (ret != 0) { return; } else { @@ -577,7 +578,7 @@ void cliHandleResp(SCliConn* conn) { } if (pMsg == NULL || (pMsg && pMsg->type != Release)) { - if (cliAppCb(conn, &transMsg, pMsg) != 0) { + if (cliNotifyCb(conn, &transMsg, pMsg) != 0) { return; } } @@ -675,7 +676,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (pMsg == NULL || (pMsg && pMsg->type != Release)) { int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); cliDestroyMsgInExhandle(refId); - if (cliAppCb(pConn, &transMsg, pMsg) != 0) { + if (cliNotifyCb(pConn, &transMsg, pMsg) != 0) { return; } } @@ -735,7 +736,7 @@ void* destroyConnPool(SCliThrd* pThrd) { transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); pMsg->ctx->task = NULL; - doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); + doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); } taosMemoryFree(msglist); @@ -817,14 +818,14 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pMsg)->msg.msgType))) { tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pMsg)->msg.msgType), tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); - doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); *pMsg = NULL; return NULL; } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } @@ -834,7 +835,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); if (task == NULL) { taosMemoryFree(arg); - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } @@ -847,7 +848,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } @@ -857,7 +858,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); if (task == NULL) { taosMemoryFree(arg); - doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); *pMsg = NULL; return NULL; } @@ -1255,7 +1256,7 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { transMsg.info.ahandle = pCtx->ahandle; pMsg->seqNum = 0; - code = cliAppCb(conn, &transMsg, pMsg); + code = cliNotifyCb(conn, &transMsg, pMsg); if (code != 0) { continue; } else { @@ -1752,7 +1753,7 @@ void cliConnCb(uv_connect_t* req, int status) { return cliSend(pConn); } -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { +static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { STransConnCtx* pCtx = pMsg->ctx; STrans* pInst = pThrd->pInst; @@ -1891,14 +1892,22 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) } return conn; } -FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { +FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { if (pCvtAddr->cvt == false) { - return; + if (EPSET_IS_VALID(pEpSet)) { + return 0; + } else { + return TSDB_CODE_RPC_FQDN_ERROR; + } } if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN); memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); } + if (EPSET_IS_VALID(pEpSet)) { + return 0; + } + return TSDB_CODE_RPC_FQDN_ERROR; } FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { @@ -1906,11 +1915,10 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; } + FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { if (pMsg == NULL) return -1; - // memset(pResp, 0, sizeof(STransMsg)); - if (pResp->code == 0) { pResp->code = TSDB_CODE_RPC_BROKEN_LINK; } @@ -1921,6 +1929,20 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { return 0; } +FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code) { + STrans* pInst = pThrd->pInst; + + STransMsg resp = {.code = code}; + code = cliBuildExceptResp(pMsg, &resp); + if (code != 0) { + return code; + } + resp.info.cliVer = pInst->compatibilityVer; + pInst->cfp(pInst->parent, &resp, NULL); + + return 0; +} + static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { int32_t code = 0; uint32_t addr = 0; @@ -1987,7 +2009,7 @@ static void doFreeTimeoutMsg(void* param) { STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s msg %s cannot get available conn after timeout", pInst->label, TMSG_INFO(pMsg->msg.msgType)); - doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); + doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); taosMemoryFree(arg); } @@ -2044,28 +2066,33 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { } return transHeapInsert(p, pConn); } -static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { + +static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr)); if (p == NULL) { tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr); - return; + return 0; } - int32_t code = transHeapDelete(p, pConn); + int32_t code = transHeapDelete(p, pConn); if (code != 0) { tDebug("failed to delete conn %p from heap cache since %s", pConn, tstrerror(code)); } + return code; } + void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { int32_t code = 0; STraceId* trace = &pMsg->msg.info.traceId; STrans* pInst = pThrd->pInst; - cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); - if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { + code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (code != 0) { + // notifyCb destroyCmsg(pMsg); return; } + char addr[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); @@ -2093,18 +2120,14 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { transQueuePush(&pConn->cliMsgs, pMsg); return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); } -void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { - int32_t code = 0; + +void cliHandleReq__noShareConn(SCliMsg* pMsg, SCliThrd* pThrd) { + int32_t code; STrans* pInst = pThrd->pInst; - - if (pInst->shareConn == 1) { - return cliHandleReq__shareConn(pMsg, pThrd); - } - - cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); - if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { + code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (code != 0) { + // notifyCb destroyCmsg(pMsg); - return; } char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); @@ -2117,12 +2140,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { if (ignore == true) { // persist conn already release by server STransMsg resp = {0}; - (void)cliBuildExceptResp(pMsg, &resp); - // refactorr later - resp.info.cliVer = pInst->compatibilityVer; - if (pMsg->type != Release) { - pInst->cfp(pInst->parent, &resp, NULL); + (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, 0); } destroyCmsg(pMsg); return; @@ -2140,13 +2159,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { code = cliCreateConn(pThrd, &conn); if (code != 0) { tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code)); - STransMsg resp = {.code = code}; - (void)cliBuildExceptResp(pMsg, &resp); - - resp.info.cliVer = pInst->compatibilityVer; - if (pMsg->type != Release) { - pInst->cfp(pInst->parent, &resp, NULL); - } + (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, code); destroyCmsg(pMsg); return; } @@ -2207,6 +2220,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { tGTrace("%s conn %p ready", pInst->label, conn); } +void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { + STrans* pInst = pThrd->pInst; + if (pInst->shareConn == 1) { + return cliHandleReq__shareConn(pMsg, pThrd); + } else { + return cliHandleReq__noShareConn(pMsg, pThrd); + } +} + static void cliDealReq(queue* wq, SCliThrd* pThrd) { int count = 0; @@ -2243,6 +2265,7 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { static void cliBuildBatch(SCliMsg* pMsg, queue* h, SCliThrd* pThrd) { STrans* pInst = pThrd->pInst; STransConnCtx* pCtx = pMsg->ctx; + return; } static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); @@ -2316,11 +2339,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - // if (pMsg->type == Normal) { - // cliBuildBatch(pMsg, h, pThrd); - // continue; - // // count++; - // } if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { cliBuildBatch(pMsg, h, pThrd); continue; @@ -2342,6 +2360,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { destroyCmsg(pMsg); continue; } + pBatchList->batchLenLimit = pInst->batchSize; SCliBatch* pBatch = NULL; @@ -2416,33 +2435,6 @@ static void cliAsyncCb(uv_async_t* handle) { if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } -static void cliPrepareCb(uv_prepare_t* handle) { - SCliThrd* thrd = handle->data; - tTrace("prepare work start"); - - SAsyncPool* pool = thrd->asyncPool; - for (int i = 0; i < pool->nAsync; i++) { - uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; - - queue wq; - (void)taosThreadMutexLock(&item->mtx); - QUEUE_MOVE(&item->qmsg, &wq); - (void)taosThreadMutexUnlock(&item->mtx); - - int count = 0; - while (!QUEUE_IS_EMPTY(&wq)) { - queue* h = QUEUE_HEAD(&wq); - QUEUE_REMOVE(h); - - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - (*cliAsyncHandle[pMsg->type])(pMsg, thrd); - count++; - } - } - tTrace("prepare work end"); - if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); -} void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { transCtxCleanup(&conn->ctx); @@ -2464,12 +2456,9 @@ void cliConnFreeMsgs(SCliConn* conn) { continue; } - STransMsg resp = {0}; - if (-1 == cliBuildExceptResp(cmsg, &resp)) { + if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) { continue; } - resp.info.cliVer = pInst->compatibilityVer; - pInst->cfp(pInst->parent, &resp, NULL); cmsg->ctx->ahandle = NULL; } @@ -2637,19 +2626,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(code, NULL, _end); } - pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - if (pThrd->prepare == NULL) { - tError("failed to create prepre since:%s", tstrerror(code)); - TAOS_CHECK_GOTO(code, NULL, _end); - } - - code = uv_prepare_init(pThrd->loop, pThrd->prepare); - if (code != 0) { - tError("failed to create prepre since:%s", uv_err_name(code)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); - } - pThrd->prepare->data = pThrd; - int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); if (pThrd->timerList == NULL) { @@ -2716,7 +2692,6 @@ _end: if (pThrd) { (void)uv_loop_close(pThrd->loop); taosMemoryFree(pThrd->loop); - taosMemoryFree(pThrd->prepare); (void)taosThreadMutexDestroy(&pThrd->msgMtx); transAsyncPoolDestroy(pThrd->asyncPool); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { @@ -2840,7 +2815,7 @@ static FORCE_INLINE void doDelayTask(void* param) { taosMemoryFree(arg); } -static void doCloseIdleConn(void* param) { +static FORCE_INLINE void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); @@ -2848,7 +2823,7 @@ static void doCloseIdleConn(void* param) { cliDestroyConn(conn, true); taosMemoryFree(arg); } -static void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) { +static FORCE_INLINE void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) { if (!(rpcDebugFlag & DEBUG_DEBUG)) { return; } @@ -2856,21 +2831,30 @@ static void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) { STraceId* trace = &pMsg->msg.info.traceId; char tbuf[512] = {0}; (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, pCtx->retryNextInterval); return; } -static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { +static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pInst = pThrd->pInst; STransConnCtx* pCtx = pMsg->ctx; cliPerfLog_schedMsg(pMsg, transLabel(pThrd->pInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } arg->param1 = pMsg; arg->param2 = pThrd; - (void)transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); + SDelayTask* pTask = transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); + if (pTask == NULL) { + taosMemoryFree(arg); + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; } FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { @@ -2963,18 +2947,17 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) { } return noDelay; } -bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pInst = pThrd->pInst; - STransConnCtx* pCtx = pMsg->ctx; - int32_t code = pResp->code; - - bool retry = pInst->retry != NULL ? pInst->retry(code, pResp->msgType - 1) : false; +int8_t cliRetryShouldRetry(STrans* pInst, STransMsg* pResp) { + bool retry = pInst->retry != NULL ? pInst->retry(pResp->code, pResp->msgType - 1) : false; if (retry == false) { - return false; + return 0; } + return 1; +} +void cliRetryMayInitCtx(STrans* pInst, SCliMsg* pMsg) { + STransConnCtx* pCtx = pMsg->ctx; if (!pCtx->retryInit) { pCtx->retryMinInterval = pInst->retryMinInterval; pCtx->retryMaxInterval = pInst->retryMaxInterval; @@ -2985,15 +2968,32 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pCtx->retryStep = 0; pCtx->retryInit = true; pCtx->retryCode = TSDB_CODE_SUCCESS; - - // already retry, not use handle specified by app; pMsg->msg.info.handle = 0; } +} +int32_t cliRetryIsTimeout(STrans* pInst, SCliMsg* pMsg) { + STransConnCtx* pCtx = pMsg->ctx; + if (pCtx->retryMaxTimeout != -1 && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { + return 1; + } + return 0; +} +bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; - if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { + STransConnCtx* pCtx = pMsg->ctx; + int32_t code = pResp->code; + + cliRetryMayInitCtx(pInst, pMsg); + + if (!cliRetryShouldRetry(pInst, pResp)) { return false; } + if (cliRetryIsTimeout(pInst, pMsg)) { + return false; + } // code, msgType // A: epset,leader, not self @@ -3045,7 +3045,12 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } pMsg->sent = 0; - cliSchedMsgToNextNode(pMsg, pThrd); + code = cliSchedMsgToNextNode(pMsg, pThrd); + if (code != 0) { + pResp->code = code; + tError("failed to sched msg to next node, reason:%s", tstrerror(code)); + return false; + } return true; } void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) { @@ -3067,7 +3072,7 @@ void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) { } } } -int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { +int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; @@ -3277,7 +3282,7 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S int64_t handle = (int64_t)pReq->info.handle; SCliThrd* pThrd = transGetWorkThrd(pInst, handle); if (pThrd == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception;); + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception); } if (handle != 0) { @@ -3702,6 +3707,7 @@ _exception: return code; } +// conn heap int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node); @@ -3724,7 +3730,7 @@ void transHeapDestroy(SHeap* heap) { heapDestroy(heap->heap); } } -int transHeapGet(SHeap* heap, SCliConn** p) { +int32_t transHeapGet(SHeap* heap, SCliConn** p) { if (heapSize(heap->heap) == 0) { *p = NULL; return -1; @@ -3737,7 +3743,7 @@ int transHeapGet(SHeap* heap, SCliConn** p) { *p = container_of(minNode, SCliConn, node); return 0; } -int transHeapInsert(SHeap* heap, SCliConn* p) { +int32_t transHeapInsert(SHeap* heap, SCliConn* p) { // impl later if (p->inHeap == 1) { return TSDB_CODE_DUP_KEY; @@ -3747,7 +3753,7 @@ int transHeapInsert(SHeap* heap, SCliConn* p) { p->inHeap = 1; return 0; } -int transHeapDelete(SHeap* heap, SCliConn* p) { +int32_t transHeapDelete(SHeap* heap, SCliConn* p) { // impl later if (p->inHeap == 0) { return TSDB_CODE_INVALID_PARA; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index b940c494d8..d3f8da5ae4 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -330,6 +330,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { (void)taosThreadMutexLock(&item->mtx); QUEUE_PUSH(&item->qmsg, q); (void)taosThreadMutexUnlock(&item->mtx); + int ret = uv_async_send(async); if (ret != 0) { tError("failed to send async,reason:%s", uv_err_name(ret));