fix exchange operator caused rpc conn not freed

This commit is contained in:
wangjiaming0909 2024-08-05 17:39:38 +08:00
parent fce636339e
commit 8dd11f9a09
2 changed files with 19 additions and 0 deletions

View File

@ -202,6 +202,7 @@ typedef struct SExchangeInfo {
SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
char* pTaskId;
SArray* pFetchRpcHandles;
} SExchangeInfo;
typedef struct SScanInfo {

View File

@ -346,6 +346,11 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
return TSDB_CODE_INVALID_PARA;
}
pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
(void)taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
if (!pInfo->pFetchRpcHandles) {
return terrno;
}
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
if (pInfo->pSources == NULL) {
@ -384,6 +389,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
return initDataSource(numOfSources, pInfo, id);
}
@ -468,6 +474,14 @@ void freeSourceDataInfo(void* p) {
void doDestroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
if (*pRpcHandle > 0) {
SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
(void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
}
}
taosArrayDestroy(pExInfo->pFetchRpcHandles);
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
@ -495,6 +509,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t index = pWrapper->sourceIndex;
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
*pRpcHandle = -1;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (!pSourceDataInfo) {
return terrno;
@ -668,6 +684,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
int64_t transporterId = 0;
code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
QUERY_CHECK_CODE(code, lino, _end);
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
*pRpcHandle = transporterId;
}
_end: