more refact
This commit is contained in:
parent
2e062dc948
commit
eb888684e3
|
|
@ -42,8 +42,8 @@ extern "C" {
|
||||||
typedef struct SVnode SVnode;
|
typedef struct SVnode SVnode;
|
||||||
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||||
typedef struct SVnodeCfg SVnodeCfg;
|
typedef struct SVnodeCfg SVnodeCfg;
|
||||||
typedef struct SVSnapshotReader SVSnapshotReader;
|
typedef struct SVSnapReader SVSnapReader;
|
||||||
typedef struct SVSnapshotWriter SVSnapshotWriter;
|
typedef struct SVSnapWriter SVSnapWriter;
|
||||||
|
|
||||||
extern const SVnodeCfg vnodeCfgDefault;
|
extern const SVnodeCfg vnodeCfgDefault;
|
||||||
|
|
||||||
|
|
@ -157,14 +157,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, SStreamReader *pHandle);
|
||||||
// sma
|
// sma
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
|
|
||||||
// SVSnapshotReader
|
// SVSnapReader
|
||||||
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
|
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SVSnapReader **ppReader, int64_t sver, int64_t ever);
|
||||||
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
|
int32_t vnodeSnapReaderClose(SVSnapReader *pReader);
|
||||||
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
|
int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nData);
|
||||||
// SVSnapshotWriter;
|
// SVSnapWriter;
|
||||||
int32_t vnodeSnapshotWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapshotWriter **ppWriter);
|
int32_t vnodeSnapshotWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter);
|
||||||
int32_t vnodeSnapshotWrite(SVSnapshotWriter *pWriter, uint8_t *pData, uint32_t nData);
|
int32_t vnodeSnapshotWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
|
||||||
int32_t vnodeSnapshotWriterClose(SVSnapshotWriter *pWriter, int8_t rollback);
|
int32_t vnodeSnapshotWriterClose(SVSnapWriter *pWriter, int8_t rollback);
|
||||||
|
|
||||||
// structs
|
// structs
|
||||||
struct STsdbCfg {
|
struct STsdbCfg {
|
||||||
|
|
|
||||||
|
|
@ -58,8 +58,8 @@ typedef struct SVState SVState;
|
||||||
typedef struct SVBufPool SVBufPool;
|
typedef struct SVBufPool SVBufPool;
|
||||||
typedef struct SQWorker SQHandle;
|
typedef struct SQWorker SQHandle;
|
||||||
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
||||||
typedef struct SMetaSnapshotReader SMetaSnapshotReader;
|
typedef struct SMetaSnapReader SMetaSnapReader;
|
||||||
typedef struct STsdbSnapshotReader STsdbSnapshotReader;
|
typedef struct STsdbSnapReader STsdbSnapReader;
|
||||||
|
|
||||||
#define VNODE_META_DIR "meta"
|
#define VNODE_META_DIR "meta"
|
||||||
#define VNODE_TSDB_DIR "tsdb"
|
#define VNODE_TSDB_DIR "tsdb"
|
||||||
|
|
@ -108,9 +108,9 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
|
||||||
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
|
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
|
||||||
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
|
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
|
||||||
SArray* metaGetSmaTbUids(SMeta* pMeta);
|
SArray* metaGetSmaTbUids(SMeta* pMeta);
|
||||||
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever);
|
int32_t metaSnapReaderOpen(SMeta* pMeta, SMetaSnapReader** ppReader, int64_t sver, int64_t ever);
|
||||||
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
|
int32_t metaSnapReaderClose(SMetaSnapReader* pReader);
|
||||||
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nData);
|
||||||
void* metaGetIdx(SMeta* pMeta);
|
void* metaGetIdx(SMeta* pMeta);
|
||||||
void* metaGetIvtIdx(SMeta* pMeta);
|
void* metaGetIvtIdx(SMeta* pMeta);
|
||||||
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
||||||
|
|
@ -130,9 +130,9 @@ int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* p
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||||
void* pMemRef);
|
void* pMemRef);
|
||||||
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, STsdbSnapReader** ppReader, int64_t sver, int64_t ever);
|
||||||
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
|
int32_t tsdbSnapReaderClose(STsdbSnapReader* pReader);
|
||||||
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, void** ppData, uint32_t* nData);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
int tqInit();
|
int tqInit();
|
||||||
|
|
|
||||||
|
|
@ -15,19 +15,19 @@
|
||||||
|
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
|
|
||||||
struct SMetaSnapshotReader {
|
struct SMetaSnapReader {
|
||||||
SMeta* pMeta;
|
SMeta* pMeta;
|
||||||
TBC* pTbc;
|
TBC* pTbc;
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever) {
|
int32_t metaSnapReaderOpen(SMeta* pMeta, SMetaSnapReader** ppReader, int64_t sver, int64_t ever) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
SMetaSnapshotReader* pMetaReader = NULL;
|
SMetaSnapReader* pMetaReader = NULL;
|
||||||
|
|
||||||
pMetaReader = (SMetaSnapshotReader*)taosMemoryCalloc(1, sizeof(*pMetaReader));
|
pMetaReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaReader));
|
||||||
if (pMetaReader == NULL) {
|
if (pMetaReader == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
@ -53,7 +53,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader) {
|
int32_t metaSnapReaderClose(SMetaSnapReader* pReader) {
|
||||||
if (pReader) {
|
if (pReader) {
|
||||||
tdbTbcClose(pReader->pTbc);
|
tdbTbcClose(pReader->pTbc);
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
|
|
@ -61,7 +61,7 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nDatap) {
|
int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nDatap) {
|
||||||
const void* pKey = NULL;
|
const void* pKey = NULL;
|
||||||
const void* pData = NULL;
|
const void* pData = NULL;
|
||||||
int32_t nKey = 0;
|
int32_t nKey = 0;
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
struct STsdbSnapshotReader {
|
struct STsdbSnapReader {
|
||||||
STsdb* pTsdb;
|
STsdb* pTsdb;
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
|
|
@ -25,22 +25,23 @@ struct STsdbSnapshotReader {
|
||||||
SDelFReader* pDelFReader;
|
SDelFReader* pDelFReader;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct STsdbSnapshotWriter {
|
typedef struct STsdbSnapWriter {
|
||||||
STsdb* pTsdb;
|
STsdb* pTsdb;
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
// for data file
|
// for data file
|
||||||
|
int32_t iDFileSet;
|
||||||
SDataFWriter* pDataFWriter;
|
SDataFWriter* pDataFWriter;
|
||||||
// for del file
|
// for del file
|
||||||
SDelFWriter* pDelFWriter;
|
SDelFWriter* pDelFWriter;
|
||||||
} STsdbSnapshotWriter;
|
} STsdbSnapWriter;
|
||||||
|
|
||||||
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever) {
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, STsdbSnapReader** ppReader, int64_t sver, int64_t ever) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdbSnapshotReader* pReader = NULL;
|
STsdbSnapReader* pReader = NULL;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pReader = (STsdbSnapshotReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||||
if (pReader == NULL) {
|
if (pReader == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
@ -58,9 +59,13 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData) {
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, void** ppData, uint32_t* nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
|
||||||
|
// read data file
|
||||||
|
|
||||||
|
// read del file
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
@ -68,7 +73,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader) {
|
int32_t tsdbSnapReaderClose(STsdbSnapReader* pReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
||||||
|
|
@ -15,29 +15,29 @@
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
struct SVSnapshotReader {
|
struct SVSnapReader {
|
||||||
SVnode *pVnode;
|
SVnode *pVnode;
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
int8_t isMetaEnd;
|
int8_t isMetaEnd;
|
||||||
int8_t isTsdbEnd;
|
int8_t isTsdbEnd;
|
||||||
SMetaSnapshotReader *pMetaReader;
|
SMetaSnapReader *pMetaReader;
|
||||||
STsdbSnapshotReader *pTsdbReader;
|
STsdbSnapReader *pTsdbReader;
|
||||||
void *pData;
|
void *pData;
|
||||||
int32_t nData;
|
int32_t nData;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SVSnapshotWriter {
|
struct SVSnapWriter {
|
||||||
SVnode *pVnode;
|
SVnode *pVnode;
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
};
|
};
|
||||||
|
|
||||||
// SVSnapshotReader ========================================================
|
// SVSnapReader ========================================================
|
||||||
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever) {
|
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SVSnapReader **ppReader, int64_t sver, int64_t ever) {
|
||||||
SVSnapshotReader *pReader = NULL;
|
SVSnapReader *pReader = NULL;
|
||||||
|
|
||||||
pReader = (SVSnapshotReader *)taosMemoryCalloc(1, sizeof(*pReader));
|
pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
|
||||||
if (pReader == NULL) {
|
if (pReader == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
@ -48,13 +48,13 @@ int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int
|
||||||
pReader->isMetaEnd = 0;
|
pReader->isMetaEnd = 0;
|
||||||
pReader->isTsdbEnd = 0;
|
pReader->isTsdbEnd = 0;
|
||||||
|
|
||||||
if (metaSnapshotReaderOpen(pVnode->pMeta, &pReader->pMetaReader, sver, ever) < 0) {
|
if (metaSnapReaderOpen(pVnode->pMeta, &pReader->pMetaReader, sver, ever) < 0) {
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbSnapshotReaderOpen(pVnode->pTsdb, &pReader->pTsdbReader, sver, ever) < 0) {
|
if (tsdbSnapReaderOpen(pVnode->pTsdb, &pReader->pTsdbReader, sver, ever) < 0) {
|
||||||
metaSnapshotReaderClose(pReader->pMetaReader);
|
metaSnapReaderClose(pReader->pMetaReader);
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
@ -68,21 +68,21 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader) {
|
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
|
||||||
if (pReader) {
|
if (pReader) {
|
||||||
vnodeFree(pReader->pData);
|
vnodeFree(pReader->pData);
|
||||||
tsdbSnapshotReaderClose(pReader->pTsdbReader);
|
tsdbSnapReaderClose(pReader->pTsdbReader);
|
||||||
metaSnapshotReaderClose(pReader->pMetaReader);
|
metaSnapReaderClose(pReader->pMetaReader);
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData) {
|
int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (!pReader->isMetaEnd) {
|
if (!pReader->isMetaEnd) {
|
||||||
code = metaSnapshotRead(pReader->pMetaReader, &pReader->pData, &pReader->nData);
|
code = metaSnapRead(pReader->pMetaReader, &pReader->pData, &pReader->nData);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (code == TSDB_CODE_VND_READ_END) {
|
if (code == TSDB_CODE_VND_READ_END) {
|
||||||
pReader->isMetaEnd = 1;
|
pReader->isMetaEnd = 1;
|
||||||
|
|
@ -97,7 +97,7 @@ int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pReader->isTsdbEnd) {
|
if (!pReader->isTsdbEnd) {
|
||||||
code = tsdbSnapshotRead(pReader->pTsdbReader, &pReader->pData, &pReader->nData);
|
code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData, &pReader->nData);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (code == TSDB_CODE_VND_READ_END) {
|
if (code == TSDB_CODE_VND_READ_END) {
|
||||||
pReader->isTsdbEnd = 1;
|
pReader->isTsdbEnd = 1;
|
||||||
|
|
@ -115,13 +115,13 @@ int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SVSnapshotWriter ========================================================
|
// SVSnapWriter ========================================================
|
||||||
int32_t vnodeSnapshotWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapshotWriter **ppWriter) {
|
int32_t vnodeSnapshotWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVSnapshotWriter *pWriter = NULL;
|
SVSnapWriter *pWriter = NULL;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pWriter = (SVSnapshotWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
|
pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
if (pWriter == NULL) {
|
if (pWriter == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
@ -136,13 +136,13 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeSnapshotWrite(SVSnapshotWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
int32_t vnodeSnapshotWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
// TODO
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeSnapshotWriterClose(SVSnapshotWriter *pWriter, int8_t rollback) {
|
int32_t vnodeSnapshotWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (!rollback) {
|
if (!rollback) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue