This commit is contained in:
hzcheng 2020-04-14 15:17:14 +08:00
parent 69b818610b
commit 1accde3c04
2 changed files with 144 additions and 58 deletions

View File

@ -437,7 +437,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target);
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target); int tsdbLoadCompInfo(SRWHelper *pHelper, void *target);
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target); int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target);
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds); int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target);
// --------- For write operations // --------- For write operations
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols); int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols);

View File

@ -38,8 +38,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
bool isLast, bool isSuperBlock); bool isLast, bool isSuperBlock);
static int compareKeyBlock(const void *arg1, const void *arg2); static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static int nRowsLEThan(SDataCols *pDataCols, int maxKey); // static int nRowsLEThan(SDataCols *pDataCols, int maxKey);
static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); // static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
@ -397,7 +397,6 @@ static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pC
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds, static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
SDataCols *pDataCols) { SDataCols *pDataCols) {
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1; if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
void *ptr = NULL; void *ptr = NULL;
@ -421,7 +420,6 @@ static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBloc
// Load specific column data from file // Load specific column data from file
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) { int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) {
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
@ -493,7 +491,7 @@ _err:
} }
// Load the whole block data // Load the whole block data
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target) {
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
int numOfSubBlock = pCompBlock->numOfSubBlocks; int numOfSubBlock = pCompBlock->numOfSubBlocks;
@ -506,6 +504,8 @@ int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err; if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err;
} }
// if (target) TODO
return 0; return 0;
_err: _err:
@ -702,11 +702,11 @@ static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) {
return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2)); return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2));
} }
static int nRowsLEThan(SDataCols *pDataCols, int maxKey) { // static int nRowsLEThan(SDataCols *pDataCols, int maxKey) {
void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE); // void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE);
if (ptr == NULL) return 0; // if (ptr == NULL) return 0;
return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1; // return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1;
} // }
// Merge the data with a block in file // Merge the data with a block in file
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
@ -732,18 +732,18 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if (rowsWritten + pCompBlock->numOfPoints >= pHelper->config.minRowsPerFileBlock) { if (rowsWritten + pCompBlock->numOfPoints >= pHelper->config.minRowsPerFileBlock) {
// Need to write to .data file // Need to write to .data file
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
// tdMergeDataCols(); if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, NULL, rowsWritten + pCompBlock->numOfPoints, &compBlock, if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[0],
// false, true) < 0) rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0)
// goto _err; goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
} else { } else {
// Need still write the .last or .l file // Need still write the .last or .l file
if (pHelper->files.nLastF.fd > 0) { if (pHelper->files.nLastF.fd > 0) {
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
tdMergeDataCols(NULL, pDataCols, rowsWritten); if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, NULL, rowsWritten + pCompBlock->numOfPoints, if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, pHelper->pDataCols[0],
&compBlock, false, true) < 0) rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0)
goto _err; goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
} else { } else {
@ -753,9 +753,95 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} }
} }
} else {
// TODO: key overlap, must merge with the block
ASSERT(keyFirst <= pCompBlock->keyLast);
} else { // Must merge with the block TSKEY keyLimit =
(blkIdx == pIdx->numOfSuperBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
int rows1 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst,
pCompBlock->keyLast); // number of rows must merge in this block
int rows2 =
pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints; // max nuber of rows the block can have more
int rows3 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst,
keyLimit); // number of rows between this block and the next block
ASSERT(rows3 >= rows1);
if ((rows2 >= rows1) && ((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0))) {
rowsWritten = rows1;
bool isLast = false;
SFile *pFile = NULL;
if (pCompBlock->last) {
isLast = true;
pFile = &(pHelper->files.lastF);
} else {
pFile = &(pHelper->files.dataF);
}
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else {
// Need to read the data block and merge with pCompDataCol to write as super block
// Read
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
rowsWritten = rows3;
int iter1 = 0; // iter over pHelper->pDataCols[0]
int iter2 = 0; // iter over pDataCols
tdResetDataCols(pHelper->pDataCols[1]);
while (true) {
if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) {
if (pHelper->pDataCols[1]->numOfPoints > 0) {
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0)
goto _err;
// TODO: the blkIdx here is not correct
tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints);
}
}
TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints
? INT64_MAX
: ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
if (key1 < key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
TYPE_BYTES[pDataCol->type]);
}
pHelper->pDataCols[1]->numOfPoints++;
iter1++;
} else if (key1 == key2) {
// TODO: think about duplicate key cases
ASSERT(false);
} else {
for (int i = 0; i < pDataCols->numOfCols; i++) {
SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
((char *)pDataCols->cols[i].pData +
TYPE_BYTES[pDataCol->type] * iter2),
TYPE_BYTES[pDataCol->type]);
}
pHelper->pDataCols[1]->numOfPoints++;
iter2++;
}
if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err;
// TODO: blkIdx here is not correct, fix it
tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
tdResetDataCols(pHelper->pDataCols[1]);
}
}
}
} }
return rowsWritten; return rowsWritten;
@ -767,58 +853,58 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); } static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); }
// Get the number of rows the data can be merged into the block // Get the number of rows the data can be merged into the block
static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { // static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
int rowsCanMerge = 0; // int rowsCanMerge = 0;
TSKEY keyFirst = dataColsKeyFirst(pDataCols); // TSKEY keyFirst = dataColsKeyFirst(pDataCols);
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(blkIdx < pIdx->numOfSuperBlocks); // ASSERT(blkIdx < pIdx->numOfSuperBlocks);
TSKEY keyMax = (blkIdx < pIdx->numOfSuperBlocks + 1) ? (pCompBlock + 1)->keyFirst - 1 : pHelper->files.maxKey; // TSKEY keyMax = (blkIdx < pIdx->numOfSuperBlocks + 1) ? (pCompBlock + 1)->keyFirst - 1 : pHelper->files.maxKey;
if (keyFirst > pCompBlock->keyLast) { // if (keyFirst > pCompBlock->keyLast) {
void *ptr = taosbsearch((void *)(&keyMax), pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), // void *ptr = taosbsearch((void *)(&keyMax), pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
compTSKEY, TD_LE); // compTSKEY, TD_LE);
ASSERT(ptr != NULL); // ASSERT(ptr != NULL);
rowsCanMerge = // rowsCanMerge =
MIN((TSKEY *)ptr - (TSKEY *)pDataCols->cols[0].pData, pHelper->config.minRowsPerFileBlock - pCompBlock->numOfPoints); // MIN((TSKEY *)ptr - (TSKEY *)pDataCols->cols[0].pData, pHelper->config.minRowsPerFileBlock - pCompBlock->numOfPoints);
} else { // } else {
int32_t colId[1] = {0}; // int32_t colId[1] = {0};
if (tsdbLoadBlockDataCols(pHelper, NULL, blkIdx, colId, 1) < 0) goto _err; // if (tsdbLoadBlockDataCols(pHelper, NULL, blkIdx, colId, 1) < 0) goto _err;
int iter1 = 0; // For pDataCols // int iter1 = 0; // For pDataCols
int iter2 = 0; // For loaded data cols // int iter2 = 0; // For loaded data cols
while (1) { // while (1) {
if (iter1 >= pDataCols->numOfPoints || iter2 >= pHelper->pDataCols[0]->numOfPoints) break; // if (iter1 >= pDataCols->numOfPoints || iter2 >= pHelper->pDataCols[0]->numOfPoints) break;
if (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.maxRowsPerFileBlock) break; // if (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.maxRowsPerFileBlock) break;
TSKEY key1 = dataColsKeyAt(pDataCols, iter1); // TSKEY key1 = dataColsKeyAt(pDataCols, iter1);
TSKEY key2 = dataColsKeyAt(pHelper->pDataCols[0], iter2); // TSKEY key2 = dataColsKeyAt(pHelper->pDataCols[0], iter2);
if (key1 > keyMax) break; // if (key1 > keyMax) break;
if (key1 < key2) { // if (key1 < key2) {
iter1++; // iter1++;
} else if (key1 == key2) { // } else if (key1 == key2) {
iter1++; // iter1++;
iter2++; // iter2++;
} else { // } else {
iter2++; // iter2++;
rowsCanMerge++; // rowsCanMerge++;
} // }
} // }
} // }
return rowsCanMerge; // return rowsCanMerge;
_err: // _err:
return -1; // return -1;
} // }
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) { static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;