This commit is contained in:
hzcheng 2020-04-14 11:49:31 +08:00
parent 66cf4a5bac
commit 69b818610b
3 changed files with 96 additions and 75 deletions

View File

@ -433,18 +433,13 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
// TODO: merge the pTemp part // TODO: merge the pTemp part
int rowsLeft = pTarget->numOfPoints - iter1; int rowsLeft = pTarget->numOfPoints - iter1;
if (rowsLeft > 0) { if (rowsLeft > 0) {
} }
break; break;
} }
} }
return 0; return 0;
_err: _err:
return -1; return -1;
} }

View File

@ -755,6 +755,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
} }
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
ASSERT(maxRowsToRead > 0);
if (pIter == NULL) return 0; if (pIter == NULL) return 0;
int numOfRows = 0; int numOfRows = 0;
@ -914,14 +915,19 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
ASSERT(rowsRead >= 0); ASSERT(rowsRead >= 0);
if (pDataCols->numOfPoints == 0) break; if (pDataCols->numOfPoints == 0) break;
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
if (rowsWritten < 0) goto _err; if (rowsWritten < 0) goto _err;
assert(rowsWritten <= pDataCols->numOfPoints); ASSERT(rowsWritten <= pDataCols->numOfPoints);
tdPopDataColsPoints(pDataCols, rowsWritten); tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints; maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints;
} }
ASSERT(pDataCols->numOfPoints == 0);
// Move the last block to the new .l file if neccessary // Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err; if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err;

View File

@ -244,7 +244,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
if (rowsToWrite < 0) goto _err; if (rowsToWrite < 0) goto _err;
} else { // Save as a super block in the middle } else { // Save as a super block in the middle
int rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1); rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1);
ASSERT(rowsToWrite > 0); ASSERT(rowsToWrite > 0);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err; if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err;
if (tsdbInsertSuperBlock(pHelper, pCompBlock, pCompBlock - pHelper->pCompInfo->blocks) < 0) goto _err; if (tsdbInsertSuperBlock(pHelper, pCompBlock, pCompBlock - pHelper->pCompInfo->blocks) < 0) goto _err;
@ -440,22 +440,75 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx,
return 0; return 0;
} }
// Load the whole block data /**
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { * Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1)
int16_t *colIds = (int16_t *)calloc(pDataCols->numOfCols, sizeof(int16_t)); */
if (colIds == NULL) goto _err; static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
ASSERT(pCompBlock->numOfSubBlocks <= 1);
for (int i = 0; i < pDataCols->numOfCols; i++) { SCompData *pCompData = (SCompData *)malloc(pCompBlock->len);
colIds[i] = pDataCols->cols[i].colId; if (pCompData == NULL) return -1;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err;
{ // TODO : check the correctness of the part
} }
if (tsdbLoadBlockDataCols(pHelper, pDataCols, blkIdx, colIds, pDataCols->numOfCols) < 0) goto _err; ASSERT(pCompBlock->numOfCols == pCompData->numOfCols);
pDataCols->numOfPoints = pCompBlock->numOfPoints;
size_t tlen = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
int ccol = 0, dcol = 0;
while (true) {
if (ccol >= pDataCols->numOfCols) {
// TODO: Fill rest NULL
break;
}
if (dcol >= pCompData->numOfCols) break;
SCompCol *pCompCol = &(pCompData->cols[ccol]);
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
if (pCompCol->colId == pDataCol->colId) {
// TODO: uncompress
memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tlen + pCompCol->offset), pCompCol->len);
ccol++;
dcol++;
} else if (pCompCol->colId > pDataCol->colId) {
// TODO: Fill NULL
dcol++;
} else {
ccol++;
}
}
tfree(pCompData);
return 0;
_err:
tfree(pCompData);
return -1;
}
// Load the whole block data
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
int numOfSubBlock = pCompBlock->numOfSubBlocks;
if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset);
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
for (int i = 1; i < numOfSubBlock; i++) {
pCompBlock++;
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err;
}
tfree(colIds);
return 0; return 0;
_err: _err:
tfree(colIds);
return -1; return -1;
} }
@ -671,71 +724,38 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
ASSERT(keyFirst >= pCompBlock->keyFirst); ASSERT(keyFirst >= pCompBlock->keyFirst);
ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
// Start here if (keyFirst > pCompBlock->keyLast) { // Merge the last block by append
TSKEY keyLimit = ASSERT(pCompBlock->last && pCompBlock->numOfPoints < pHelper->config.minRowsPerFileBlock);
(blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyLast - 1; int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
int rowsMustMerge = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyLast); rowsWritten = MIN((defaultRowsToWrite - pCompBlock->numOfPoints), pDataCols->numOfPoints);
int maxRowsCanMerge = if (rowsWritten + pCompBlock->numOfPoints >= pHelper->config.minRowsPerFileBlock) {
MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, 0, keyLimit)); // Need to write to .data file
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
if (pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock) { // tdMergeDataCols();
// Need to load the block and split as two super block // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, NULL, rowsWritten + pCompBlock->numOfPoints, &compBlock,
} else { // false, true) < 0)
} // goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
if (rowsMustMerge + pCompBlock->numOfPoints > pHelper->config.maxRowsPerFileBlock) {
// Load the block and merge as two super block
}
if (rowsMustMerge > maxRowsCanMerge) {
ASSERT(pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock);
} else {
}
int rowsToMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols);
if (rowsToMerge < 0) goto _err;
ASSERT(rowsToMerge > 0);
if (pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS &&
((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0 &&
pCompBlock->numOfPoints + rowsToMerge < pHelper->config.minRowsPerFileBlock))) {
SFile *pFile = NULL;
if ((!pCompBlock->last) || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) {
pFile = &(pHelper->files.dataF);
} else { } else {
pFile = &(pHelper->files.lastF); // Need still write the .last or .l file
}
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsToMerge, &compBlock, pCompBlock->last, false) < 0) goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsToMerge) < 0) goto _err;
} else {
// Read-Merge-Write as a super block
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsToMerge);
int isLast = 0;
SFile *pFile = NULL;
if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) {
pFile = &(pHelper->files.dataF);
} else {
isLast = 1;
if (pHelper->files.nLastF.fd > 0) { if (pHelper->files.nLastF.fd > 0) {
pFile = &(pHelper->files.nLastF); if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
tdMergeDataCols(NULL, pDataCols, rowsWritten);
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, NULL, rowsWritten + pCompBlock->numOfPoints,
&compBlock, false, true) < 0)
goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
} else { } else {
pFile = &(pHelper->files.lastF); // Write to .last file and append as a sub-block
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.lastF, pDataCols, rowsWritten, &compBlock, true, false) < 0)
goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} }
} }
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsToMerge, &compBlock, isLast, true) < 0) goto _err; } else { // Must merge with the block
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
} }
return rowsWritten; return rowsWritten;