fix: remove pages block using disk
This commit is contained in:
parent
f77b602da9
commit
ba33907776
|
@ -299,8 +299,8 @@ typedef struct STmsSubTableInput {
|
|||
|
||||
SArray* aBlockPages;
|
||||
SSDataBlock* pPageBlock;
|
||||
|
||||
int32_t pageIdx;
|
||||
|
||||
int32_t rowIdx;
|
||||
int64_t* aTs;
|
||||
} STmsSubTableInput;
|
||||
|
|
|
@ -3431,46 +3431,6 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo,
|
||||
STmsSubTableInput* pInput) {
|
||||
taosArrayClear(pInput->aBlockPages);
|
||||
int32_t start = 0;
|
||||
SSDataBlock* pDataBlock = pInput->pReaderBlock;
|
||||
while (start < pDataBlock->info.rows) {
|
||||
int32_t stop = 0;
|
||||
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize);
|
||||
SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
|
||||
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pSubTblsInfo->pBlocksBuf, &pageId);
|
||||
|
||||
taosArrayPush(pInput->aBlockPages, &pageId);
|
||||
|
||||
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
|
||||
ASSERT(size <= getBufPageSize(pSubTblsInfo->pBlocksBuf));
|
||||
|
||||
blockDataToBuf(pPage, p);
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pSubTblsInfo->pBlocksBuf, pPage);
|
||||
|
||||
blockDataDestroy(p);
|
||||
start = stop + 1;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput, int32_t pageIndex) {
|
||||
int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pageIndex);
|
||||
void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId);
|
||||
|
||||
blockDataFromBuf(pInput->pPageBlock, page);
|
||||
|
||||
releaseBufPage(pSubTblsInfo->pBlocksBuf, page);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||
pInfo->bGroupProcessed = false;
|
||||
|
||||
|
@ -3491,11 +3451,7 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
|||
if (pInput->rowIdx == -1) {
|
||||
continue;
|
||||
}
|
||||
if (pInput->type == SUB_TABLE_EXT_PAGES) {
|
||||
pInput->pageIdx = 0;
|
||||
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx);
|
||||
pInput->rowIdx = 0;
|
||||
} else {
|
||||
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||
pInput->rowIdx = 0;
|
||||
pInput->pageIdx = -1;
|
||||
}
|
||||
|
@ -3564,11 +3520,6 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
|
|||
pInput->rowIdx = 0;
|
||||
pInput->pageIdx = -1;
|
||||
}
|
||||
if (i + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) {
|
||||
pInput->aBlockPages = taosArrayInit(32, sizeof(int32_t));
|
||||
saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
|
||||
pInput->type = SUB_TABLE_EXT_PAGES;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -3588,29 +3539,6 @@ static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubT
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t adjustSubTableFromExtPages(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||
if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) {
|
||||
bool hasNext = true;
|
||||
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
|
||||
if (!hasNext) {
|
||||
pInput->rowIdx = -1;
|
||||
++pSubTblsInfo->numSubTablesCompleted;
|
||||
} else {
|
||||
saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
|
||||
pInput->pageIdx = 0;
|
||||
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx);
|
||||
pInput->rowIdx = 0;
|
||||
|
||||
}
|
||||
} else {
|
||||
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx);
|
||||
pInput->rowIdx = 0;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||
|
@ -3621,9 +3549,6 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
|
|||
} else if (pInput->rowIdx == pInputBlock->info.rows -1 ) {
|
||||
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
|
||||
} else if (pInput->type == SUB_TABLE_EXT_PAGES) {
|
||||
++pInput->pageIdx;
|
||||
adjustSubTableFromExtPages(pOperatorInfo, pSubTblsInfo);
|
||||
}
|
||||
if (pInput->rowIdx != -1) {
|
||||
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||
|
@ -3732,7 +3657,7 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* doTableMergeScanSeqBlocksInMem(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -4320,14 +4245,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo, pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(optrDummyOpenFn,
|
||||
pTableScanNode->paraTablesSort ? doTableMergeScanSeqBlocksInMem : doTableMergeScan,
|
||||
NULL,
|
||||
destroyTableMergeScanOperatorInfo,
|
||||
optrDefaultBufFn,
|
||||
getTableMergeScanExplainExecInfo,
|
||||
optrDefaultGetNextExtFn, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(
|
||||
optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL,
|
||||
destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn,
|
||||
NULL);
|
||||
pOperator->cost.openCost = 0;
|
||||
return pOperator;
|
||||
|
||||
|
|
Loading…
Reference in New Issue