snapshot read code
This commit is contained in:
parent
d8e6fe25eb
commit
56985c5217
|
@ -650,6 +650,12 @@ typedef struct SMergeTree {
|
|||
SLDataIter *pIter;
|
||||
} SMergeTree;
|
||||
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
int64_t uid;
|
||||
STSchema *pTSchema;
|
||||
} SSkmInfo;
|
||||
|
||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange);
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||
|
|
|
@ -14,13 +14,8 @@
|
|||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
int64_t uid;
|
||||
STSchema *pTSchema;
|
||||
} SSkmInfo;
|
||||
|
||||
typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT;
|
||||
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
|
||||
|
||||
typedef struct {
|
||||
SRBTreeNode n;
|
||||
|
@ -99,7 +94,7 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter);
|
|||
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
|
||||
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);
|
||||
|
||||
static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
|
||||
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
|
||||
SRowInfo *pInfo1 = (SRowInfo *)p1;
|
||||
SRowInfo *pInfo2 = (SRowInfo *)p2;
|
||||
|
||||
|
@ -325,22 +320,22 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid) {
|
||||
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (suid) {
|
||||
if (pCommitter->skmTable.suid == suid) {
|
||||
pCommitter->skmTable.uid = uid;
|
||||
if (pSkmInfo->suid == suid) {
|
||||
pSkmInfo->uid = uid;
|
||||
goto _exit;
|
||||
}
|
||||
} else {
|
||||
if (pCommitter->skmTable.uid == uid) goto _exit;
|
||||
if (pSkmInfo->uid == uid) goto _exit;
|
||||
}
|
||||
|
||||
pCommitter->skmTable.suid = suid;
|
||||
pCommitter->skmTable.uid = uid;
|
||||
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
|
||||
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, -1, &pCommitter->skmTable.pTSchema);
|
||||
pSkmInfo->suid = suid;
|
||||
pSkmInfo->uid = uid;
|
||||
tTSchemaDestroy(pSkmInfo->pTSchema);
|
||||
code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
|
||||
if (code) goto _exit;
|
||||
|
||||
_exit:
|
||||
|
@ -432,7 +427,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
|||
int8_t iIter = 0;
|
||||
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
|
||||
pIter = &pCommitter->aDataIter[iIter];
|
||||
pIter->type = LAST_DATA_ITER;
|
||||
pIter->type = STT_DATA_ITER;
|
||||
pIter->iStt = iStt;
|
||||
|
||||
code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
|
||||
|
@ -1046,7 +1041,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
} else if (pCommitter->pIter->type == LAST_DATA_ITER) { // last file
|
||||
} else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file
|
||||
pIter->iRow++;
|
||||
if (pIter->iRow < pIter->bData.nRow) {
|
||||
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
||||
|
@ -1437,7 +1432,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
|||
tMapDataReset(&pCommitter->dWriter.mBlock);
|
||||
|
||||
// impl
|
||||
code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid);
|
||||
code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
|
||||
if (code) goto _err;
|
||||
code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
|
||||
if (code) goto _err;
|
||||
|
|
|
@ -1053,6 +1053,29 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
|
||||
int32_t code = 0;
|
||||
SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0];
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
// decmpr
|
||||
code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]);
|
||||
if (code) goto _err;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
|
|
@ -16,6 +16,29 @@
|
|||
#include "tsdb.h"
|
||||
|
||||
// STsdbSnapReader ========================================
|
||||
typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT;
|
||||
typedef struct {
|
||||
SRBTreeNode n;
|
||||
SRowInfo rInfo;
|
||||
EFIterT type;
|
||||
union {
|
||||
struct {
|
||||
SArray* aBlockIdx;
|
||||
int32_t iBlockIdx;
|
||||
SBlockIdx* pBlockIdx;
|
||||
SMapData mBlock;
|
||||
int32_t iBlock;
|
||||
}; // .data file
|
||||
struct {
|
||||
int32_t iStt;
|
||||
SArray* aSttBlk;
|
||||
int32_t iSttBlk;
|
||||
}; // .stt file
|
||||
};
|
||||
SBlockData bData;
|
||||
int32_t iRow;
|
||||
} SFDataIter;
|
||||
|
||||
struct STsdbSnapReader {
|
||||
STsdb* pTsdb;
|
||||
int64_t sver;
|
||||
|
@ -26,146 +49,301 @@ struct STsdbSnapReader {
|
|||
int8_t dataDone;
|
||||
int32_t fid;
|
||||
SDataFReader* pDataFReader;
|
||||
SArray* aBlockIdx; // SArray<SBlockIdx>
|
||||
SArray* aSstBlk; // SArray<SSttBlk>
|
||||
SBlockIdx* pBlockIdx;
|
||||
SSttBlk* pSstBlk;
|
||||
|
||||
int32_t iBlockIdx;
|
||||
int32_t iBlockL;
|
||||
SMapData mBlock; // SMapData<SDataBlk>
|
||||
int32_t iBlock;
|
||||
SBlockData oBlockData;
|
||||
SBlockData nBlockData;
|
||||
SFDataIter* pIter;
|
||||
SRBTree rbt;
|
||||
SFDataIter aFDataIter[TSDB_MAX_STT_FILE + 1];
|
||||
SBlockData bData;
|
||||
SSkmInfo skmTable;
|
||||
// for del file
|
||||
int8_t delDone;
|
||||
SDelFReader* pDelFReader;
|
||||
SArray* aDelIdx; // SArray<SDelIdx>
|
||||
int32_t iDelIdx;
|
||||
SArray* aDelData; // SArray<SDelData>
|
||||
uint8_t* aBuf[5];
|
||||
};
|
||||
|
||||
extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
|
||||
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
|
||||
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
|
||||
|
||||
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
|
||||
int32_t code = 0;
|
||||
|
||||
SDFileSet dFileSet = {.fid = pReader->fid};
|
||||
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
|
||||
if (pSet == NULL) return code;
|
||||
|
||||
pReader->fid = pSet->fid;
|
||||
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
|
||||
if (code) goto _err;
|
||||
|
||||
pReader->pIter = NULL;
|
||||
tRBTreeCreate(&pReader->rbt, tRowInfoCmprFn);
|
||||
|
||||
// .data file
|
||||
SFDataIter* pIter = &pReader->aFDataIter[0];
|
||||
pIter->type = SNAP_DATA_FILE_ITER;
|
||||
|
||||
code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
|
||||
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
|
||||
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
|
||||
|
||||
if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid);
|
||||
ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid);
|
||||
|
||||
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
|
||||
|
||||
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
|
||||
pIter->rInfo.suid = pIter->pBlockIdx->suid;
|
||||
pIter->rInfo.uid = pIter->pBlockIdx->uid;
|
||||
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||
goto _add_iter_and_break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
|
||||
_add_iter_and_break:
|
||||
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
|
||||
break;
|
||||
}
|
||||
|
||||
// .stt file
|
||||
pIter = &pReader->aFDataIter[1];
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
pIter->type = SNAP_STT_FILE_ITER;
|
||||
pIter->iStt = iStt;
|
||||
|
||||
code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk);
|
||||
if (code) goto _err;
|
||||
|
||||
for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
|
||||
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||
|
||||
if (pSttBlk->minVer > pReader->ever) continue;
|
||||
if (pSttBlk->maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadSttBlock(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
|
||||
|
||||
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
|
||||
pIter->rInfo.suid = pIter->bData.suid;
|
||||
pIter->rInfo.uid = pIter->bData.uid;
|
||||
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||
goto _add_iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
|
||||
_add_iter:
|
||||
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
|
||||
pIter++;
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pReader->pTsdb->pVnode),
|
||||
pReader->pTsdb->path, pReader->fid);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode),
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) { return pReader->pIter ? &pReader->pIter->rInfo : NULL; }
|
||||
|
||||
static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pReader->pIter) {
|
||||
SFDataIter* pIter = pReader->pIter;
|
||||
|
||||
while (true) {
|
||||
_find_row:
|
||||
for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
|
||||
|
||||
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
|
||||
pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
||||
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||
goto _out;
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->type == SNAP_DATA_FILE_ITER) {
|
||||
while (true) {
|
||||
for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
|
||||
|
||||
if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
pIter->iRow = -1;
|
||||
goto _find_row;
|
||||
}
|
||||
|
||||
pIter->iBlockIdx++;
|
||||
if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break;
|
||||
|
||||
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
if (code) goto _err;
|
||||
pIter->iBlock = -1;
|
||||
}
|
||||
|
||||
pReader->pIter = NULL;
|
||||
} else if (pIter->type == SNAP_STT_FILE_ITER) {
|
||||
for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
|
||||
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||
|
||||
if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadSttBlock(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
pIter->iRow = -1;
|
||||
goto _find_row;
|
||||
}
|
||||
|
||||
pReader->pIter = NULL;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
_out:
|
||||
pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
|
||||
if (pReader->pIter && pIter) {
|
||||
int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo);
|
||||
if (c > 0) {
|
||||
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
|
||||
pReader->pIter = NULL;
|
||||
} else {
|
||||
ASSERT(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pReader->pIter == NULL) {
|
||||
pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
|
||||
if (pReader->pIter) {
|
||||
tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(pReader->bData.nRow);
|
||||
|
||||
int32_t aBufN[5] = {0};
|
||||
code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN);
|
||||
if (code) goto _exit;
|
||||
|
||||
int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
|
||||
pHdr->type = SNAP_DATA_TSDB;
|
||||
pHdr->size = size;
|
||||
|
||||
memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
|
||||
memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
|
||||
if (aBufN[1]) {
|
||||
memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
|
||||
}
|
||||
if (aBufN[0]) {
|
||||
memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pReader->pTsdb;
|
||||
|
||||
while (true) {
|
||||
if (pReader->pDataFReader == NULL) {
|
||||
// next
|
||||
SDFileSet dFileSet = {.fid = pReader->fid};
|
||||
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
|
||||
if (pSet == NULL) goto _exit;
|
||||
pReader->fid = pSet->fid;
|
||||
|
||||
// load
|
||||
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pTsdb, pSet);
|
||||
code = tsdbSnapReadOpenFile(pReader);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadSttBlk(pReader->pDataFReader, 0, pReader->aSstBlk);
|
||||
if (code) goto _err;
|
||||
|
||||
// init
|
||||
pReader->iBlockIdx = 0;
|
||||
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) {
|
||||
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
|
||||
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
pReader->iBlock = 0;
|
||||
} else {
|
||||
pReader->pBlockIdx = NULL;
|
||||
}
|
||||
|
||||
pReader->iBlockL = 0;
|
||||
while (true) {
|
||||
if (pReader->iBlockL >= taosArrayGetSize(pReader->aSstBlk)) {
|
||||
pReader->pSstBlk = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
pReader->pSstBlk = (SSttBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL);
|
||||
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) {
|
||||
// TODO
|
||||
break;
|
||||
}
|
||||
|
||||
pReader->iBlockL++;
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||
pReader->fid);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (pReader->pBlockIdx && pReader->pSstBlk) {
|
||||
TABLEID id = {.suid = pReader->pSstBlk->suid, .uid = pReader->pSstBlk->minUid};
|
||||
if (pReader->pDataFReader == NULL) break;
|
||||
|
||||
ASSERT(0);
|
||||
SRowInfo* pRowInfo = tsdbSnapGetRow(pReader);
|
||||
if (pRowInfo == NULL) {
|
||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
continue;
|
||||
}
|
||||
|
||||
// if (tTABLEIDCmprFn(pReader->pBlockIdx, &minId) < 0) {
|
||||
// // TODO
|
||||
// } else if (tTABLEIDCmprFn(pReader->pBlockIdx, &maxId) < 0) {
|
||||
// // TODO
|
||||
// } else {
|
||||
// // TODO
|
||||
// }
|
||||
} else if (pReader->pBlockIdx) {
|
||||
while (pReader->iBlock < pReader->mBlock.nItem) {
|
||||
SDataBlk block;
|
||||
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk);
|
||||
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
|
||||
SBlockData* pBlockData = &pReader->bData;
|
||||
|
||||
if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) {
|
||||
// load data (todo)
|
||||
}
|
||||
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
|
||||
if (code) goto _err;
|
||||
|
||||
// next
|
||||
pReader->iBlock++;
|
||||
if (*ppData) break;
|
||||
}
|
||||
code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pReader->iBlock >= pReader->mBlock.nItem) {
|
||||
pReader->iBlockIdx++;
|
||||
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) {
|
||||
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
|
||||
while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
|
||||
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock);
|
||||
if (code) goto _err;
|
||||
code = tsdbSnapNextRow(pReader);
|
||||
if (code) goto _err;
|
||||
|
||||
pReader->iBlock = 0;
|
||||
} else {
|
||||
pReader->pBlockIdx = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (*ppData) goto _exit;
|
||||
} else if (pReader->pSstBlk) {
|
||||
while (pReader->pSstBlk) {
|
||||
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) {
|
||||
// load data (todo)
|
||||
}
|
||||
|
||||
// next
|
||||
pReader->iBlockL++;
|
||||
if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) {
|
||||
pReader->pSstBlk = (SSttBlk*)taosArrayGetSize(pReader->aSstBlk);
|
||||
} else {
|
||||
pReader->pSstBlk = NULL;
|
||||
}
|
||||
|
||||
if (*ppData) goto _exit;
|
||||
}
|
||||
} else {
|
||||
pRowInfo = tsdbSnapGetRow(pReader);
|
||||
if (pRowInfo == NULL) {
|
||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBlockData->nRow >= 4096) break;
|
||||
}
|
||||
|
||||
code = tsdbSnapCmprData(pReader, ppData);
|
||||
if (code) goto _err;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
@ -216,7 +394,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
size += tPutDelData(NULL, pDelData);
|
||||
}
|
||||
}
|
||||
|
||||
if (size == 0) continue;
|
||||
|
||||
// org data
|
||||
|
@ -292,23 +469,33 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// data
|
||||
pReader->fid = INT32_MIN;
|
||||
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pReader->aBlockIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
|
||||
SFDataIter* pIter = &pReader->aFDataIter[iIter];
|
||||
|
||||
if (iIter == 0) {
|
||||
pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pIter->aBlockIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||
if (pIter->aSttBlk == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
code = tBlockDataCreate(&pIter->bData);
|
||||
if (code) goto _err;
|
||||
}
|
||||
pReader->aSstBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||
if (pReader->aSstBlk == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pReader->mBlock = tMapDataInit();
|
||||
code = tBlockDataCreate(&pReader->oBlockData);
|
||||
if (code) goto _err;
|
||||
code = tBlockDataCreate(&pReader->nBlockData);
|
||||
|
||||
code = tBlockDataCreate(&pReader->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
// del
|
||||
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
|
||||
if (pReader->aDelIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -335,18 +522,26 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
|||
int32_t code = 0;
|
||||
STsdbSnapReader* pReader = *ppReader;
|
||||
|
||||
if (pReader->pDataFReader) {
|
||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
}
|
||||
taosArrayDestroy(pReader->aSstBlk);
|
||||
taosArrayDestroy(pReader->aBlockIdx);
|
||||
tMapDataClear(&pReader->mBlock);
|
||||
tBlockDataDestroy(&pReader->oBlockData, 1);
|
||||
tBlockDataDestroy(&pReader->nBlockData, 1);
|
||||
// data
|
||||
if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
|
||||
SFDataIter* pIter = &pReader->aFDataIter[iIter];
|
||||
|
||||
if (pReader->pDelFReader) {
|
||||
tsdbDelFReaderClose(&pReader->pDelFReader);
|
||||
if (iIter == 0) {
|
||||
taosArrayDestroy(pIter->aBlockIdx);
|
||||
tMapDataClear(&pIter->mBlock);
|
||||
} else {
|
||||
taosArrayDestroy(pIter->aSttBlk);
|
||||
}
|
||||
|
||||
tBlockDataDestroy(&pIter->bData, 1);
|
||||
}
|
||||
|
||||
tBlockDataDestroy(&pReader->bData, 1);
|
||||
tTSchemaDestroy(pReader->skmTable.pTSchema);
|
||||
|
||||
// del
|
||||
if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
|
||||
taosArrayDestroy(pReader->aDelIdx);
|
||||
taosArrayDestroy(pReader->aDelData);
|
||||
|
||||
|
@ -354,6 +549,10 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
|||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
|
||||
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
|
||||
tFree(pReader->aBuf[iBuf]);
|
||||
}
|
||||
|
||||
taosMemoryFree(pReader);
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue