From 69b818610b93ab5df2ec123f5c13526bb70ef509 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 14 Apr 2020 11:49:31 +0800 Subject: [PATCH] TD-100 --- src/common/src/dataformat.c | 7 +- src/vnode/tsdb/src/tsdbMain.c | 8 +- src/vnode/tsdb/src/tsdbRWHelper.c | 156 +++++++++++++++++------------- 3 files changed, 96 insertions(+), 75 deletions(-) diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index fd7dd5b91e..a17ceff54b 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -433,18 +433,13 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { // TODO: merge the pTemp part int rowsLeft = pTarget->numOfPoints - iter1; if (rowsLeft > 0) { - } break; } - - - - } return 0; - _err: +_err: return -1; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 6934392a0c..33991aec52 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -755,6 +755,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { + ASSERT(maxRowsToRead > 0); if (pIter == NULL) return 0; int numOfRows = 0; @@ -914,14 +915,19 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ASSERT(rowsRead >= 0); if (pDataCols->numOfPoints == 0) break; + ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); + ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); + int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); if (rowsWritten < 0) goto _err; - assert(rowsWritten <= pDataCols->numOfPoints); + ASSERT(rowsWritten <= pDataCols->numOfPoints); tdPopDataColsPoints(pDataCols, rowsWritten); maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints; } + ASSERT(pDataCols->numOfPoints == 0); + // Move the last block to the new .l file if neccessary if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err; diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index de9d1008c1..2f879bb37d 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -244,7 +244,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); if (rowsToWrite < 0) goto _err; } else { // Save as a super block in the middle - int rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1); + rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1); ASSERT(rowsToWrite > 0); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err; if (tsdbInsertSuperBlock(pHelper, pCompBlock, pCompBlock - pHelper->pCompInfo->blocks) < 0) goto _err; @@ -440,22 +440,75 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, return 0; } -// Load the whole block data -int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { - int16_t *colIds = (int16_t *)calloc(pDataCols->numOfCols, sizeof(int16_t)); - if (colIds == NULL) goto _err; +/** + * Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1) + */ +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { + ASSERT(pCompBlock->numOfSubBlocks <= 1); - for (int i = 0; i < pDataCols->numOfCols; i++) { - colIds[i] = pDataCols->cols[i].colId; + SCompData *pCompData = (SCompData *)malloc(pCompBlock->len); + if (pCompData == NULL) return -1; + + int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; + + { // TODO : check the correctness of the part } - if (tsdbLoadBlockDataCols(pHelper, pDataCols, blkIdx, colIds, pDataCols->numOfCols) < 0) goto _err; + ASSERT(pCompBlock->numOfCols == pCompData->numOfCols); + + pDataCols->numOfPoints = pCompBlock->numOfPoints; + + size_t tlen = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; + int ccol = 0, dcol = 0; + while (true) { + if (ccol >= pDataCols->numOfCols) { + // TODO: Fill rest NULL + break; + } + if (dcol >= pCompData->numOfCols) break; + + SCompCol *pCompCol = &(pCompData->cols[ccol]); + SDataCol *pDataCol = &(pDataCols->cols[dcol]); + + if (pCompCol->colId == pDataCol->colId) { + // TODO: uncompress + memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tlen + pCompCol->offset), pCompCol->len); + ccol++; + dcol++; + } else if (pCompCol->colId > pDataCol->colId) { + // TODO: Fill NULL + dcol++; + } else { + ccol++; + } + } + + tfree(pCompData); + return 0; + +_err: + tfree(pCompData); + return -1; +} + +// Load the whole block data +int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { + SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + int numOfSubBlock = pCompBlock->numOfSubBlocks; + if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset); + + if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err; + for (int i = 1; i < numOfSubBlock; i++) { + pCompBlock++; + if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err; + if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err; + } - tfree(colIds); return 0; _err: - tfree(colIds); return -1; } @@ -671,71 +724,38 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ASSERT(keyFirst >= pCompBlock->keyFirst); ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); - // Start here - TSKEY keyLimit = - (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyLast - 1; + if (keyFirst > pCompBlock->keyLast) { // Merge the last block by append + ASSERT(pCompBlock->last && pCompBlock->numOfPoints < pHelper->config.minRowsPerFileBlock); + int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface - int rowsMustMerge = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyLast); - int maxRowsCanMerge = - MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, 0, keyLimit)); - - if (pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock) { - // Need to load the block and split as two super block - } else { - } - - if (rowsMustMerge + pCompBlock->numOfPoints > pHelper->config.maxRowsPerFileBlock) { - // Load the block and merge as two super block - } - - if (rowsMustMerge > maxRowsCanMerge) { - ASSERT(pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock); - - } else { - - } - - - int rowsToMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols); - if (rowsToMerge < 0) goto _err; - - ASSERT(rowsToMerge > 0); - - if (pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS && - ((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0 && - pCompBlock->numOfPoints + rowsToMerge < pHelper->config.minRowsPerFileBlock))) { - - SFile *pFile = NULL; - - if ((!pCompBlock->last) || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) { - pFile = &(pHelper->files.dataF); + rowsWritten = MIN((defaultRowsToWrite - pCompBlock->numOfPoints), pDataCols->numOfPoints); + if (rowsWritten + pCompBlock->numOfPoints >= pHelper->config.minRowsPerFileBlock) { + // Need to write to .data file + if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + // tdMergeDataCols(); + // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, NULL, rowsWritten + pCompBlock->numOfPoints, &compBlock, + // false, true) < 0) + // goto _err; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } else { - pFile = &(pHelper->files.lastF); - } - - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsToMerge, &compBlock, pCompBlock->last, false) < 0) goto _err; - - if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsToMerge) < 0) goto _err; - } else { - // Read-Merge-Write as a super block - if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; - tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsToMerge); - - int isLast = 0; - SFile *pFile = NULL; - if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) { - pFile = &(pHelper->files.dataF); - } else { - isLast = 1; + // Need still write the .last or .l file if (pHelper->files.nLastF.fd > 0) { - pFile = &(pHelper->files.nLastF); + if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + tdMergeDataCols(NULL, pDataCols, rowsWritten); + if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, NULL, rowsWritten + pCompBlock->numOfPoints, + &compBlock, false, true) < 0) + goto _err; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } else { - pFile = &(pHelper->files.lastF); + // Write to .last file and append as a sub-block + if (tsdbWriteBlockToFile(pHelper, &pHelper->files.lastF, pDataCols, rowsWritten, &compBlock, true, false) < 0) + goto _err; + if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } } - if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsToMerge, &compBlock, isLast, true) < 0) goto _err; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; + } else { // Must merge with the block + } return rowsWritten;