diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index aa33d6f2fc..980b9aa6bf 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1181,7 +1181,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.queryId = pJob->queryId; qMsg.taskId = pTask->taskId; qMsg.refId = pJob->refId; - qMsg.execId = pTask->execId; + qMsg.execId = *(int32_t*)param; msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg); if (msgSize < 0) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index d96c01fc76..97c3c7d276 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -371,14 +371,13 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, pCtx->roundTotal = pEpSet->numOfEps; } - if (pCtx->roundTimes >= pCtx->roundTotal) { int64_t nowTs = taosGetTimestampMs(); int64_t lastTime = nowTs - pCtx->startTs; if (lastTime > tsMaxRetryWaitTime) { SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - pJob->noMoreRetry = true; + pJob->noMoreRetry = true; SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode)); } @@ -418,7 +417,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; - pTask->childReady = 0; + pTask->childReady = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } @@ -505,11 +504,11 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i pLevel->taskExecDoneNum = 0; pLevel->taskLaunchedNum = 0; } - + SCH_RESET_JOB_LEVEL_IDX(pJob); - + code = schDoTaskRedirect(pJob, pTask, pData, rspCode); - + taosMemoryFreeClear(pData->pData); taosMemoryFreeClear(pData->pEpSet); @@ -627,7 +626,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo pTask->maxRetryTimes); return TSDB_CODE_SUCCESS; } - + if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { pTask->maxExecTimes++; pTask->maxRetryTimes++; @@ -862,7 +861,9 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { while (nodeInfo) { if (nodeInfo->handle) { SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, NULL); + void *pExecId = taosHashGetKey(nodeInfo, NULL); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); + SCH_TASK_DLOG("start to drop task's %dth execNode", i); } else { SCH_TASK_DLOG("no need to drop task %dth execNode", i); @@ -901,7 +902,6 @@ int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType return TSDB_CODE_SUCCESS; } - int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; @@ -1269,7 +1269,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t int32_t code = TSDB_CODE_SUCCESS; SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type)); - + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; @@ -1277,7 +1277,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_LOCK_TASK(pTask); code = schNotifyTaskOnExecNode(pJob, pTask, type); SCH_UNLOCK_TASK(pTask); - + if (TSDB_CODE_SUCCESS != code) { break; } @@ -1289,7 +1289,6 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_RET(code); } - int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL)); } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c010e31320..da6d71e07b 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -19,16 +19,12 @@ extern "C" { #endif #include -#include "os.h" -#include "taoserror.h" #include "theap.h" -#include "tmisce.h" #include "tmsg.h" #include "transLog.h" #include "transportInt.h" #include "trpc.h" #include "ttrace.h" -#include "tutil.h" typedef bool (*FilteFunc)(void* arg); @@ -115,9 +111,12 @@ typedef SRpcConnInfo STransHandleInfo; // ref mgt handle typedef struct SExHandle { - void* handle; - int64_t refId; - void* pThrd; + void* handle; + int64_t refId; + void* pThrd; + queue q; + int8_t inited; + SRWLatch latch; } SExHandle; typedef struct { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2d8f4ed3c2..062609baac 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -92,6 +92,7 @@ typedef struct SCliMsg { int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) + queue seqq; } SCliMsg; typedef struct SCliThrd { @@ -121,11 +122,7 @@ typedef struct SCliThrd { SHashObj* batchCache; SCliMsg* stopMsg; - - bool quit; - - int newConnCount; - SHashObj* msgCount; + bool quit; } SCliThrd; typedef struct SCliObj { @@ -262,10 +259,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } \ if (i == sz) { \ pMsg = NULL; \ - tDebug("msg not found, %" PRIu64 "", ahandle); \ } else { \ pMsg = transQueueRm(&conn->cliMsgs, i); \ - tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) @@ -343,6 +338,34 @@ bool cliMaySendCachedMsg(SCliConn* conn) { _RETURN: return false; } +bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { + if (refId == 0) return false; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh == NULL) { + tDebug("release conn %p, refId: %" PRId64 "", conn, refId); + return false; + } + taosWLockLatch(&exh->latch); + if (exh->handle == NULL) exh->handle = conn; + exh->inited = 1; + if (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + taosWUnLockLatch(&exh->latch); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + transCtxMerge(&conn->ctx, &t->ctx->appCtx); + transQueuePush(&conn->cliMsgs, t); + tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + cliSend(conn); + return true; + } + taosWUnLockLatch(&exh->latch); + tDebug("empty conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + return false; +} + void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -439,8 +462,14 @@ void cliHandleResp(SCliConn* conn) { return; } } + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + tDebug("conn %p msg refId: %" PRId64 "", conn, refId); destroyCmsg(pMsg); + if (cliConnSendSeqMsg(refId, conn)) { + return; + } + if (cliMaySendCachedMsg(conn) == true) { return; } @@ -451,6 +480,21 @@ void cliHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } +static void cliDestroyMsgInExhandle(int64_t refId) { + if (refId == 0) return; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh) { + taosWLockLatch(&exh->latch); + while (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + destroyCmsg(t); + } + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); + } +} void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (transQueueEmpty(&pConn->cliMsgs)) { @@ -510,6 +554,8 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { } if (pMsg == NULL || (pMsg && pMsg->type != Release)) { + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + cliDestroyMsgInExhandle(refId); if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; } @@ -678,7 +724,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } list->numOfConn++; } - tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum); + tDebug("%s numOfConn: %d, limit: %d, dst:%s", pTransInst->label, list->numOfConn, pTransInst->connLimitNum, key); return NULL; } @@ -742,13 +788,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; - if (conn->list->size >= 20) { + if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; STrans* pTransInst = thrd->pTransInst; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pTransInst->idleTime)); } } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -761,8 +807,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); - conn->refId = exh->refId; + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); + conn->refId = exh->refId; if (conn->refId == -1) { taosMemoryFree(exh); } @@ -779,9 +827,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { if (exh == NULL) { return -1; } + taosWLockLatch(&exh->latch); exh->handle = conn; exh->pThrd = conn->hostThrd; conn->refId = exh->refId; + taosWUnLockLatch(&exh->latch); transReleaseExHandle(transGetRefMgt(), handle); return 0; @@ -882,7 +932,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } conn->list = NULL; - pThrd->newConnCount--; transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -1190,7 +1239,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(pList->port); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1392,7 +1440,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + taosRLockLatch(&exh->latch); SCliConn* conn = exh->handle; + taosRUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); @@ -1425,7 +1476,9 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) *ignore = true; return NULL; } else { + taosRLockLatch(&exh->latch); conn = exh->handle; + taosRUnLockLatch(&exh->latch); if (conn == NULL) { conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) specifyConnRef(conn, true, refId); @@ -1439,7 +1492,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { - tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); + tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pTransInst)->label, pThrd->pool, addr); } return conn; } @@ -1598,7 +1651,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(port); tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1858,9 +1910,10 @@ void cliIteraConnMsgs(SCliConn* conn) { bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { uint64_t ahandle = pHead->ahandle; - tDebug("ahandle = %" PRIu64 "", ahandle); SCliMsg* pMsg = NULL; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, + conn->refId); transClearBuffer(&conn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); @@ -1869,6 +1922,9 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i); if (cliMsg->type == Release) { ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req"); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, + conn->refId); + cliDestroyConn(conn, true); return true; } } @@ -1984,11 +2040,9 @@ static SCliThrd* createThrdObj(void* trans) { taosMemoryFree(pThrd); return NULL; } - if (pTransInst->supportBatch) { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb); - } else { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); - } + int32_t nSync = pTransInst->supportBatch ? 4 : 8; + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb); + if (pThrd->asyncPool == NULL) { tError("failed to init async pool"); uv_loop_close(pThrd->loop); @@ -2029,8 +2083,6 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->quit = false; - pThrd->newConnCount = 0; - pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); return pThrd; } static void destroyThrdObj(SCliThrd* pThrd) { @@ -2076,7 +2128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); - taosHashCleanup(pThrd->msgCount); taosMemoryFree(pThrd); } @@ -2095,14 +2146,7 @@ void cliSendQuit(SCliThrd* thrd) { void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { if (uv_handle_get_type(handle) == UV_TIMER) { - // SCliConn* pConn = handle->data; - // if (pConn != NULL && pConn->timer != NULL) { - // SCliThrd* pThrd = pConn->hostThrd; - // uv_timer_stop((uv_timer_t*)handle); - // handle->data = NULL; - // taosArrayPush(pThrd->timerList, &pConn->timer); - // pConn->timer = NULL; - // } + // do nothing } else { uv_read_stop((uv_stream_t*)handle); } @@ -2137,18 +2181,23 @@ static void doCloseIdleConn(void* param) { cliDestroyConn(conn, true); taosMemoryFree(arg); } +static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { + if (!(rpcDebugFlag & DEBUG_DEBUG)) { + return; + } + STransConnCtx* pCtx = pMsg->ctx; + STraceId* trace = &pMsg->msg.info.traceId; + char tbuf[512] = {0}; + EPSET_TO_STR(&pCtx->epSet, tbuf); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, + pCtx->retryNextInterval); + return; +} static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; - - if (rpcDebugFlag & DEBUG_DEBUG) { - STraceId* trace = &pMsg->msg.info.traceId; - char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); - tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, - pCtx->retryStep, pCtx->retryNextInterval); - } + cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; @@ -2157,12 +2206,6 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); } -FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { - if (*val != exp) { - *val = newVal; - } -} - FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; @@ -2504,21 +2547,7 @@ int transReleaseCliHandle(void* handle) { } return 0; } - -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { - transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; - } - - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); - if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; - } - +static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); epsetAssign(&pCtx->epSet, pEpSet); @@ -2535,12 +2564,48 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; + QUEUE_INIT(&cliMsg->seqq); + return cliMsg; +} + +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return TSDB_CODE_RPC_BROKEN_LINK; + } + + int64_t handle = (int64_t)pReq->info.handle; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_BROKEN_LINK; + } + if (handle != 0) { + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); + if (exh != NULL) { + taosWLockLatch(&exh->latch); + if (exh->handle == NULL && exh->inited != 0) { + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + QUEUE_PUSH(&exh->q, &pCliMsg->seqq); + taosWUnLockLatch(&exh->latch); + tDebug("msg refId: %" PRId64 "", handle); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; + } + exh->inited = 1; + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), handle); + } + } + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { - destroyCmsg(cliMsg); + EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); + if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) { + destroyCmsg(pCliMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } @@ -2726,6 +2791,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId); return exh->refId; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f47a688e6f..21ad5be869 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -761,9 +761,12 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; - pConn->status = ConnRelease; transClearBuffer(&pConn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); + if (pConn->status != ConnAcquire) { + return true; + } + pConn->status = ConnRelease; STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); @@ -1090,6 +1093,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; + QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); return pConn; @@ -1121,6 +1125,7 @@ static int reallocConnRef(SSvrConn* conn) { exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); transAcquireExHandle(transGetRefMgt(), exh->refId); conn->refId = exh->refId;