refact code
This commit is contained in:
parent
efb429ebd3
commit
207d69e61e
|
@ -554,43 +554,44 @@ _err:
|
||||||
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
|
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SBlockData *pBlockData = &pCommitter->dWriter.bData;
|
SBlockData *pBlockData = &pCommitter->dWriter.bData;
|
||||||
SDataBlk block;
|
SDataBlk dataBlk;
|
||||||
|
|
||||||
ASSERT(pBlockData->nRow > 0);
|
ASSERT(pBlockData->nRow > 0);
|
||||||
|
|
||||||
tDataBlkReset(&block);
|
tDataBlkReset(&dataBlk);
|
||||||
|
|
||||||
// info
|
// info
|
||||||
block.nRow += pBlockData->nRow;
|
dataBlk.nRow += pBlockData->nRow;
|
||||||
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
||||||
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
|
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
|
||||||
|
|
||||||
if (iRow == 0) {
|
if (iRow == 0) {
|
||||||
if (tsdbKeyCmprFn(&block.minKey, &key) > 0) {
|
if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
|
||||||
block.minKey = key;
|
dataBlk.minKey = key;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
|
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
|
||||||
block.hasDup = 1;
|
dataBlk.hasDup = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&block.maxKey, &key) < 0) {
|
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
|
||||||
block.maxKey = key;
|
dataBlk.maxKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
block.minVer = TMIN(block.minVer, key.version);
|
dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
|
||||||
block.maxVer = TMAX(block.maxVer, key.version);
|
dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
// write
|
// write
|
||||||
block.nSubBlock++;
|
dataBlk.nSubBlock++;
|
||||||
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1],
|
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
|
||||||
((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0);
|
((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL,
|
||||||
|
pCommitter->cmprAlg, 0);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// put SDataBlk
|
// put SDataBlk
|
||||||
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk);
|
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &dataBlk, tPutDataBlk);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// clear
|
// clear
|
||||||
|
@ -605,33 +606,33 @@ _err:
|
||||||
|
|
||||||
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
|
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSttBlk blockL;
|
SSttBlk sstBlk;
|
||||||
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
|
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
|
||||||
|
|
||||||
ASSERT(pBlockData->nRow > 0);
|
ASSERT(pBlockData->nRow > 0);
|
||||||
|
|
||||||
// info
|
// info
|
||||||
blockL.suid = pBlockData->suid;
|
sstBlk.suid = pBlockData->suid;
|
||||||
blockL.nRow = pBlockData->nRow;
|
sstBlk.nRow = pBlockData->nRow;
|
||||||
blockL.minKey = TSKEY_MAX;
|
sstBlk.minKey = TSKEY_MAX;
|
||||||
blockL.maxKey = TSKEY_MIN;
|
sstBlk.maxKey = TSKEY_MIN;
|
||||||
blockL.minVer = VERSION_MAX;
|
sstBlk.minVer = VERSION_MAX;
|
||||||
blockL.maxVer = VERSION_MIN;
|
sstBlk.maxVer = VERSION_MIN;
|
||||||
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
||||||
blockL.minKey = TMIN(blockL.minKey, pBlockData->aTSKEY[iRow]);
|
sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
|
||||||
blockL.maxKey = TMAX(blockL.maxKey, pBlockData->aTSKEY[iRow]);
|
sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
|
||||||
blockL.minVer = TMIN(blockL.minVer, pBlockData->aVersion[iRow]);
|
sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
|
||||||
blockL.maxVer = TMAX(blockL.maxVer, pBlockData->aVersion[iRow]);
|
sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
|
||||||
}
|
}
|
||||||
blockL.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
|
sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
|
||||||
blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
|
sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
|
||||||
|
|
||||||
// write
|
// write
|
||||||
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1);
|
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &sstBlk.bInfo, NULL, pCommitter->cmprAlg, 1);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// push SSttBlk
|
// push SSttBlk
|
||||||
if (taosArrayPush(pCommitter->dWriter.aSttBlk, &blockL) == NULL) {
|
if (taosArrayPush(pCommitter->dWriter.aSttBlk, &sstBlk) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue