refact
This commit is contained in:
parent
8c75980679
commit
0633c25a22
|
|
@ -58,8 +58,31 @@ void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
|
||||||
int32_t tsdbBegin2(STsdb *pTsdb);
|
int32_t tsdbBegin2(STsdb *pTsdb);
|
||||||
int32_t tsdbCommit2(STsdb *pTsdb);
|
int32_t tsdbCommit2(STsdb *pTsdb);
|
||||||
|
|
||||||
|
// tsdbFile.c ==============================================================================================
|
||||||
|
typedef int32_t TSDB_FILE_T;
|
||||||
|
typedef struct SDFInfo SDFInfo;
|
||||||
|
typedef struct SDFile SDFile;
|
||||||
|
typedef struct SDFileSet SDFileSet;
|
||||||
|
|
||||||
|
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
||||||
|
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile);
|
||||||
|
int tsdbOpenDFile(SDFile *pDFile, int flags);
|
||||||
|
void tsdbCloseDFile(SDFile *pDFile);
|
||||||
|
int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence);
|
||||||
|
int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte);
|
||||||
|
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm);
|
||||||
|
int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset);
|
||||||
|
int tsdbRemoveDFile(SDFile *pDFile);
|
||||||
|
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte);
|
||||||
|
int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest);
|
||||||
|
int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
|
||||||
|
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
|
||||||
|
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
|
||||||
|
int tsdbUpdateDFileHeader(SDFile *pDFile);
|
||||||
|
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
|
||||||
|
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
|
||||||
|
|
||||||
// tsdbMemTable ================
|
// tsdbMemTable ================
|
||||||
typedef struct STsdbRow STsdbRow;
|
|
||||||
typedef struct STbData STbData;
|
typedef struct STbData STbData;
|
||||||
typedef struct STsdbMemTable STsdbMemTable;
|
typedef struct STsdbMemTable STsdbMemTable;
|
||||||
typedef struct SMergeInfo SMergeInfo;
|
typedef struct SMergeInfo SMergeInfo;
|
||||||
|
|
@ -75,10 +98,6 @@ int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIte
|
||||||
// tsdbFS ================
|
// tsdbFS ================
|
||||||
typedef struct STsdbFS STsdbFS;
|
typedef struct STsdbFS STsdbFS;
|
||||||
|
|
||||||
// tsdbSma ================
|
|
||||||
typedef struct SSmaEnv SSmaEnv;
|
|
||||||
typedef struct SSmaEnvs SSmaEnvs;
|
|
||||||
|
|
||||||
// structs
|
// structs
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int minFid;
|
int minFid;
|
||||||
|
|
@ -113,7 +132,7 @@ struct STable {
|
||||||
#define TABLE_TID(t) (t)->tid
|
#define TABLE_TID(t) (t)->tid
|
||||||
#define TABLE_UID(t) (t)->uid
|
#define TABLE_UID(t) (t)->uid
|
||||||
|
|
||||||
int tsdbPrepareCommit(STsdb *pTsdb);
|
// int tsdbPrepareCommit(STsdb *pTsdb);
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FILE_HEAD = 0, // .head
|
TSDB_FILE_HEAD = 0, // .head
|
||||||
TSDB_FILE_DATA, // .data
|
TSDB_FILE_DATA, // .data
|
||||||
|
|
@ -124,7 +143,7 @@ typedef enum {
|
||||||
TSDB_FILE_META, // meta
|
TSDB_FILE_META, // meta
|
||||||
} E_TSDB_FILE_T;
|
} E_TSDB_FILE_T;
|
||||||
|
|
||||||
typedef struct {
|
struct SDFInfo {
|
||||||
uint32_t magic;
|
uint32_t magic;
|
||||||
uint32_t fver;
|
uint32_t fver;
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
|
|
@ -133,22 +152,22 @@ typedef struct {
|
||||||
uint32_t offset;
|
uint32_t offset;
|
||||||
uint64_t size;
|
uint64_t size;
|
||||||
uint64_t tombSize;
|
uint64_t tombSize;
|
||||||
} SDFInfo;
|
};
|
||||||
|
|
||||||
typedef struct {
|
struct SDFile {
|
||||||
SDFInfo info;
|
SDFInfo info;
|
||||||
STfsFile f;
|
STfsFile f;
|
||||||
TdFilePtr pFile;
|
TdFilePtr pFile;
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
} SDFile;
|
};
|
||||||
|
|
||||||
typedef struct {
|
struct SDFileSet {
|
||||||
int fid;
|
int fid;
|
||||||
int8_t state; // -128~127
|
int8_t state; // -128~127
|
||||||
uint8_t ver; // 0~255, DFileSet version
|
uint8_t ver; // 0~255, DFileSet version
|
||||||
uint16_t reserve;
|
uint16_t reserve;
|
||||||
SDFile files[TSDB_FILE_MAX];
|
SDFile files[TSDB_FILE_MAX];
|
||||||
} SDFileSet;
|
};
|
||||||
|
|
||||||
struct STbData {
|
struct STbData {
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
|
|
@ -521,7 +540,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
||||||
#define TSDB_FILE_STATE_OK 0
|
#define TSDB_FILE_STATE_OK 0
|
||||||
#define TSDB_FILE_STATE_BAD 1
|
#define TSDB_FILE_STATE_BAD 1
|
||||||
|
|
||||||
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
|
||||||
#define TSDB_FILE_F(tf) (&((tf)->f))
|
#define TSDB_FILE_F(tf) (&((tf)->f))
|
||||||
#define TSDB_FILE_PFILE(tf) ((tf)->pFile)
|
#define TSDB_FILE_PFILE(tf) ((tf)->pFile)
|
||||||
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
||||||
|
|
@ -539,7 +557,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
||||||
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
||||||
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
||||||
|
|
||||||
typedef int32_t TSDB_FILE_T;
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FS_VER_0 = 0,
|
TSDB_FS_VER_0 = 0,
|
||||||
TSDB_FS_VER_MAX,
|
TSDB_FS_VER_MAX,
|
||||||
|
|
@ -560,112 +577,6 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
|
||||||
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile);
|
|
||||||
int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
|
|
||||||
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
|
|
||||||
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
|
|
||||||
int tsdbUpdateDFileHeader(SDFile *pDFile);
|
|
||||||
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
|
|
||||||
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbSetDFileInfo(SDFile *pDFile, SDFInfo *pInfo) { pDFile->info = *pInfo; }
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbOpenDFile(SDFile *pDFile, int flags) {
|
|
||||||
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
|
|
||||||
if (pDFile->pFile == NULL) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbCloseDFile(SDFile *pDFile) {
|
|
||||||
if (TSDB_FILE_OPENED(pDFile)) {
|
|
||||||
taosCloseFile(&pDFile->pFile);
|
|
||||||
TSDB_FILE_SET_CLOSED(pDFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
|
|
||||||
// ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
|
|
||||||
if (loffset < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return loffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
|
|
||||||
if (nwrite < nbyte) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return nwrite;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
|
|
||||||
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t toffset;
|
|
||||||
|
|
||||||
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pDFile->info.size == toffset);
|
|
||||||
|
|
||||||
if (offset) {
|
|
||||||
*offset = toffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDFile->info.size += nbyte;
|
|
||||||
|
|
||||||
return (int)nbyte;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRemoveDFile(SDFile *pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
|
|
||||||
if (nread < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return nread;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest) {
|
|
||||||
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbSetDFileInfo(pDest, TSDB_FILE_INFO(pSrc));
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// =============== SDFileSet
|
// =============== SDFileSet
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
||||||
|
|
@ -218,15 +218,15 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbPrepareCommit(STsdb *pTsdb) {
|
// int tsdbPrepareCommit(STsdb *pTsdb) {
|
||||||
if (pTsdb->mem == NULL) return 0;
|
// if (pTsdb->mem == NULL) return 0;
|
||||||
|
|
||||||
ASSERT(pTsdb->imem == NULL);
|
// ASSERT(pTsdb->imem == NULL);
|
||||||
|
|
||||||
pTsdb->imem = pTsdb->mem;
|
// pTsdb->imem = pTsdb->mem;
|
||||||
pTsdb->mem = NULL;
|
// pTsdb->mem = NULL;
|
||||||
return 0;
|
// return 0;
|
||||||
}
|
// }
|
||||||
|
|
||||||
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
|
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
|
||||||
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
|
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
|
||||||
|
|
|
||||||
|
|
@ -448,3 +448,98 @@ static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, c
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbOpenDFile(SDFile *pDFile, int flags) {
|
||||||
|
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
|
||||||
|
if (pDFile->pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbCloseDFile(SDFile *pDFile) {
|
||||||
|
if (TSDB_FILE_OPENED(pDFile)) {
|
||||||
|
taosCloseFile(&pDFile->pFile);
|
||||||
|
TSDB_FILE_SET_CLOSED(pDFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
|
||||||
|
// ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
|
||||||
|
if (loffset < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return loffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
|
||||||
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
|
||||||
|
if (nwrite < nbyte) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nwrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
|
||||||
|
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
|
||||||
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t toffset;
|
||||||
|
|
||||||
|
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pDFile->info.size == toffset);
|
||||||
|
|
||||||
|
if (offset) {
|
||||||
|
*offset = toffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDFile->info.size += nbyte;
|
||||||
|
|
||||||
|
return (int)nbyte;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbRemoveDFile(SDFile *pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
|
||||||
|
|
||||||
|
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
|
||||||
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
|
||||||
|
if (nread < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nread;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest) {
|
||||||
|
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDest->info = pSrc->info;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue