TD-100
This commit is contained in:
parent
bed8890fe8
commit
136f99ed5c
|
@ -188,18 +188,17 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
||||||
|
|
||||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
|
ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
|
||||||
SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage
|
// SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose
|
||||||
|
|
||||||
// Load the SCompInfo part if neccessary
|
// Load the SCompInfo part if neccessary
|
||||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
|
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
|
||||||
if ((!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) &&
|
if ((!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) &&
|
||||||
((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) &&
|
((pIdx->offset > 0) && (pIdx->hasLast || keyFirst <= pIdx->maxKey))) {
|
||||||
(tsdbLoadCompInfo(pHelper, NULL) < 0))
|
if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
|
||||||
goto _err;
|
}
|
||||||
|
|
||||||
if (!pIdx->hasLast && keyFirst > pIdx->maxKey) {
|
if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block
|
||||||
// Just need to append as a super block
|
|
||||||
rowsToWrite = pDataCols->numOfPoints;
|
rowsToWrite = pDataCols->numOfPoints;
|
||||||
SFile *pWFile = NULL;
|
SFile *pWFile = NULL;
|
||||||
bool isLast = false;
|
bool isLast = false;
|
||||||
|
@ -218,39 +217,48 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||||
|
|
||||||
pIdx->hasLast = compBlock.last;
|
pIdx->hasLast = compBlock.last;
|
||||||
pIdx->numOfSuperBlocks++;
|
pIdx->numOfSuperBlocks++;
|
||||||
pIdx->maxKey = dataColsKeyLast(pDataCols);
|
pIdx->maxKey = compBlock.keyLast;
|
||||||
// pIdx->len = ??????
|
ASSERT(compBlock.keyLast == dataColsKeyLast(pDataCols));
|
||||||
} else { // (pIdx->hasLast) OR (keyFirst <= pIdx->maxKey)
|
pIdx->len += sizeof(SCompBlock);
|
||||||
if (keyFirst > pIdx->maxKey) {
|
} else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
|
||||||
int blkIdx = pIdx->numOfSuperBlocks - 1;
|
|
||||||
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[blkIdx].last);
|
|
||||||
|
|
||||||
// Need to merge with the last block
|
|
||||||
if (tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols) < 0) goto _err;
|
|
||||||
} else {
|
|
||||||
// Find the first block greater or equal to the block
|
|
||||||
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks),
|
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks),
|
||||||
pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE);
|
pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE);
|
||||||
if (pCompBlock == NULL) {
|
|
||||||
if (tsdbMergeDataWithBlock(pHelper, pIdx->numOfSuperBlocks-1, pDataCols) < 0) goto _err;
|
|
||||||
} else {
|
|
||||||
if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
|
|
||||||
SCompBlock *pNextBlock = NULL;
|
|
||||||
TSKEY keyLimit = (pNextBlock == NULL) ? INT_MAX : (pNextBlock->keyFirst - 1);
|
|
||||||
rowsToWrite =
|
|
||||||
MIN(nRowsLEThan(pDataCols, keyLimit), pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints);
|
|
||||||
|
|
||||||
if (tsdbMergeDataWithBlock(pHelper, pCompBlock-pHelper->pCompInfo->blocks, pDataCols) < 0) goto _err;
|
int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfSuperBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
|
||||||
} else {
|
|
||||||
// There options: 1. merge with previous block
|
|
||||||
// 2. commit as one block
|
|
||||||
// 3. merge with current block
|
|
||||||
int nRows1 = INT_MAX;
|
|
||||||
int nRows2 = nRowsLEThan(pDataCols, pCompBlock->keyFirst);
|
|
||||||
int nRows3 = MIN(nRowsLEThan(pDataCols, (pCompBlock + 1)->keyFirst), (pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints));
|
|
||||||
|
|
||||||
// TODO: find the block with max rows can merge
|
if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block
|
||||||
if (tsdbMergeDataWithBlock(pHelper, pCompBlock, pDataCols) < 0) goto _err;
|
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last);
|
||||||
|
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
||||||
|
if (rowsToWrite < 0) goto _err;
|
||||||
|
} else { // Has key overlap
|
||||||
|
|
||||||
|
if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { // Key overlap with the block
|
||||||
|
// TSKEY keyLimit =
|
||||||
|
// (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : (pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1);
|
||||||
|
|
||||||
|
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
||||||
|
if (rowsToWrite < 0) goto _err;
|
||||||
|
|
||||||
|
ASSERT(rowsToWrite == MIN(rows1, rows2));
|
||||||
|
} else { // Either merge with the previous block or save as a super block in the middle
|
||||||
|
SCompBlock *prevBlock = (blkIdx == 0) ? NULL : (pCompBlock - 1);
|
||||||
|
|
||||||
|
int rows1 = nRowsLEThan(pDataCols, pCompBlock->keyFirst); // rows write as a super block in the middle
|
||||||
|
int rows2 = (prevBlock) ? (pHelper->config.maxRowsPerFileBlock - prevBlock->numOfPoints)
|
||||||
|
: rows1; // rows can merge with the previous block
|
||||||
|
if (rows1 >= rows2) {
|
||||||
|
rowsToWrite = tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rows1, &compBlock, false, true);
|
||||||
|
if (rowsToWrite < 0) goto _err;
|
||||||
|
|
||||||
|
ASSERT(rowsToWrite == rows1);
|
||||||
|
|
||||||
|
// Add the super block to it
|
||||||
|
pIdx->len += sizeof(SCompBlock);
|
||||||
|
pIdx->numOfSuperBlocks++;
|
||||||
|
} else {
|
||||||
|
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx-1, pDataCols);
|
||||||
|
if (rowsToWrite < 0) goto _err;
|
||||||
|
ASSERT(rowsToWrite == rows2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -309,7 +317,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
||||||
if (lseek(fd, curCompIdx.offset, SEEK_SET) < 0) return -1;
|
if (lseek(fd, curCompIdx.offset, SEEK_SET) < 0) return -1;
|
||||||
|
|
||||||
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, curCompIdx.len);
|
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, curCompIdx.len);
|
||||||
if (tread(fd, (void *)(pHelper->pCompInfo), pHelper) < 0) return -1;
|
if (tread(fd, (void *)(pHelper->pCompInfo), pHelper->compIdx.len) < pHelper->compIdx.len) return -1;
|
||||||
// TODO: check the checksum
|
// TODO: check the checksum
|
||||||
|
|
||||||
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
|
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
|
||||||
|
@ -521,8 +529,14 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) {
|
||||||
|
return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2));
|
||||||
|
}
|
||||||
|
|
||||||
static int nRowsLEThan(SDataCols *pDataCols, int maxKey) {
|
static int nRowsLEThan(SDataCols *pDataCols, int maxKey) {
|
||||||
return 0;
|
void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE);
|
||||||
|
if (ptr == NULL) return 0;
|
||||||
|
return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
|
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
|
||||||
|
|
Loading…
Reference in New Issue