feat: add operator next function

This commit is contained in:
slzhou 2024-01-24 10:21:17 +08:00
parent 3f2b469b5e
commit 622dd10b19
2 changed files with 143 additions and 45 deletions

View File

@ -42,7 +42,6 @@ extern "C" {
// #include "tstream.h" // #include "tstream.h"
// #include "tstreamUpdate.h" // #include "tstreamUpdate.h"
#include "tlrucache.h" #include "tlrucache.h"
#include "tdatablock.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
@ -301,13 +300,14 @@ typedef struct STmsSubTableInput {
int64_t* aTs; int64_t* aTs;
} STmsSubTableInput; } STmsSubTableInput;
typedef struct SBlockOrderInfo SBlockOrderInfo;
typedef struct STmsSubTablesMergeInfo { typedef struct STmsSubTablesMergeInfo {
SBlockOrderInfo* pOrderInfo; SBlockOrderInfo* pOrderInfo;
int32_t numSubTables; int32_t numSubTables;
STmsSubTableInput* aInputs; STmsSubTableInput* aInputs;
SMultiwayMergeTreeInfo* pTree; SMultiwayMergeTreeInfo* pTree;
int32_t numTablesCompleted; int32_t numSubTablesCompleted;
int32_t numTableBlocksInMem; int32_t numTableBlocksInMem;
SDiskbasedBuf* pBlocksBuf; SDiskbasedBuf* pBlocksBuf;

View File

@ -3375,7 +3375,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pBlock->info.rows == 0) { if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pBlock->info.rows == 0) {
continue; continue;
} }
*pSubTableHasBlock = true; *pSubTableHasBlock = true;
pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid); pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pInput->pBlock->info.rows; pOperator->resultInfo.totalRows += pInput->pBlock->info.rows;
@ -3415,8 +3415,8 @@ static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTables
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput) { static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput, int32_t pageIndex) {
int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pInput->pageIdx); int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pageIndex);
void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId); void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId);
blockDataFromBuf(pInput->pBlock, page); blockDataFromBuf(pInput->pBlock, page);
releaseBufPage(pSubTblsInfo->pBlocksBuf, page); releaseBufPage(pSubTblsInfo->pBlocksBuf, page);
@ -3424,6 +3424,8 @@ static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsIn
} }
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
pInfo->bGroupProcessed = false;
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
int32_t i = pInfo->tableStartIndex + 1; int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) { for (; i < numOfTables; ++i) {
@ -3443,7 +3445,7 @@ static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
} }
if (pInput->type == SUB_TABLE_EXT_PAGES) { if (pInput->type == SUB_TABLE_EXT_PAGES) {
pInput->pageIdx = 0; pInput->pageIdx = 0;
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx);
pInput->rowIdx = 0; pInput->rowIdx = 0;
} else { } else {
pInput->rowIdx = 0; pInput->rowIdx = 0;
@ -3492,7 +3494,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
if (!hasNext) { if (!hasNext) {
pInput->rowIdx = -1; pInput->rowIdx = -1;
++pSubTblsInfo->numTablesCompleted; ++pSubTblsInfo->numSubTablesCompleted;
continue; continue;
} else { } else {
pInput->rowIdx = 0; pInput->rowIdx = 0;
@ -3506,39 +3508,6 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t startSubTablesTms(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
initSubTablesMergeInfo(pInfo);
initSubTableInputs(pOperator, pInfo);
initSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
return TSDB_CODE_SUCCESS;
}
static int32_t stopSubTablesTms(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
tMergeTreeDestroy(&pSubTblsInfo->pTree);
for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
blockDataDestroy(pInput->pBlock);
taosArrayDestroy(pInput->aBlockPages);
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
}
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
taosMemoryFree(pSubTblsInfo->aInputs);
taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
STableMergeScanInfo* pInfo = pOperatorInfo->info; STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
@ -3547,7 +3516,7 @@ static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubT
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
if (!hasNext) { if (!hasNext) {
pInput->rowIdx = -1; pInput->rowIdx = -1;
++pSubTblsInfo->numTablesCompleted; ++pSubTblsInfo->numSubTablesCompleted;
} else { } else {
pInput->rowIdx = 0; pInput->rowIdx = 0;
} }
@ -3563,22 +3532,22 @@ static int32_t adjustSubTableFromExtPages(SOperatorInfo* pOperatorInfo, STmsSubT
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
if (!hasNext) { if (!hasNext) {
pInput->rowIdx = -1; pInput->rowIdx = -1;
++pSubTblsInfo->numTablesCompleted; ++pSubTblsInfo->numSubTablesCompleted;
} else { } else {
saveSubTableBlock(pInfo, pSubTblsInfo, pInput); saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
pInput->pageIdx = 0; pInput->pageIdx = 0;
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx);
pInput->rowIdx = 0; pInput->rowIdx = 0;
} }
} else { } else {
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx);
pInput->rowIdx = 0; pInput->rowIdx = 0;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo, int32_t *pNumCompleted) { static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
STableMergeScanInfo* pInfo = pOperatorInfo->info; STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
@ -3623,6 +3592,135 @@ static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
blockDataCleanup(pResBlock);
bool finished = false;
while (true) {
while (1) {
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
finished = true;
break;
}
appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
adjustSubTableForNextRow(pOperator, pSubTblsInfo);
if (pResBlock->info.rows >= capacity) {
break;
}
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
if (finished || limitReached || pResBlock->info.rows > 0) {
break;
}
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
initSubTablesMergeInfo(pInfo);
initSubTableInputs(pOperator, pInfo);
initSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
return TSDB_CODE_SUCCESS;
}
static int32_t stopSubTablesTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
tMergeTreeDestroy(&pSubTblsInfo->pTree);
for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
blockDataDestroy(pInput->pBlock);
taosArrayDestroy(pInput->aBlockPages);
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
}
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
taosMemoryFree(pSubTblsInfo->aInputs);
taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL;
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doTableMergeScanSubTables(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
if (tableListSize == 0) {
setOperatorCompleted(pOperator);
return NULL;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
startSubTablesTableMergeScan(pOperator);
}
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
}
if (pBlock != NULL) {
pBlock->info.id.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->bGroupProcessed = true;
return pBlock;
} else {
// Data of this group are all dumped, let's try the next group
stopSubTablesTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) {
setOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
startSubTablesTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
}
return pBlock;
}
// table merge scan : one reader, save blocks to disk
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0; int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));