fix: page block and reader block for each child table input
This commit is contained in:
parent
2f6a2923b7
commit
305d4a962a
|
@ -292,8 +292,10 @@ typedef enum ESubTableInputType {
|
||||||
typedef struct STmsSubTableInput {
|
typedef struct STmsSubTableInput {
|
||||||
STsdbReader* pReader;
|
STsdbReader* pReader;
|
||||||
ESubTableInputType type;
|
ESubTableInputType type;
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pReaderBlock;
|
||||||
|
|
||||||
SArray* aBlockPages;
|
SArray* aBlockPages;
|
||||||
|
SSDataBlock* pPageBlock;
|
||||||
|
|
||||||
int32_t pageIdx;
|
int32_t pageIdx;
|
||||||
int32_t rowIdx;
|
int32_t rowIdx;
|
||||||
|
|
|
@ -3319,6 +3319,7 @@ _error:
|
||||||
// TODO: error processing, memory freeing
|
// TODO: error processing, memory freeing
|
||||||
// TODO: add log for error and perf
|
// TODO: add log for error and perf
|
||||||
// TODO: tsdb reader open/close dynamically
|
// TODO: tsdb reader open/close dynamically
|
||||||
|
// TODO: blockdata deep cleanup
|
||||||
|
|
||||||
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
int32_t left = *(int32_t*)pLeft;
|
int32_t left = *(int32_t*)pLeft;
|
||||||
|
@ -3349,7 +3350,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
|
const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
blockDataCleanup(pInput->pBlock);
|
blockDataCleanup(pInput->pReaderBlock);
|
||||||
|
|
||||||
pInfo->base.dataReader = pInput->pReader;
|
pInfo->base.dataReader = pInput->pReader;
|
||||||
|
|
||||||
|
@ -3369,7 +3370,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status);
|
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -3377,13 +3378,13 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pBlock->info.rows == 0) {
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pReaderBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pSubTableHasBlock = true;
|
*pSubTableHasBlock = true;
|
||||||
pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid);
|
pInput->pReaderBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pReaderBlock->info.id.uid);
|
||||||
pOperator->resultInfo.totalRows += pInput->pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pInput->pReaderBlock->info.rows;
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -3394,9 +3395,9 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
|
|
||||||
static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo,
|
static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo,
|
||||||
STmsSubTableInput* pInput) {
|
STmsSubTableInput* pInput) {
|
||||||
pInput->aBlockPages = taosArrayInit(4, sizeof(int32_t));
|
taosArrayClear(pInput->aBlockPages);
|
||||||
int32_t start = 0;
|
int32_t start = 0;
|
||||||
SSDataBlock* pDataBlock = pInput->pBlock;
|
SSDataBlock* pDataBlock = pInput->pReaderBlock;
|
||||||
while (start < pDataBlock->info.rows) {
|
while (start < pDataBlock->info.rows) {
|
||||||
int32_t stop = 0;
|
int32_t stop = 0;
|
||||||
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize);
|
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize);
|
||||||
|
@ -3418,14 +3419,16 @@ static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTables
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
start = stop + 1;
|
start = stop + 1;
|
||||||
}
|
}
|
||||||
blockDataCleanup(pInput->pBlock);
|
blockDataCleanup(pInput->pReaderBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput, int32_t pageIndex) {
|
static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput, int32_t pageIndex) {
|
||||||
int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pageIndex);
|
int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pageIndex);
|
||||||
void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId);
|
void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId);
|
||||||
blockDataFromBuf(pInput->pBlock, page);
|
|
||||||
|
blockDataFromBuf(pInput->pPageBlock, page);
|
||||||
|
|
||||||
releaseBufPage(pSubTblsInfo->pBlocksBuf, page);
|
releaseBufPage(pSubTblsInfo->pBlocksBuf, page);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -3458,7 +3461,8 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
pInput->rowIdx = 0;
|
pInput->rowIdx = 0;
|
||||||
pInput->pageIdx = -1;
|
pInput->pageIdx = -1;
|
||||||
}
|
}
|
||||||
SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||||
pInput->aTs = (int64_t*)col->pData;
|
pInput->aTs = (int64_t*)col->pData;
|
||||||
}
|
}
|
||||||
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
|
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
|
||||||
|
@ -3479,7 +3483,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||||
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
||||||
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
||||||
|
|
||||||
pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables;
|
pSubTblsInfo->numTableBlocksInMem = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3493,9 +3497,10 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
|
||||||
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||||
pInput->type = SUB_TABLE_MEM_BLOCK;
|
pInput->type = SUB_TABLE_MEM_BLOCK;
|
||||||
pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
||||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInput->pBlock,
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInput->pReaderBlock,
|
||||||
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||||
bool hasNext = true;
|
bool hasNext = true;
|
||||||
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
|
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
|
||||||
|
@ -3508,6 +3513,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
|
||||||
pInput->pageIdx = -1;
|
pInput->pageIdx = -1;
|
||||||
}
|
}
|
||||||
if (i + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) {
|
if (i + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) {
|
||||||
|
pInput->aBlockPages = taosArrayInit(32, sizeof(int32_t));
|
||||||
saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
|
saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
|
||||||
pInput->type = SUB_TABLE_EXT_PAGES;
|
pInput->type = SUB_TABLE_EXT_PAGES;
|
||||||
}
|
}
|
||||||
|
@ -3518,7 +3524,6 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
|
||||||
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
|
||||||
bool hasNext = true;
|
bool hasNext = true;
|
||||||
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
|
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
|
||||||
if (!hasNext) {
|
if (!hasNext) {
|
||||||
|
@ -3558,9 +3563,10 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
|
||||||
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
|
||||||
if (pInput->rowIdx < pInput->pBlock->info.rows - 1) {
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
if (pInput->rowIdx < pInputBlock->info.rows - 1) {
|
||||||
++pInput->rowIdx;
|
++pInput->rowIdx;
|
||||||
} else if (pInput->rowIdx == pInput->pBlock->info.rows -1 ) {
|
} else if (pInput->rowIdx == pInputBlock->info.rows -1 ) {
|
||||||
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||||
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
|
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
|
||||||
} else if (pInput->type == SUB_TABLE_EXT_PAGES) {
|
} else if (pInput->type == SUB_TABLE_EXT_PAGES) {
|
||||||
|
@ -3568,7 +3574,7 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
|
||||||
adjustSubTableFromExtPages(pOperatorInfo, pSubTblsInfo);
|
adjustSubTableFromExtPages(pOperatorInfo, pSubTblsInfo);
|
||||||
}
|
}
|
||||||
if (pInput->rowIdx != -1) {
|
if (pInput->rowIdx != -1) {
|
||||||
SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||||
pInput->aTs = (int64_t*)col->pData;
|
pInput->aTs = (int64_t*)col->pData;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3580,11 +3586,13 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
|
||||||
|
|
||||||
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
|
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
|
||||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
SColumnInfoData* pSrcColInfo = taosArrayGet(pInput->pBlock->pDataBlock, i);
|
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
|
||||||
bool isNull = colDataIsNull(pSrcColInfo, pInput->pBlock->info.rows, pInput->rowIdx, NULL);
|
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
|
||||||
|
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
|
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
|
||||||
|
@ -3648,7 +3656,8 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
|
||||||
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
|
||||||
blockDataDestroy(pInput->pBlock);
|
blockDataDestroy(pInput->pReaderBlock);
|
||||||
|
blockDataDestroy(pInput->pPageBlock);
|
||||||
taosArrayDestroy(pInput->aBlockPages);
|
taosArrayDestroy(pInput->aBlockPages);
|
||||||
pInfo->base.readerAPI.tsdReaderClose(pInput->pReader);
|
pInfo->base.readerAPI.tsdReaderClose(pInput->pReader);
|
||||||
pInput->pReader = NULL;
|
pInput->pReader = NULL;
|
||||||
|
|
Loading…
Reference in New Issue