Merge pull request #27672 from taosdata/fix/ly_res

enh(query):remove void for operator
This commit is contained in:
Haojun Liao 2024-09-06 09:49:10 +08:00 committed by GitHub
commit 2544f0313e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 88 additions and 32 deletions

View File

@ -51,7 +51,7 @@ typedef struct SAggOperatorInfo {
} SAggOperatorInfo; } SAggOperatorInfo;
static void destroyAggOperatorInfo(void* param); static void destroyAggOperatorInfo(void* param);
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
@ -63,7 +63,7 @@ static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, in
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
@ -184,7 +184,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
if (pBlock) { if (pBlock) {
pAggInfo->pNewGroupBlock = NULL; pAggInfo->pNewGroupBlock = NULL;
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
QUERY_CHECK_CODE(code, lino, _end);
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -225,12 +226,19 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
break; break;
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end); if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
code = doAggregateImpl(pOperator, pSup->pCtx); code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock); destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -427,20 +435,24 @@ void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
*ppBlock = NULL; *ppBlock = NULL;
} }
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
int32_t code = TSDB_CODE_SUCCESS;
SAggOperatorInfo* pAggInfo = pOperator->info; SAggOperatorInfo* pAggInfo = pOperator->info;
if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
return; return code;
} }
doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
// record the current active group id // record the current active group id
pAggInfo->groupId = groupId; pAggInfo->groupId = groupId;
return code;
} }
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result. // for simple group by query without interval, all the tables belong to one group result.
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info; SAggOperatorInfo* pAggInfo = pOperator->info;
@ -452,23 +464,27 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
groupId, pTaskInfo, false, &pAggInfo->aggSup, true); groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
if (pResultRow == NULL || pTaskInfo->code != 0) { if (pResultRow == NULL || pTaskInfo->code != 0) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); code = pTaskInfo->code;
lino = __LINE__;
goto _end;
} }
/* /*
* not assign result buffer yet, add new result buffer * not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one * all group belong to one result set, and each group result has different group id so set the id to be one
*/ */
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _end);
T_LONG_JMP(pTaskInfo->env, terrno);
}
} }
int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _end);
T_LONG_JMP(pTaskInfo->env, ret);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
} }
return code;
} }
// a new buffer page for each table. Needs to opt this design // a new buffer page for each table. Needs to opt this design

View File

@ -464,7 +464,10 @@ _error:
void destroyExchangeOperatorInfo(void* param) { void destroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param; SExchangeInfo* pExInfo = (SExchangeInfo*)param;
(void)taosRemoveRef(exchangeObjRefPool, pExInfo->self); int32_t code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
} }
void freeBlock(void* pParam) { void freeBlock(void* pParam) {
@ -505,7 +508,10 @@ void doDestroyExchangeOperatorInfo(void* param) {
blockDataDestroy(pExInfo->pDummyBlock); blockDataDestroy(pExInfo->pDummyBlock);
tSimpleHashCleanup(pExInfo->pHashSources); tSimpleHashCleanup(pExInfo->pHashSources);
(void)tsem_destroy(&pExInfo->ready); int32_t code = tsem_destroy(&pExInfo->ready);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
taosMemoryFreeClear(pExInfo->pTaskId); taosMemoryFreeClear(pExInfo->pTaskId);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
@ -561,9 +567,13 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
code = TAOS_SYSTEM_ERROR(code); code = TAOS_SYSTEM_ERROR(code);
qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo); qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
return code;
} }
(void)taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
return code; return code;
} }
@ -1190,7 +1200,14 @@ static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeIn
return pTask->code; return pTask->code;
} }
} }
(void)tsem_wait(&pExchangeInfo->ready);
code = tsem_wait(&pExchangeInfo->ready);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTask->code = code;
return pTask->code;
}
if (pTask->pWorkerCb) { if (pTask->pWorkerCb) {
code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -904,8 +904,14 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
} }
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
if (pExchangeInfo) { if (pExchangeInfo) {
(void)tsem_post(&pExchangeInfo->ready); int32_t code = tsem_post(&pExchangeInfo->ready);
(void)taosReleaseRef(exchangeObjRefPool, pStop->refId); if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = taosReleaseRef(exchangeObjRefPool, pStop->refId);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
} }
} }

View File

@ -671,7 +671,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
val = *pVal; val = *pVal;
(void)taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
qTrace("release LRU cache, res %d", bRes);
} }
qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch, qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
@ -893,7 +894,10 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
if (pInfo->base.pTableListInfo->groupOffset) { if (pInfo->base.pTableListInfo->groupOffset) {
pInfo->countState = TABLE_COUNT_STATE_PROCESSED; pInfo->countState = TABLE_COUNT_STATE_PROCESSED;
} else { } else {
(void)taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
} }
} }
@ -4529,6 +4533,7 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno); QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
QUERY_CHECK_CODE(code, lino, _end);
} }
} }
} else { } else {

View File

@ -2177,7 +2177,12 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
(void)tsem_wait(&pInfo->ready); code = tsem_wait(&pInfo->ready);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pTaskInfo->code) { if (pTaskInfo->code) {
qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo), qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
@ -2327,7 +2332,10 @@ void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNo
void destroySysScanOperator(void* param) { void destroySysScanOperator(void* param) {
SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
(void)tsem_destroy(&pInfo->ready); int32_t code = tsem_destroy(&pInfo->ready);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
if (pInfo->name.type == TSDB_TABLE_NAME_T) { if (pInfo->name.type == TSDB_TABLE_NAME_T) {
@ -2383,7 +2391,10 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
} }
} }
(void)tsem_post(&pScanResInfo->ready); int32_t res = tsem_post(&pScanResInfo->ready);
if (res != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(res));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -1231,7 +1231,7 @@ void destroyIntervalOperatorInfo(void* param) {
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp); cleanupExprSupp(&pInfo->scalarSupp);
(void)tdListFree(pInfo->binfo.resultRowInfo.openWindow); pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
taosArrayDestroy(pInfo->pInterpCols); taosArrayDestroy(pInfo->pInterpCols);
pInfo->pInterpCols = NULL; pInfo->pInterpCols = NULL;
@ -2132,7 +2132,7 @@ typedef struct SGroupTimeWindow {
void destroyMergeIntervalOperatorInfo(void* param) { void destroyMergeIntervalOperatorInfo(void* param) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
(void)tdListFree(miaInfo->groupIntervals); miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
@ -2162,7 +2162,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow* prevWin = &prevGrpWin->window; STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
(void)tdListPopNode(miaInfo->groupIntervals, listNode); SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
taosMemoryFreeClear(tmp);
} }
} }