From fbf67e42943913787b525efcbe3edf020217899e Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 3 Jan 2024 14:03:22 +0800 Subject: [PATCH] fix: save data when creating intial sources --- source/libs/executor/inc/executorInt.h | 6 - source/libs/executor/inc/tsort.h | 3 + source/libs/executor/src/scanoperator.c | 270 ++--------------------- source/libs/executor/src/tsort.c | 277 ++++++++++++++++++++++-- 4 files changed, 276 insertions(+), 280 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 1ab27e42a0..5b014bef63 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -273,11 +273,6 @@ typedef struct STableScanInfo { bool filesetDelimited; } STableScanInfo; -typedef struct STmsSortRowIdInfo { - SDiskbasedBuf* pExtSrcRowsBuf; - int32_t srcTsSlotId; -} STmsSortRowIdInfo; - typedef struct STableMergeScanInfo { int32_t tableStartIndex; int32_t tableEndIndex; @@ -312,7 +307,6 @@ typedef struct STableMergeScanInfo { bool rtnNextDurationBlocks; int32_t nextDurationBlocksIdx; bool bSortRowId; - STmsSortRowIdInfo sortRowIdInfo; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 365acf2bff..1ed4d7baa5 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -194,6 +194,9 @@ void tsortSetClosed(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle); void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param); +int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsSize); + +void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle); /** * @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen * @param [in] pSortCols cols to comp and build diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f690edcf07..36e810bd23 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3220,179 +3220,6 @@ _error: // ========================= table merge scan -static int32_t getPageFromExtSrcRowsBuf(SDiskbasedBuf* pResultBuf, int32_t rowBytes, int32_t* pPageId, SFilePage** ppFilePage) { - SFilePage* pFilePage = NULL; - - int32_t pageId = -1; - SArray* list = getDataBufPagesIdList(pResultBuf); - - if (taosArrayGetSize(list) == 0) { - pFilePage = getNewBufPage(pResultBuf, &pageId); - pFilePage->num = sizeof(SFilePage); - } else { - SPageInfo* pi = getLastPageInfo(list); - pFilePage = getBufPage(pResultBuf, getPageId(pi)); - if (pFilePage == NULL) { - qError("failed to get buffer, code:%s", tstrerror(terrno)); - return terrno; - } - - pageId = getPageId(pi); - - if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) { - releaseBufPageInfo(pResultBuf, pi); - - pFilePage = getNewBufPage(pResultBuf, &pageId); - if (pFilePage != NULL) { - pFilePage->num = sizeof(SFilePage); - } - } - } - - if (pFilePage == NULL) { - qError("failed to get buffer, code:%s", tstrerror(terrno)); - return terrno; - } - - *pPageId = pageId; - *ppFilePage = pFilePage; - return TSDB_CODE_SUCCESS; -} - -static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - - char* isNull = (char*)buf; - char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); - if (colDataIsNull_s(pCol, rowIdx)) { - isNull[i] = 1; - continue; - } - - isNull[i] = 0; - char* pData = colDataGetData(pCol, rowIdx); - if (pCol->info.type == TSDB_DATA_TYPE_JSON) { - if (pCol->pData) { - int32_t dataLen = getJsonValueLen(pData); - memcpy(pStart, pData, dataLen); - pStart += dataLen; - } else { - // the column that is pre-allocated has no data and has offset - *pStart = 0; - pStart += 1; - } - } else if (IS_VAR_DATA_TYPE(pCol->info.type)) { - if (pCol->pData) { - varDataCopy(pStart, pData); - pStart += varDataTLen(pData); - } else { - // the column that is pre-allocated has no data and has offset - *(VarDataLenT*)(pStart) = 0; - pStart += VARSTR_HEADER_SIZE; - } - } else { - int32_t bytes = pCol->info.bytes; - memcpy(pStart, pData, bytes); - pStart += bytes; - } - } - *(int32_t*)pStart = (char*)pStart - (char*)buf; - pStart += sizeof(int32_t); - return (int32_t)(pStart - (char*)buf); -} - -static int32_t saveBlockRowToExtRowsBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { - SDiskbasedBuf* pResultBuf = pInfo->sortRowIdInfo.pExtSrcRowsBuf; - int32_t rowBytes = blockDataGetRowSize(pBlock) + taosArrayGetSize(pBlock->pDataBlock) + sizeof(int32_t); - int32_t pageId = -1; - SFilePage* pFilePage = NULL; - int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - *pPageId = pageId; - *pOffset = pFilePage->num; - *pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset)); - - pFilePage->num += (*pLength); - setBufPageDirty(pFilePage, true); - releaseBufPage(pResultBuf, pFilePage); - return 0; -} - -static int32_t fillSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { - STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo; - - int32_t nRows = pSrcBlock->info.rows; - pSortInputBlk->info.window = pSrcBlock->info.window; - pSortInputBlk->info.id = pSrcBlock->info.id; - blockDataEnsureCapacity(pSortInputBlk, nRows); - - SColumnInfoData* tsCol = taosArrayGet(pSortInputBlk->pDataBlock, 0); - SColumnInfoData* pSrcTsCol = taosArrayGet(pSrcBlock->pDataBlock, pSortInfo->srcTsSlotId); - colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info); - - SColumnInfoData* pageIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1); - SColumnInfoData* offsetCol = taosArrayGet(pSortInputBlk->pDataBlock, 2); - - for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { - int32_t pageId = -1; - int32_t offset = -1; - int32_t length = -1; - saveBlockRowToExtRowsBuf(pInfo, pSrcBlock, i, &pageId, &offset, &length); - colDataSetInt32(pageIdCol, i, &pageId); - colDataSetInt32(offsetCol, i, &offset); - } - - pSortInputBlk->info.rows = nRows; - - return 0; -} - -static void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { - STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo; - - int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); - int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); - void* page = getBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, pageId); - - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - char* buf = (char*)page + offset; - char* isNull = (char*)buf; - char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - if (!isNull[i]) { - colDataSetVal(pColInfo, pBlock->info.rows, pStart, false); - if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { - int32_t dataLen = getJsonValueLen(pStart); - pStart += dataLen; - } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { - pStart += varDataTLen(pStart); - } else { - int32_t bytes = pColInfo->info.bytes; - pStart += bytes; - } - } else { - colDataSetNULL(pColInfo, pBlock->info.rows); - } - } - - if (*(int32_t*)pStart != pStart-buf) { - qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart, (int32_t)(pStart-buf)); - }; - - releaseBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, page); - - pBlock->info.dataLoad = 1; - pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; - pBlock->info.rows += 1; -} - static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { int64_t nRows = 0; void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); @@ -3535,18 +3362,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; - SSDataBlock* pSortInputBlk = NULL; - if (pInfo->bSortRowId) { - blockDataCleanup(pInfo->pSortInputBlock); - fillSortInputBlock(pInfo, pBlock, pInfo->pSortInputBlock); - pSortInputBlk = pInfo->pSortInputBlock; - } else { - pSortInputBlk = pBlock; - } - pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - return pSortInputBlk; + return pBlock; } return NULL; @@ -3594,26 +3412,6 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* return; } -int32_t startRowIdSort(STableMergeScanInfo *pInfo) { - STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo; - int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock), - taosArrayGetSize(pInfo->pResBlock->pDataBlock)); - pageSize *= 2; - int numOfTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; - int32_t memSize = TMIN(pageSize * numOfTables, 512 * 1024 * 1024); - int32_t code = createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize, memSize, "tms-ext-src-block", tsTempDir); - dBufSetPrintInfo(pSort->pExtSrcRowsBuf); - return code; -} - -int32_t stopRowIdSort(STableMergeScanInfo *pInfo) { - STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo; - - destroyDiskbasedBuf(pSort->pExtSrcRowsBuf); - pSort->pExtSrcRowsBuf = NULL; - return 0; -} - int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3624,16 +3422,20 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->bNewFilesetEvent = false; pInfo->bNextDurationBlockEvent = false; - code = startRowIdSort(pInfo); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); + if (pInfo->bSortRowId) { + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); + int32_t memSize = 512 * 1024 * 1024; + int32_t rowBytes = blockDataGetRowSize(pInfo->pResBlock) + taosArrayGetSize(pInfo->pResBlock->pDataBlock) + sizeof(int32_t); + int32_t pageSize = TMAX(memSize/numOfTable, rowBytes); + tsortSetSortByRowId(pInfo->pSortHandle, pageSize, memSize); + } else { + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); + } tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); @@ -3670,7 +3472,6 @@ void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { tsortDestroySortHandle(pInfo->pSortHandle); pInfo->pSortHandle = NULL; - stopRowIdSort(pInfo); } int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { @@ -3757,11 +3558,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* if (pTupleHandle == NULL) { break; } - if (!pInfo->bSortRowId) { - appendOneRowToDataBlock(pResBlock, pTupleHandle); - } else { - appendOneRowIdRowToDataBlock(pInfo, pResBlock, pTupleHandle); - } + tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle); if (pResBlock->info.rows >= capacity) { break; } @@ -3847,11 +3644,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->base.cond); - if (pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf != NULL) { - destroyDiskbasedBuf(pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf); - pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf = NULL; - } - int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams); pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); @@ -3895,34 +3687,6 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla return TSDB_CODE_SUCCESS; } -static void initRowIdSortputBlock(STableMergeScanInfo* pInfo) { - SSDataBlock* pSortInput = createDataBlock(); - SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); - blockDataAppendColInfo(pSortInput, &tsCol); - SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); - blockDataAppendColInfo(pSortInput, &pageIdCol); - SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); - blockDataAppendColInfo(pSortInput, &offsetCol); - pInfo->pSortInputBlock = pSortInput; - - int32_t srcTsSlotId = 0; - for (int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) { - SColMatchItem* colInfo = taosArrayGet(pInfo->base.matchInfo.pList, i); - if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - srcTsSlotId = colInfo->dstSlotId; - } - } - pInfo->sortRowIdInfo.srcTsSlotId = srcTsSlotId; - SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); - SBlockOrderInfo bi = {0}; - bi.order = pInfo->base.cond.order; - bi.slotId = 0; - bi.nullFirst = NULL_ORDER_FIRST; - taosArrayPush(pList, &bi); - pInfo->pSortInfo = pList; - return; -} - SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); @@ -4000,12 +3764,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->bSortRowId = false; } - if (!pInfo->bSortRowId) { - pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); - pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); - } else { - initRowIdSortputBlock(pInfo); - } + pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); + pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); int32_t rowSize = blockDataGetRowSize(pInfo->pSortInputBlock); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 205cd7d3ef..0b3a7c180e 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -75,6 +75,13 @@ struct SSortHandle { bool (*abortCheckFn)(void* param); void* abortCheckParam; + + bool bSortByRowId; + SDiskbasedBuf* pExtRowsBuf; + int32_t extRowsPageSize; + int32_t extRowsMemSize; + int32_t srcTsSlotId; + SBlockOrderInfo extRowsOrderInfo; }; void tsortSetSingleTableMerge(SSortHandle* pHandle) { @@ -199,7 +206,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->type = type; pSortHandle->pageSize = pageSize; pSortHandle->numOfPages = numOfPages; - pSortHandle->pSortInfo = pSortInfo; + pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL); pSortHandle->loops = 0; pSortHandle->pqMaxTupleLength = pqMaxTupleLength; @@ -303,6 +310,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { taosArrayDestroy(pSortHandle->pOrderedSource); taosMemoryFreeClear(pSortHandle); + if (pSortHandle->pExtRowsBuf != NULL) { + destroyDiskbasedBuf(pSortHandle->pExtRowsBuf); + } + taosArrayDestroy(pSortHandle->pSortInfo); } int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) { @@ -848,6 +859,228 @@ static int32_t createPageBuf(SSortHandle* pHandle) { return 0; } +void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { + if (pHandle->bSortByRowId) { + int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); + int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); + void* page = getBufPage(pHandle->pExtRowsBuf, pageId); + + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + char* buf = (char*)page + offset; + char* isNull = (char*)buf; + char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + if (!isNull[i]) { + colDataSetVal(pColInfo, pBlock->info.rows, pStart, false); + if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(pStart); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + pStart += varDataTLen(pStart); + } else { + int32_t bytes = pColInfo->info.bytes; + pStart += bytes; + } + } else { + colDataSetNULL(pColInfo, pBlock->info.rows); + } + } + + if (*(int32_t*)pStart != pStart - buf) { + qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart, + (int32_t)(pStart - buf)); + }; + + releaseBufPage(pHandle->pExtRowsBuf, page); + + pBlock->info.dataLoad = 1; + pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; + pBlock->info.rows += 1; + } else { + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + bool isNull = tsortIsNullVal(pTupleHandle, i); + if (isNull) { + colDataSetNULL(pColInfo, pBlock->info.rows); + } else { + char* pData = tsortGetValue(pTupleHandle, i); + if (pData != NULL) { + colDataSetVal(pColInfo, pBlock->info.rows, pData, false); + } + } + } + + pBlock->info.dataLoad = 1; + pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; + pBlock->info.rows += 1; + } +} + +static int32_t getPageFromExtSrcRowsBuf(SDiskbasedBuf* pResultBuf, int32_t rowBytes, int32_t* pPageId, SFilePage** ppFilePage) { + SFilePage* pFilePage = NULL; + + int32_t pageId = -1; + SArray* list = getDataBufPagesIdList(pResultBuf); + + if (taosArrayGetSize(list) == 0) { + pFilePage = getNewBufPage(pResultBuf, &pageId); + pFilePage->num = sizeof(SFilePage); + } else { + SPageInfo* pi = getLastPageInfo(list); + pFilePage = getBufPage(pResultBuf, getPageId(pi)); + if (pFilePage == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } + + pageId = getPageId(pi); + + if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) { + releaseBufPageInfo(pResultBuf, pi); + + pFilePage = getNewBufPage(pResultBuf, &pageId); + if (pFilePage != NULL) { + pFilePage->num = sizeof(SFilePage); + } + } + } + + if (pFilePage == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } + + *pPageId = pageId; + *ppFilePage = pFilePage; + return TSDB_CODE_SUCCESS; +} + +static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + char* isNull = (char*)buf; + char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (colDataIsNull_s(pCol, rowIdx)) { + isNull[i] = 1; + continue; + } + + isNull[i] = 0; + char* pData = colDataGetData(pCol, rowIdx); + if (pCol->info.type == TSDB_DATA_TYPE_JSON) { + if (pCol->pData) { + int32_t dataLen = getJsonValueLen(pData); + memcpy(pStart, pData, dataLen); + pStart += dataLen; + } else { + // the column that is pre-allocated has no data and has offset + *pStart = 0; + pStart += 1; + } + } else if (IS_VAR_DATA_TYPE(pCol->info.type)) { + if (pCol->pData) { + varDataCopy(pStart, pData); + pStart += varDataTLen(pData); + } else { + // the column that is pre-allocated has no data and has offset + *(VarDataLenT*)(pStart) = 0; + pStart += VARSTR_HEADER_SIZE; + } + } else { + int32_t bytes = pCol->info.bytes; + memcpy(pStart, pData, bytes); + pStart += bytes; + } + } + *(int32_t*)pStart = (char*)pStart - (char*)buf; + pStart += sizeof(int32_t); + return (int32_t)(pStart - (char*)buf); +} + +static int32_t saveBlockRowToExtRowsBuf(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { + SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf; + int32_t rowBytes = blockDataGetRowSize(pBlock) + taosArrayGetSize(pBlock->pDataBlock) + sizeof(int32_t); + int32_t pageId = -1; + SFilePage* pFilePage = NULL; + int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + *pPageId = pageId; + *pOffset = pFilePage->num; + *pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset)); + + pFilePage->num += (*pLength); + setBufPageDirty(pFilePage, true); + releaseBufPage(pResultBuf, pFilePage); + return 0; +} + + +static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) { + int32_t pageId = -1; + int32_t offset = -1; + int32_t length = -1; + saveBlockRowToExtRowsBuf(pHandle, pSource, *rowIndex, &pageId, &offset, &length); + + SSDataBlock* pBlock = pHandle->pDataBlock; + SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId); + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0); + char* pData = colDataGetData(pSrcTsCol, *rowIndex); + colDataSetVal(pTsCol, pBlock->info.rows, pData, false); + + SColumnInfoData* pPageIdCol = taosArrayGet(pBlock->pDataBlock, 1); + colDataSetInt32(pPageIdCol, pBlock->info.rows, &pageId); + + SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2); + colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset); + + pBlock->info.rows += 1; + *rowIndex += 1; +} + +static void initRowIdSort(SSortHandle* pHandle) { + blockDataDestroy(pHandle->pDataBlock); + + SSDataBlock* pSortInput = createDataBlock(); + SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); + blockDataAppendColInfo(pSortInput, &tsCol); + SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); + blockDataAppendColInfo(pSortInput, &pageIdCol); + SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); + blockDataAppendColInfo(pSortInput, &offsetCol); + pHandle->pDataBlock = pSortInput; + + SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); + SBlockOrderInfo bi = {0}; + bi.order = pOrder->order; + bi.slotId = 0; + bi.nullFirst = NULL_ORDER_FIRST; + + SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo)); + taosArrayPush(aOrder, &bi); + + taosArrayDestroy(pHandle->pSortInfo); + pHandle->pSortInfo = aOrder; + return; +} + +int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsMemSize) { + int32_t code = createDiskbasedBuf(&pHandle->pExtRowsBuf, extRowsPageSize, extRowsMemSize, "sort-ext-rows", tsTempDir); + pHandle->extRowsPageSize = extRowsPageSize; + pHandle->extRowsMemSize = extRowsMemSize; + SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); + pHandle->extRowsOrderInfo = *pOrder; + initRowIdSort(pHandle); + pHandle->bSortByRowId = true; + return code; +} + typedef struct SBlkMergeSupport { int64_t** aTs; int32_t* aRowIdx; @@ -919,7 +1152,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx return sz; } -static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) { +static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { int32_t code = TSDB_CODE_SUCCESS; int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); @@ -927,13 +1160,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO blockDataCleanup(pHandle->pDataBlock); int32_t numBlks = taosArrayGetSize(aBlk); + SBlockOrderInfo* pOrigBlockOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo; + SBlockOrderInfo* pHandleBlockOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlkMergeSupport sup; sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); - sup.order = order->order; + sup.order = pOrigBlockOrder->order; for (int i = 0; i < numBlks; ++i) { SSDataBlock* blk = taosArrayGetP(aBlk, i); - SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId); + SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockOrder->slotId); sup.aTs[i] = (int64_t*)col->pData; sup.aRowIdx[i] = 0; } @@ -958,8 +1193,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO int32_t nMergedRows = 0; bool mergeLimitReached = false; size_t blkPgSz = pgHeaderSz; - int64_t lastPageBufTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; - int64_t currTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; + int64_t lastPageBufTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; + int64_t currTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; while (nRows < totalRows) { int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); @@ -967,7 +1202,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows); if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { - SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId); + SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId); lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); nMergedRows += pHandle->pDataBlock->info.rows; @@ -977,15 +1212,19 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { mergeLimitReached = true; - if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) || - (lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) { + if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) || + (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) { pHandle->currMergeLimitTs = lastPageBufTs; } break; } } blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); - appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + if (!pHandle->bSortByRowId) { + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + } else { + appendToRowIndexDataBlock(pHandle, minBlk, &minRow); + } blkPgSz += bufInc; ++nRows; @@ -999,14 +1238,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO } if (pHandle->pDataBlock->info.rows > 0) { if (!mergeLimitReached) { - SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId); + SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId); lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); nMergedRows += pHandle->pDataBlock->info.rows; if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { mergeLimitReached = true; - if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) || - (lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) { + if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) || + (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) { pHandle->currMergeLimitTs = lastPageBufTs; } } @@ -1025,7 +1264,6 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO } static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { - SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); @@ -1040,7 +1278,8 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); int32_t szSort = 0; - if (pOrder->order == TSDB_ORDER_ASC) { + SBlockOrderInfo* pOrigOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo; + if (pOrigOrder->order == TSDB_ORDER_ASC) { pHandle->currMergeLimitTs = INT64_MAX; } else { pHandle->currMergeLimitTs = INT64_MIN; @@ -1051,10 +1290,10 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); if (pBlk != NULL) { - SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); + SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; - if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || - (pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { + if ((pOrigOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || + (pOrigOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { continue; } } @@ -1076,7 +1315,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { tSimpleHashClear(mUidBlk); int64_t p = taosGetTimestampUs(); - code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk); taosArrayDestroy(aBlkSort);