feat: refactor code

This commit is contained in:
slzhou 2024-01-24 09:12:27 +08:00
parent 174673470d
commit 3f2b469b5e
1 changed files with 122 additions and 45 deletions

View File

@ -3342,26 +3342,47 @@ static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* p
return ret; return ret;
} }
static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pHasNext) { static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pSubTableHasBlock) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
const SStorageAPI* pAPI= &pTaskInfo->storageAPI; const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
blockDataCleanup(pInput->pBlock); blockDataCleanup(pInput->pBlock);
bool hasNext = false;
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext);
*pHasNext = hasNext;
if (!hasNext) { while (true) {
pInput->rowIdx = -1; bool hasNext = false;
++pSubTblsInfo->numTablesCompleted; int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext);
return code; if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader);
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader);
}
break;
}
uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
break;
}
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pBlock->info.rows == 0) {
continue;
}
*pSubTableHasBlock = true;
pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pInput->pBlock->info.rows;
return TSDB_CODE_SUCCESS;
} }
uint32_t status = 0; *pSubTableHasBlock = false;
code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status); return TSDB_CODE_SUCCESS;
return code;
} }
static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo, static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo,
@ -3399,7 +3420,6 @@ static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsIn
void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId); void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId);
blockDataFromBuf(pInput->pBlock, page); blockDataFromBuf(pInput->pBlock, page);
releaseBufPage(pSubTblsInfo->pBlocksBuf, page); releaseBufPage(pSubTblsInfo->pBlocksBuf, page);
pInput->rowIdx = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3424,6 +3444,7 @@ static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
if (pInput->type == SUB_TABLE_EXT_PAGES) { if (pInput->type == SUB_TABLE_EXT_PAGES) {
pInput->pageIdx = 0; pInput->pageIdx = 0;
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
pInput->rowIdx = 0;
} else { } else {
pInput->rowIdx = 0; pInput->rowIdx = 0;
pInput->pageIdx = -1; pInput->pageIdx = -1;
@ -3435,40 +3456,62 @@ static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t startSubTablesTms(SOperatorInfo* pOperator) { static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
STableMergeScanInfo* pInfo = pOperator->info; STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; pInfo->pSubTablesMergeInfo = pSubTblsInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
setGroupStartEndIndex(pInfo); setGroupStartEndIndex(pInfo);
pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0); pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput)); pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput));
int32_t bufPageSize = pInfo->bufPageSize; int32_t bufPageSize = pInfo->bufPageSize;
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
for (int32_t i = pInfo->tableStartIndex; i <= pInfo->tableEndIndex; ++i) {
int32_t idx = i - pInfo->tableStartIndex; pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + idx; return TSDB_CODE_SUCCESS;
}
static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
pInput->type = SUB_TABLE_MEM_BLOCK; pInput->type = SUB_TABLE_MEM_BLOCK;
pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false); pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false);
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInfo->pReaderBlock, pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInfo->pReaderBlock,
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
bool hasNext = true; bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
if (!hasNext) {
if (idx + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) { pInput->rowIdx = -1;
++pSubTblsInfo->numTablesCompleted;
continue;
} else {
pInput->rowIdx = 0;
pInput->pageIdx = -1;
}
if (i + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) {
saveSubTableBlock(pInfo, pSubTblsInfo, pInput); saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
pInput->type = SUB_TABLE_EXT_PAGES; pInput->type = SUB_TABLE_EXT_PAGES;
} }
} }
return TSDB_CODE_SUCCESS;
}
initSubTablesMergeSort(pSubTblsInfo); static int32_t startSubTablesTms(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
initSubTablesMergeInfo(pInfo);
initSubTableInputs(pOperator, pInfo);
initSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3490,38 +3533,72 @@ static int32_t stopSubTablesTms(SOperatorInfo* pOperator) {
} }
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf); destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
taosMemoryFree(pSubTblsInfo->aInputs); taosMemoryFree(pSubTblsInfo->aInputs);
taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
if (!hasNext) {
pInput->rowIdx = -1;
++pSubTblsInfo->numTablesCompleted;
} else {
pInput->rowIdx = 0;
}
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSubTableFromExtPages(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) {
bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
if (!hasNext) {
pInput->rowIdx = -1;
++pSubTblsInfo->numTablesCompleted;
} else {
saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
pInput->pageIdx = 0;
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
pInput->rowIdx = 0;
}
} else {
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
pInput->rowIdx = 0;
}
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo, int32_t *pNumCompleted) { static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo, int32_t *pNumCompleted) {
STableMergeScanInfo* pInfo = pOperatorInfo->info; STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
if (pInput->rowIdx < pInput->pBlock->info.rows - 1) { if (pInput->rowIdx < pInput->pBlock->info.rows - 1) {
++pInput->rowIdx; ++pInput->rowIdx;
return TSDB_CODE_SUCCESS;
} else if (pInput->rowIdx == pInput->pBlock->info.rows -1 ) { } else if (pInput->rowIdx == pInput->pBlock->info.rows -1 ) {
if (pInput->type == SUB_TABLE_MEM_BLOCK) { if (pInput->type == SUB_TABLE_MEM_BLOCK) {
bool hasNext = true; adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
} else if (pInput->type == SUB_TABLE_EXT_PAGES) { } else if (pInput->type == SUB_TABLE_EXT_PAGES) {
++pInput->pageIdx; ++pInput->pageIdx;
if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) { adjustSubTableFromExtPages(pOperatorInfo, pSubTblsInfo);
bool hasNext = true; }
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); if (pInput->rowIdx != -1) {
if (hasNext) { SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
saveSubTableBlock(pInfo, pSubTblsInfo, pInput); pInput->aTs = (int64_t*)col->pData;
pInput->pageIdx = 0;
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
}
} else {
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
}
} }
SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
pInput->aTs = (int64_t*)col->pData;
} }
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree)); tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }