more vnode snapshot

This commit is contained in:
Hongze Cheng 2022-07-14 09:36:23 +00:00
parent 5823713185
commit b554559889
1 changed files with 118 additions and 0 deletions

View File

@ -489,9 +489,66 @@ _err:
return code;
}
static int32_t tsdbSnapWriteTableDataAhead(STsdbSnapWriter* pWriter, TABLEID id) {
int32_t code = 0;
if (pWriter->pDataFReader == NULL) goto _exit;
while (true) {
if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break;
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
if (c >= 0) break;
pWriter->iBlockIdx++;
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
SBlock block;
tMapDataReset(&pWriter->mBlockW);
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
tBlockReset(&block);
block.last = 1;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block,
pWriter->cmprAlg);
if (code) goto _err;
}
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
}
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
_exit:
return code;
_err:
return code;
}
static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
if (pWriter->pDataFReader == NULL) {
// no old data
@ -551,6 +608,67 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
}
}
} else {
// has old data
// TABLE ==================================================
// end last table data if id not same (todo)
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
if (c < 0) {
} else if (c > 0) {
ASSERT(0);
}
}
// start new table data if need (todo)
if (pWriter->pBlockIdxW == NULL) {
// commit table data ahead
code = tsdbSnapWriteTableDataAhead(pWriter, id);
if (code) goto _err;
// reader
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id);
if (c) {
pWriter->pBlockIdx = NULL;
}
} else {
pWriter->pBlockIdx = NULL;
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlockW, NULL);
if (code) goto _err;
pWriter->iBlock = 0;
}
// writer
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
tBlockReset(&pWriter->blockW);
tBlockDataReset(&pWriter->bDataW);
tMapDataReset(&pWriter->mBlockW);
}
// BLOCK ==================================================
// write block ahead
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
SBlock block;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
if (tsdbKeyCmprFn(&block.maxKey, &keyFirst) >= 0) break;
pWriter->iBlock++;
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
}
}
return code;