more snapshor reader

This commit is contained in:
Hongze Cheng 2022-07-05 07:42:56 +00:00
parent 5c7189a433
commit 9512b51ec0
1 changed files with 55 additions and 25 deletions

View File

@ -24,10 +24,11 @@ struct STsdbSnapReader {
int8_t dataDone; int8_t dataDone;
int32_t fid; int32_t fid;
SDataFReader* pDataFReader; SDataFReader* pDataFReader;
int32_t iBlockIdx;
SArray* aBlockIdx; // SArray<SBlockIdx> SArray* aBlockIdx; // SArray<SBlockIdx>
int32_t iBlock; int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock; // SMapData<SBlock> SMapData mBlock; // SMapData<SBlock>
int32_t iBlock;
SBlockData blkData; SBlockData blkData;
// for del file // for del file
int8_t delDone; int8_t delDone;
@ -39,39 +40,68 @@ struct STsdbSnapReader {
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
SBlock block;
SBlock* pBlock = &block;
if (pReader->pDataFReader == NULL) { while (true) {
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, NULL); if (pReader->pDataFReader == NULL) {
if (code) goto _err; SDFileSet* pSet = NULL;
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL); // search the next data file set to read (todo)
if (code) goto _err; if (0 /* TODO */) {
code = TSDB_CODE_VND_READ_END;
goto _exit;
}
pReader->iBlockIdx = 0; // open
} code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
if (code) goto _err;
while (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { // SBlockIdx
SBlockIdx* pBlockIdx = taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
if (code) goto _err;
pReader->iBlockIdx++; pReader->iBlockIdx = 0;
pReader->pBlockIdx = NULL;
}
code = tsdbReadBlock(pReader->pDataFReader, pBlockIdx, &pReader->mBlock, NULL); while (true) {
if (code) goto _err; if (pReader->pBlockIdx == NULL) {
if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
tsdbDataFReaderClose(&pReader->pDataFReader);
break;
}
break; pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
} pReader->iBlockIdx++;
while (pReader->iBlock < pReader->mBlock.nItem) { // SBlock
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock); code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
if (code) goto _err;
pReader->iBlock++; pReader->iBlock = 0;
}
if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) && while (true) {
(pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) { SBlock block;
// block in range, encode and return the data (todo) SBlock* pBlock = &block;
goto _exit;
if (pReader->iBlock >= pReader->mBlock.nItem) {
pReader->pBlockIdx = NULL;
break;
}
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
pReader->iBlock++;
if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) ||
(pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) {
// overlap (todo)
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL);
if (code) goto _err;
goto _exit;
}
}
} }
} }