refact tsdb commit
This commit is contained in:
parent
447217670f
commit
b26006d341
|
@ -739,12 +739,6 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
|
|||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
||||
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
|
||||
|
||||
// Seek file
|
||||
offset = tsdbSeekDFile(pDFile, 0, SEEK_END);
|
||||
if (offset < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Make buffer space
|
||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
|
||||
return -1;
|
||||
|
@ -842,7 +836,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
|
|||
tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));
|
||||
|
||||
// Write the whole block to file
|
||||
if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize) < lsize) {
|
||||
if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1005,8 +999,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
|
|||
|
||||
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
|
||||
|
||||
if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1076,7 +1069,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
|||
supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
|
||||
supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
|
||||
|
||||
if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, pBlock->numOfSubBlocks + 1) < 0) return -1;
|
||||
if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
|
||||
if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1;
|
||||
|
@ -1087,16 +1080,28 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
|||
|
||||
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
|
||||
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
||||
SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
// SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh));
|
||||
SDFile *pDFile;
|
||||
SBlock block;
|
||||
bool isSameFile;
|
||||
|
||||
if ((pBlock->last && pCommith->isLFileSame) || ((!pBlock->last) && pCommith->isDFileSame)) {
|
||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||
|
||||
if (pBlock->last) {
|
||||
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
|
||||
isSameFile = pCommith->isLFileSame;
|
||||
} else {
|
||||
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
isSameFile = pCommith->isDFileSame;
|
||||
}
|
||||
|
||||
if (isSameFile) {
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1;
|
||||
if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
block = *pBlock;
|
||||
block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlk);
|
||||
block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk);
|
||||
|
||||
if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
|
||||
pBlock->numOfSubBlocks) < 0) {
|
||||
|
@ -1105,7 +1110,7 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
|
|||
}
|
||||
} else {
|
||||
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
|
||||
if (tsdbWriteBlock(pCommith, pCommitF, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1;
|
||||
if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1;
|
||||
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
|
||||
}
|
||||
|
||||
|
@ -1113,14 +1118,12 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
|
|||
}
|
||||
|
||||
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
|
||||
ASSERT(pSupBlock != NULL);
|
||||
|
||||
if (taosArrayPush(pCommith->aSupBlk, pSupBlock) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pSubBlocks && taosArrayPushBatch(pCommith->aSupBlk, pSubBlocks, nSubBlocks) < 0) {
|
||||
if (pSubBlocks && taosArrayPushBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue