more snapshot
This commit is contained in:
parent
538f130ac0
commit
fc0745825e
|
@ -28,8 +28,10 @@ struct STsdbSnapReader {
|
|||
SDataFReader* pDataFReader;
|
||||
SArray* aBlockIdx; // SArray<SBlockIdx>
|
||||
SArray* aBlockL; // SArray<SBlockL>
|
||||
int32_t iBlockIdx;
|
||||
SBlockIdx* pBlockIdx;
|
||||
SBlockL* pBlockL;
|
||||
|
||||
int32_t iBlockIdx;
|
||||
int32_t iBlockL;
|
||||
SMapData mBlock; // SMapData<SBlock>
|
||||
int32_t iBlock;
|
||||
|
@ -75,96 +77,42 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
}
|
||||
|
||||
while (true) {
|
||||
#if 0
|
||||
if (pReader->pBlockIdx == NULL) {
|
||||
if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
|
||||
if (pReader->pBlockIdx && pReader->pBlockL) {
|
||||
} else if (pReader->pBlockIdx) {
|
||||
// may have problem (todo)
|
||||
while (pReader->iBlock < pReader->mBlock.nItem) {
|
||||
SBlock block;
|
||||
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetBlock);
|
||||
|
||||
if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) {
|
||||
// load data (todo)
|
||||
}
|
||||
|
||||
// next
|
||||
pReader->iBlock++;
|
||||
|
||||
if (*ppData) goto _exit;
|
||||
}
|
||||
} else if (pReader->pBlockL) {
|
||||
while (pReader->pBlockL) {
|
||||
if (pReader->pBlockL->minVer <= pReader->ever && pReader->pBlockL->maxVer >= pReader->sver) {
|
||||
// load data (todo)
|
||||
}
|
||||
|
||||
// next
|
||||
pReader->iBlockL++;
|
||||
if (pReader->iBlockL < taosArrayGetSize(pReader->aBlockL)) {
|
||||
pReader->pBlockL = (SBlockL*)taosArrayGetSize(pReader->aBlockL);
|
||||
} else {
|
||||
pReader->pBlockL = NULL;
|
||||
}
|
||||
|
||||
if (*ppData) goto _exit;
|
||||
}
|
||||
} else {
|
||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
break;
|
||||
}
|
||||
|
||||
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
|
||||
pReader->iBlockIdx++;
|
||||
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
pReader->iBlock = 0;
|
||||
}
|
||||
|
||||
SBlock block;
|
||||
SBlock* pBlock = █
|
||||
while (true) {
|
||||
if (pReader->iBlock >= pReader->mBlock.nItem) {
|
||||
pReader->pBlockIdx = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
|
||||
pReader->iBlock++;
|
||||
|
||||
if (pBlock->minVersion > pReader->ever || pBlock->maxVersion < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->oBlockData, NULL, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
// filter
|
||||
tBlockDataReset(&pReader->nBlockData);
|
||||
for (int32_t iColData = 0; iColData < taosArrayGetSize(pReader->oBlockData.aIdx); iColData++) {
|
||||
SColData* pColDataO = tBlockDataGetColDataByIdx(&pReader->oBlockData, iColData);
|
||||
SColData* pColDataN = NULL;
|
||||
|
||||
code = tBlockDataAddColData(&pReader->nBlockData, taosArrayGetSize(pReader->nBlockData.aIdx), &pColDataN);
|
||||
if (code) goto _err;
|
||||
|
||||
tColDataInit(pColDataN, pColDataO->cid, pColDataO->type, pColDataO->smaOn);
|
||||
}
|
||||
|
||||
for (int32_t iRow = 0; iRow < pReader->oBlockData.nRow; iRow++) {
|
||||
TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow);
|
||||
int64_t version = TSDBROW_VERSION(&row);
|
||||
|
||||
tsdbTrace("vgId:%d, vnode snapshot tsdb read for %s, %" PRId64 "(%" PRId64 " , %" PRId64 ")",
|
||||
TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path, version, pReader->sver, pReader->ever);
|
||||
|
||||
if (version < pReader->sver || version > pReader->ever) continue;
|
||||
|
||||
code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (pReader->nBlockData.nRow <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// org data
|
||||
// compress data (todo)
|
||||
int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||
pHdr->type = pReader->type;
|
||||
pHdr->size = size;
|
||||
|
||||
TABLEID* pId = (TABLEID*)(&pHdr[1]);
|
||||
pId->suid = pReader->pBlockIdx->suid;
|
||||
pId->uid = pReader->pBlockIdx->uid;
|
||||
|
||||
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
|
||||
|
||||
// tsdbInfo("vgId:%d, vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
|
||||
// " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
|
||||
// TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
|
||||
// pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
|
||||
// size);
|
||||
|
||||
goto _exit;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue