From eb45b021b94b89b8938b7a8978d477b30b4adae5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 12 Jul 2020 23:46:46 +0800 Subject: [PATCH] finish more code --- src/tsdb/src/tsdbRWHelper.c | 451 +++++++++++++++++++++--------------- 1 file changed, 263 insertions(+), 188 deletions(-) diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 21efb1f60f..b66ab5c446 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -25,35 +25,41 @@ #define TSDB_KEY_COL_OFFSET 0 #define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock)) -static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, - bool isLast, bool isSuperBlock); -static int compareKeyBlock(const void *arg1, const void *arg2); -static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); -static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); -static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); -static void tsdbResetHelperFileImpl(SRWHelper *pHelper); -static int tsdbInitHelperFile(SRWHelper *pHelper); -static void tsdbDestroyHelperFile(SRWHelper *pHelper); -static void tsdbResetHelperTableImpl(SRWHelper *pHelper); -static void tsdbResetHelperTable(SRWHelper *pHelper); -static void tsdbInitHelperTable(SRWHelper *pHelper); -static void tsdbDestroyHelperTable(SRWHelper *pHelper); -static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); -static void tsdbResetHelperBlock(SRWHelper *pHelper); -static int tsdbInitHelperBlock(SRWHelper *pHelper); -static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); -static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, - int maxPoints, char *buffer, int bufferSize); -static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, - int numOfColIds); -static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); -static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); +static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, + bool isLast, bool isSuperBlock); +static int compareKeyBlock(const void *arg1, const void *arg2); +static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static void tsdbResetHelperFileImpl(SRWHelper *pHelper); +static int tsdbInitHelperFile(SRWHelper *pHelper); +static void tsdbDestroyHelperFile(SRWHelper *pHelper); +static void tsdbResetHelperTableImpl(SRWHelper *pHelper); +static void tsdbResetHelperTable(SRWHelper *pHelper); +static void tsdbInitHelperTable(SRWHelper *pHelper); +static void tsdbDestroyHelperTable(SRWHelper *pHelper); +static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); +static void tsdbResetHelperBlock(SRWHelper *pHelper); +static int tsdbInitHelperBlock(SRWHelper *pHelper); +static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, + int maxPoints, char *buffer, int bufferSize); +static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, + int numOfColIds); +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); +static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx); +static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey); static void tsdbDestroyHelperBlock(SRWHelper *pHelper); static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, SDataCol *pDataCol); +static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock); +static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, + int *blkIdx); +static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows); // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { @@ -236,12 +242,7 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]); - STsdbCfg * pCfg = &(pHelper->pRepo->config); int blkIdx = 0; - int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SCompBlock compBlock = {0}; - - SSkipListIterator sIter = {0}; ASSERT(TABLE_TID(pCommitIter->pTable) == pHelper->tableInfo.tid); if (TABLE_UID(pCommitIter->pTable) != pIdx->uid) memset((void *)pIdx, 0, sizeof(*pIdx)); @@ -252,166 +253,10 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols if (keyFirst < 0 || keyFirst > maxKey) break; // iter over if (pIdx->offset <= 0 || keyFirst > pIdx->maxKey) { - if (pIdx->hasLast) { - ASSERT(pIdx->offset > 0); - SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); - ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey, - defaultRowsInBlock - pCompBlock->numOfRows, pDataCols, NULL, 0); - ASSERT((rowsRead > 0) && (rowsRead == pDataCols->numOfRows)); - if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && - pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) - return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks-1, rowsRead) < 0) return -1; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); - - if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + rowsRead); - SFile *pFile = NULL; - bool isLast = false; - if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) { - pFile = &(pHelper->files.dataF); - } else { - isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); - } - - if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], &compBlock, - isLast, true) < 0) - return -1; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; - } - } else { // last block is not in .last file - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); - ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); - - SFile *pFile = NULL; - bool isLast = false; - if (rowsRead >= pCfg->minRowsPerFileBlock) { - pFile = &(pHelper->files.dataF); - } else { - isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); - } - - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, &compBlock, isLast, true) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; - } + if (tsdbProcessAppendCommit(pHelper, pCommitIter, pDataCols, maxKey) < 0) return -1; blkIdx = pIdx->numOfBlocks; } else { - SCompBlock *pCompBlock = taosbsearch(&keyFirst, (void *)blockAtIdx(pHelper, blkIdx), pIdx->numOfBlocks - blkIdx, - sizeof(SCompBlock), compareKeyBlock, TD_GE); - ASSERT(pCompBlock != NULL); - - if (pCompBlock->last) { - ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); - - int16_t colId = 0; - sIter = *(pCommitIter->pIter); - if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; - int rows2 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, maxKey, rows1, NULL, - pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows); - if (rows2 == 0) { // all data are filtered - *pCommitIter->pIter = sIter; - } else { - if (rows2 + rows1 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && - !TSDB_NLAST_FILE_OPENED(pHelper)) { - tdResetDataCols(pDataCols); - int rows = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, - pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows); - ASSERT(rows == rows2 && pDataCols->numOfRows == rows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) - return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rows) < 0) return -1; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - // TODO: merge pHelper->pDataCols[0] with pCommitIter->pIter - int round = 0; - // int iter1 = 0; - while (true) { - tdResetDataCols(pDataCols); - int rowsRead = 0; - // tsdbTwoLeveIterMerge(pHelper->pDataCols[0], &iter1, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols); - if (rowsRead == 0) break; - SFile *pFile = NULL; - bool isLast = false; - if (rowsRead >= pCfg->minRowsPerFileBlock) { - pFile = &(pHelper->files.dataF); - } else { - isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); - } - - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, &compBlock, isLast, true) < 0) return -1; - if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks-1) < 0) return -1; - } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks-1) < 0) return -1; - } - round++; - } - } - } - } else { - int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock); - TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : pCompBlock[1].keyFirst - 1; - if (keyFirst < pCompBlock->keyFirst) { - while (true) { - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0); - if (rowsRead == 0) break; - - ASSERT(rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) - return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - tblkIdx++; - } - } else { - ASSERT(keyFirst <= pCompBlock->keyLast); - int16_t colId = 0; - if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); - - sIter = *(pCommitIter->pIter); - int rows1 = pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows; - int rows2 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, pCompBlock->keyLast, INT_MAX /*TODO*/, NULL, - pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows); - int rows3 = 0; - if (rows2 == 0) { // All filtered out - *(pCommitIter->pIter) = sIter; - } else { - rows3 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, keyLimit, INT_MAX /* TODO*/, NULL, NULL, 0) + rows2; - ASSERT(rows3 >= rows2); - - if (rows1 >= rows2) { - int rows = (rows1 >= rows3) ? rows3 : rows2; - tdResetDataCols(pDataCols); - int rowsRead = - tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, - pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows); - ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) - return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); - int round = 0; - while (true) { - // TODO - round++; - } - } - } - } - } + if (tsdbProcessMergeCommit(pHelper, pCommitIter, pDataCols, maxKey, &blkIdx) < 0) return -1; } } @@ -1450,3 +1295,233 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { return buf; } + +static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SCompBlock compBlock = {0}; + + ASSERT(pIdx->offset <= 0 || keyFirst > pIdx->maxKey); + if (pIdx->hasLast) { // append to with last block + ASSERT(pIdx->offset > 0); + SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); + ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, + pDataCols, NULL, 0); + ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && + pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); + + if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + } + } else { + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); + ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + + if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; + } + + return 0; +} + +static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, + int *blkIdx) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + SCompBlock compBlock = {0}; + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SDataCols *pDataCols0 = pHelper->pDataCols[0]; + + SSkipListIterator slIter = {0}; + + ASSERT(keyFirst <= pIdx->maxKey); + + SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), + pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE); + ASSERT(pCompBlock != NULL); + + if (pCompBlock->last) { + ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); + int16_t colId = 0; + slIter = *(pCommitIter->pIter); + if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); + + int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); + if (rows2 == 0) { // all data filtered out + *(pCommitIter->pIter) = slIter; + } else { + if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, + pDataCols0->cols[0].pData, pDataCols0->numOfRows); + ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + int rowsRead = + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); + if (rowsRead == 0) break; + + if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; + if (round == 0) { + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + } else { + if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; + } + + round++; + } + } + } + *blkIdx = pIdx->numOfBlocks; + } else { + int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock); + TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); + if (keyFirst < pCompBlock->keyFirst) { + while (true) { + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0); + if (rowsRead == 0) break; + + ASSERT(rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + tblkIdx++; + } + *blkIdx = tblkIdx; + } else { + ASSERT(keyFirst <= pCompBlock->keyLast); + int16_t colId =0; + if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + + slIter = *(pCommitIter->pIter); + int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); + + if (rows2 == 0) { // all filtered out + *(pCommitIter->pIter) = slIter; + } else { + int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; + ASSERT(rows3 >= rows2); + + if (rows1 >= rows2) { + int rows = (rows1 >= rows3) ? rows3 : rows2; + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, + pDataCols0->cols[0].pData, pDataCols0->numOfRows); + ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; + *blkIdx = tblkIdx + 1; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); + if (rowsRead == 0) break; + + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) + return -1; + if (round == 0) { + if (tsdbUpdateSuperBlock(pHelper, &compBlock, 0 /*TODO*/) < 0) return -1; + } else { + if (tsdbInsertSuperBlock(pHelper, &compBlock, 0 /*TODO */) < 0) return -1; + } + + round++; + } + } + + } + } + } + + return 0; +} + +static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows) { + int numOfRows = 0; + TSKEY key1 = INT64_MAX; + TSKEY key2 = INT64_MAX; + STSchema *pSchema = NULL; + + ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); + tdResetDataCols(pTarget); + + while (true) { + key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); + SDataRow row = tsdbNextIterRow(pCommitIter->pIter); + key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row); + + if (key1 == INT64_MAX && key2 == INT64_MAX) break; + + if (key1 <= key2) { + for (int i = 0; i < pDataCols->numOfCols; i++) { + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } + (*iter)++; + if (key1 == key2) tSkipListIterNext(pCommitIter->pIter); + } else { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + tdAppendDataRowToDataCol(row, pSchema, pTarget); + } + tSkipListIterNext(pCommitIter->pIter); + } + + numOfRows++; + if (numOfRows >= maxRows) break; + ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints); + } + + return numOfRows; +} + +static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock) { + STsdbCfg *pCfg = &(pHelper->pRepo->config); + SFile * pFile = NULL; + bool isLast = false; + + ASSERT(pDataCols->numOfRows > 0); + + if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { + pFile = &(pHelper->files.dataF); + } else { + isLast = true; + pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); + } + + ASSERT(pFile->fd > 0); + + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1; + + return 0; +} \ No newline at end of file