diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 668d40dd0b..d295e868e9 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -202,6 +202,7 @@ typedef struct SExchangeInfo { SLimitInfo limitInfo; int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo char* pTaskId; + SArray* pFetchRpcHandles; } SExchangeInfo; typedef struct SScanInfo { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 676c14fee3..5f53b43802 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -346,6 +346,11 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources); return TSDB_CODE_INVALID_PARA; } + pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t)); + (void)taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources); + if (!pInfo->pFetchRpcHandles) { + return terrno; + } pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); if (pInfo->pSources == NULL) { @@ -384,6 +389,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); pInfo->self = taosAddRef(exchangeObjRefPool, pInfo); + return initDataSource(numOfSources, pInfo, id); } @@ -468,6 +474,14 @@ void freeSourceDataInfo(void* p) { void doDestroyExchangeOperatorInfo(void* param) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; + for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) { + int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i); + if (*pRpcHandle > 0) { + SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i); + (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle); + } + } + taosArrayDestroy(pExInfo->pFetchRpcHandles); taosArrayDestroy(pExInfo->pSources); taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo); @@ -495,6 +509,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { } int32_t index = pWrapper->sourceIndex; + int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index); + *pRpcHandle = -1; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); if (!pSourceDataInfo) { return terrno; @@ -668,6 +684,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas int64_t transporterId = 0; code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); QUERY_CHECK_CODE(code, lino, _end); + int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex); + *pRpcHandle = transporterId; } _end: