From 7f93cb9f1a5f0524dcb917c626d119bdc0f528f1 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 13 Dec 2023 11:38:14 +0800 Subject: [PATCH] fix: use pageid, offset, length as row index --- source/libs/executor/inc/executorInt.h | 3 +- source/libs/executor/src/scanoperator.c | 197 +++++++++++++++--------- 2 files changed, 124 insertions(+), 76 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 7a9160b392..35f2e28d36 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -274,8 +274,7 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STmsSortRowIdInfo { - SDiskbasedBuf* pExtSrcBlkBuf; - SSHashObj* pBlkDataHash; // blkId->SSDataBlock* + SDiskbasedBuf* pExtSrcRowsBuf; } STmsSortRowIdInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8914c2aeac..be5cf298a4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3220,6 +3220,80 @@ _error: } // ========================= table merge scan + +static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { + SDiskbasedBuf* pResultBuf = pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf; + int32_t rowBytes = blockDataGetRowSize(pBlock); + + SFilePage* pFilePage = NULL; + + // in the first scan, new space needed for results + int32_t pageId = -1; + SArray* list = getDataBufPagesIdList(pResultBuf); + + if (taosArrayGetSize(list) == 0) { + pFilePage = getNewBufPage(pResultBuf, &pageId); + pFilePage->num = sizeof(SFilePage); + } else { + SPageInfo* pi = getLastPageInfo(list); + pFilePage = getBufPage(pResultBuf, getPageId(pi)); + if (pFilePage == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } + + pageId = getPageId(pi); + + if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) { + // release current page first, and prepare the next one + releaseBufPageInfo(pResultBuf, pi); + + pFilePage = getNewBufPage(pResultBuf, &pageId); + if (pFilePage != NULL) { + pFilePage->num = sizeof(SFilePage); + } + } + } + + if (pFilePage == NULL) { + return -1; + } + *pPageId = pageId; + *pOffset = pFilePage->num; + char* buf = (char*)pFilePage + (*pOffset); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + char* isNull = (char*)buf; + char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (colDataIsNull_s(pCol, rowIdx)) { + isNull[i] = 1; + continue; + } + + isNull[i] = 0; + char* pData = colDataGetData(pCol, rowIdx); + if (pCol->info.type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(pData); + memcpy(pStart, pData, dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pCol->info.type)) { + varDataCopy(pStart, pData); + pStart += varDataTLen(pData); + } else { + int32_t bytes = pCol->info.bytes; + memcpy(pStart, pData, bytes); + pStart += bytes; + } + } + *pLength = (int32_t)(pStart - (char*)buf); + pFilePage->num += (*pLength); + setBufPageDirty(pFilePage, true); + releaseBufPage(pResultBuf, pFilePage); + return 0; +} + static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; @@ -3233,33 +3307,18 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo SColumnInfoData* pSrcTsCol = taosArrayGet(pSrcBlock->pDataBlock, tsSlotId); colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info); - SColumnInfoData* blkIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1); - SColumnInfoData* rowIdxCol = taosArrayGet(pSortInputBlk->pDataBlock, 2); - - int32_t start = 0; - while (start < pSrcBlock->info.rows) { - int32_t stop = 0; - blockDataSplitRows(pSrcBlock, pSrcBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize); - SSDataBlock* p = blockDataExtractBlock(pSrcBlock, start, stop-start+1); + SColumnInfoData* pageIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1); + SColumnInfoData* offsetCol = taosArrayGet(pSortInputBlk->pDataBlock, 2); + SColumnInfoData* lengthCol = taosArrayGet(pSortInputBlk->pDataBlock, 3); + for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { int32_t pageId = -1; - void* pPage = getNewBufPage(pSortInfo->pExtSrcBlkBuf, &pageId); - - int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); - ASSERT(size <= getBufPageSize(pSortInfo->pExtSrcBlkBuf)); - - blockDataToBuf(pPage, p); - - setBufPageDirty(pPage, true); - releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage); - - blockDataDestroy(p); - for (int32_t i = start; i <= stop; ++i) { - colDataSetInt32(blkIdCol, i, &pageId); - int32_t rowIdx = i - start; - colDataSetInt32(rowIdxCol, i, &rowIdx); - } - start = stop + 1; + int32_t offset = -1; + int32_t length = -1; + saveBlockRowToBuf(pInfo, pSrcBlock, i, &pageId, &offset, &length); + colDataSetInt32(pageIdCol, i, &pageId); + colDataSetInt32(pageIdCol, i, &offset); + colDataSetInt32(pageIdCol, i, &length); } pSortInputBlk->info.rows = nRows; @@ -3267,50 +3326,38 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo return 0; } +void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { + int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); + int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); + int32_t length = *(int32_t*)tsortGetValue(pTupleHandle, 2); + void* page = getBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, pageId); -static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, SSDataBlock** ppBlock) { STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; - void* pBlkHashVal = tSimpleHashGet(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId)); - if (pBlkHashVal) { - *ppBlock = *(SSDataBlock**)pBlkHashVal; - } - else { - void* pPage = getBufPage(pSortInfo->pExtSrcBlkBuf, blockId); - SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false); - blockDataFromBuf(pBlock, pPage); - releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage); - - *ppBlock = pBlock; - tSimpleHashPut(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId), &pBlock, sizeof(pBlock)); - } - return 0; -} - -void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { - int32_t blkId = *(int32_t*)tsortGetValue(pTupleHandle, 1); - int32_t rowIdx = *(int32_t*)tsortGetValue(pTupleHandle, 2); - SSDataBlock* pSrcBlk = NULL; - retrieveSourceBlock(pInfo, blkId, &pSrcBlk); - - for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + char* buf = (char*)page + offset; + char* isNull = (char*)buf; + char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pSrcColInfo = taosArrayGet(pSrcBlk->pDataBlock, i); - bool isNull = colDataIsNull_s(pSrcColInfo, rowIdx); - if (isNull) { - colDataSetNULL(pColInfo, pBlock->info.rows); - } else { - char* pData = colDataGetData(pSrcColInfo, rowIdx); - if (pData != NULL) { - colDataSetVal(pColInfo, pBlock->info.rows, pData, false); + if (!isNull[i]) { + colDataSetVal(pColInfo, pBlock->info.rows, pStart, false); + if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(pStart); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + pStart += varDataTLen(pStart); + } else { + int32_t bytes = pColInfo->info.bytes; + pStart += bytes; } + } else { + colDataSetNULL(pColInfo, pBlock->info.rows); } } - if (rowIdx == pSrcBlk->info.rows - 1) { - tSimpleHashRemove(pInfo->tmsSortRowIdInfo.pBlkDataHash, &blkId, sizeof(blkId)); - blockDataDestroy(pSrcBlk); - } + releaseBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, page); + pBlock->info.dataLoad = 1; pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; pBlock->info.rows += 1; @@ -3462,19 +3509,19 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* int32_t startRowIdSort(STableMergeScanInfo *pInfo) { STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; - createDiskbasedBuf(&pSort->pExtSrcBlkBuf, pInfo->bufPageSize, pInfo->sortBufSize, "tms-ext-src-block", tsTempDir); - dBufSetPrintInfo(pSort->pExtSrcBlkBuf); - pSort->pBlkDataHash = tSimpleHashInit(2048, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock), + taosArrayGetSize(pInfo->pResBlock->pDataBlock)); + int32_t memSize = pageSize * 2048; + createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize, memSize, "tms-ext-src-block", tsTempDir); + dBufSetPrintInfo(pSort->pExtSrcRowsBuf); return 0; } int32_t stopRowIdSort(STableMergeScanInfo *pInfo) { STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; - destroyDiskbasedBuf(pSort->pExtSrcBlkBuf); - pSort->pExtSrcBlkBuf = NULL; - tSimpleHashCleanup(pSort->pBlkDataHash); - pSort->pBlkDataHash = NULL; + destroyDiskbasedBuf(pSort->pExtSrcRowsBuf); + pSort->pExtSrcRowsBuf = NULL; return 0; } @@ -3798,10 +3845,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN SSDataBlock* pSortInput = createDataBlock(); SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); blockDataAppendColInfo(pSortInput, &tsCol); - SColumnInfoData blkIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); - blockDataAppendColInfo(pSortInput, &blkIdCol); - SColumnInfoData rowIdxCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); - blockDataAppendColInfo(pSortInput, &rowIdxCol); + SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); + blockDataAppendColInfo(pSortInput, &pageIdCol); + SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); + blockDataAppendColInfo(pSortInput, &offsetCol); + SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); + blockDataAppendColInfo(pSortInput, &lengthCol); pInfo->pSortInputBlock = pSortInput; SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); @@ -3823,8 +3872,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN } pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); - int32_t rowSize = pInfo->pResBlock->info.rowSize; - uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock); + int32_t rowSize = pInfo->pSortInputBlock->info.rowSize; + uint32_t nCols = taosArrayGetSize(pInfo->pSortInputBlock->pDataBlock); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); pInfo->filesetDelimited = pTableScanNode->filesetDelimited;