feat: use SSnapshotParam as a parameter to vnodeSnapReaderOpen
This commit is contained in:
parent
083dd148be
commit
4d55488495
|
@ -259,7 +259,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
|
|
||||||
// SVSnapReader
|
// SVSnapReader
|
||||||
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader);
|
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader);
|
||||||
void vnodeSnapReaderClose(SVSnapReader *pReader);
|
void vnodeSnapReaderClose(SVSnapReader *pReader);
|
||||||
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
|
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
|
||||||
// SVSnapWriter
|
// SVSnapWriter
|
||||||
|
|
|
@ -295,7 +295,8 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
|
||||||
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
|
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
|
||||||
// STsdbSnapReader ========================================
|
// STsdbSnapReader ========================================
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pExclude,
|
int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap);
|
||||||
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
|
||||||
STsdbSnapReader** ppReader);
|
STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
||||||
|
|
|
@ -1252,23 +1252,12 @@ _out:
|
||||||
return Z;
|
return Z;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pExclude,
|
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges,
|
||||||
TSnapRangeArray **fsrArr) {
|
TSnapRangeArray **fsrArr) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
STSnapRange *fsr1 = NULL;
|
STSnapRange *fsr1 = NULL;
|
||||||
|
|
||||||
TSnapRangeArray *snapF = tsdbFSToSnapRangeArray(fs);
|
|
||||||
if (snapF == NULL) {
|
|
||||||
tsdbError("failed to generate snap ranges from fs since %s.", terrstr());
|
|
||||||
goto _out;
|
|
||||||
}
|
|
||||||
TSnapRangeArray *snapD = tsdbSnapDiff(snapF, pExclude);
|
|
||||||
if (snapD == NULL) {
|
|
||||||
tsdbError("failed to get diff of snap ranges since %s.", terrstr());
|
|
||||||
goto _out;
|
|
||||||
}
|
|
||||||
|
|
||||||
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
|
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
|
||||||
if (fsrArr[0] == NULL) {
|
if (fsrArr[0] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1278,37 +1267,32 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
// TODO: use the same fs fSetArr as get snapDiff. The following treatment is potentially wrong
|
|
||||||
// if the fSetArr are changed.
|
|
||||||
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
while (i < TARRAY2_SIZE(snapD)) {
|
int64_t sver1 = sver;
|
||||||
STSnapRange *u = TARRAY2_GET(snapD, i);
|
int64_t ever1 = ever;
|
||||||
if (fset->fid < u->fid) {
|
|
||||||
break;
|
while (pRanges && i < TARRAY2_SIZE(pRanges)) {
|
||||||
} else if (fset->fid > u->fid) {
|
STSnapRange *u = TARRAY2_GET(pRanges, i);
|
||||||
ASSERT(false);
|
if (fset->fid > u->fid) {
|
||||||
i++;
|
i++;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
if (fset->fid == u->fid) {
|
||||||
|
sver1 = u->sver;
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
int64_t sver1 = TMAX(sver, u->sver);
|
|
||||||
int64_t ever1 = TMIN(ever, u->ever);
|
|
||||||
if (sver1 > ever1) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
|
|
||||||
|
|
||||||
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
|
|
||||||
if (code) break;
|
|
||||||
|
|
||||||
code = TARRAY2_APPEND(fsrArr[0], fsr1);
|
|
||||||
if (code) break;
|
|
||||||
|
|
||||||
fsr1 = NULL;
|
|
||||||
}
|
}
|
||||||
|
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
|
||||||
|
|
||||||
|
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
|
||||||
if (code) break;
|
if (code) break;
|
||||||
|
|
||||||
|
code = TARRAY2_APPEND(fsrArr[0], fsr1);
|
||||||
|
if (code) break;
|
||||||
|
|
||||||
|
fsr1 = NULL;
|
||||||
}
|
}
|
||||||
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
||||||
|
|
||||||
|
@ -1319,14 +1303,6 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
||||||
}
|
}
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
TSnapRangeArray **ppArrs[2] = {&snapF, &snapD};
|
|
||||||
int len = sizeof(ppArrs) / sizeof(ppArrs[0]);
|
|
||||||
for (int k = 0; k < len; k++) {
|
|
||||||
if (ppArrs[k][0] == NULL) continue;
|
|
||||||
TARRAY2_DESTROY(ppArrs[k][0], tsdbTSnapRangeClear);
|
|
||||||
taosMemoryFree(ppArrs[k][0]);
|
|
||||||
ppArrs[k][0] = NULL;
|
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -412,7 +412,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pExclude,
|
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
|
||||||
STsdbSnapReader** reader) {
|
STsdbSnapReader** reader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -425,7 +425,7 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type,
|
||||||
reader[0]->ever = ever;
|
reader[0]->ever = ever;
|
||||||
reader[0]->type = type;
|
reader[0]->type = type;
|
||||||
|
|
||||||
code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TSnapRangeArray*)pExclude, &reader[0]->fsrArr);
|
code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TSnapRangeArray*)pRanges, &reader[0]->fsrArr);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -1157,3 +1157,8 @@ _exit:
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) {
|
||||||
|
// TODO: get the full and diff info of tsdb Snap
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -518,13 +518,9 @@ void vnodeStop(SVnode *pVnode) {}
|
||||||
|
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
||||||
|
|
||||||
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
|
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
|
||||||
pSnapshot->lastApplyIndex = pVnode->state.committed;
|
pSnap->lastApplyIndex = pVnode->state.committed;
|
||||||
pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
|
pSnap->lastApplyTerm = pVnode->state.commitTerm;
|
||||||
pSnapshot->lastConfigIndex = -1;
|
pSnap->lastConfigIndex = -1;
|
||||||
|
return tsdbSnapGetInfo(pVnode->pTsdb, pSnap);
|
||||||
if (pSnapshot->typ == TAOS_SYNC_SNAP_INFO_FULL) {
|
|
||||||
// TODO: get full info of snapshots
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ struct SVSnapReader {
|
||||||
SMetaSnapReader *pMetaReader;
|
SMetaSnapReader *pMetaReader;
|
||||||
// tsdb
|
// tsdb
|
||||||
int8_t tsdbDone;
|
int8_t tsdbDone;
|
||||||
TSnapRangeArray *pExclude;
|
TSnapRangeArray *pRanges;
|
||||||
STsdbSnapReader *pTsdbReader;
|
STsdbSnapReader *pTsdbReader;
|
||||||
// tq
|
// tq
|
||||||
int8_t tqHandleDone;
|
int8_t tqHandleDone;
|
||||||
|
@ -48,8 +48,10 @@ struct SVSnapReader {
|
||||||
SRSmaSnapReader *pRsmaReader;
|
SRSmaSnapReader *pRsmaReader;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
|
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int64_t sver = pParam->start;
|
||||||
|
int64_t ever = pParam->end;
|
||||||
SVSnapReader *pReader = NULL;
|
SVSnapReader *pReader = NULL;
|
||||||
|
|
||||||
pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
|
pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
|
||||||
|
@ -61,7 +63,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
|
||||||
// TODO: pReader->pEx
|
// TODO: decode pParam->data and store the result in pReader->pRanges
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
|
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
|
@ -179,7 +181,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
if (!pReader->tsdbDone) {
|
if (!pReader->tsdbDone) {
|
||||||
// open if not
|
// open if not
|
||||||
if (pReader->pTsdbReader == NULL) {
|
if (pReader->pTsdbReader == NULL) {
|
||||||
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pExclude,
|
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges,
|
||||||
&pReader->pTsdbReader);
|
&pReader->pTsdbReader);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
|
@ -475,8 +475,7 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta
|
||||||
|
|
||||||
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
SSnapshotParam *pSnapshotParam = pParam;
|
int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
|
||||||
int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue