From 1accde3c042b5815060170fbb990a993132c8ec5 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 14 Apr 2020 15:17:14 +0800 Subject: [PATCH] TD-100 --- src/vnode/tsdb/inc/tsdbMain.h | 2 +- src/vnode/tsdb/src/tsdbRWHelper.c | 200 +++++++++++++++++++++--------- 2 files changed, 144 insertions(+), 58 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index beb9ba9868..979e1c2865 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -437,7 +437,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target); int tsdbLoadCompInfo(SRWHelper *pHelper, void *target); int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target); int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds); -int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); +int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target); // --------- For write operations int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols); diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index 2f879bb37d..d67fd03964 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -38,8 +38,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa bool isLast, bool isSuperBlock); static int compareKeyBlock(const void *arg1, const void *arg2); static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); -static int nRowsLEThan(SDataCols *pDataCols, int maxKey); -static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); +// static int nRowsLEThan(SDataCols *pDataCols, int maxKey); +// static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); @@ -397,7 +397,6 @@ static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pC static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds, SDataCols *pDataCols) { if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1; - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; void *ptr = NULL; @@ -421,7 +420,6 @@ static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBloc // Load specific column data from file int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) { - SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block @@ -493,7 +491,7 @@ _err: } // Load the whole block data -int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { +int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target) { SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; int numOfSubBlock = pCompBlock->numOfSubBlocks; @@ -506,6 +504,8 @@ int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err; } + // if (target) TODO + return 0; _err: @@ -702,11 +702,11 @@ static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) { return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2)); } -static int nRowsLEThan(SDataCols *pDataCols, int maxKey) { - void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE); - if (ptr == NULL) return 0; - return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1; -} +// static int nRowsLEThan(SDataCols *pDataCols, int maxKey) { +// void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE); +// if (ptr == NULL) return 0; +// return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1; +// } // Merge the data with a block in file static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { @@ -732,18 +732,18 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (rowsWritten + pCompBlock->numOfPoints >= pHelper->config.minRowsPerFileBlock) { // Need to write to .data file if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; - // tdMergeDataCols(); - // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, NULL, rowsWritten + pCompBlock->numOfPoints, &compBlock, - // false, true) < 0) - // goto _err; + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; + if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[0], + rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0) + goto _err; if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } else { // Need still write the .last or .l file if (pHelper->files.nLastF.fd > 0) { if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; - tdMergeDataCols(NULL, pDataCols, rowsWritten); - if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, NULL, rowsWritten + pCompBlock->numOfPoints, - &compBlock, false, true) < 0) + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; + if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, pHelper->pDataCols[0], + rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0) goto _err; if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } else { @@ -753,9 +753,95 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } } + } else { + // TODO: key overlap, must merge with the block + ASSERT(keyFirst <= pCompBlock->keyLast); - } else { // Must merge with the block + TSKEY keyLimit = + (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1; + int rows1 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst, + pCompBlock->keyLast); // number of rows must merge in this block + int rows2 = + pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints; // max nuber of rows the block can have more + int rows3 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst, + keyLimit); // number of rows between this block and the next block + + ASSERT(rows3 >= rows1); + + if ((rows2 >= rows1) && ((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0))) { + rowsWritten = rows1; + bool isLast = false; + SFile *pFile = NULL; + + if (pCompBlock->last) { + isLast = true; + pFile = &(pHelper->files.lastF); + } else { + pFile = &(pHelper->files.dataF); + } + + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err; + if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; + } else { + // Need to read the data block and merge with pCompDataCol to write as super block + + // Read + if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + rowsWritten = rows3; + + int iter1 = 0; // iter over pHelper->pDataCols[0] + int iter2 = 0; // iter over pDataCols + tdResetDataCols(pHelper->pDataCols[1]); + while (true) { + if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) { + if (pHelper->pDataCols[1]->numOfPoints > 0) { + if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], + pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) + goto _err; + // TODO: the blkIdx here is not correct + tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints); + } + } + + TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints + ? INT64_MAX + : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; + TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; + + if (key1 < key2) { + for (int i = 0; i < pDataCols->numOfCols; i++) { + SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; + memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), + TYPE_BYTES[pDataCol->type]); + } + pHelper->pDataCols[1]->numOfPoints++; + iter1++; + } else if (key1 == key2) { + // TODO: think about duplicate key cases + ASSERT(false); + } else { + for (int i = 0; i < pDataCols->numOfCols; i++) { + SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; + memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + ((char *)pDataCols->cols[i].pData + + TYPE_BYTES[pDataCol->type] * iter2), + TYPE_BYTES[pDataCol->type]); + } + pHelper->pDataCols[1]->numOfPoints++; + iter2++; + } + + if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { + if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err; + // TODO: blkIdx here is not correct, fix it + tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); + + tdResetDataCols(pHelper->pDataCols[1]); + } + } + } } return rowsWritten; @@ -767,58 +853,58 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); } // Get the number of rows the data can be merged into the block -static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { - int rowsCanMerge = 0; - TSKEY keyFirst = dataColsKeyFirst(pDataCols); +// static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { +// int rowsCanMerge = 0; +// TSKEY keyFirst = dataColsKeyFirst(pDataCols); - SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; +// SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; +// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; - ASSERT(blkIdx < pIdx->numOfSuperBlocks); +// ASSERT(blkIdx < pIdx->numOfSuperBlocks); - TSKEY keyMax = (blkIdx < pIdx->numOfSuperBlocks + 1) ? (pCompBlock + 1)->keyFirst - 1 : pHelper->files.maxKey; +// TSKEY keyMax = (blkIdx < pIdx->numOfSuperBlocks + 1) ? (pCompBlock + 1)->keyFirst - 1 : pHelper->files.maxKey; - if (keyFirst > pCompBlock->keyLast) { - void *ptr = taosbsearch((void *)(&keyMax), pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), - compTSKEY, TD_LE); - ASSERT(ptr != NULL); +// if (keyFirst > pCompBlock->keyLast) { +// void *ptr = taosbsearch((void *)(&keyMax), pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), +// compTSKEY, TD_LE); +// ASSERT(ptr != NULL); - rowsCanMerge = - MIN((TSKEY *)ptr - (TSKEY *)pDataCols->cols[0].pData, pHelper->config.minRowsPerFileBlock - pCompBlock->numOfPoints); +// rowsCanMerge = +// MIN((TSKEY *)ptr - (TSKEY *)pDataCols->cols[0].pData, pHelper->config.minRowsPerFileBlock - pCompBlock->numOfPoints); - } else { - int32_t colId[1] = {0}; - if (tsdbLoadBlockDataCols(pHelper, NULL, blkIdx, colId, 1) < 0) goto _err; +// } else { +// int32_t colId[1] = {0}; +// if (tsdbLoadBlockDataCols(pHelper, NULL, blkIdx, colId, 1) < 0) goto _err; - int iter1 = 0; // For pDataCols - int iter2 = 0; // For loaded data cols +// int iter1 = 0; // For pDataCols +// int iter2 = 0; // For loaded data cols - while (1) { - if (iter1 >= pDataCols->numOfPoints || iter2 >= pHelper->pDataCols[0]->numOfPoints) break; - if (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.maxRowsPerFileBlock) break; +// while (1) { +// if (iter1 >= pDataCols->numOfPoints || iter2 >= pHelper->pDataCols[0]->numOfPoints) break; +// if (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.maxRowsPerFileBlock) break; - TSKEY key1 = dataColsKeyAt(pDataCols, iter1); - TSKEY key2 = dataColsKeyAt(pHelper->pDataCols[0], iter2); +// TSKEY key1 = dataColsKeyAt(pDataCols, iter1); +// TSKEY key2 = dataColsKeyAt(pHelper->pDataCols[0], iter2); - if (key1 > keyMax) break; +// if (key1 > keyMax) break; - if (key1 < key2) { - iter1++; - } else if (key1 == key2) { - iter1++; - iter2++; - } else { - iter2++; - rowsCanMerge++; - } - } - } +// if (key1 < key2) { +// iter1++; +// } else if (key1 == key2) { +// iter1++; +// iter2++; +// } else { +// iter2++; +// rowsCanMerge++; +// } +// } +// } - return rowsCanMerge; +// return rowsCanMerge; -_err: - return -1; -} +// _err: +// return -1; +// } static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;