From ac209abb50a89e8349599c9f38c3c692bd0d04f7 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Fri, 10 Apr 2020 23:12:53 +0800 Subject: [PATCH] TD-100 --- src/vnode/tsdb/inc/tsdbMain.h | 31 ++--- src/vnode/tsdb/src/tsdbRWHelper.c | 212 ++++++++++++++++++++++++++---- 2 files changed, 206 insertions(+), 37 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index 949bb9d48b..beb9ba9868 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -382,27 +382,28 @@ typedef struct { // Global configuration SHelperCfg config; + int8_t state; + + // For file set usage SHelperFile files; + SCompIdx * pCompIdx; + size_t compIdxSize; + // For table set usage SHelperTable tableInfo; - SCompIdx compIdx; // SCompIdx of current table + SCompInfo * pCompInfo; + size_t compInfoSize; + bool hasOldLastBlock; - int8_t state; // current loading state - - // Information in .head file - SCompIdx *pCompIdx; - size_t compIdxSize; - - SCompInfo *pCompInfo; - size_t compInfoSize; - int blockIter; // For write purpose - - // Information in .data or .last file + // For block set usage SCompData *pCompData; size_t compDataSize; - SDataCols *pDataCols[2]; + // ------ Perhaps no usage + SCompIdx compIdx; // SCompIdx of current table + int blockIter; // For write purpose + // Compression buffer void * cBuffer; size_t cBufSize; @@ -434,8 +435,8 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError); // --------- For read operations int tsdbLoadCompIdx(SRWHelper *pHelper, void *target); int tsdbLoadCompInfo(SRWHelper *pHelper, void *target); -int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target); -int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds); +int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target); +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds); int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); // --------- For write operations diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index dc51b86c29..fb46deb216 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -175,7 +175,12 @@ void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema tdInitDataCols(pHelper->pDataCols[0], pSchema); tdInitDataCols(pHelper->pDataCols[1], pSchema); - pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid]; + SCompIdx *pIdx = pHelper->pCompIdx + pHelperTable->tid; + if (pIdx->offset > 0 && pIdx->hasLast) { + pHelper->hasOldLastBlock = true; + } + + // pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid]; helperSetState(pHelper, TSDB_HELPER_TABLE_SET); } @@ -205,6 +210,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { } if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block + ASSERT(pHelper->hasOldLastBlock == false); rowsToWrite = pDataCols->numOfPoints; SFile *pWFile = NULL; bool isLast = false; @@ -231,9 +237,8 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { if (rowsToWrite < 0) goto _err; } else { // Has key overlap - if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { // Key overlap with the block - // TSKEY keyLimit = - // (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : (pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1); + if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { + // Key overlap with the block, must merge with the block rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); if (rowsToWrite < 0) goto _err; @@ -269,17 +274,61 @@ _err: } int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { - // TODO + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompBlock compBlock; + if ((pHelper->files.nHeadF.fd > 0) && (pHelper->hasOldLastBlock)) { + if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; + + SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfSuperBlocks - 1; + ASSERT(pCompBlock->last); + + if (pCompBlock->numOfSubBlocks > 1) { + if (tsdbLoadBlockData(pHelper, pIdx->numOfSuperBlocks - 1, NULL) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], + pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0) + return -1; + + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfSuperBlocks - 1) < 0) return -1; + + } else { + if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1; + pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); + if (pCompBlock->offset < 0) return -1; + + if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) + return -1; + } + + pHelper->hasOldLastBlock = false; + } + return 0; } int tsdbWriteCompInfo(SRWHelper *pHelper) { - // TODO + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { + if (pIdx->offset > 0) { + pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + if (pIdx->offset < 0) return -1; + + if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; + } + } else { + pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + if (pIdx->offset < 0) return -1; + + if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; + } + return 0; } int tsdbWriteCompIdx(SRWHelper *pHelper) { - // TODO + if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + + if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, pHelper->compIdxSize) < pHelper->compIdxSize) + return -1; return 0; } @@ -326,13 +375,78 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { return 0; } -int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target) { - // TODO +int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { + ASSERT(pCompBlock->numOfSubBlocks <= 1); + int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + + if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1; + + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; + adjustMem(pHelper->pCompData, pHelper->compDataSize, tsize); + + if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1; + + ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols); + + if (target) memcpy(target, pHelper->pCompData, tsize); + return 0; } -int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds) { - // TODO +static int comparColIdCompCol(const void *arg1, const void *arg2) { + return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId; +} + +static int comparColIdDataCol(const void *arg1, const void *arg2) { + return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId; +} + +static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) { + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; + if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset) < 0) return -1; + if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1; + + return 0; +} + +static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds, + SDataCols *pDataCols) { + if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1; + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; + int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + + void *ptr = NULL; + for (int i = 0; i < numOfColIds; i++) { + int16_t colId = colIds[i]; + + ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol), comparColIdCompCol); + if (ptr == NULL) continue; + SCompCol *pCompCol = (SCompCol *)ptr; + + ptr = bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol); + ASSERT(ptr != NULL); + SDataCol *pDataCol = (SDataCol *)ptr; + + pDataCol->len = pCompCol->len; + if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1; + } + + return 0; +} + +// Load specific column data from file +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) { + SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block + if (pCompBlock->numOfSubBlocks == 1) { + + } + + + + return 0; } @@ -537,44 +651,74 @@ static int nRowsLEThan(SDataCols *pDataCols, int maxKey) { return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1; } +// Merge the data with a block static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { + // TODO: set pHelper->hasOldBlock int rowsWritten = 0; SCompBlock compBlock = {0}; + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; ASSERT(blkIdx < pIdx->numOfSuperBlocks); SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; ASSERT(pCompBlock->numOfSubBlocks >= 1); + ASSERT(keyFirst >= pCompBlock->keyFirst); - int rowsCanMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols); - if (rowsCanMerge < 0) goto _err; + // Start here + TSKEY keyLimit = + (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyLast - 1; - ASSERT(rowsCanMerge > 0); + int rowsMustMerge = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyLast); + int maxRowsCanMerge = + MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, keyLimit)); + + if (pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock) { + // Need to load the block and split as two super block + } else { + } + + if (rowsMustMerge + pCompBlock->numOfPoints > pHelper->config.maxRowsPerFileBlock) { + // Load the block and merge as two super block + } + + if (rowsMustMerge > maxRowsCanMerge) { + ASSERT(pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock); + + } else { + + } + + + int rowsToMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols); + if (rowsToMerge < 0) goto _err; + + ASSERT(rowsToMerge > 0); if (pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS && ((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0 && - pCompBlock->numOfPoints + rowsCanMerge < pHelper->config.minRowsPerFileBlock))) { + pCompBlock->numOfPoints + rowsToMerge < pHelper->config.minRowsPerFileBlock))) { SFile *pFile = NULL; - if (!pCompBlock->last) { + if ((!pCompBlock->last) || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) { pFile = &(pHelper->files.dataF); } else { pFile = &(pHelper->files.lastF); } - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last, false) < 0) goto _err; + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsToMerge, &compBlock, pCompBlock->last, false) < 0) goto _err; - if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsCanMerge) < 0) goto _err; + if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsToMerge) < 0) goto _err; } else { // Read-Merge-Write as a super block if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; - tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsCanMerge); + tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsToMerge); int isLast = 0; SFile *pFile = NULL; - if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.minRowsPerFileBlock)) { + if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) { pFile = &(pHelper->files.dataF); } else { isLast = 1; @@ -585,7 +729,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa } } - if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast, true) < 0) goto _err; + if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsToMerge, &compBlock, isLast, true) < 0) goto _err; if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } @@ -619,7 +763,7 @@ static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SData } else { int32_t colId[1] = {0}; - if (tsdbLoadBlockDataCols(pHelper, NULL, colId, 1) < 0) goto _err; + if (tsdbLoadBlockDataCols(pHelper, NULL, blkIdx, colId, 1) < 0) goto _err; int iter1 = 0; // For pDataCols int iter2 = 0; // For loaded data cols @@ -808,4 +952,28 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; return 0; +} + +// Get the number of rows in range [minKey, maxKey] +static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) { + if (pDataCols->numOfPoints == 0) return 0; + + ASSERT(minKey <= maxKey); + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + TSKEY keyLast = dataColsKeyLast(pDataCols); + ASSERT(keyFirst <= keyLast); + + if (minKey > keyLast || maxKey < keyFirst) return 0; + + void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), + compTSKEY, TD_GE); + ASSERT(ptr1 != NULL); + + void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), + compTSKEY, TD_LE); + ASSERT(ptr2 != NULL); + + if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0; + + return (TSKEY *)ptr2 - (TSKEY *)ptr1; } \ No newline at end of file