From 338f6eea718ad6fea4114b599fcbd0b3cab788b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Jun 2022 00:47:03 +0800 Subject: [PATCH] fix(query): fix invalid free in exchange. --- source/libs/executor/src/executorimpl.c | 43 +++++++++++-------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0cfeecb28c..1a73e9f3ac 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2418,6 +2418,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->useconds = htobe64(pRsp->useconds); + + ASSERT(pSourceDataInfo->pRsp != NULL); + qDebug("fetch rsp received, index:%d, rows:%d", pSourceDataInfo->index, pRsp->numOfRows); } else { pSourceDataInfo->code = code; } @@ -2466,6 +2469,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); + ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); @@ -2643,7 +2648,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx int32_t completed = 0; for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); - if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; continue; @@ -2653,6 +2657,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx continue; } + if (pDataInfo->code != TSDB_CODE_SUCCESS) { + code = pDataInfo->code; + goto _error; + } + SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); @@ -2690,6 +2699,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx pLoadInfo->totalSize); } + taosMemoryFreeClear(pDataInfo->pRsp); + if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); @@ -2699,7 +2710,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } } - taosMemoryFreeClear(pDataInfo->pRsp); return pExchangeInfo->pResult; } @@ -2719,26 +2729,9 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + } else { + ASSERT(0); } - - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - int64_t startTs = taosGetTimestampUs(); - - // Asynchronously send all fetch requests to all sources. - for (int32_t i = 0; i < totalSources; ++i) { - int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - } - - int64_t endTs = taosGetTimestampUs(); - qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), - totalSources, endTs - startTs); - - tsem_wait(&pExchangeInfo->ready); - pOperator->status = OP_RES_TO_RETURN; - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { @@ -2758,10 +2751,11 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { } int64_t endTs = taosGetTimestampUs(); - qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), + qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); tsem_wait(&pExchangeInfo->ready); + pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; return TSDB_CODE_SUCCESS; @@ -2875,7 +2869,8 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { if (pExchangeInfo->seqLoadData) { return seqLoadRemoteData(pOperator); } else { - return concurrentlyLoadRemoteData(pOperator); + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); +// return concurrentlyLoadRemoteData(pOperator); } } @@ -2927,7 +2922,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p } pInfo->pResult = pBlock; - pInfo->seqLoadData = true; + pInfo->seqLoadData = false; tsem_init(&pInfo->ready, 0, 0);