feat: prepare for switch between one reader and multi readers

This commit is contained in:
slzhou 2024-01-24 11:11:45 +08:00
parent 622dd10b19
commit 033ebb5fa5
2 changed files with 39 additions and 39 deletions

View File

@ -326,7 +326,6 @@ typedef struct STableMergeScanInfo {
SSDataBlock* pSortInputBlock;
SSDataBlock* pReaderBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
@ -348,7 +347,6 @@ typedef struct STableMergeScanInfo {
bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx;
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
} STableMergeScanInfo;

View File

@ -3437,7 +3437,7 @@ static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
pInfo->tableEndIndex = i - 1;
}
static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i;
if (pInput->rowIdx == -1) {
@ -3628,34 +3628,31 @@ static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
initSubTableInputs(pOperator, pInfo);
initSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
openSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
return TSDB_CODE_SUCCESS;
}
static int32_t stopSubTablesTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
if (pSubTblsInfo != NULL) {
tMergeTreeDestroy(&pSubTblsInfo->pTree);
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
tMergeTreeDestroy(&pSubTblsInfo->pTree);
blockDataDestroy(pInput->pBlock);
taosArrayDestroy(pInput->aBlockPages);
pInfo->base.readerAPI.tsdReaderClose(pInput->pReader);
pInput->pReader = NULL;
}
for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
taosMemoryFree(pSubTblsInfo->aInputs);
blockDataDestroy(pInput->pBlock);
taosArrayDestroy(pInput->aBlockPages);
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL;
}
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
taosMemoryFree(pSubTblsInfo->aInputs);
taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL;
return TSDB_CODE_SUCCESS;
}
@ -4136,10 +4133,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
// start one reader variable
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
@ -4150,20 +4145,24 @@ void destroyTableMergeScanOperatorInfo(void* param) {
}
}
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
tSimpleHashCleanup(pTableScanInfo->mTableNumRows);
pTableScanInfo->mTableNumRows = NULL;
taosHashCleanup(pTableScanInfo->mSkipTables);
pTableScanInfo->mSkipTables = NULL;
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
// end one reader variable
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo);
stopSubTablesTableMergeScan(pTableScanInfo);
taosMemoryFreeClear(param);
}
@ -4237,35 +4236,38 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error;
}
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
pInfo->mergeLimit = -1;
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
if (hasLimit) {
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
pInfo->mSkipTables = NULL;
}
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
int32_t rowSize = pInfo->pResBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
//start one reader variable
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
if (!tsExperimental) {
pInfo->filesetDelimited = false;
} else {
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
}
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
// end one reader variable
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;