Merge pull request #27091 from taosdata/fix/3.0/TD-31275-TD-31310

1. fix exchange operator dead lock due to no ret check for add ref
This commit is contained in:
Haojun Liao 2024-08-09 13:56:18 +08:00 committed by GitHub
commit 6532b8a534
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 13 additions and 7 deletions

View File

@ -391,7 +391,11 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
if (pInfo->self < 0) {
int32_t code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
return initDataSource(numOfSources, pInfo, id);
}
@ -480,14 +484,16 @@ void freeSourceDataInfo(void* p) {
void doDestroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
if (*pRpcHandle > 0) {
SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
(void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
if (pExInfo->pFetchRpcHandles) {
for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
if (*pRpcHandle > 0) {
SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
(void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
}
}
taosArrayDestroy(pExInfo->pFetchRpcHandles);
}
taosArrayDestroy(pExInfo->pFetchRpcHandles);
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);