feat: table merge scan with one less disk usage

This commit is contained in:
shenglian zhou 2024-01-23 16:34:14 +08:00
parent e5ec56d811
commit dd70c63dfe
2 changed files with 267 additions and 0 deletions

View File

@ -42,6 +42,7 @@ extern "C" {
// #include "tstream.h" // #include "tstream.h"
// #include "tstreamUpdate.h" // #include "tstreamUpdate.h"
#include "tlrucache.h" #include "tlrucache.h"
#include "tdatablock.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
@ -284,6 +285,34 @@ typedef struct STableScanInfo {
bool needCountEmptyTable; bool needCountEmptyTable;
} STableScanInfo; } STableScanInfo;
typedef enum ESubTableInputType {
SUB_TABLE_MEM_BLOCK,
SUB_TABLE_EXT_PAGES,
} ESubTableInputType;
typedef struct STmsSubTableInput {
STsdbReader* pReader;
ESubTableInputType type;
SSDataBlock* pBlock;
SArray* aBlockPages;
int32_t pageIdx;
int32_t rowIdx;
int64_t* aTs;
} STmsSubTableInput;
typedef struct STmsSubTablesMergeInfo {
SBlockOrderInfo* pOrderInfo;
int32_t numSubTables;
STmsSubTableInput* aInputs;
SMultiwayMergeTreeInfo* pTree;
int32_t numTablesCompleted;
int32_t numTableBlocksInMem;
SDiskbasedBuf* pBlocksBuf;
} STmsSubTablesMergeInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
int32_t tableStartIndex; int32_t tableStartIndex;
int32_t tableEndIndex; int32_t tableEndIndex;
@ -318,6 +347,9 @@ typedef struct STableMergeScanInfo {
SSDataBlock* nextDurationBlocks[2]; SSDataBlock* nextDurationBlocks[2];
bool rtnNextDurationBlocks; bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx; int32_t nextDurationBlocksIdx;
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanFilterContext { typedef struct STagScanFilterContext {

View File

@ -3311,6 +3311,241 @@ _error:
return NULL; return NULL;
} }
// table merge scan operator
// table merge scan operator
// TODO: limit / duration optimization
// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish
// TODO: error processing, memory freeing
// TODO: add log for error and perf
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
int32_t left = *(int32_t*)pLeft;
int32_t right = *(int32_t*)pRight;
STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param;
int32_t leftIdx = pInfo->aInputs[left].rowIdx;
int32_t rightIdx = pInfo->aInputs[right].rowIdx;
if (leftIdx == -1) {
return 1;
} else if (rightIdx == -1) {
return -1;
}
int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx];
int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx];
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) {
ret = -1 * ret;
}
return ret;
}
static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pHasNext) {
const STableMergeScanInfo* pInfo = pOperator->info;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
blockDataCleanup(pInput->pBlock);
bool hasNext = false;
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext);
*pHasNext = hasNext;
if (!hasNext) {
pInput->rowIdx = -1;
++pSubTblsInfo->numTablesCompleted;
return code;
}
uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status);
return code;
}
static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo,
STmsSubTableInput* pInput) {
pInput->aBlockPages = taosArrayInit(4, sizeof(int32_t));
int32_t start = 0;
SSDataBlock* pDataBlock = pInput->pBlock;
while (start < pDataBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize);
SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
int32_t pageId = -1;
void* pPage = getNewBufPage(pSubTblsInfo->pBlocksBuf, &pageId);
taosArrayPush(pInput->aBlockPages, &pageId);
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pSubTblsInfo->pBlocksBuf));
blockDataToBuf(pPage, p);
setBufPageDirty(pPage, true);
releaseBufPage(pSubTblsInfo->pBlocksBuf, pPage);
blockDataDestroy(p);
start = stop + 1;
}
blockDataCleanup(pInput->pBlock);
return TSDB_CODE_SUCCESS;
}
static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput) {
int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pInput->pageIdx);
void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId);
blockDataFromBuf(pInput->pBlock, page);
releaseBufPage(pSubTblsInfo->pBlocksBuf, page);
pInput->rowIdx = 0;
return TSDB_CODE_SUCCESS;
}
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
}
pInfo->tableEndIndex = i - 1;
}
static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i;
if (pInput->rowIdx == -1) {
continue;
}
if (pInput->type == SUB_TABLE_EXT_PAGES) {
pInput->pageIdx = 0;
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
} else {
pInput->rowIdx = 0;
pInput->pageIdx = -1;
}
SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
pInput->aTs = (int64_t*)col->pData;
}
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
return TSDB_CODE_SUCCESS;
}
static int32_t startSubTablesTms(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
setGroupStartEndIndex(pInfo);
pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput));
int32_t bufPageSize = pInfo->bufPageSize;
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
for (int32_t i = pInfo->tableStartIndex; i <= pInfo->tableEndIndex; ++i) {
int32_t idx = i - pInfo->tableStartIndex;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + idx;
pInput->type = SUB_TABLE_MEM_BLOCK;
pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false);
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInfo->pReaderBlock,
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
if (idx + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) {
saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
pInput->type = SUB_TABLE_EXT_PAGES;
}
}
initSubTablesMergeSort(pSubTblsInfo);
return TSDB_CODE_SUCCESS;
}
static int32_t stopSubTablesTms(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
tMergeTreeDestroy(pSubTblsInfo->pTree);
for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
blockDataDestroy(pInput->pBlock);
taosArrayDestroy(pInput->aBlockPages);
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
}
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
taosMemoryFree(pSubTblsInfo->aInputs);
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo, int32_t *pNumCompleted) {
STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
if (pInput->rowIdx < pInput->pBlock->info.rows - 1) {
++pInput->rowIdx;
return TSDB_CODE_SUCCESS;
} else if (pInput->rowIdx == pInput->pBlock->info.rows -1 ) {
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
} else if (pInput->type == SUB_TABLE_EXT_PAGES) {
++pInput->pageIdx;
if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) {
bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
if (hasNext) {
saveSubTableBlock(pInfo, pSubTblsInfo, pInput);
pInput->pageIdx = 0;
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
}
} else {
fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput);
}
}
SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
pInput->aTs = (int64_t*)col->pData;
}
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
return TSDB_CODE_SUCCESS;
}
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pInput->pBlock->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pInput->pBlock->info.rows, pInput->rowIdx, NULL);
if (isNull) {
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
} else {
if (!pSrcColInfo->pData) continue;
char* pData = colDataGetData(pSrcColInfo, pInput->rowIdx);
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
}
}
pBlock->info.rows += 1;
return TSDB_CODE_SUCCESS;
}
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0; int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));