diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b52f942438..34a70f28e4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3318,6 +3318,7 @@ _error: // TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish // TODO: error processing, memory freeing // TODO: add log for error and perf +// TODO: tsdb reader open/close dynamically static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) { int32_t left = *(int32_t*)pLeft; @@ -3350,11 +3351,14 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu blockDataCleanup(pInput->pBlock); + pInfo->base.dataReader = pInput->pReader; + while (true) { bool hasNext = false; int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader); + pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, code); } if (!hasNext || isTaskKilled(pTaskInfo)) { @@ -3367,6 +3371,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status); if (code != 0) { + pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, code); } if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { @@ -3379,8 +3384,10 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu *pSubTableHasBlock = true; pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid); pOperator->resultInfo.totalRows += pInput->pBlock->info.rows; + pInfo->base.dataReader = NULL; return TSDB_CODE_SUCCESS; } + pInfo->base.dataReader = NULL; *pSubTableHasBlock = false; return TSDB_CODE_SUCCESS; } @@ -3488,7 +3495,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInput->type = SUB_TABLE_MEM_BLOCK; pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false); STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); - pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInfo->pReaderBlock, + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInput->pBlock, (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); bool hasNext = true; fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); @@ -3638,7 +3645,7 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) { if (pSubTblsInfo != NULL) { tMergeTreeDestroy(&pSubTblsInfo->pTree); - for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) { + for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; blockDataDestroy(pInput->pBlock); @@ -4272,7 +4279,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScanSubTables, NULL, destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL); pOperator->cost.openCost = 0; return pOperator;