diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index cba26c46b5..c103d8eee7 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -273,6 +273,17 @@ typedef struct STableScanInfo { bool filesetDelimited; } STableScanInfo; +typedef struct STmsSortRowIdInfo { + int32_t blkId; + int64_t dataFileOffset; + TdFilePtr idxFile; + char idxPath[PATH_MAX]; + TdFilePtr dataFile; + char dataPath[PATH_MAX]; + SLRUCache* pBlkInfoCache; // blkId->(offset, len) + SLRUCache* pBlkDataCache; // blkId->SSDataBlock* +} STmsSortRowIdInfo; + typedef struct STableMergeScanInfo { int32_t tableStartIndex; int32_t tableEndIndex; @@ -301,6 +312,8 @@ typedef struct STableMergeScanInfo { bool bNewFileset; bool bOnlyRetrieveBlock; bool filesetDelimited; + bool bSortRowId; + STmsSortRowIdInfo tmsSortRowIdInfo; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a92770b1f9..d88433c5aa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3219,6 +3219,141 @@ _error: return NULL; } +// ========================= table merge scan +typedef struct STmsSortBlockInfo { + int32_t blkId; + int32_t length; + int64_t offset; +} STmsSortBlockInfo; + +static int32_t saveSourceBlock(STmsSortRowIdInfo* pSortInfo, const SSDataBlock* pSrcBlock, int32_t *pSzBlk) { + int32_t szBlk = blockDataGetSize(pSrcBlock) + sizeof(int32_t) + taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(int32_t); + char* buf = taosMemoryMalloc(szBlk); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + blockDataToBuf(buf, pSrcBlock); + *pSzBlk = szBlk; + + taosLSeekFile(pSortInfo->dataFile, pSortInfo->dataFileOffset, SEEK_SET); + taosWriteFile(pSortInfo->dataFile, buf, szBlk); + + STmsSortBlockInfo info = {.blkId = pSortInfo->blkId + , .offset = pSortInfo->dataFileOffset, .length = szBlk}; + taosLSeekFile(pSortInfo->idxFile, pSortInfo->blkId*sizeof(STmsSortBlockInfo), SEEK_SET); + taosWriteFile(pSortInfo->idxFile, &info, sizeof(info)); + + return 0; +} + +static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo, + const SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { + const STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; + + int32_t nRows = pSrcBlock->info.rows; + pSortInputBlk->info = pSrcBlock->info; + blockDataEnsureCapacity(pSortInputBlk, nRows); + + int32_t tsSlotId = ((SBlockOrderInfo*)taosArrayGet(pInfo->pSortInfo, 0))->slotId; + SColumnInfoData* tsCol = taosArrayGet(pSortInputBlk->pDataBlock, 0); + SColumnInfoData* pSrcTsCol = taosArrayGet(pSrcBlock->pDataBlock, tsSlotId); + colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info); + + SColumnInfoData* blkIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1); + colDataSetNItems(blkIdCol, 0, (char*)&pSortInfo->blkId, nRows, false); + + SColumnInfoData* rowIdxCol = taosArrayGet(pSortInputBlk->pDataBlock, 2); + for (int32_t i = 0; i < nRows; ++i) { + colDataSetInt32(rowIdxCol, i, &i); + } + return 0; +} + +static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { + //TODO: batch save + int32_t code = 0; + STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; + int32_t szBlk = 0; + code = saveSourceBlock(pSortInfo, pSrcBlock, &szBlk); + + fillSortInputBlock(pInfo, pSrcBlock, pSortInputBlk); + + ++pSortInfo->blkId; + pSortInfo->dataFileOffset = ((pSortInfo->dataFileOffset + szBlk) + 4096) & ~4096; + + return code; +} + +static void deleteBlockInfoCache(const void *key, size_t keyLen, void *value, void *ud) { + taosMemoryFree(value); +} + +static void deleteBlockDataCache(const void *key, size_t keyLen, void *value, void *ud) { + SSDataBlock* pBlock = value; + blockDataDestroy(pBlock); +} + +static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, SSDataBlock** ppBlock) { + STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; + + LRUHandle* hBlk = taosLRUCacheLookup(pSortInfo->pBlkDataCache, &blockId, sizeof(blockId)); + if (hBlk) { + SSDataBlock* pBlk = taosLRUCacheValue(pSortInfo->pBlkDataCache, hBlk); + *ppBlock = pBlk; + } else { + STmsSortBlockInfo* blkInfo = NULL; + LRUHandle* hBlkInfo = taosLRUCacheLookup(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId)); + if (hBlkInfo) { + blkInfo = taosLRUCacheValue(pSortInfo->pBlkInfoCache, hBlkInfo); + } else { + blkInfo = taosMemoryMalloc(sizeof(STmsSortBlockInfo)); + taosLSeekFile(pSortInfo->idxFile, blockId * sizeof(STmsSortBlockInfo), SEEK_SET); + taosReadFile(pSortInfo->idxFile, &blkInfo, sizeof(blkInfo)); + ASSERT(blkInfo->blkId == blockId); + taosLRUCacheInsert(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId), blkInfo, 1, deleteBlockInfoCache, + &hBlkInfo, TAOS_LRU_PRIORITY_LOW, NULL); + } + { + taosLSeekFile(pSortInfo->dataFile, blkInfo->offset, SEEK_SET); + char* buf = taosMemoryMalloc(blkInfo->length); + taosReadFile(pSortInfo->dataFile, buf, blkInfo->length); + SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false); + blockDataFromBuf(pBlock, buf); + *ppBlock = pBlock; + + taosLRUCacheInsert(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId), pBlock, 1, deleteBlockDataCache, + &hBlk, TAOS_LRU_PRIORITY_LOW, NULL); + } + } + return 0; +} + +void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { + int32_t blkId = *(int32_t*)tsortGetValue(pTupleHandle, 1); + int32_t rowIdx = *(int32_t*)tsortGetValue(pTupleHandle, 2); + SSDataBlock* pSrcBlk = NULL; + retrieveSourceBlock(pInfo, blkId, &pSrcBlk); + + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pSrcColInfo = taosArrayGet(pSrcBlk->pDataBlock, i); + + bool isNull = colDataIsNull_s(pSrcColInfo, rowIdx); + if (isNull) { + colDataSetNULL(pColInfo, pBlock->info.rows); + } else { + char* pData = colDataGetData(pSrcColInfo, i); + if (pData != NULL) { + colDataSetVal(pColInfo, pBlock->info.rows, pData, false); + } + } + } + + pBlock->info.dataLoad = 1; + pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; + pBlock->info.rows += 1; +} + static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { int64_t nRows = 0; void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); @@ -3308,11 +3443,17 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (pInfo->mergeLimit != -1) { tableMergeScanDoSkipTable(pInfo, pBlock); } - + SSDataBlock* pSortInputBlk = NULL; + if (pInfo->bSortRowId) { + pSortInputBlk = createOneDataBlock(pInfo->pSortInputBlock, false); + transformIntoSortInputBlock(pInfo, pBlock, pSortInputBlk); + } else { + pSortInputBlk = pBlock; + } pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - return pBlock; + return pSortInputBlk; } return NULL; @@ -3353,6 +3494,33 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* return; } +int32_t startRowIdSort(STableMergeScanInfo *pInfo) { + STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; + pSort->blkId = 0; + pSort->dataFileOffset = 0; + taosGetTmpfilePath(tsTempDir, "tms-block-info", pSort->idxPath); + pSort->idxFile = taosOpenFile(pSort->idxPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + taosGetTmpfilePath(tsTempDir, "tms-block-data", pSort->dataPath); + pSort->dataFile = taosOpenFile(pSort->dataPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + pSort->pBlkInfoCache = taosLRUCacheInit(2048, 0, 0.5); + taosLRUCacheSetStrictCapacity(pSort->pBlkInfoCache, false); + pSort->pBlkDataCache = taosLRUCacheInit(2048, 0, 0.5); + taosLRUCacheSetStrictCapacity(pSort->pBlkInfoCache, false); + return 0; +} + +int32_t stopRowIdSort(STableMergeScanInfo *pInfo) { + STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; + taosCloseFile(&pSort->idxFile); + taosRemoveFile(pSort->idxPath); + taosCloseFile(&pSort->dataFile); + taosRemoveFile(pSort->dataPath); + + taosLRUCacheCleanup(pSort->pBlkInfoCache); + taosLRUCacheCleanup(pSort->pBlkDataCache); + return 0; +} + int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3361,6 +3529,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->bNewFileset = false; + startRowIdSort(pInfo); pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, @@ -3399,6 +3568,8 @@ void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { tsortDestroySortHandle(pInfo->pSortHandle); pInfo->pSortHandle = NULL; + + stopRowIdSort(pInfo); } int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { @@ -3475,8 +3646,11 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* if (pTupleHandle == NULL) { break; } - - appendOneRowToDataBlock(pResBlock, pTupleHandle); + if (!pInfo->bSortRowId) { + appendOneRowToDataBlock(pResBlock, pTupleHandle); + } else { + appendOneRowIdRowToDataBlock(pInfo, pResBlock, pTupleHandle); + } if (pResBlock->info.rows >= capacity) { break; } @@ -3659,9 +3833,27 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); + if (!pInfo->bSortRowId) { + pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); + pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + } else { + SSDataBlock* pSortInput = createDataBlock(); + SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); + blockDataAppendColInfo(pSortInput, &tsCol); + SColumnInfoData blkIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); + blockDataAppendColInfo(pSortInput, &blkIdCol); + SColumnInfoData rowIdxCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); + blockDataAppendColInfo(pSortInput, &rowIdxCol); + pInfo->pSortInputBlock = pSortInput; - pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); - pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); + SBlockOrderInfo bi = {0}; + bi.order = pInfo->base.cond.order; + bi.slotId = 0; + bi.nullFirst = NULL_ORDER_FIRST; + taosArrayPush(pList, &bi); + pInfo->pSortInfo = pList; + } initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); pInfo->mTableNumRows = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); @@ -3678,6 +3870,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); pInfo->filesetDelimited = pTableScanNode->filesetDelimited; + setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols;