diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index ad3ee0bfb1..f4d5fcfe16 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -162,7 +162,7 @@ typedef struct { void* task; int hThrdIdx; -} STransConnCtx; +} SReqCtx; #pragma pack(push, 1) @@ -318,9 +318,9 @@ void transUnrefCliHandle(void* handle); int32_t transReleaseCliHandle(void* handle); int32_t transReleaseSrvHandle(void* handle); -int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, +int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx); +int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp); +int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs); int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId); int32_t transFreeConnById(void* shandle, int64_t transpointId); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1f2b1fd88f..116904e9c7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -65,7 +65,7 @@ typedef struct SCliConn { void* hostThrd; SConnBuffer readBuf; - STransQueue cliMsgs; + STransQueue reqMsgs; queue q; SConnList* list; @@ -92,18 +92,18 @@ typedef struct SCliConn { int32_t shareCnt; } SCliConn; -typedef struct SCliMsg { - STransConnCtx* ctx; - STransMsg msg; - queue q; - STransMsgType type; +typedef struct SCliReq { + SReqCtx* ctx; + STransMsg msg; + queue q; + STransMsgType type; int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) queue seqq; - int32_t seqNum; -} SCliMsg; + int32_t seq; +} SCliReq; typedef struct SCliThrd { TdThread thread; // tid @@ -131,7 +131,7 @@ typedef struct SCliThrd { SHashObj* batchCache; SHashObj* connHeapCache; - SCliMsg* stopMsg; + SCliReq* stopMsg; bool quit; } SCliThrd; @@ -185,7 +185,7 @@ static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); -static int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); +static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); void cliResetConnTimer(SCliConn* conn); static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); @@ -196,15 +196,15 @@ static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); -static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg); +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq); static void cliDestroyBatch(SCliBatch* pBatch); // cli util func -static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); +static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx); static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr); -static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); -static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code); +static FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* resp); +static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); @@ -217,19 +217,19 @@ static void cliHandleExcept(SCliConn* conn, int32_t code); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); -static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); +static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code); // handle req from app -static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); -static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); -static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); -static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); +static void cliHandleReq(SCliReq* pReq, SCliThrd* pThrd); +static void cliHandleQuit(SCliReq* pReq, SCliThrd* pThrd); +static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd); +static void cliHandleUpdate(SCliReq* pReq, SCliThrd* pThrd); static void cliDealReq(queue* h, SCliThrd* pThrd); static void cliBatchDealReq(queue* h, SCliThrd* pThrd); static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDealReq, cliBatchDealReq}; -static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd); +static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd); -static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, +static void (*cliAsyncHandle[])(SCliReq* pReq, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate, cliHandleFreeById}; static FORCE_INLINE void destroyCmsg(void* cmsg); @@ -237,7 +237,7 @@ static FORCE_INLINE void destroyCmsg(void* cmsg); static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param); static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); -static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); +static FORCE_INLINE void transDestroyConnCtx(SReqCtx* ctx); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); @@ -289,17 +289,17 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ - int i = 0, sz = transQueueSize(&conn->cliMsgs); \ + int i = 0, sz = transQueueSize(&conn->reqMsgs); \ for (; i < sz; i++) { \ - pMsg = transQueueGet(&conn->cliMsgs, i); \ - if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ + pReq = transQueueGet(&conn->reqMsgs, i); \ + if (pReq->ctx != NULL && (uint64_t)pReq->ctx->ahandle == ahandle) { \ break; \ } \ } \ if (i == sz) { \ - pMsg = NULL; \ + pReq = NULL; \ } else { \ - pMsg = transQueueRm(&conn->cliMsgs, i); \ + pReq = transQueueRm(&conn->reqMsgs, i); \ } \ } while (0) @@ -307,7 +307,7 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); do { \ int i = 0; \ do { \ - pCliMsg = transQueueGet(&conn->cliMsgs, i++); \ + pCliMsg = transQueueGet(&conn->reqMsgs, i++); \ if (pCliMsg && 0 == pCliMsg->sent) { \ break; \ } \ @@ -351,8 +351,8 @@ static void* cliWorkThread(void* arg); static void cliReleaseUnfinishedMsg(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { - SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); + for (int i = 0; i < transQueueSize(&conn->reqMsgs); i++) { + SCliReq* msg = transQueueGet(&conn->reqMsgs, i); if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { conn->ctx.freeFunc(msg->ctx->ahandle); @@ -363,12 +363,12 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } destroyCmsg(msg); } - transQueueClear(&conn->cliMsgs); + transQueueClear(&conn->reqMsgs); memset(&conn->ctx, 0, sizeof(conn->ctx)); } bool cliMaySendCachedMsg(SCliConn* conn) { - if (!transQueueEmpty(&conn->cliMsgs)) { - SCliMsg* pCliMsg = NULL; + if (!transQueueEmpty(&conn->reqMsgs)) { + SCliReq* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); cliSend(conn); return true; @@ -392,9 +392,9 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { queue* h = QUEUE_HEAD(&exh->q); QUEUE_REMOVE(h); taosWUnLockLatch(&exh->latch); - SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + SCliReq* t = QUEUE_DATA(h, SCliReq, seqq); transCtxMerge(&conn->ctx, &t->ctx->appCtx); - (void)transQueuePush(&conn->cliMsgs, t); + (void)transQueuePush(&conn->reqMsgs, t); tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); (void)transReleaseExHandle(transGetRefMgt(), refId); cliSend(conn); @@ -420,23 +420,23 @@ void cliResetConnTimer(SCliConn* conn) { } void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } -SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) { - SCliMsg* pMsg = NULL; - for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { - pMsg = transQueueGet(&conn->cliMsgs, i); - if (pMsg->seqNum == seqNum) { - transQueueRm(&conn->cliMsgs, i); +SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) { + SCliReq* pReq = NULL; + for (int i = 0; i < transQueueSize(&conn->reqMsgs); i++) { + pReq = transQueueGet(&conn->reqMsgs, i); + if (pReq->seq == seq) { + transQueueRm(&conn->reqMsgs, i); break; } } - if (pMsg == NULL) { + if (pReq == NULL) { ASSERT(0); } - return pMsg; + return pReq; } bool cliShouldAddConnToPool(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - bool empty = transQueueEmpty(&conn->cliMsgs); + bool empty = transQueueEmpty(&conn->reqMsgs); if (empty) { (void)delConnFromHeapCache(pThrd->connHeapCache, conn); } @@ -472,18 +472,18 @@ void cliHandleResp_shareConn(SCliConn* conn) { transMsg.info.hasEpSet = pHead->hasEpSet; transMsg.info.cliVer = htonl(pHead->compatibilityVer); - SCliMsg* pMsg = cliFindMsgBySeqnum(conn, pHead->seqNum); - pMsg->seqNum = 0; + SCliReq* pReq = cliFindReqBySeq(conn, pHead->seqNum); + pReq->seq = 0; - STransConnCtx* pCtx = pMsg->ctx; + SReqCtx* pCtx = pReq->ctx; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; STraceId* trace = &transMsg.info.traceId; - int32_t ret = cliNotifyCb(conn, &transMsg, pMsg); + int32_t ret = cliNotifyCb(conn, pReq, &transMsg); if (ret != 0) { return; } else { - destroyCmsg(pMsg); + destroyCmsg(pReq); } } void cliHandleResp(SCliConn* conn) { @@ -528,18 +528,18 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.hasEpSet = pHead->hasEpSet; transMsg.info.cliVer = htonl(pHead->compatibilityVer); - SCliMsg* pMsg = NULL; - STransConnCtx* pCtx = NULL; + SCliReq* pReq = NULL; + SReqCtx* pCtx = NULL; if (CONN_NO_PERSIST_BY_APP(conn)) { - pMsg = transQueuePop(&conn->cliMsgs); + pReq = transQueuePop(&conn->reqMsgs); - pCtx = pMsg ? pMsg->ctx : NULL; + pCtx = pReq ? pReq->ctx : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); } else { uint64_t ahandle = (uint64_t)pHead->ahandle; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); - if (pMsg == NULL) { + if (pReq == NULL) { transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); @@ -550,7 +550,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle); } } else { - pCtx = pMsg->ctx; + pCtx = pReq->ctx; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); } @@ -577,14 +577,14 @@ void cliHandleResp(SCliConn* conn) { return; } - if (pMsg == NULL || (pMsg && pMsg->type != Release)) { - if (cliNotifyCb(conn, &transMsg, pMsg) != 0) { + if (pReq == NULL || (pReq && pReq->type != Release)) { + if (cliNotifyCb(conn, pReq, &transMsg) != 0) { return; } } - int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); tDebug("conn %p msg refId: %" PRId64 "", conn, refId); - destroyCmsg(pMsg); + destroyCmsg(pReq); if (cliConnSendSeqMsg(refId, conn)) { return; @@ -608,7 +608,7 @@ static void cliDestroyMsgInExhandle(int64_t refId) { while (!QUEUE_IS_EMPTY(&exh->q)) { queue* h = QUEUE_HEAD(&exh->q); QUEUE_REMOVE(h); - SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + SCliReq* t = QUEUE_DATA(h, SCliReq, seqq); destroyCmsg(t); } taosWUnLockLatch(&exh->latch); @@ -617,7 +617,7 @@ static void cliDestroyMsgInExhandle(int64_t refId) { } void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { - if (transQueueEmpty(&pConn->cliMsgs)) { + if (transQueueEmpty(&pConn->reqMsgs)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); @@ -629,26 +629,26 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { STrans* pInst = pThrd->pInst; bool once = false; do { - SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); + SCliReq* pReq = transQueuePop(&pConn->reqMsgs); - if (pMsg == NULL && once) { + if (pReq == NULL && once) { break; } - if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) { - destroyCmsg(pMsg); + if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) { + destroyCmsg(pReq); break; } - STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; STransMsg transMsg = {0}; transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; - transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; transMsg.info.cliVer = pInst->compatibilityVer; - if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { + if (pReq == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); @@ -660,29 +660,29 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { transMsg.info.ahandle); } } else { - transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL; + transMsg.info.ahandle = (pReq != NULL && pReq->type != Release && pCtx) ? pCtx->ahandle : NULL; } if (pCtx == NULL || pCtx->pSem == NULL) { if (transMsg.info.ahandle == NULL) { - if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) { - destroyCmsg(pMsg); + if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) { + destroyCmsg(pReq); once = true; continue; } } } - if (pMsg == NULL || (pMsg && pMsg->type != Release)) { - int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + if (pReq == NULL || (pReq && pReq->type != Release)) { + int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); cliDestroyMsgInExhandle(refId); - if (cliNotifyCb(pConn, &transMsg, pMsg) != 0) { + if (cliNotifyCb(pConn, pReq, &transMsg) != 0) { return; } } - destroyCmsg(pMsg); + destroyCmsg(pReq); tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); - } while (!transQueueEmpty(&pConn->cliMsgs)); + } while (!transQueueEmpty(&pConn->reqMsgs)); if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); transUnrefCliHandle(pConn); } @@ -731,12 +731,12 @@ void* destroyConnPool(SCliThrd* pThrd) { queue* h = QUEUE_HEAD(&msglist->msgQ); QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); - pMsg->ctx->task = NULL; + transDQCancel(pThrd->waitConnQueue, pReq->ctx->task); + pReq->ctx->task = NULL; - doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); + doNotifyCb(pReq, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); } taosMemoryFree(msglist); @@ -791,7 +791,7 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { return conn; } -static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { +static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliReq** pReq) { void* pool = pThrd->pool; STrans* pInst = pThrd->pInst; size_t klen = strlen(key); @@ -803,8 +803,8 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); if (nList == NULL) { - // doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pMsg = NULL; + // doNotifyApp(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pReq = NULL; return NULL; } QUEUE_INIT(&nList->msgQ); @@ -814,72 +814,72 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { plist->list = nList; } - STraceId* trace = &(*pMsg)->msg.info.traceId; + STraceId* trace = &(*pReq)->msg.info.traceId; // no avaliable conn in pool if (QUEUE_IS_EMPTY(&plist->conns)) { SMsgList* list = plist->list; if ((list)->numOfConn >= pInst->connLimitNum) { - STraceId* trace = &(*pMsg)->msg.info.traceId; - if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pMsg)->msg.msgType))) { - tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pMsg)->msg.msgType), + STraceId* trace = &(*pReq)->msg.info.traceId; + if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pReq)->msg.msgType))) { + tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pReq)->msg.msgType), tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); - doNotifyCb(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); - *pMsg = NULL; + doNotifyCb(*pReq, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); + *pReq = NULL; return NULL; } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { - doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pMsg = NULL; + doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pReq = NULL; return NULL; } - arg->param1 = *pMsg; + arg->param1 = *pReq; arg->param2 = pThrd; SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); if (task == NULL) { taosMemoryFree(arg); - doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pMsg = NULL; + doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pReq = NULL; return NULL; } - (*pMsg)->ctx->task = task; - tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pMsg)->msg.msgType)); - QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); - *pMsg = NULL; + (*pReq)->ctx->task = task; + tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); + QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); + *pReq = NULL; } else { // send msg in delay queue if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { - doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pMsg = NULL; + doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pReq = NULL; return NULL; } - arg->param1 = *pMsg; + arg->param1 = *pReq; arg->param2 = pThrd; SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); if (task == NULL) { taosMemoryFree(arg); - doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pMsg = NULL; + doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pReq = NULL; return NULL; } - (*pMsg)->ctx->task = task; - tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + (*pReq)->ctx->task = task; + tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); - QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); + QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); queue* h = QUEUE_HEAD(&(list)->msgQ); QUEUE_REMOVE(h); - SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q); + SCliReq* ans = QUEUE_DATA(h, SCliReq, q); - *pMsg = ans; + *pReq = ans; - trace = &(*pMsg)->msg.info.traceId; - tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + trace = &(*pReq)->msg.info.traceId; + tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); transDQCancel(pThrd->waitConnQueue, ans->ctx->task); } list->numOfConn++; @@ -932,13 +932,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { queue* h = QUEUE_HEAD(&(msgList)->msgQ); QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); - pMsg->ctx->task = NULL; + transDQCancel(thrd->waitConnQueue, pReq->ctx->task); + pReq->ctx->task = NULL; - transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); - (void)transQueuePush(&conn->cliMsgs, pMsg); + transCtxMerge(&conn->ctx, &pReq->ctx->appCtx); + (void)transQueuePush(&conn->reqMsgs, pReq); conn->status = ConnNormal; cliSend(conn); @@ -1100,7 +1100,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { conn->status = ConnNormal; conn->broken = false; - TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed); + TAOS_CHECK_GOTO(transQueueInit(&conn->reqMsgs, NULL), NULL, _failed); TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); @@ -1122,7 +1122,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); - TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed); + TAOS_CHECK_GOTO(transQueueInit(&conn->reqMsgs, NULL), NULL, _failed); TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); @@ -1145,7 +1145,7 @@ _failed: if (conn) { taosMemoryFree(conn->stream); (void)transDestroyBuffer(&conn->readBuf); - transQueueDestroy(&conn->cliMsgs); + transQueueDestroy(&conn->reqMsgs); } taosMemoryFree(conn); } @@ -1214,11 +1214,11 @@ static void cliDestroy(uv_handle_t* handle) { } static bool cliHandleNoResp(SCliConn* conn) { bool res = false; - if (!transQueueEmpty(&conn->cliMsgs)) { - SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0); - if (REQUEST_NO_RESP(&pMsg->msg)) { - (void)transQueuePop(&conn->cliMsgs); - destroyCmsg(pMsg); + if (!transQueueEmpty(&conn->reqMsgs)) { + SCliReq* pReq = transQueueGet(&conn->reqMsgs, 0); + if (REQUEST_NO_RESP(&pReq->msg)) { + (void)transQueuePop(&conn->reqMsgs); + destroyCmsg(pReq); res = true; } if (res == true) { @@ -1239,16 +1239,16 @@ static void cliSendCb(uv_write_t* req, int status) { SCliConn* pConn = transReqQueueRemove(req); if (pConn == NULL) return; - SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); - if (pMsg != NULL) { - int64_t cost = taosGetTimestampUs() - pMsg->st; + SCliReq* pReq = transQueueGet(&pConn->reqMsgs, 0); + if (pReq != NULL) { + int64_t cost = taosGetTimestampUs() - pReq->st; if (cost > 1000 * 50) { tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost); } } - if (pMsg != NULL && pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) { - rpcFreeCont(pMsg->msg.pCont); - pMsg->msg.pCont = 0; + if (pReq != NULL && pReq->msg.contLen == 0 && pReq->msg.pCont != 0) { + rpcFreeCont(pReq->msg.pCont); + pReq->msg.pCont = 0; } if (status == 0) { @@ -1271,27 +1271,27 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { int32_t code = -1; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - while (!transQueueEmpty(&conn->cliMsgs)) { - SCliMsg* pMsg = transQueuePop(&conn->cliMsgs); - ASSERT(pMsg->type != Release); - ASSERT(REQUEST_NO_RESP(&pMsg->msg) == 0); + while (!transQueueEmpty(&conn->reqMsgs)) { + SCliReq* pReq = transQueuePop(&conn->reqMsgs); + ASSERT(pReq->type != Release); + ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0); - STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; STransMsg transMsg = {0}; transMsg.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; - transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; transMsg.info.cliVer = pInst->compatibilityVer; transMsg.info.ahandle = pCtx->ahandle; - pMsg->seqNum = 0; - code = cliNotifyCb(conn, &transMsg, pMsg); + pReq->seqNum = 0; + code = cliNotifyCb(conn, pReq, &transMsg); if (code != 0) { continue; } else { // already notify user - destroyCmsg(pMsg); + destroyCmsg(pReq); } } @@ -1314,7 +1314,7 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { void cliSendBatch_shareConn(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - int32_t size = transQueueSize(&pConn->cliMsgs); + int32_t size = transQueueSize(&pConn->reqMsgs); int32_t totalLen = 0; if (size == 0) { @@ -1327,32 +1327,32 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int j = 0; for (int i = 0; i < size; i++) { - SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, i); + SCliReq* pCliMsg = transQueueGet(&pConn->reqMsgs, i); if (pCliMsg->sent == 1) { continue; } - STransConnCtx* pCtx = pCliMsg->ctx; + SReqCtx* pCtx = pCliMsg->ctx; pConn->seq++; - STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); - if (pMsg->pCont == 0) { - pMsg->pCont = (void*)rpcMallocCont(0); - pMsg->contLen = 0; + STransMsg* pReq = (STransMsg*)(&pCliMsg->msg); + if (pReq->pCont == 0) { + pReq->pCont = (void*)rpcMallocCont(0); + pReq->contLen = 0; } - int msgLen = transMsgLenFromCont(pMsg->contLen); + int msgLen = transMsgLenFromCont(pReq->contLen); - STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + STransMsgHead* pHead = transHeadFromCont(pReq->pCont); if (pHead->comp == 0) { pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; - pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; - pHead->msgType = pMsg->msgType; + pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; + pHead->msgType = pReq->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pInst->user, strlen(pInst->user)); - pHead->traceId = pMsg->info.traceId; + pHead->traceId = pReq->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; pHead->compatibilityVer = htonl(pInst->compatibilityVer); @@ -1361,8 +1361,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pHead->seqNum = pConn->seq; if (pHead->comp == 0) { - if (pInst->compressSize != -1 && pInst->compressSize < pMsg->contLen) { - msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { + msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } } else { @@ -1402,42 +1402,42 @@ void cliSendBatch(SCliConn* pConn) { int i = 0; queue* h = NULL; QUEUE_FOREACH(h, &pBatch->wq) { - SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q); + SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q); - STransConnCtx* pCtx = pCliMsg->ctx; + SReqCtx* pCtx = pCliMsg->ctx; - STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); - if (pMsg->pCont == 0) { - pMsg->pCont = (void*)rpcMallocCont(0); - if (pMsg->pCont == NULL) { + STransMsg* pReq = (STransMsg*)(&pCliMsg->msg); + if (pReq->pCont == 0) { + pReq->pCont = (void*)rpcMallocCont(0); + if (pReq->pCont == NULL) { code = TSDB_CODE_OUT_OF_BUFFER; tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); goto _exception; } - pMsg->contLen = 0; + pReq->contLen = 0; } - int msgLen = transMsgLenFromCont(pMsg->contLen); - STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + int msgLen = transMsgLenFromCont(pReq->contLen); + STransMsgHead* pHead = transHeadFromCont(pReq->pCont); if (pHead->comp == 0) { pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; - pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; - pHead->msgType = pMsg->msgType; + pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; + pHead->msgType = pReq->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pInst->user, strlen(pInst->user)); - pHead->traceId = pMsg->info.traceId; + pHead->traceId = pReq->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; pHead->compatibilityVer = htonl(pInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); - if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { - if (pInst->compressSize != -1 && pInst->compressSize < pMsg->contLen) { - msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { + if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { + msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } } else { @@ -1475,37 +1475,37 @@ void cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - if (transQueueEmpty(&pConn->cliMsgs)) { + if (transQueueEmpty(&pConn->reqMsgs)) { tError("%s conn %p not msg to send", pInst->label, pConn); cliHandleExcept(pConn, -1); return; } - SCliMsg* pCliMsg = NULL; + SCliReq* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(pConn); pCliMsg->sent = 1; - STransConnCtx* pCtx = pCliMsg->ctx; + SReqCtx* pCtx = pCliMsg->ctx; - STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); - if (pMsg->pCont == 0) { - pMsg->pCont = (void*)rpcMallocCont(0); - tDebug("malloc memory: %p", pMsg->pCont); - pMsg->contLen = 0; + STransMsg* pReq = (STransMsg*)(&pCliMsg->msg); + if (pReq->pCont == 0) { + pReq->pCont = (void*)rpcMallocCont(0); + tDebug("malloc memory: %p", pReq->pCont); + pReq->contLen = 0; } - int msgLen = transMsgLenFromCont(pMsg->contLen); - STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + int msgLen = transMsgLenFromCont(pReq->contLen); + STransMsgHead* pHead = transHeadFromCont(pReq->pCont); if (pHead->comp == 0) { pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; - pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; - pHead->msgType = pMsg->msgType; + pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; + pHead->msgType = pReq->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pInst->user, strlen(pInst->user)); - pHead->traceId = pMsg->info.traceId; + pHead->traceId = pReq->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; pHead->compatibilityVer = htonl(pInst->compatibilityVer); @@ -1516,9 +1516,9 @@ void cliSend(SCliConn* pConn) { CONN_SET_PERSIST_BY_APP(pConn); } - STraceId* trace = &pMsg->info.traceId; + STraceId* trace = &pReq->info.traceId; - if (pInst->startTimer != NULL && pInst->startTimer(0, pMsg->msgType)) { + if (pInst->startTimer != NULL && pInst->startTimer(0, pReq->msgType)) { uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -1529,9 +1529,9 @@ void cliSend(SCliConn* pConn) { pConn->timer = timer; } - if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { - if (pInst->compressSize != -1 && pInst->compressSize < pMsg->contLen) { - msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { + if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { + msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } } else { @@ -1544,7 +1544,7 @@ void cliSend(SCliConn* pConn) { uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); if (req == NULL) { - tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), tstrerror(TSDB_CODE_OUT_OF_MEMORY)); cliHandleExcept(pConn, -1); return; @@ -1552,7 +1552,7 @@ void cliSend(SCliConn* pConn) { int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { - tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), uv_err_name(status)); cliHandleExcept(pConn, -1); } @@ -1567,7 +1567,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { queue* h = QUEUE_HEAD(&pBatch->wq); QUEUE_REMOVE(h); - SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); + SCliReq* p = QUEUE_DATA(h, SCliReq, q); destroyCmsg(p); } SCliBatchList* p = pBatch->pList; @@ -1715,11 +1715,11 @@ static void cliSendBatchCb(uv_write_t* req, int status) { static void cliHandleFastFail_resp(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + SCliReq* pReq = transQueueGet(&pConn->reqMsgs, 0); - STraceId* trace = &pMsg->msg.info.traceId; + STraceId* trace = &pReq->msg.info.traceId; tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); + TMSG_INFO(pReq->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); } static void cliHandleFastFail_noresp(SCliConn* pConn, int status) { @@ -1788,17 +1788,17 @@ void cliConnCb(uv_connect_t* req, int status) { return cliSend(pConn); } -static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { - STransConnCtx* pCtx = pMsg->ctx; - STrans* pInst = pThrd->pInst; +static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { + SReqCtx* pCtx = pReq->ctx; + STrans* pInst = pThrd->pInst; STransMsg transMsg = {0}; transMsg.contLen = 0; transMsg.pCont = NULL; transMsg.code = code; - transMsg.msgType = pMsg->msg.msgType + 1; - transMsg.info.ahandle = pMsg->ctx->ahandle; - transMsg.info.traceId = pMsg->msg.info.traceId; + transMsg.msgType = pReq->msg.msgType + 1; + transMsg.info.ahandle = pReq->ctx->ahandle; + transMsg.info.traceId = pReq->msg.info.traceId; transMsg.info.hasEpSet = false; transMsg.info.cliVer = pInst->compatibilityVer; if (pCtx->pSem != NULL) { @@ -1810,27 +1810,27 @@ static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { pInst->cfp(pInst->parent, &transMsg, NULL); } - destroyCmsg(pMsg); + destroyCmsg(pReq); } -static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { +static void cliHandleQuit(SCliReq* pReq, SCliThrd* pThrd) { if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { - pThrd->stopMsg = pMsg; + pThrd->stopMsg = pReq; return; } pThrd->stopMsg = NULL; pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); - destroyCmsg(pMsg); + destroyCmsg(pReq); (void)destroyConnPool(pThrd); (void)uv_walk(pThrd->loop, cliWalkCb, NULL); } -static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { - int64_t refId = (int64_t)(pMsg->msg.info.handle); +static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd) { + int64_t refId = (int64_t)(pReq->msg.info.handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { tDebug("%" PRId64 " already released", refId); - destroyCmsg(pMsg); + destroyCmsg(pReq); return; } @@ -1843,27 +1843,27 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { if (T_REF_VAL_GET(conn) == 2) { transUnrefCliHandle(conn); - if (!transQueuePush(&conn->cliMsgs, pMsg)) { + if (!transQueuePush(&conn->reqMsgs, pReq)) { return; } cliSend(conn); } else { tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); - destroyCmsg(pMsg); + destroyCmsg(pReq); } } -static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { - STransConnCtx* pCtx = pMsg->ctx; +static void cliHandleUpdate(SCliReq* pReq, SCliThrd* pThrd) { + SReqCtx* pCtx = pReq->ctx; pThrd->cvtAddr = pCtx->cvtAddr; - destroyCmsg(pMsg); + destroyCmsg(pReq); } -static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) { +static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd) { int32_t code = 0; - int64_t refId = (int64_t)(pMsg->msg.info.handle); + int64_t refId = (int64_t)(pReq->msg.info.handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { tDebug("id %" PRId64 " already released", refId); - destroyCmsg(pMsg); + destroyCmsg(pReq); return; } @@ -1876,7 +1876,7 @@ static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) { } tDebug("do free conn %p by id %" PRId64 "", conn, refId); - int32_t size = transQueueSize(&conn->cliMsgs); + int32_t size = transQueueSize(&conn->reqMsgs); if (size == 0) { // already recv, and notify upper layer TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); @@ -1892,14 +1892,14 @@ _exception: (void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetRefMgt(), refId); (void)transRemoveExHandle(transGetRefMgt(), refId); - destroyCmsg(pMsg); + destroyCmsg(pReq); } -SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { - STransConnCtx* pCtx = (*pMsg)->ctx; - SCliConn* conn = NULL; +SCliConn* cliGetConn(SCliReq** pReq, SCliThrd* pThrd, bool* ignore, char* addr) { + SReqCtx* pCtx = (*pReq)->ctx; + SCliConn* conn = NULL; - int64_t refId = (int64_t)((*pMsg)->msg.info.handle); + int64_t refId = (int64_t)((*pReq)->msg.info.handle); if (refId != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { @@ -1911,7 +1911,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) conn = exh->handle; taosRUnLockLatch(&exh->latch); if (conn == NULL) { - conn = getConnFromPool2(pThrd, addr, pMsg); + conn = getConnFromPool2(pThrd, addr, pReq); if (conn != NULL) specifyConnRef(conn, true, refId); } (void)transReleaseExHandle(transGetRefMgt(), refId); @@ -1919,7 +1919,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) return conn; }; - conn = getConnFromPool2(pThrd, addr, pMsg); + conn = getConnFromPool2(pThrd, addr, pReq); if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { @@ -1945,30 +1945,30 @@ FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) return TSDB_CODE_RPC_FQDN_ERROR; } -FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { +FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx) { if (code != 0) return false; return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; } -FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { - if (pMsg == NULL) return -1; +FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* pResp) { + if (pReq == NULL) return -1; if (pResp->code == 0) { pResp->code = TSDB_CODE_RPC_BROKEN_LINK; } - pResp->msgType = pMsg->msg.msgType + 1; - pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL; - pResp->info.traceId = pMsg->msg.info.traceId; + pResp->msgType = pReq->msg.msgType + 1; + pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL; + pResp->info.traceId = pReq->msg.info.traceId; return 0; } -FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code) { +FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code) { STrans* pInst = pThrd->pInst; STransMsg resp = {.code = code}; - code = cliBuildExceptResp(pMsg, &resp); + code = cliBuildExceptResp(pReq, &resp); if (code != 0) { return code; } @@ -2036,15 +2036,15 @@ static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) { static void doFreeTimeoutMsg(void* param) { STaskArg* arg = param; - SCliMsg* pMsg = arg->param1; + SCliReq* pReq = arg->param1; SCliThrd* pThrd = arg->param2; STrans* pInst = pThrd->pInst; - QUEUE_REMOVE(&pMsg->q); - STraceId* trace = &pMsg->msg.info.traceId; + QUEUE_REMOVE(&pReq->q); + STraceId* trace = &pReq->msg.info.traceId; - tGTrace("%s msg %s cannot get available conn after timeout", pInst->label, TMSG_INFO(pMsg->msg.msgType)); - doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); + tGTrace("%s msg %s cannot get available conn after timeout", pInst->label, TMSG_INFO(pReq->msg.msgType)); + doNotifyCb(pReq, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); taosMemoryFree(arg); } @@ -2115,21 +2115,21 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { return code; } -void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { +void cliHandleReq__shareConn(SCliReq* pReq, SCliThrd* pThrd) { int32_t code = 0; - STraceId* trace = &pMsg->msg.info.traceId; + STraceId* trace = &pReq->msg.info.traceId; STrans* pInst = pThrd->pInst; - code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); if (code != 0) { // notifyCb - destroyCmsg(pMsg); + destroyCmsg(pReq); return; } char addr[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); + CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); if (pConn == NULL) { @@ -2138,12 +2138,12 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { pConn = getConnFromPool(pThrd, addr, &ignore); if (pConn != NULL) { addConnToHeapCache(pThrd->connHeapCache, pConn); - transQueuePush(&pConn->cliMsgs, pMsg); + transQueuePush(&pConn->reqMsgs, pReq); return cliSendBatch_shareConn(pConn); } } else { tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); - transQueuePush(&pConn->cliMsgs, pMsg); + transQueuePush(&pConn->reqMsgs, pReq); cliSendBatch_shareConn(pConn); return; } @@ -2152,57 +2152,57 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { pConn->dstAddr = taosStrdup(addr); code = addConnToHeapCache(pThrd->connHeapCache, pConn); - transQueuePush(&pConn->cliMsgs, pMsg); - return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); + transQueuePush(&pConn->reqMsgs, pReq); + return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); } -void cliHandleReq__noShareConn(SCliMsg* pMsg, SCliThrd* pThrd) { +void cliHandleReq__noShareConn(SCliReq* pReq, SCliThrd* pThrd) { int32_t code; STrans* pInst = pThrd->pInst; - code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); if (code != 0) { // notifyCb - destroyCmsg(pMsg); + destroyCmsg(pReq); } - char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); + char* fqdn = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); bool ignore = false; - SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr); + SCliConn* conn = cliGetConn(&pReq, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server STransMsg resp = {0}; - if (pMsg->type != Release) { - (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, 0); + if (pReq->type != Release) { + (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, 0); } - destroyCmsg(pMsg); + destroyCmsg(pReq); return; } - if (conn == NULL && pMsg == NULL) { + if (conn == NULL && pReq == NULL) { return; } - STraceId* trace = &pMsg->msg.info.traceId; + STraceId* trace = &pReq->msg.info.traceId; if (conn != NULL) { - transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); - (void)transQueuePush(&conn->cliMsgs, pMsg); + transCtxMerge(&conn->ctx, &pReq->ctx->appCtx); + (void)transQueuePush(&conn->reqMsgs, pReq); cliSend(conn); } else { code = cliCreateConn(pThrd, &conn); if (code != 0) { tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code)); - (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, code); - destroyCmsg(pMsg); + (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, code); + destroyCmsg(pReq); return; } - specifyConnRef(conn, true, (int64_t)pMsg->msg.info.handle); + specifyConnRef(conn, true, (int64_t)pReq->msg.info.handle); - transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); - (void)transQueuePush(&conn->cliMsgs, pMsg); + transCtxMerge(&conn->ctx, &pReq->ctx->appCtx); + (void)transQueuePush(&conn->reqMsgs, pReq); conn->dstAddr = taosStrdup(addr); if (conn->dstAddr == NULL) { @@ -2261,12 +2261,12 @@ void cliHandleReq__noShareConn(SCliMsg* pMsg, SCliThrd* pThrd) { tGTrace("%s conn %p ready", pInst->label, conn); } -void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { +void cliHandleReq(SCliReq* pReq, SCliThrd* pThrd) { STrans* pInst = pThrd->pInst; if (pInst->shareConn == 1) { - return cliHandleReq__shareConn(pMsg, pThrd); + return cliHandleReq__shareConn(pReq, pThrd); } else { - return cliHandleReq__noShareConn(pMsg, pThrd); + return cliHandleReq__noShareConn(pReq, pThrd); } } @@ -2277,13 +2277,13 @@ static void cliDealReq(queue* wq, SCliThrd* pThrd) { queue* h = QUEUE_HEAD(wq); QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - if (pMsg->type == Quit) { - pThrd->stopMsg = pMsg; + if (pReq->type == Quit) { + pThrd->stopMsg = pReq; continue; } - (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); + (*cliAsyncHandle[pReq->type])(pReq, pThrd); count++; } if (count >= 2) { @@ -2303,9 +2303,9 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); return batch; } -static void cliBuildBatch(SCliMsg* pMsg, queue* h, SCliThrd* pThrd) { - STrans* pInst = pThrd->pInst; - STransConnCtx* pCtx = pMsg->ctx; +static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { + STrans* pInst = pThrd->pInst; + SReqCtx* pCtx = pReq->ctx; return; } static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { @@ -2348,7 +2348,7 @@ static void destroyBatchList(SCliBatchList* pList) { taosMemoryFree(pList->dst); taosMemoryFree(pList); } -static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliMsg* pMsg) { +static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); if (pBatch == NULL) { tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); @@ -2358,9 +2358,9 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliMsg* p QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->listq); - QUEUE_PUSH(&pBatch->wq, &pMsg->q); + QUEUE_PUSH(&pBatch->wq, &pReq->q); pBatch->wLen += 1; - pBatch->batchSize = pMsg->msg.contLen; + pBatch->batchSize = pReq->msg.contLen; pBatch->pList = pList; QUEUE_PUSH(&pList->wq, &pBatch->listq); @@ -2378,15 +2378,15 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { queue* h = QUEUE_HEAD(wq); QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { - cliBuildBatch(pMsg, h, pThrd); + if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) { + cliBuildBatch(pReq, h, pThrd); continue; } - if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { - STransConnCtx* pCtx = pMsg->ctx; + if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) { + SReqCtx* pCtx = pReq->ctx; char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); @@ -2398,17 +2398,17 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliBatchList* pBatchList = NULL; code = createBatchList(&pBatchList, key, ip, port); if (code != 0) { - destroyCmsg(pMsg); + destroyCmsg(pReq); continue; } pBatchList->batchLenLimit = pInst->batchSize; SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, pBatchList, pMsg); + code = createBatch(&pBatch, pBatchList, pReq); if (code != 0) { destroyBatchList(pBatchList); - destroyCmsg(pMsg); + destroyCmsg(pReq); continue; } @@ -2419,30 +2419,30 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, *ppBatchList, pMsg); + code = createBatch(&pBatch, *ppBatchList, pReq); if (code != 0) { - destroyCmsg(pMsg); + destroyCmsg(pReq); cliDestroyBatch(pBatch); } } else { queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { + if ((pBatch->batchSize + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { QUEUE_PUSH(&pBatch->wq, h); - pBatch->batchSize += pMsg->msg.contLen; + pBatch->batchSize += pReq->msg.contLen; pBatch->wLen += 1; } else { SCliBatch* tBatch = NULL; - code = createBatch(&tBatch, *ppBatchList, pMsg); + code = createBatch(&tBatch, *ppBatchList, pReq); if (code != 0) { - destroyCmsg(pMsg); + destroyCmsg(pReq); } } } } continue; } - (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); + (*cliAsyncHandle[pReq->type])(pReq, pThrd); count++; } @@ -2481,9 +2481,9 @@ void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { transCtxCleanup(&conn->ctx); cliReleaseUnfinishedMsg(conn); if (destroy == 1) { - transQueueDestroy(&conn->cliMsgs); + transQueueDestroy(&conn->reqMsgs); } else { - transQueueClear(&conn->cliMsgs); + transQueueClear(&conn->reqMsgs); } } @@ -2491,8 +2491,8 @@ void cliConnFreeMsgs(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { - SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i); + for (int i = 0; i < transQueueSize(&conn->reqMsgs); i++) { + SCliReq* cmsg = transQueueGet(&conn->reqMsgs, i); if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { continue; } @@ -2507,7 +2507,7 @@ void cliConnFreeMsgs(SCliConn* conn) { bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { uint64_t ahandle = pHead->ahandle; - SCliMsg* pMsg = NULL; + SCliReq* pReq = NULL; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, conn->refId); @@ -2515,10 +2515,10 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { (void)transClearBuffer(&conn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); - for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) { - SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i); + for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqMsgs); i++) { + SCliReq* cliMsg = transQueueGet(&conn->reqMsgs, i); if (cliMsg->type == Release) { - ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req"); + ASSERTS(pReq == NULL, "trans-cli recv invaid release-req"); tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, conn->refId); cliDestroyConn(conn, true); @@ -2529,7 +2529,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { cliConnFreeMsgs(conn); tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); - destroyCmsg(pMsg); + destroyCmsg(pReq); addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); return true; @@ -2596,45 +2596,45 @@ _err: } static FORCE_INLINE void destroyCmsg(void* arg) { - SCliMsg* pMsg = arg; - if (pMsg == NULL) { + SCliReq* pReq = arg; + if (pReq == NULL) { return; } - tDebug("free memory:%p, free ctx: %p", pMsg, pMsg->ctx); + tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); - transDestroyConnCtx(pMsg->ctx); - transFreeMsg(pMsg->msg.pCont); - taosMemoryFree(pMsg); + transDestroyConnCtx(pReq->ctx); + transFreeMsg(pReq->msg.pCont); + taosMemoryFree(pReq); } static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { if (arg == NULL) return; - SCliMsg* pMsg = arg; + SCliReq* pReq = arg; SCliThrd* pThrd = param; - if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) { - if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); + if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL) { + if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pReq->msg.info.ahandle); } - destroyCmsg(pMsg); + destroyCmsg(pReq); } static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { if (param == NULL) return; STaskArg* arg = param; - SCliMsg* pMsg = arg->param1; + SCliReq* pReq = arg->param1; SCliThrd* pThrd = arg->param2; - if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - pThrd->destroyAhandleFp(pMsg->ctx->ahandle); + if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + pThrd->destroyAhandleFp(pReq->ctx->ahandle); } - if (pMsg->msg.info.handle != 0) { - (void)transReleaseExHandle(transGetRefMgt(), (int64_t)pMsg->msg.info.handle); - (void)transRemoveExHandle(transGetRefMgt(), (int64_t)pMsg->msg.info.handle); + if (pReq->msg.info.handle != 0) { + (void)transReleaseExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); + (void)transRemoveExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); } - transDestroyConnCtx(pMsg->ctx); - transFreeMsg(pMsg->msg.pCont); - taosMemoryFree(pMsg); + transDestroyConnCtx(pReq->ctx); + transFreeMsg(pReq->msg.pCont); + taosMemoryFree(pReq); } static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { @@ -2757,7 +2757,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { (void)taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); (void)taosThreadMutexDestroy(&pThrd->msgMtx); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsgWrapper, (void*)pThrd); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliReq, destroyCmsgWrapper, (void*)pThrd); transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); @@ -2803,7 +2803,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd); } -static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) { +static FORCE_INLINE void transDestroyConnCtx(SReqCtx* ctx) { // taosMemoryFree(ctx); } @@ -2811,7 +2811,7 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) { int32_t cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully int32_t code = 0; - SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); + SCliReq* msg = taosMemoryCalloc(1, sizeof(SCliReq)); if (msg == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -2852,7 +2852,7 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pInst) { } static FORCE_INLINE void doDelayTask(void* param) { STaskArg* arg = param; - cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2); + cliHandleReq((SCliReq*)arg->param1, (SCliThrd*)arg->param2); taosMemoryFree(arg); } @@ -2864,30 +2864,41 @@ static FORCE_INLINE void doCloseIdleConn(void* param) { cliDestroyConn(conn, true); taosMemoryFree(arg); } -static FORCE_INLINE void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) { +static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) { if (!(rpcDebugFlag & DEBUG_DEBUG)) { return; } - STransConnCtx* pCtx = pMsg->ctx; - STraceId* trace = &pMsg->msg.info.traceId; - char tbuf[512] = {0}; + SReqCtx* pCtx = pReq->ctx; + STraceId* trace = &pReq->msg.info.traceId; + char tbuf[512] = {0}; (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, pCtx->retryNextInterval); return; } +static FORCE_INLINE void cliPerfLog_epset(SCliConn* pConn, SCliReq* pReq) { + if (!(rpcDebugFlag & DEBUG_TRACE)) { + return; + } + SReqCtx* pCtx = pReq->ctx; -static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pInst = pThrd->pInst; - STransConnCtx* pCtx = pMsg->ctx; - cliPerfLog_schedMsg(pMsg, transLabel(pThrd->pInst)); + char tbuf[512] = {0}; + (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + tTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); + return; +} + +static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliReq* pReq, SCliThrd* pThrd) { + STrans* pInst = pThrd->pInst; + SReqCtx* pCtx = pReq->ctx; + cliPerfLog_schedMsg(pReq, transLabel(pThrd->pInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); if (arg == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - arg->param1 = pMsg; + arg->param1 = pReq; arg->param2 = pThrd; SDelayTask* pTask = transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); @@ -2898,7 +2909,10 @@ static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd return 0; } -FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { +FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { + SReqCtx* ctx = pReq->ctx; + SEpSet* dst = &ctx->epSet; + if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; } @@ -2920,10 +2934,12 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { pResp->pCont = buf; pResp->contLen = len; + pResp->info.hasEpSet = 1; + epsetAssign(dst, &epset); return true; } -bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) { +bool cliResetEpset(SReqCtx* pCtx, STransMsg* pResp, bool hasEpSet) { bool noDelay = true; if (hasEpSet == false) { if (pResp->contLen == 0) { @@ -2989,16 +3005,8 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) { return noDelay; } -int8_t cliRetryShouldRetry(STrans* pInst, STransMsg* pResp) { - bool retry = pInst->retry != NULL ? pInst->retry(pResp->code, pResp->msgType - 1) : false; - if (retry == false) { - return 0; - } - - return 1; -} -void cliRetryMayInitCtx(STrans* pInst, SCliMsg* pMsg) { - STransConnCtx* pCtx = pMsg->ctx; +void cliRetryMayInitCtx(STrans* pInst, SCliReq* pReq) { + SReqCtx* pCtx = pReq->ctx; if (!pCtx->retryInit) { pCtx->retryMinInterval = pInst->retryMinInterval; pCtx->retryMaxInterval = pInst->retryMaxInterval; @@ -3009,30 +3017,67 @@ void cliRetryMayInitCtx(STrans* pInst, SCliMsg* pMsg) { pCtx->retryStep = 0; pCtx->retryInit = true; pCtx->retryCode = TSDB_CODE_SUCCESS; - pMsg->msg.info.handle = 0; + pReq->msg.info.handle = 0; } } -int32_t cliRetryIsTimeout(STrans* pInst, SCliMsg* pMsg) { - STransConnCtx* pCtx = pMsg->ctx; + +int32_t cliRetryIsTimeout(STrans* pInst, SCliReq* pReq) { + SReqCtx* pCtx = pReq->ctx; if (pCtx->retryMaxTimeout != -1 && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { return 1; } return 0; } -bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { + +int8_t cliRetryShouldRetry(STrans* pInst, STransMsg* pResp) { + bool retry = pInst->retry != NULL ? pInst->retry(pResp->code, pResp->msgType - 1) : false; + if (retry == false) { + return 0; + } + + return 1; +} + +void cliRetryUpdate(SReqCtx* pCtx, int8_t noDelay) { + if (noDelay == false) { + pCtx->epsetRetryCnt = 1; + pCtx->retryStep++; + + int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1); + pCtx->retryNextInterval = factor * pCtx->retryMinInterval; + if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) { + pCtx->retryNextInterval = pCtx->retryMaxInterval; + } + } else { + pCtx->retryNextInterval = 0; + pCtx->epsetRetryCnt++; + } +} + +int32_t cliRetryDoSched(SCliReq* pReq, SCliThrd* pThrd) { + pReq->sent = 0; + int32_t code = cliSchedMsgToNextNode(pReq, pThrd); + if (code != 0) { + tError("failed to sched msg to next node, reason:%s", tstrerror(code)); + return code; + } + return 0; +} + +bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - STransConnCtx* pCtx = pMsg->ctx; - int32_t code = pResp->code; + SReqCtx* pCtx = pReq->ctx; + int32_t code = pResp->code; - cliRetryMayInitCtx(pInst, pMsg); + cliRetryMayInitCtx(pInst, pReq); if (!cliRetryShouldRetry(pInst, pResp)) { return false; } - if (cliRetryIsTimeout(pInst, pMsg)) { + if (cliRetryIsTimeout(pInst, pReq)) { return false; } // code, msgType @@ -3071,22 +3116,11 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pCtx->retryCode = code; } - if (noDelay == false) { - pCtx->epsetRetryCnt = 1; - pCtx->retryStep++; + cliRetryUpdate(pCtx, noDelay); - int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1); - pCtx->retryNextInterval = factor * pCtx->retryMinInterval; - if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) { - pCtx->retryNextInterval = pCtx->retryMaxInterval; - } - } else { - pCtx->retryNextInterval = 0; - pCtx->epsetRetryCnt++; - } + pReq->sent = 0; - pMsg->sent = 0; - code = cliSchedMsgToNextNode(pMsg, pThrd); + code = cliRetryDoSched(pReq, pThrd); if (code != 0) { pResp->code = code; tError("failed to sched msg to next node, reason:%s", tstrerror(code)); @@ -3094,7 +3128,9 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } return true; } -void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) { + +void cliMayResetRespCode(SCliReq* pReq, STransMsg* pResp) { + SReqCtx* pCtx = pReq->ctx; if (pCtx->retryCode != TSDB_CODE_SUCCESS) { int32_t code = pResp->code; // return internal code app @@ -3113,33 +3149,13 @@ void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) { } } } -int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { + +int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - - if (pMsg == NULL || pMsg->ctx == NULL) { - tTrace("%s conn %p handle resp", pInst->label, pConn); - pInst->cfp(pInst->parent, pResp, NULL); - return 0; - } - - bool retry = cliGenRetryRule(pConn, pResp, pMsg); - if (retry == true) { - return -1; - } - - STransConnCtx* pCtx = pMsg->ctx; - cliMayReSetRespCode(pCtx, pResp); - + SReqCtx* pCtx = pReq->ctx; STraceId* trace = &pResp->info.traceId; - bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); - if (hasEpSet) { - if (rpcDebugFlag & DEBUG_TRACE) { - char tbuf[512] = {0}; - (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); - tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); - } - } + if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pSem) { @@ -3166,7 +3182,7 @@ int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } else { tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); - if (retry == false && hasEpSet == true) { + if (pResp->info.hasEpSet == 1) { pInst->cfp(pInst->parent, pResp, &pCtx->epSet); } else { if (!cliIsEpsetUpdated(pResp->code, pCtx)) { @@ -3178,6 +3194,28 @@ int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } return 0; } +int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + + if (pReq == NULL || pReq->ctx == NULL) { + tTrace("%s conn %p handle resp", pInst->label, pConn); + pInst->cfp(pInst->parent, pResp, NULL); + return 0; + } + + bool retry = cliMayRetry(pConn, pReq, pResp); + if (retry == true) { + return -1; + } + + cliMayResetRespCode(pReq, pResp); + + if (cliTryUpdateEpset(pReq, pResp)) { + cliPerfLog_epset(pConn, pReq); + } + return cliNotifyImplCb(pConn, pReq, pResp); +} void transCloseClient(void* arg) { int32_t code = 0; @@ -3251,14 +3289,14 @@ int32_t transReleaseCliHandle(void* handle) { STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527}; TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); - STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); if (pCtx == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } pCtx->ahandle = tmsg.info.ahandle; - SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + SCliReq* cmsg = taosMemoryCalloc(1, sizeof(SCliReq)); if (cmsg == NULL) { taosMemoryFree(pCtx); return TSDB_CODE_OUT_OF_MEMORY; @@ -3278,10 +3316,10 @@ int32_t transReleaseCliHandle(void* handle) { return code; } -static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliMsg** pCliMsg) { +static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliReq** pCliMsg) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); - STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); if (pCtx == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -3294,7 +3332,7 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq if (ctx != NULL) pCtx->appCtx = *ctx; - SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + SCliReq* cliMsg = taosMemoryCalloc(1, sizeof(SCliReq)); if (cliMsg == NULL) { taosMemoryFree(pCtx); return TSDB_CODE_OUT_OF_MEMORY; @@ -3331,7 +3369,7 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S if (exh != NULL) { taosWLockLatch(&exh->latch); if (exh->handle == NULL && exh->inited != 0) { - SCliMsg* pCliMsg = NULL; + SCliReq* pCliMsg = NULL; code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); if (code != 0) { taosWUnLockLatch(&exh->latch); @@ -3352,7 +3390,7 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S } } - SCliMsg* pCliMsg = NULL; + SCliReq* pCliMsg = NULL; TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception); STraceId* trace = &pReq->info.traceId; @@ -3398,7 +3436,7 @@ int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* p pReq->info.handle = (void*)(*transpointId); - SCliMsg* pCliMsg = NULL; + SCliReq* pCliMsg = NULL; TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception); STraceId* trace = &pReq->info.traceId; @@ -3451,7 +3489,7 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); - STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); if (pCtx == NULL) { (void)tsem_destroy(sem); taosMemoryFree(sem); @@ -3465,7 +3503,7 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra pCtx->pSem = sem; pCtx->pRsp = pTransRsp; - SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + SCliReq* cliMsg = taosMemoryCalloc(1, sizeof(SCliReq)); if (cliMsg == NULL) { (void)tsem_destroy(sem); taosMemoryFree(sem); @@ -3563,7 +3601,7 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); - STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); if (pCtx == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); } @@ -3585,7 +3623,7 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _RETURN2); } - SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + SCliReq* cliMsg = taosMemoryCalloc(1, sizeof(SCliReq)); if (cliMsg == NULL) { taosMemoryFree(pCtx); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); @@ -3651,7 +3689,7 @@ int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int32_t code = 0; for (int8_t i = 0; i < pInst->numOfThreads; i++) { - STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); if (pCtx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; break; @@ -3659,7 +3697,7 @@ int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { pCtx->cvtAddr = cvtAddr; - SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + SCliReq* cliMsg = taosMemoryCalloc(1, sizeof(SCliReq)); if (cliMsg == NULL) { taosMemoryFree(pCtx); code = TSDB_CODE_OUT_OF_MEMORY; @@ -3726,7 +3764,7 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); } - SCliMsg* pCli = taosMemoryCalloc(1, sizeof(SCliMsg)); + SCliReq* pCli = taosMemoryCalloc(1, sizeof(SCliReq)); if (pCli == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } @@ -3752,7 +3790,7 @@ _exception: int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node); - if (transQueueSize(&args1->cliMsgs) > transQueueSize(&args2->cliMsgs)) { + if (transQueueSize(&args1->reqMsgs) > transQueueSize(&args2->reqMsgs)) { return 0; } return 1;