diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f4d5fcfe16..b811d192bd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -142,7 +142,7 @@ typedef struct { tmsg_t msgType; // message type int8_t connType; // connection type cli/srv - STransCtx appCtx; // + STransCtx userCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API STransSyncMsg* pSyncMsg; // for syncchronous with timeout API @@ -318,24 +318,24 @@ void transUnrefCliHandle(void* handle); int32_t transReleaseCliHandle(void* handle); int32_t transReleaseSrvHandle(void* handle); -int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx); -int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp); -int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, +int32_t transSendRequest(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx); +int32_t transSendRecv(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp); +int32_t transSendRecvWithTimeout(void* pInit, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs); -int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId); -int32_t transFreeConnById(void* shandle, int64_t transpointId); +int32_t transSendRequestWithId(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId); +int32_t transFreeConnById(void* pInit, int64_t transpointId); int32_t transSendResponse(const STransMsg* msg); int32_t transRegisterMsg(const STransMsg* msg); -int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); -int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); +int32_t transSetDefaultAddr(void* pInit, const char* ip, const char* fqdn); +int32_t transSetIpWhiteList(void* pInit, void* arg, FilteFunc* func); int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst); int32_t transAllocHandle(int64_t* refId); -void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit); +void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit); void transCloseClient(void* arg); void transCloseServer(void* arg); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index f56c435cba..8199ee21c9 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -32,8 +32,8 @@ extern "C" { #endif -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit); +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit); void taosCloseServer(void* arg); void taosCloseClient(void* arg); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 6b0b557f1c..ff53d01ca0 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -15,7 +15,7 @@ #include "transComm.h" -void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = { +void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* pInit) = { transInitServer, transInitClient}; void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; @@ -103,7 +103,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->timeToGetConn = 10 * 1000; } pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn; - + pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); @@ -167,29 +167,29 @@ void* rpcReallocCont(void* ptr, int64_t contLen) { return st + TRANS_MSG_OVERHEAD; } -int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { - return transSendRequest(shandle, pEpSet, pMsg, NULL); +int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { + return transSendRequest(pInit, pEpSet, pMsg, NULL); } -int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { - if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0|| pRid == NULL) { - return transSendRequest(shandle, pEpSet, pMsg, pCtx); +int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) { + return transSendRequest(pInit, pEpSet, pMsg, pCtx); } else { - return transSendRequestWithId(shandle, pEpSet, pMsg, pRid); + return transSendRequestWithId(pInit, pEpSet, pMsg, pRid); } } -int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { - return transSendRequestWithId(shandle, pEpSet, pReq, transpointId); +int32_t rpcSendRequestWithId(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { + return transSendRequestWithId(pInit, pEpSet, pReq, transpointId); } -int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { - return transSendRecv(shandle, pEpSet, pMsg, pRsp); +int32_t rpcSendRecv(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { + return transSendRecv(pInit, pEpSet, pMsg, pRsp); } -int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated, +int32_t rpcSendRecvWithTimeout(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { - return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs); + return transSendRecvWithTimeout(pInit, pEpSet, pMsg, pRsp, epUpdated, timeoutMs); } -int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); } +int32_t rpcFreeConnById(void* pInit, int64_t connId) { return transFreeConnById(pInit, connId); } int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 537563664e..5218d376ae 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -98,7 +98,7 @@ typedef struct SCliReq { queue q; STransMsgType type; - int64_t refId; + // int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) queue seqq; @@ -232,12 +232,12 @@ static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd); static void (*cliAsyncHandle[])(SCliReq* pReq, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate, cliHandleFreeById}; -static FORCE_INLINE void destroyCmsg(void* cmsg); +static FORCE_INLINE void destroyReq(void* cmsg); -static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param); -static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); +static FORCE_INLINE void destroyReqWrapper(void* arg, void* param); +static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); -static FORCE_INLINE void transDestroyConnCtx(SReqCtx* ctx); +static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); @@ -361,7 +361,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd->destroyAhandleFp(msg->ctx->ahandle); } } - destroyCmsg(msg); + destroyReq(msg); } transQueueClear(&conn->reqMsgs); memset(&conn->ctx, 0, sizeof(conn->ctx)); @@ -393,7 +393,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { QUEUE_REMOVE(h); taosWUnLockLatch(&exh->latch); SCliReq* t = QUEUE_DATA(h, SCliReq, seqq); - transCtxMerge(&conn->ctx, &t->ctx->appCtx); + transCtxMerge(&conn->ctx, &t->ctx->userCtx); (void)transQueuePush(&conn->reqMsgs, t); tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); (void)transReleaseExHandle(transGetRefMgt(), refId); @@ -483,10 +483,11 @@ void cliHandleResp_shareConn(SCliConn* conn) { if (ret != 0) { return; } else { - destroyCmsg(pReq); + destroyReq(pReq); } } void cliHandleResp(SCliConn* conn) { + int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; @@ -502,6 +503,7 @@ void cliHandleResp(SCliConn* conn) { if (msgLen <= 0) { taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + // TODO: notify cb return; } @@ -509,8 +511,9 @@ void cliHandleResp(SCliConn* conn) { tTrace("%s conn %p not reset read buf", transLabel(pInst), conn); } - if (transDecompressMsg((char**)&pHead, msgLen) < 0) { + if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) { tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); + // TODO: notify cb } pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); @@ -584,7 +587,7 @@ void cliHandleResp(SCliConn* conn) { } int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); tDebug("conn %p msg refId: %" PRId64 "", conn, refId); - destroyCmsg(pReq); + destroyReq(pReq); if (cliConnSendSeqMsg(refId, conn)) { return; @@ -609,7 +612,7 @@ static void cliDestroyMsgInExhandle(int64_t refId) { queue* h = QUEUE_HEAD(&exh->q); QUEUE_REMOVE(h); SCliReq* t = QUEUE_DATA(h, SCliReq, seqq); - destroyCmsg(t); + destroyReq(t); } taosWUnLockLatch(&exh->latch); (void)transReleaseExHandle(transGetRefMgt(), refId); @@ -636,7 +639,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { } if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) { - destroyCmsg(pReq); + destroyReq(pReq); break; } @@ -666,7 +669,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (pCtx == NULL || pCtx->pSem == NULL) { if (transMsg.info.ahandle == NULL) { if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) { - destroyCmsg(pReq); + destroyReq(pReq); once = true; continue; } @@ -680,7 +683,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { return; } } - destroyCmsg(pReq); + destroyReq(pReq); tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); } while (!transQueueEmpty(&pConn->reqMsgs)); if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); @@ -937,7 +940,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { transDQCancel(thrd->waitConnQueue, pReq->ctx->task); pReq->ctx->task = NULL; - transCtxMerge(&conn->ctx, &pReq->ctx->appCtx); + transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); (void)transQueuePush(&conn->reqMsgs, pReq); conn->status = ConnNormal; @@ -1218,7 +1221,7 @@ static bool cliHandleNoResp(SCliConn* conn) { SCliReq* pReq = transQueueGet(&conn->reqMsgs, 0); if (REQUEST_NO_RESP(&pReq->msg)) { (void)transQueuePop(&conn->reqMsgs); - destroyCmsg(pReq); + destroyReq(pReq); res = true; } if (res == true) { @@ -1291,7 +1294,7 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { continue; } else { // already notify user - destroyCmsg(pReq); + destroyReq(pReq); } } @@ -1518,17 +1521,6 @@ void cliSend(SCliConn* pConn) { STraceId* trace = &pReq->info.traceId; - if (pInst->startTimer != NULL && pInst->startTimer(0, pReq->msgType)) { - uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; - if (timer == NULL) { - timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); - tDebug("no available timer, create a timer %p", timer); - (void)uv_timer_init(pThrd->loop, timer); - } - timer->data = pConn; - pConn->timer = timer; - } - if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead); @@ -1568,7 +1560,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { QUEUE_REMOVE(h); SCliReq* p = QUEUE_DATA(h, SCliReq, q); - destroyCmsg(p); + destroyReq(p); } SCliBatchList* p = pBatch->pList; p->sending -= 1; @@ -1810,7 +1802,7 @@ static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { pInst->cfp(pInst->parent, &transMsg, NULL); } - destroyCmsg(pReq); + destroyReq(pReq); } static void cliHandleQuit(SCliReq* pReq, SCliThrd* pThrd) { if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { @@ -1820,7 +1812,7 @@ static void cliHandleQuit(SCliReq* pReq, SCliThrd* pThrd) { pThrd->stopMsg = NULL; pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); - destroyCmsg(pReq); + destroyReq(pReq); (void)destroyConnPool(pThrd); (void)uv_walk(pThrd->loop, cliWalkCb, NULL); @@ -1830,7 +1822,7 @@ static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { tDebug("%" PRId64 " already released", refId); - destroyCmsg(pReq); + destroyReq(pReq); return; } @@ -1849,13 +1841,13 @@ static void cliHandleRelease(SCliReq* pReq, SCliThrd* pThrd) { cliSend(conn); } else { tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); - destroyCmsg(pReq); + destroyReq(pReq); } } static void cliHandleUpdate(SCliReq* pReq, SCliThrd* pThrd) { SReqCtx* pCtx = pReq->ctx; pThrd->cvtAddr = pCtx->cvtAddr; - destroyCmsg(pReq); + destroyReq(pReq); } static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd) { int32_t code = 0; @@ -1863,7 +1855,7 @@ static void cliHandleFreeById(SCliReq* pReq, SCliThrd* pThrd) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { tDebug("id %" PRId64 " already released", refId); - destroyCmsg(pReq); + destroyReq(pReq); return; } @@ -1892,7 +1884,7 @@ _exception: (void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetRefMgt(), refId); (void)transRemoveExHandle(transGetRefMgt(), refId); - destroyCmsg(pReq); + destroyReq(pReq); } SCliConn* cliGetConn(SCliReq** pReq, SCliThrd* pThrd, bool* ignore, char* addr) { @@ -2116,158 +2108,158 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { } void cliHandleReq__shareConn(SCliReq* pReq, SCliThrd* pThrd) { - int32_t code = 0; + // int32_t code = 0; - STraceId* trace = &pReq->msg.info.traceId; - STrans* pInst = pThrd->pInst; + // STraceId* trace = &pReq->msg.info.traceId; + // STrans* pInst = pThrd->pInst; - code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); - if (code != 0) { - // notifyCb - destroyCmsg(pReq); - return; - } + // code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); + // if (code != 0) { + // // TODO: notifyCb + // destroyReq(pReq); + // return; + // } - char addr[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + // char addr[TSDB_FQDN_LEN + 64] = {0}; + // CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); - SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); - if (pConn == NULL) { - tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); - bool ignore = false; - pConn = getConnFromPool(pThrd, addr, &ignore); - if (pConn != NULL) { - addConnToHeapCache(pThrd->connHeapCache, pConn); - transQueuePush(&pConn->reqMsgs, pReq); - return cliSendBatch_shareConn(pConn); - } - } else { - tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); - transQueuePush(&pConn->reqMsgs, pReq); - cliSendBatch_shareConn(pConn); - return; - } + // SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); + // if (pConn == NULL) { + // tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + // bool ignore = false; + // pConn = getConnFromPool(pThrd, addr, &ignore); + // if (pConn != NULL) { + // addConnToHeapCache(pThrd->connHeapCache, pConn); + // transQueuePush(&pConn->reqMsgs, pReq); + // return cliSendBatch_shareConn(pConn); + // } + // } else { + // tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + // transQueuePush(&pConn->reqMsgs, pReq); + // cliSendBatch_shareConn(pConn); + // return; + // } - code = cliCreateConn(pThrd, &pConn); - pConn->dstAddr = taosStrdup(addr); - code = addConnToHeapCache(pThrd->connHeapCache, pConn); + // code = cliCreateConn(pThrd, &pConn); + // pConn->dstAddr = taosStrdup(addr); + // code = addConnToHeapCache(pThrd->connHeapCache, pConn); - transQueuePush(&pConn->reqMsgs, pReq); - return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + // transQueuePush(&pConn->reqMsgs, pReq); + // return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); } void cliHandleReq__noShareConn(SCliReq* pReq, SCliThrd* pThrd) { - int32_t code; - STrans* pInst = pThrd->pInst; - code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); - if (code != 0) { - // notifyCb - destroyCmsg(pReq); - } + // int32_t code; + // STrans* pInst = pThrd->pInst; + // code = cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); + // if (code != 0) { + // // notifyCb + // destroyReq(pReq); + // } - char* fqdn = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); - char addr[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); + // char* fqdn = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); + // uint16_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + // char addr[TSDB_FQDN_LEN + 64] = {0}; + // CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); - bool ignore = false; - SCliConn* conn = cliGetConn(&pReq, pThrd, &ignore, addr); - if (ignore == true) { - // persist conn already release by server - STransMsg resp = {0}; - if (pReq->type != Release) { - (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, 0); - } - destroyCmsg(pReq); - return; - } - if (conn == NULL && pReq == NULL) { - return; - } - STraceId* trace = &pReq->msg.info.traceId; + // bool ignore = false; + // SCliConn* conn = cliGetConn(&pReq, pThrd, &ignore, addr); + // if (ignore == true) { + // // persist conn already release by server + // STransMsg resp = {0}; + // if (pReq->type != Release) { + // (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, 0); + // } + // destroyReq(pReq); + // return; + // } + // if (conn == NULL && pReq == NULL) { + // return; + // } + // STraceId* trace = &pReq->msg.info.traceId; - if (conn != NULL) { - transCtxMerge(&conn->ctx, &pReq->ctx->appCtx); - (void)transQueuePush(&conn->reqMsgs, pReq); - cliSend(conn); - } else { - code = cliCreateConn(pThrd, &conn); - if (code != 0) { - tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code)); - (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, code); - destroyCmsg(pReq); - return; - } + // if (conn != NULL) { + // transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); + // (void)transQueuePush(&conn->reqMsgs, pReq); + // cliSend(conn); + // } else { + // code = cliCreateConn(pThrd, &conn); + // if (code != 0) { + // tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code)); + // (void)cliBuildExceptRespAndNotifyCb(pThrd, pReq, code); + // destroyReq(pReq); + // return; + // } - specifyConnRef(conn, true, (int64_t)pReq->msg.info.handle); + // specifyConnRef(conn, true, (int64_t)pReq->msg.info.handle); - transCtxMerge(&conn->ctx, &pReq->ctx->appCtx); - (void)transQueuePush(&conn->reqMsgs, pReq); + // transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); + // (void)transQueuePush(&conn->reqMsgs, pReq); - conn->dstAddr = taosStrdup(addr); - if (conn->dstAddr == NULL) { - tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pInst), conn, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - cliHandleFastFail(conn, -1); - return; - } + // conn->dstAddr = taosStrdup(addr); + // if (conn->dstAddr == NULL) { + // tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pInst), conn, + // tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + // cliHandleFastFail(conn, -1); + // return; + // } - uint32_t ipaddr; - int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); - if (code != 0) { - cliResetConnTimer(conn); - cliHandleExcept(conn, code); - return; - } + // uint32_t ipaddr; + // int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); + // if (code != 0) { + // cliResetConnTimer(conn); + // cliHandleExcept(conn, code); + // return; + // } - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = ipaddr; - addr.sin_port = (uint16_t)htons(port); + // struct sockaddr_in addr; + // addr.sin_family = AF_INET; + // addr.sin_addr.s_addr = ipaddr; + // addr.sin_port = (uint16_t)htons(port); - tGTrace("%s conn %p try to connect to %s", pInst->label, conn, conn->dstAddr); - int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); - if (fd == -1) { - tGError("%s conn %p failed to create socket, reason:%s", transLabel(pInst), conn, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - cliHandleExcept(conn, -1); - errno = 0; - return; - } + // tGTrace("%s conn %p try to connect to %s", pInst->label, conn, conn->dstAddr); + // int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); + // if (fd == -1) { + // tGError("%s conn %p failed to create socket, reason:%s", transLabel(pInst), conn, + // tstrerror(TAOS_SYSTEM_ERROR(errno))); + // cliHandleExcept(conn, -1); + // errno = 0; + // return; + // } - int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); - if (ret != 0) { - tGError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - cliHandleExcept(conn, -1); - return; - } + // int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); + // if (ret != 0) { + // tGError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); + // cliHandleExcept(conn, -1); + // return; + // } - ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); - if (ret != 0) { - tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - cliHandleExcept(conn, -1); - return; - } + // ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); + // if (ret != 0) { + // tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); + // cliHandleExcept(conn, -1); + // return; + // } - ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); - if (ret != 0) { - cliResetConnTimer(conn); - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, ret); - return; - } - (void)uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); - } - tGTrace("%s conn %p ready", pInst->label, conn); + // ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + // if (ret != 0) { + // cliResetConnTimer(conn); + // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + // cliHandleFastFail(conn, ret); + // return; + // } + // (void)uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + // } + // tGTrace("%s conn %p ready", pInst->label, conn); } void cliHandleReq(SCliReq* pReq, SCliThrd* pThrd) { - STrans* pInst = pThrd->pInst; - if (pInst->shareConn == 1) { - return cliHandleReq__shareConn(pReq, pThrd); - } else { - return cliHandleReq__noShareConn(pReq, pThrd); - } + // STrans* pInst = pThrd->pInst; + // if (pInst->shareConn == 1) { + // return cliHandleReq__shareConn(pReq, pThrd); + // } else { + // return cliHandleReq__noShareConn(pReq, pThrd); + // } } static void cliDealReq(queue* wq, SCliThrd* pThrd) { @@ -2398,7 +2390,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliBatchList* pBatchList = NULL; code = createBatchList(&pBatchList, key, ip, port); if (code != 0) { - destroyCmsg(pReq); + destroyReq(pReq); continue; } @@ -2408,7 +2400,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { code = createBatch(&pBatch, pBatchList, pReq); if (code != 0) { destroyBatchList(pBatchList); - destroyCmsg(pReq); + destroyReq(pReq); continue; } @@ -2421,7 +2413,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliBatch* pBatch = NULL; code = createBatch(&pBatch, *ppBatchList, pReq); if (code != 0) { - destroyCmsg(pReq); + destroyReq(pReq); cliDestroyBatch(pBatch); } } else { @@ -2435,7 +2427,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliBatch* tBatch = NULL; code = createBatch(&tBatch, *ppBatchList, pReq); if (code != 0) { - destroyCmsg(pReq); + destroyReq(pReq); } } } @@ -2529,7 +2521,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { cliConnFreeMsgs(conn); tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); - destroyCmsg(pReq); + destroyReq(pReq); addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); return true; @@ -2537,6 +2529,48 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { return false; } +static FORCE_INLINE void destroyReq(void* arg) { + SCliReq* pReq = arg; + if (pReq == NULL) { + return; + } + tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); + + destroyReqCtx(pReq->ctx); + transFreeMsg(pReq->msg.pCont); + taosMemoryFree(pReq); +} +static FORCE_INLINE void destroyReqWrapper(void* arg, void* param) { + if (arg == NULL) return; + + SCliReq* pReq = arg; + SCliThrd* pThrd = param; + if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL) { + if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pReq->msg.info.ahandle); + } + destroyReq(pReq); +} +static FORCE_INLINE void destroyReqAndAhanlde(void* param) { + if (param == NULL) return; + + STaskArg* arg = param; + SCliReq* pReq = arg->param1; + SCliThrd* pThrd = arg->param2; + + if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + pThrd->destroyAhandleFp(pReq->ctx->ahandle); + } + + if (pReq->msg.info.handle != 0) { + (void)transReleaseExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); + (void)transRemoveExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); + } + + destroyReqCtx(pReq->ctx); + transFreeMsg(pReq->msg.pCont); + taosMemoryFree(pReq); +} + static void* cliWorkThread(void* arg) { char threadName[TSDB_LABEL_LEN] = {0}; @@ -2552,14 +2586,14 @@ static void* cliWorkThread(void* arg) { return NULL; } -void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { +void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInstRef) { int32_t code = 0; SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj)); if (cli == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); } - STrans* pInst = shandle; + STrans* pInst = pInstRef; memcpy(cli->label, label, TSDB_LABEL_LEN); cli->numOfThreads = numOfThreads; @@ -2570,7 +2604,7 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < cli->numOfThreads; i++) { SCliThrd* pThrd = NULL; - code = createThrdObj(shandle, &pThrd); + code = createThrdObj(pInstRef, &pThrd); if (code != 0) { goto _err; } @@ -2595,48 +2629,6 @@ _err: return NULL; } -static FORCE_INLINE void destroyCmsg(void* arg) { - SCliReq* pReq = arg; - if (pReq == NULL) { - return; - } - tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); - - transDestroyConnCtx(pReq->ctx); - transFreeMsg(pReq->msg.pCont); - taosMemoryFree(pReq); -} -static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { - if (arg == NULL) return; - - SCliReq* pReq = arg; - SCliThrd* pThrd = param; - if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL) { - if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pReq->msg.info.ahandle); - } - destroyCmsg(pReq); -} -static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { - if (param == NULL) return; - - STaskArg* arg = param; - SCliReq* pReq = arg->param1; - SCliThrd* pThrd = arg->param2; - - if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - pThrd->destroyAhandleFp(pReq->ctx->ahandle); - } - - if (pReq->msg.info.handle != 0) { - (void)transReleaseExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); - (void)transRemoveExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); - } - - transDestroyConnCtx(pReq->ctx); - transFreeMsg(pReq->msg.pCont); - taosMemoryFree(pReq); -} - static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { int32_t code = 0; STrans* pInst = trans; @@ -2757,10 +2749,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { (void)taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); (void)taosThreadMutexDestroy(&pThrd->msgMtx); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliReq, destroyCmsgWrapper, (void*)pThrd); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliReq, destroyReqWrapper, (void*)pThrd); transAsyncPoolDestroy(pThrd->asyncPool); - transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); + transDQDestroy(pThrd->delayQueue, destroyReqAndAhanlde); transDQDestroy(pThrd->timeoutQueue, NULL); transDQDestroy(pThrd->waitConnQueue, NULL); @@ -2803,10 +2795,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd); } -static FORCE_INLINE void transDestroyConnCtx(SReqCtx* ctx) { - // - taosMemoryFree(ctx); -} +static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx) { taosMemoryFree(ctx); } int32_t cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully @@ -2910,6 +2899,7 @@ static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliReq* pReq, SCliThrd* pThrd } FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { + int32_t code = 0; SReqCtx* ctx = pReq->ctx; SEpSet* dst = &ctx->epSet; @@ -2918,7 +2908,8 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { } // rebuild resp msg SEpSet epset; - if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) { + if ((code = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset)) < 0) { + tError("failed to deserialize epset, code:%d", code); return false; } int32_t tlen = tSerializeSEpSet(NULL, 0, dst); @@ -2927,6 +2918,7 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { int32_t len = pResp->contLen - tlen; if (len != 0) { buf = rpcMallocCont(len); + // TODO: check buf memcpy(buf, (char*)pResp->pCont + tlen, len); } rpcFreeCont(pResp->pCont); @@ -3031,14 +3023,10 @@ int32_t cliRetryIsTimeout(STrans* pInst, SCliReq* pReq) { int8_t cliRetryShouldRetry(STrans* pInst, STransMsg* pResp) { bool retry = pInst->retry != NULL ? pInst->retry(pResp->code, pResp->msgType - 1) : false; - if (retry == false) { - return 0; - } - - return 1; + return retry == false ? 0 : 1; } -void cliRetryUpdate(SReqCtx* pCtx, int8_t noDelay) { +void cliRetryUpdateRule(SReqCtx* pCtx, int8_t noDelay) { if (noDelay == false) { pCtx->epsetRetryCnt = 1; pCtx->retryStep++; @@ -3055,7 +3043,6 @@ void cliRetryUpdate(SReqCtx* pCtx, int8_t noDelay) { } int32_t cliRetryDoSched(SCliReq* pReq, SCliThrd* pThrd) { - pReq->sent = 0; int32_t code = cliSchedMsgToNextNode(pReq, pThrd); if (code != 0) { tError("failed to sched msg to next node, reason:%s", tstrerror(code)); @@ -3080,8 +3067,8 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { if (cliRetryIsTimeout(pInst, pReq)) { return false; } - // code, msgType + // code, msgType // A: epset,leader, not self // B: epset,not know leader // C: noepset,leader but not serivce @@ -3116,7 +3103,7 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { pCtx->retryCode = code; } - cliRetryUpdate(pCtx, noDelay); + cliRetryUpdateRule(pCtx, noDelay); pReq->sent = 0; @@ -3310,13 +3297,13 @@ int32_t transReleaseCliHandle(void* handle) { tGDebug("send release request at thread:%08" PRId64 ", malloc memory:%p", pThrd->pid, cmsg); if ((code = transAsyncSend(pThrd->asyncPool, &cmsg->q)) != 0) { - destroyCmsg(cmsg); + destroyReq(cmsg); return code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code; } return code; } -static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliReq** pCliMsg) { +static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliReq** pCliMsg) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); @@ -3330,7 +3317,7 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - if (ctx != NULL) pCtx->appCtx = *ctx; + if (ctx != NULL) pCtx->userCtx = *ctx; SCliReq* pCliReq = taosMemoryCalloc(1, sizeof(SCliReq)); if (pReq == NULL) { @@ -3342,7 +3329,6 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq pCliReq->msg = *pReq; pCliReq->st = taosGetTimestampUs(); pCliReq->type = Normal; - pCliReq->refId = (int64_t)shandle; QUEUE_INIT(&pCliReq->seqq); *pCliMsg = pCliReq; @@ -3350,8 +3336,8 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq return 0; } -int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); +int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); if (pInst == NULL) { transFreeMsg(pReq->pCont); pReq->pCont = NULL; @@ -3370,7 +3356,7 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S taosWLockLatch(&exh->latch); if (exh->handle == NULL && exh->inited != 0) { SCliReq* pCliMsg = NULL; - code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); + code = transInitMsg(pInstRef, pEpSet, pReq, ctx, &pCliMsg); if (code != 0) { taosWUnLockLatch(&exh->latch); (void)transReleaseExHandle(transGetRefMgt(), handle); @@ -3380,7 +3366,7 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); tDebug("msg refId: %" PRId64 "", handle); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return 0; } else { exh->inited = 1; @@ -3391,33 +3377,33 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S } SCliReq* pCliMsg = NULL; - TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception); + TAOS_CHECK_GOTO(transInitMsg(pInstRef, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { - destroyCmsg(pCliMsg); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + destroyReq(pCliMsg); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return 0; _exception: transFreeMsg(pReq->pCont); pReq->pCont = NULL; - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } -int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { +int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { if (transpointId == NULL) { ASSERT(0); return TSDB_CODE_INVALID_PARA; } int32_t code = 0; - STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); if (pInst == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception); } @@ -3437,28 +3423,28 @@ int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* p pReq->info.handle = (void*)(*transpointId); SCliReq* pCliMsg = NULL; - TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception); + TAOS_CHECK_GOTO(transInitMsg(pInstRef, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { - destroyCmsg(pCliMsg); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + destroyReq(pCliMsg); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return 0; _exception: transFreeMsg(pReq->pCont); pReq->pCont = NULL; - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } -int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); +int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); if (pInst == NULL) { transFreeMsg(pReq->pCont); pReq->pCont = NULL; @@ -3515,7 +3501,6 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra pCliReq->msg = *pReq; pCliReq->st = taosGetTimestampUs(); pCliReq->type = Normal; - pCliReq->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, @@ -3523,7 +3508,7 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra code = transAsyncSend(pThrd->asyncPool, &pCliReq->q); if (code != 0) { - destroyCmsg(pReq); + destroyReq(pReq); TAOS_CHECK_GOTO((code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code), NULL, _RETURN); } (void)tsem_wait(sem); @@ -3533,11 +3518,11 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra _RETURN: tsem_destroy(sem); taosMemoryFree(sem); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); taosMemoryFree(pTransRsp); return code; _RETURN1: - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); taosMemoryFree(pTransRsp); taosMemoryFree(pReq->pCont); pReq->pCont = NULL; @@ -3579,10 +3564,10 @@ _EXIT: taosMemoryFree(pSyncMsg); return code; } -int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, +int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { int32_t code = 0; - STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); if (pInst == NULL) { transFreeMsg(pReq->pCont); pReq->pCont = NULL; @@ -3633,7 +3618,7 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, pCliReq->msg = *pReq; pCliReq->st = taosGetTimestampUs(); pCliReq->type = Normal; - pCliReq->refId = (int64_t)shandle; + // pCliReq->refId = (int64_t)pInstRef; STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, @@ -3641,7 +3626,7 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, code = transAsyncSend(pThrd->asyncPool, &pCliReq->q); if (code != 0) { - destroyCmsg(pReq); + destroyReq(pReq); TAOS_CHECK_GOTO(code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code, NULL, _RETURN); goto _RETURN; } @@ -3660,7 +3645,7 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, code = 0; } _RETURN: - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); (void)taosReleaseRef(transGetSyncMsgMgt(), ref); (void)taosRemoveRef(transGetSyncMsgMgt(), ref); return code; @@ -3668,16 +3653,16 @@ _RETURN2: transFreeMsg(pReq->pCont); pReq->pCont = NULL; taosMemoryFree(pTransMsg); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } /* * **/ -int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { +int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) { if (ip == NULL || fqdn == NULL) return TSDB_CODE_INVALID_PARA; - STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); if (pInst == NULL) { return TSDB_CODE_RPC_MODULE_QUIT; } @@ -3706,13 +3691,13 @@ int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { pReq->ctx = pCtx; pReq->type = Update; - pReq->refId = (int64_t)shandle; + // pReq->refId = (int64_t)pInstRef; SCliThrd* thrd = ((SCliObj*)pInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64, pInst->label, thrd->pid); if ((code = transAsyncSend(thrd->asyncPool, &(pReq->q))) != 0) { - destroyCmsg(pReq); + destroyReq(pReq); if (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT) { code = TSDB_CODE_RPC_MODULE_QUIT; } @@ -3720,7 +3705,7 @@ int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { } } - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } @@ -3748,9 +3733,9 @@ int32_t transAllocHandle(int64_t* refId) { *refId = exh->refId; return 0; } -int32_t transFreeConnById(void* shandle, int64_t transpointId) { +int32_t transFreeConnById(void* pInstRef, int64_t transpointId) { int32_t code = 0; - STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); if (pInst == NULL) { return TSDB_CODE_RPC_MODULE_QUIT; } @@ -3782,7 +3767,7 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) { } _exception: - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 9c942c6d00..8acbe9f273 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1399,7 +1399,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { srv->numOfWorkerReady++; } -void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { +void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit) { int32_t code = 0; SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj)); @@ -1463,9 +1463,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); - thrd->pInst = shandle; + thrd->pInst = pInit; thrd->quit = false; - thrd->pInst = shandle; + thrd->pInst = pInit; thrd->pWhiteList = uvWhiteListCreate(); srv->pThreadObj[i] = thrd; @@ -1494,9 +1494,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } - thrd->pInst = shandle; + thrd->pInst = pInit; thrd->quit = false; - thrd->pInst = shandle; + thrd->pInst = pInit; thrd->pWhiteList = uvWhiteListCreate(); if (thrd->pWhiteList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY;