enh: exclude a list of snap ranges in tsdbSnapReaderOpen
This commit is contained in:
parent
573c28fd06
commit
1be2835fc1
|
@ -295,7 +295,8 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
|
||||||
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
|
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
|
||||||
// STsdbSnapReader ========================================
|
// STsdbSnapReader ========================================
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pEx,
|
||||||
|
STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
||||||
// STsdbSnapWriter ========================================
|
// STsdbSnapWriter ========================================
|
||||||
|
|
|
@ -48,7 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
|
||||||
// open rsma1/rsma2
|
// open rsma1/rsma2
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pSma->pRSmaTsdb[i]) {
|
if (pSma->pRSmaTsdb[i]) {
|
||||||
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
|
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, (i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2), NULL,
|
||||||
&pReader->pDataReader[i]);
|
&pReader->pDataReader[i]);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -991,25 +991,29 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, TSnapRangeArray **fsrArr) {
|
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pEx,
|
||||||
|
TSnapRangeArray **fsrArr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
STSnapRange *fsr1;
|
STSnapRange *fsr1 = NULL;
|
||||||
|
|
||||||
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
|
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
|
||||||
if (fsrArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (fsrArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, &fsr1);
|
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver, ever, &fsr1);
|
||||||
if (code) break;
|
if (code) break;
|
||||||
|
|
||||||
code = TARRAY2_APPEND(fsrArr[0], fsr1);
|
code = TARRAY2_APPEND(fsrArr[0], fsr1);
|
||||||
if (code) break;
|
if (code) break;
|
||||||
|
|
||||||
|
fsr1 = NULL;
|
||||||
}
|
}
|
||||||
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
tsdbTSnapRangeClear(&fsr1);
|
||||||
TARRAY2_DESTROY(fsrArr[0], tsdbTSnapRangeClear);
|
TARRAY2_DESTROY(fsrArr[0], tsdbTSnapRangeClear);
|
||||||
fsrArr[0] = NULL;
|
fsrArr[0] = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,8 @@ int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
|
||||||
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||||
int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsrArr);
|
int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsrArr);
|
||||||
|
|
||||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, TSnapRangeArray **fsrArr);
|
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pEx,
|
||||||
|
TSnapRangeArray **fsrArr);
|
||||||
int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr);
|
int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr);
|
||||||
// txn
|
// txn
|
||||||
int64_t tsdbFSAllocEid(STFileSystem *fs);
|
int64_t tsdbFSAllocEid(STFileSystem *fs);
|
||||||
|
|
|
@ -458,14 +458,19 @@ int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, STSnapRange **fsr) {
|
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr) {
|
||||||
fsr[0] = taosMemoryCalloc(1, sizeof(STSnapRange));
|
fsr[0] = taosMemoryCalloc(1, sizeof(*fsr[0]));
|
||||||
if (fsr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (fsr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
fsr[0]->fid = fset1->fid;
|
fsr[0]->fid = fset1->fid;
|
||||||
// fsr[0]->sver = sver;
|
fsr[0]->sver = sver;
|
||||||
// fsr[0]->ever = ever;
|
fsr[0]->ever = ever;
|
||||||
return tsdbTFileSetInitRef(pTsdb, fset1, &fsr[0]->fset);
|
|
||||||
|
int32_t code = tsdbTFileSetInitRef(pTsdb, fset1, &fsr[0]->fset);
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(fsr[0]);
|
||||||
|
fsr[0] = NULL;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
|
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
|
||||||
|
|
|
@ -46,7 +46,7 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
||||||
int32_t tsdbTFileSetClear(STFileSet **fset);
|
int32_t tsdbTFileSetClear(STFileSet **fset);
|
||||||
int32_t tsdbTFileSetRemove(STFileSet **fset);
|
int32_t tsdbTFileSetRemove(STFileSet **fset);
|
||||||
|
|
||||||
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, STSnapRange **fsr);
|
int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr);
|
||||||
int32_t tsdbTSnapRangeClear(STSnapRange **fsr);
|
int32_t tsdbTSnapRangeClear(STSnapRange **fsr);
|
||||||
|
|
||||||
// to/from json
|
// to/from json
|
||||||
|
|
|
@ -412,7 +412,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** reader) {
|
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pEx, STsdbSnapReader** reader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
@ -424,7 +424,7 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type,
|
||||||
reader[0]->ever = ever;
|
reader[0]->ever = ever;
|
||||||
reader[0]->type = type;
|
reader[0]->type = type;
|
||||||
|
|
||||||
code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, &reader[0]->fsrArr);
|
code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TSnapRangeArray*)pEx, &reader[0]->fsrArr);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
// SVSnapReader ========================================================
|
// SVSnapReader ========================================================
|
||||||
struct SVSnapReader {
|
struct SVSnapReader {
|
||||||
|
@ -28,6 +29,7 @@ struct SVSnapReader {
|
||||||
SMetaSnapReader *pMetaReader;
|
SMetaSnapReader *pMetaReader;
|
||||||
// tsdb
|
// tsdb
|
||||||
int8_t tsdbDone;
|
int8_t tsdbDone;
|
||||||
|
TSnapRangeArray *pEx;
|
||||||
STsdbSnapReader *pTsdbReader;
|
STsdbSnapReader *pTsdbReader;
|
||||||
// tq
|
// tq
|
||||||
int8_t tqHandleDone;
|
int8_t tqHandleDone;
|
||||||
|
@ -59,6 +61,8 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
|
||||||
|
// TODO: pReader->pEx
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
|
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return code;
|
return code;
|
||||||
|
@ -175,7 +179,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
if (!pReader->tsdbDone) {
|
if (!pReader->tsdbDone) {
|
||||||
// open if not
|
// open if not
|
||||||
if (pReader->pTsdbReader == NULL) {
|
if (pReader->pTsdbReader == NULL) {
|
||||||
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB,
|
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pEx,
|
||||||
&pReader->pTsdbReader);
|
&pReader->pTsdbReader);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue