Merge pull request #14716 from taosdata/feature/3.0_mhli

refactor(sync): merge tsdb_snapshot code
This commit is contained in:
Li Minghao 2022-07-10 11:19:01 +08:00 committed by GitHub
commit d806ef3d80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1082 additions and 186 deletions

View File

@ -3,6 +3,7 @@ add_library(vnode STATIC "")
target_sources( target_sources(
vnode vnode
PRIVATE PRIVATE
# vnode # vnode
"src/vnd/vnodeOpen.c" "src/vnd/vnodeOpen.c"
"src/vnd/vnodeBufPool.c" "src/vnd/vnodeBufPool.c"
@ -13,7 +14,6 @@ target_sources(
"src/vnd/vnodeSvr.c" "src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c" "src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c" "src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeUtil.c"
# meta # meta
"src/meta/metaOpen.c" "src/meta/metaOpen.c"
@ -46,6 +46,7 @@ target_sources(
"src/tsdb/tsdbUtil.c" "src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c" "src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c" "src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbRetention.c"
# tq # tq
"src/tq/tq.c" "src/tq/tq.c"
@ -62,7 +63,6 @@ target_include_directories(
PUBLIC "inc" PUBLIC "inc"
PRIVATE "src/inc" PRIVATE "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
) )
target_link_libraries( target_link_libraries(
vnode vnode
@ -76,6 +76,7 @@ target_link_libraries(
PUBLIC executor PUBLIC executor
PUBLIC scheduler PUBLIC scheduler
PUBLIC tdb PUBLIC tdb
# PUBLIC bdb # PUBLIC bdb
# PUBLIC scalar # PUBLIC scalar
PUBLIC transport PUBLIC transport
@ -83,11 +84,11 @@ target_link_libraries(
PUBLIC index PUBLIC index
) )
target_compile_definitions(vnode PUBLIC -DMETA_REFACT) target_compile_definitions(vnode PUBLIC -DMETA_REFACT)
if(${BUILD_WITH_INVERTEDINDEX}) if(${BUILD_WITH_INVERTEDINDEX})
add_definitions(-DUSE_INVERTED_INDEX) add_definitions(-DUSE_INVERTED_INDEX)
endif(${BUILD_WITH_INVERTEDINDEX}) endif(${BUILD_WITH_INVERTEDINDEX})
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})

View File

@ -41,7 +41,8 @@ extern "C" {
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct STsdbCfg STsdbCfg; // todo: remove typedef struct STsdbCfg STsdbCfg; // todo: remove
typedef struct SVnodeCfg SVnodeCfg; typedef struct SVnodeCfg SVnodeCfg;
typedef struct SVSnapshotReader SVSnapshotReader; typedef struct SVSnapReader SVSnapReader;
typedef struct SVSnapWriter SVSnapWriter;
extern const SVnodeCfg vnodeCfgDefault; extern const SVnodeCfg vnodeCfgDefault;
@ -57,10 +58,6 @@ void vnodeStop(SVnode *pVnode);
int64_t vnodeGetSyncHandle(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode);
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
@ -185,7 +182,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// sma // sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// need to reposition // SVSnapReader
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader);
int32_t vnodeSnapReaderClose(SVSnapReader *pReader);
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
// SVSnapWriter
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter);
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
// structs // structs
struct STsdbCfg { struct STsdbCfg {

View File

@ -57,6 +57,9 @@ int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
// metaCommit ================== // metaCommit ==================
static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); } static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); }
// metaTable ==================
int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME);
struct SMeta { struct SMeta {
TdThreadRwlock lock; TdThreadRwlock lock;

View File

@ -64,6 +64,7 @@ typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS; typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger; typedef struct SRowMerger SRowMerger;
typedef struct STsdbFSState STsdbFSState; typedef struct STsdbFSState STsdbFSState;
typedef struct STsdbSnapHdr STsdbSnapHdr;
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512 #define TSDB_FHDR_SIZE 512

View File

@ -58,8 +58,11 @@ typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SMetaSnapshotReader SMetaSnapshotReader; typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct STsdbSnapshotReader STsdbSnapshotReader; typedef struct SMetaSnapWriter SMetaSnapWriter;
typedef struct STsdbSnapReader STsdbSnapReader;
typedef struct STsdbSnapWriter STsdbSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb" #define VNODE_TSDB_DIR "tsdb"
@ -74,8 +77,6 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader;
// vnd.h // vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolFree(SVBufPool* pPool, void* p);
int32_t vnodeRealloc(void** pp, int32_t size);
void vnodeFree(void* p);
// meta // meta
typedef struct SMCtbCursor SMCtbCursor; typedef struct SMCtbCursor SMCtbCursor;
@ -109,9 +110,6 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy); STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid); SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta); SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
void* metaGetIdx(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
@ -131,9 +129,6 @@ int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* p
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
// tq // tq
int tqInit(); int tqInit();
@ -183,6 +178,23 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
void tdUidStoreDestory(STbUidStore* pStore); void tdUidStoreDestory(STbUidStore* pStore);
void* tdUidStoreFree(STbUidStore* pStore); void* tdUidStoreFree(STbUidStore* pStore);
// SMetaSnapReader ========================================
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader);
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader);
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData);
// SMetaSnapWriter ========================================
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter);
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader);
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ========================================
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
int8_t dstType; int8_t dstType;
@ -202,7 +214,9 @@ typedef struct {
struct SVState { struct SVState {
int64_t committed; int64_t committed;
int64_t applied; int64_t applied;
int64_t applyTerm;
int64_t commitID; int64_t commitID;
int64_t commitTerm;
}; };
struct SVnodeInfo { struct SVnodeInfo {
@ -291,6 +305,12 @@ struct SSma {
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
struct SSnapDataHdr {
int8_t type;
int64_t size;
uint8_t data[];
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -15,53 +15,57 @@
#include "meta.h" #include "meta.h"
struct SMetaSnapshotReader { // SMetaSnapReader ========================================
struct SMetaSnapReader {
SMeta* pMeta; SMeta* pMeta;
TBC* pTbc;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
TBC* pTbc;
}; };
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever) { int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
int32_t c = 0; int32_t c = 0;
SMetaSnapshotReader* pMetaReader = NULL; SMetaSnapReader* pMetaSnapReader = NULL;
pMetaReader = (SMetaSnapshotReader*)taosMemoryCalloc(1, sizeof(*pMetaReader)); // alloc
if (pMetaReader == NULL) { pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader));
if (pMetaSnapReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pMetaReader->pMeta = pMeta; pMetaSnapReader->pMeta = pMeta;
pMetaReader->sver = sver; pMetaSnapReader->sver = sver;
pMetaReader->ever = ever; pMetaSnapReader->ever = ever;
code = tdbTbcOpen(pMeta->pTbDb, &pMetaReader->pTbc, NULL);
// impl
code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL);
if (code) { if (code) {
goto _err; goto _err;
} }
code = tdbTbcMoveTo(pMetaReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) { if (code) {
goto _err; goto _err;
} }
*ppReader = pMetaReader; *ppReader = pMetaSnapReader;
return code; return code;
_err: _err:
metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader) { int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
if (pReader) { tdbTbcClose((*ppReader)->pTbc);
tdbTbcClose(pReader->pTbc); taosMemoryFree(*ppReader);
taosMemoryFree(pReader); *ppReader = NULL;
}
return 0; return 0;
} }
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nDatap) { int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
const void* pKey = NULL; const void* pKey = NULL;
const void* pData = NULL; const void* pData = NULL;
int32_t nKey = 0; int32_t nKey = 0;
@ -71,23 +75,110 @@ int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t*
for (;;) { for (;;) {
code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData); code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData);
if (code || ((STbDbKey*)pData)->version > pReader->ever) { if (code || ((STbDbKey*)pData)->version > pReader->ever) {
return TSDB_CODE_VND_READ_END; code = TSDB_CODE_VND_READ_END;
goto _exit;
} }
if (((STbDbKey*)pData)->version < pReader->sver) { if (((STbDbKey*)pData)->version < pReader->sver) {
tdbTbcMoveToNext(pReader->pTbc);
continue; continue;
} }
tdbTbcMoveToNext(pReader->pTbc);
break; break;
} }
// copy the data // copy the data
if (vnodeRealloc(ppData, nData) < 0) { if (tRealloc(ppData, sizeof(SSnapDataHdr) + nData) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; return code;
} }
((SSnapDataHdr*)(*ppData))->type = 0; // TODO: use macro
((SSnapDataHdr*)(*ppData))->size = nData;
memcpy(((SSnapDataHdr*)(*ppData))->data, pData, nData);
memcpy(*ppData, pData, nData); _exit:
*nDatap = nData; return code;
}
// SMetaSnapWriter ========================================
struct SMetaSnapWriter {
SMeta* pMeta;
int64_t sver;
int64_t ever;
};
static int32_t metaSnapRollback(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t metaSnapCommit(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
int32_t code = 0;
SMetaSnapWriter* pWriter;
// alloc
pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pMeta = pMeta;
pWriter->sver = sver;
pWriter->ever = ever;
*ppWriter = pWriter;
return code;
_err:
metaError("vgId:%d meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
SMetaSnapWriter* pWriter = *ppWriter;
if (rollback) {
code = metaSnapRollback(pWriter);
if (code) goto _err;
} else {
code = metaSnapCommit(pWriter);
if (code) goto _err;
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;
_err:
metaError("vgId:%d meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
return code;
}
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
SMeta* pMeta = pWriter->pMeta;
SMetaEntry metaEntry = {0};
SDecoder* pDecoder = &(SDecoder){0};
tDecoderInit(pDecoder, pData, nData);
metaDecodeEntry(pDecoder, &metaEntry);
code = metaHandleEntry(pMeta, &metaEntry);
if (code) goto _err;
return code;
_err:
metaError("vgId:%d meta snapshot write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -17,7 +17,6 @@
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -1151,7 +1150,7 @@ _exit:
return rcode; return rcode;
} }
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
metaWLock(pMeta); metaWLock(pMeta);
// save to table.db // save to table.db

View File

@ -13,33 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "vnd.h" #include "tsdb.h"
int32_t vnodeRealloc(void** pp, int32_t size) { int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
uint8_t* p = NULL; int32_t code = 0;
int32_t csize = 0;
if (*pp) { // begin
p = (uint8_t*)(*pp) - sizeof(int32_t); code = tsdbFSBegin(pTsdb->fs);
csize = *(int32_t*)p; if (code) goto _err;
// do retention
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs->nState->aDFileSet); iSet++) {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pTsdb->fs->nState->aDFileSet, iSet);
// TODO
} }
if (csize >= size) { // commit
return 0; code = tsdbFSCommit(pTsdb->fs);
} if (code) goto _err;
p = (uint8_t*)taosMemoryRealloc(p, size); _exit:
if (p == NULL) { return code;
return TSDB_CODE_OUT_OF_MEMORY;
}
*(int32_t*)p = size;
*pp = p + sizeof(int32_t);
return 0; _err:
} tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
void vnodeFree(void* p) {
if (p) {
taosMemoryFree(((uint8_t*)p) - sizeof(int32_t));
}
} }

View File

@ -15,22 +15,686 @@
#include "tsdb.h" #include "tsdb.h"
struct STsdbSnapshotReader { // STsdbSnapReader ========================================
struct STsdbSnapReader {
STsdb* pTsdb; STsdb* pTsdb;
// TODO int64_t sver;
int64_t ever;
// for data file
int8_t dataDone;
int32_t fid;
SDataFReader* pDataFReader;
SArray* aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock; // SMapData<SBlock>
int32_t iBlock;
SBlockData blkData;
// for del file
int8_t delDone;
SDelFReader* pDelFReader;
int32_t iDelIdx;
SArray* aDelIdx; // SArray<SDelIdx>
SArray* aDelData; // SArray<SDelData>
}; };
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever) { static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
// TODO int32_t code = 0;
return 0;
while (true) {
if (pReader->pDataFReader == NULL) {
SDFileSet* pSet = NULL;
// search the next data file set to read (todo)
if (0 /* TODO */) {
code = TSDB_CODE_VND_READ_END;
goto _exit;
} }
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader) { // open
// TODO code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
return 0; if (code) goto _err;
// SBlockIdx
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
if (code) goto _err;
pReader->iBlockIdx = 0;
pReader->pBlockIdx = NULL;
} }
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData) { while (true) {
// TODO if (pReader->pBlockIdx == NULL) {
return 0; if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
tsdbDataFReaderClose(&pReader->pDataFReader);
break;
}
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
pReader->iBlockIdx++;
// SBlock
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
if (code) goto _err;
pReader->iBlock = 0;
}
while (true) {
SBlock block;
SBlock* pBlock = &block;
if (pReader->iBlock >= pReader->mBlock.nItem) {
pReader->pBlockIdx = NULL;
break;
}
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
pReader->iBlock++;
if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) ||
(pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) {
// overlap (todo)
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL);
if (code) goto _err;
goto _exit;
}
}
}
}
_exit:
return code;
_err:
tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
SDelFile* pDelFile = pTsdb->fs->cState->pDelFile;
if (pReader->pDelFReader == NULL) {
if (pDelFile == NULL) {
code = TSDB_CODE_VND_READ_END;
goto _exit;
}
// open
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err;
// read index
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
if (code) goto _err;
pReader->iDelIdx = 0;
}
while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
int32_t size = 0;
pReader->iDelIdx++;
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
if (code) goto _err;
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
size += tPutDelData(NULL, pDelData);
}
}
if (size > 0) {
int64_t n = 0;
size = size + sizeof(SSnapDataHdr) + sizeof(TABLEID);
code = tRealloc(ppData, size);
if (code) goto _err;
// SSnapDataHdr
SSnapDataHdr* pSnapDataHdr = (SSnapDataHdr*)(*ppData + n);
pSnapDataHdr->type = 1;
pSnapDataHdr->size = size; // TODO: size here may incorrect
n += sizeof(SSnapDataHdr);
// TABLEID
TABLEID* pId = (TABLEID*)(*ppData + n);
pId->suid = pDelIdx->suid;
pId->uid = pDelIdx->uid;
n += sizeof(*pId);
// DATA
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
n += tPutDelData(*ppData + n, pDelData);
}
}
goto _exit;
}
}
code = TSDB_CODE_VND_READ_END;
tsdbDelFReaderClose(&pReader->pDelFReader);
_exit:
return code;
_err:
tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
int32_t code = 0;
STsdbSnapReader* pReader = NULL;
// alloc
pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pTsdb = pTsdb;
pReader->sver = sver;
pReader->ever = ever;
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pReader->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->mBlock = tMapDataInit();
code = tBlockDataInit(&pReader->blkData);
if (code) goto _err;
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
if (pReader->aDelIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pReader->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*ppReader = pReader;
return code;
_err:
tsdbError("vgId:%d snapshot reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t code = 0;
STsdbSnapReader* pReader = *ppReader;
taosArrayDestroy(pReader->aDelData);
taosArrayDestroy(pReader->aDelIdx);
if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader);
}
tBlockDataClear(&pReader->blkData);
tMapDataClear(&pReader->mBlock);
taosArrayDestroy(pReader->aBlockIdx);
if (pReader->pDataFReader) {
tsdbDataFReaderClose(&pReader->pDataFReader);
}
taosMemoryFree(pReader);
*ppReader = NULL;
return code;
}
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
// read data file
if (!pReader->dataDone) {
code = tsdbSnapReadData(pReader, ppData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->dataDone = 1;
} else {
goto _err;
}
} else {
goto _exit;
}
}
// read del file
if (!pReader->delDone) {
code = tsdbSnapReadDel(pReader, ppData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->delDone = 1;
} else {
goto _err;
}
} else {
goto _exit;
}
}
code = TSDB_CODE_VND_READ_END;
_exit:
return code;
_err:
tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
// STsdbSnapWriter ========================================
struct STsdbSnapWriter {
STsdb* pTsdb;
int64_t sver;
int64_t ever;
// config
int32_t minutes;
int8_t precision;
// for data file
int32_t fid;
SDataFReader* pDataFReader;
SArray* aBlockIdx;
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock;
int32_t iBlock;
SBlockData blockData;
int32_t iRow;
SDataFWriter* pDataFWriter;
SArray* aBlockIdxN;
SBlockIdx blockIdx;
SMapData mBlockN;
SBlock block;
SBlockData nBlockData;
// for del file
SDelFReader* pDelFReader;
SDelFWriter* pDelFWriter;
int32_t iDelIdx;
SArray* aDelIdx;
SArray* aDelData;
SArray* aDelIdxN;
};
static int32_t tsdbSnapRollback(STsdbSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDataFWriter == NULL) goto _exit;
// TODO
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 0);
if (code) goto _err;
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
if (code) goto _err;
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb snapshot writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
int32_t iRow = 0; // todo
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
while (iRow < nRow) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
}
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
int64_t suid = 0; // todo
int64_t uid = 0; // todo
int64_t skey; // todo
int64_t ekey; // todo
int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision));
// begin
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err;
pWriter->fid = fid;
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid);
// reader
if (pSet) {
// open
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err;
// SBlockIdx
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL);
if (code) goto _err;
} else {
taosArrayClear(pWriter->aBlockIdx);
}
pWriter->iBlockIdx = 0;
// writer
SDFileSet wSet = {0};
if (pSet == NULL) {
wSet = (SDFileSet){0}; // todo
} else {
wSet = (SDFileSet){0}; // todo
}
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
if (code) goto _err;
taosArrayClear(pWriter->aBlockIdxN);
}
// process
TABLEID id = {0}; // TODO
TSKEY minKey = 0; // TODO
TSKEY maxKey = 0; // TODO
while (true) {
if (pWriter->pBlockIdx) {
int32_t c = tTABLEIDCmprFn(&id, pWriter->pBlockIdx);
if (c == 0) {
} else if (c < 0) {
// keep merge
} else {
// code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
pWriter->iBlockIdx++;
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
} else {
pWriter->pBlockIdx = NULL;
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
}
}
} else {
int32_t c = tTABLEIDCmprFn(&id, &pWriter->blockIdx);
if (c == 0) {
// merge commit the block data
} else if (c > 0) {
// code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
} else {
ASSERT(0);
}
}
}
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) {
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->nState);
// reader
if (pDelFile) {
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err;
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdx, NULL);
if (code) goto _err;
}
// writer
SDelFile delFile = {.commitID = pTsdb->pVnode->state.commitID, .offset = 0, .size = 0};
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
if (code) goto _err;
}
// process the del data
TABLEID id = {0}; // todo
while (true) {
SDelIdx* pDelIdx = NULL;
int64_t n = 0;
SDelData delData;
SDelIdx delIdx;
int8_t toBreak = 0;
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx)) {
pDelIdx = taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
}
if (pDelIdx) {
int32_t c = tTABLEIDCmprFn(&id, pDelIdx);
if (c < 0) {
goto _new_del;
} else {
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
if (code) goto _err;
pWriter->iDelIdx++;
if (c == 0) {
toBreak = 1;
delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
goto _merge_del;
} else {
delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
goto _write_del;
}
}
}
_new_del:
toBreak = 1;
delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
taosArrayClear(pWriter->aDelData);
_merge_del:
while (n < nData) {
n += tGetDelData(pData + n, &delData);
if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
_write_del:
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxN, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (toBreak) break;
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) goto _exit;
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
if (code) goto _err;
SDelIdx delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdx, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
if (code) goto _err;
code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pWriter->pDelFWriter->fDel);
if (code) goto _err;
code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
if (code) goto _err;
if (pWriter->pDelFReader) {
code = tsdbDelFReaderClose(&pWriter->pDelFReader);
if (code) goto _err;
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb snapshow write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
int32_t code = 0;
STsdbSnapWriter* pWriter = NULL;
// alloc
pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTsdb = pTsdb;
pWriter->sver = sver;
pWriter->ever = ever;
*ppWriter = pWriter;
return code;
_err:
tsdbError("vgId:%d tsdb snapshot writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STsdbSnapWriter* pWriter = *ppWriter;
if (rollback) {
code = tsdbSnapRollback(pWriter);
if (code) goto _err;
} else {
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err;
code = tsdbSnapWriteDelEnd(pWriter);
if (code) goto _err;
code = tsdbSnapCommit(pWriter);
if (code) goto _err;
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;
_err:
tsdbError("vgId:%d tsdb snapshot writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
int8_t type = pData[0];
// ts data
if (type == 0) {
code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1);
if (code) goto _err;
} else {
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err;
}
// del data
if (type == 1) {
code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1);
if (code) goto _err;
}
return code;
_err:
tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
} }

View File

@ -87,8 +87,10 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
n += tPutI32v(p ? p + n : p, pMapData->nItem); n += tPutI32v(p ? p + n : p, pMapData->nItem);
if (pMapData->nItem) { if (pMapData->nItem) {
int32_t lOffset = 0;
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem]); n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset);
lOffset = pMapData->aOffset[iItem];
} }
n += tPutI32v(p ? p + n : p, pMapData->nData); n += tPutI32v(p ? p + n : p, pMapData->nData);
@ -111,8 +113,11 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
if (pMapData->nItem) { if (pMapData->nItem) {
if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1; if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1;
int32_t lOffset = 0;
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tGetI32v(p + n, &pMapData->aOffset[iItem]); n += tGetI32v(p + n, &pMapData->aOffset[iItem]);
pMapData->aOffset[iItem] += lOffset;
lOffset = pMapData->aOffset[iItem];
} }
n += tGetI32v(p + n, &pMapData->nData); n += tGetI32v(p + n, &pMapData->nData);

View File

@ -223,6 +223,7 @@ int vnodeCommit(SVnode *pVnode) {
// save info // save info
info.config = pVnode->config; info.config = pVnode->config;
info.state.committed = pVnode->state.applied; info.state.committed = pVnode->state.applied;
info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID; info.state.commitID = pVnode->state.commitID;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
if (vnodeSaveInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0) {
@ -316,6 +317,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
return 0; return 0;
} }
@ -328,6 +330,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
if (code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code); tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
if (code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
if (code < 0) return -1;
return 0; return 0;
} }

View File

@ -79,8 +79,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
strcpy(pVnode->path, path); strcpy(pVnode->path, path);
pVnode->config = info.config; pVnode->config = info.config;
pVnode->state.committed = info.state.committed; pVnode->state.committed = info.state.committed;
pVnode->state.commitTerm = info.state.commitTerm;
pVnode->state.applied = info.state.committed; pVnode->state.applied = info.state.committed;
pVnode->state.commitID = info.state.commitID; pVnode->state.commitID = info.state.commitID;
pVnode->state.commitTerm = info.state.commitTerm;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
pVnode->blockCount = 0; pVnode->blockCount = 0;
@ -194,4 +196,9 @@ void vnodeStop(SVnode *pVnode) {}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; } void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = pVnode->state.committed;
pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
pSnapshot->lastConfigIndex = -1;
}

View File

@ -13,24 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "vnodeInt.h" #include "vnd.h"
struct SVSnapshotReader { // SVSnapReader ========================================================
struct SVSnapReader {
SVnode *pVnode; SVnode *pVnode;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int8_t isMetaEnd; // meta
int8_t isTsdbEnd; int8_t metaDone;
SMetaSnapshotReader *pMetaReader; SMetaSnapReader *pMetaReader;
STsdbSnapshotReader *pTsdbReader; // tsdb
void *pData; int8_t tsdbDone;
int32_t nData; STsdbSnapReader *pTsdbReader;
uint8_t *pData;
}; };
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
SVSnapshotReader *pReader = NULL; int32_t code = 0;
SVSnapReader *pReader = NULL;
pReader = (SVSnapshotReader *)taosMemoryCalloc(1, sizeof(*pReader)); pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) { if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
@ -38,72 +41,169 @@ int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int
pReader->pVnode = pVnode; pReader->pVnode = pVnode;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
pReader->isMetaEnd = 0;
pReader->isTsdbEnd = 0;
if (metaSnapshotReaderOpen(pVnode->pMeta, &pReader->pMetaReader, sver, ever) < 0) { code = metaSnapReaderOpen(pVnode->pMeta, sver, ever, &pReader->pMetaReader);
taosMemoryFree(pReader); if (code) goto _err;
goto _err;
}
if (tsdbSnapshotReaderOpen(pVnode->pTsdb, &pReader->pTsdbReader, sver, ever) < 0) { code = tsdbSnapReaderOpen(pVnode->pTsdb, sver, ever, &pReader->pTsdbReader);
metaSnapshotReaderClose(pReader->pMetaReader); if (code) goto _err;
taosMemoryFree(pReader);
goto _err;
}
_exit:
*ppReader = pReader; *ppReader = pReader;
return 0; return code;
_err: _err:
vError("vgId:%d vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return -1; return code;
} }
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader) { int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
if (pReader) {
vnodeFree(pReader->pData);
tsdbSnapshotReaderClose(pReader->pTsdbReader);
metaSnapshotReaderClose(pReader->pMetaReader);
taosMemoryFree(pReader);
}
return 0;
}
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
if (!pReader->isMetaEnd) { tFree(pReader->pData);
code = metaSnapshotRead(pReader->pMetaReader, &pReader->pData, &pReader->nData); if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader);
if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader);
taosMemoryFree(pReader);
return code;
}
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0;
if (!pReader->metaDone) {
code = metaSnapRead(pReader->pMetaReader, &pReader->pData);
if (code) { if (code) {
if (code == TSDB_CODE_VND_READ_END) { if (code == TSDB_CODE_VND_READ_END) {
pReader->isMetaEnd = 1; pReader->metaDone = 1;
} else { } else {
return code; goto _err;
} }
} else { } else {
*ppData = pReader->pData; *ppData = pReader->pData;
*nData = pReader->nData; *nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
return code; goto _exit;
} }
} }
if (!pReader->isTsdbEnd) { if (!pReader->tsdbDone) {
code = tsdbSnapshotRead(pReader->pTsdbReader, &pReader->pData, &pReader->nData); code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData);
if (code) { if (code) {
if (code == TSDB_CODE_VND_READ_END) { if (code == TSDB_CODE_VND_READ_END) {
pReader->isTsdbEnd = 1; pReader->tsdbDone = 1;
} else { } else {
return code; goto _err;
} }
} else { } else {
*ppData = pReader->pData; *ppData = pReader->pData;
*nData = pReader->nData; *nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
return code; goto _exit;
} }
} }
code = TSDB_CODE_VND_READ_END; code = TSDB_CODE_VND_READ_END;
_exit:
return code;
_err:
vError("vgId:% snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
return code;
}
// SVSnapWriter ========================================================
struct SVSnapWriter {
SVnode *pVnode;
int64_t sver;
int64_t ever;
// meta
SMetaSnapWriter *pMetaSnapWriter;
// tsdb
STsdbSnapWriter *pTsdbSnapWriter;
};
static int32_t vnodeSnapRollback(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t vnodeSnapCommit(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
int32_t code = 0;
SVSnapWriter *pWriter = NULL;
// alloc
pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pVnode = pVnode;
pWriter->sver = sver;
pWriter->ever = ever;
return code;
_err:
vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
return code;
}
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0;
if (rollback) {
code = vnodeSnapRollback(pWriter);
if (code) goto _err;
} else {
code = vnodeSnapCommit(pWriter);
if (code) goto _err;
}
taosMemoryFree(pWriter);
return code;
_err:
vError("vgId:%d vnode snapshow writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
return code;
}
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0;
SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData;
SVnode *pVnode = pWriter->pVnode;
ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData);
if (pSnapDataHdr->type == 0) {
// meta
if (pWriter->pMetaSnapWriter == NULL) {
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err;
}
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code) goto _err;
} else {
// tsdb
if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err;
}
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code) goto _err;
}
return code;
_err:
vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -143,6 +143,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
version); version);
pVnode->state.applied = version; pVnode->state.applied = version;
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
// skip header // skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));

View File

@ -453,20 +453,19 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam; SSnapshotParam *pSnapshotParam = pParam;
int32_t code = int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
vnodeSnapshotReaderOpen(pVnode, (SVSnapshotReader **)ppReader, pSnapshotParam->start, pSnapshotParam->end);
return code; return code;
} }
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapshotReaderClose(pReader); int32_t code = vnodeSnapReaderClose(pReader);
return code; return code;
} }
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapshotRead(pReader, (const void **)ppBuf, len); int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
return code; return code;
} }