minor changes
This commit is contained in:
parent
ed9709c3f0
commit
cc5563716c
|
@ -19,6 +19,7 @@
|
||||||
#include "mallocator.h"
|
#include "mallocator.h"
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
|
#include "tfs.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
#include "tfs.h"
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
||||||
#define TSDB_FILE_F(tf) (&((tf)->f))
|
#define TSDB_FILE_F(tf) (&((tf)->f))
|
||||||
#define TSDB_FILE_FD(tf) ((tf)->fd)
|
#define TSDB_FILE_FD(tf) ((tf)->fd)
|
||||||
#define TSDB_FILE_FULL_NAME(tf) TFILE_NAME(TSDB_FILE_F(tf))
|
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
||||||
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0)
|
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0)
|
||||||
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
|
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
|
||||||
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
|
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
|
||||||
|
@ -181,10 +181,10 @@ typedef struct {
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
} SDFile;
|
} SDFile;
|
||||||
|
|
||||||
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
||||||
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
|
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
|
||||||
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
||||||
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
|
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
|
||||||
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
|
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
|
||||||
int tsdbUpdateDFileHeader(SDFile* pDFile);
|
int tsdbUpdateDFileHeader(SDFile* pDFile);
|
||||||
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
|
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
|
||||||
|
@ -311,10 +311,10 @@ typedef struct {
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver);
|
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet* pSet, SDiskID did, int fid, uint32_t ver);
|
||||||
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
||||||
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
|
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
|
||||||
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
|
void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet);
|
||||||
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
|
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
|
||||||
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
|
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
|
||||||
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
|
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
|
||||||
|
|
|
@ -104,7 +104,7 @@ int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
||||||
|
|
||||||
if (did.level > TSDB_FSET_LEVEL(pSet)) {
|
if (did.level > TSDB_FSET_LEVEL(pSet)) {
|
||||||
// Need to move the FSET to higher level
|
// Need to move the FSET to higher level
|
||||||
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
|
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
|
||||||
|
|
||||||
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
|
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
|
||||||
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
|
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
|
||||||
|
@ -482,7 +482,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
// Set and open commit FSET
|
// Set and open commit FSET
|
||||||
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
|
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
|
||||||
// Create a new FSET to write data
|
// Create a new FSET to write data
|
||||||
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
|
tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)));
|
||||||
|
|
||||||
if (tsdbCreateDFileSet(pWSet, true) < 0) {
|
if (tsdbCreateDFileSet(pWSet, true) < 0) {
|
||||||
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
|
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
|
||||||
|
@ -507,7 +507,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
|
|
||||||
// TSDB_FILE_HEAD
|
// TSDB_FILE_HEAD
|
||||||
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
||||||
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
|
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
|
||||||
if (tsdbCreateDFile(pWHeadf, true) < 0) {
|
if (tsdbCreateDFile(pWHeadf, true) < 0) {
|
||||||
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
|
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
|
@ -554,7 +554,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
|
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
|
||||||
pCommith->isLFileSame = false;
|
pCommith->isLFileSame = false;
|
||||||
|
|
||||||
if (tsdbCreateDFile(pWLastf, true) < 0) {
|
if (tsdbCreateDFile(pWLastf, true) < 0) {
|
||||||
|
|
|
@ -97,7 +97,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
|
static void *tsdbDecodeDFileSetArray(STsdb*pRepo, void *buf, SArray *pArray) {
|
||||||
uint64_t nset;
|
uint64_t nset;
|
||||||
SDFileSet dset;
|
SDFileSet dset;
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
|
||||||
|
|
||||||
buf = taosDecodeFixedU64(buf, &nset);
|
buf = taosDecodeFixedU64(buf, &nset);
|
||||||
for (size_t i = 0; i < nset; i++) {
|
for (size_t i = 0; i < nset; i++) {
|
||||||
buf = tsdbDecodeDFileSet(buf, &dset);
|
buf = tsdbDecodeDFileSet(pRepo, buf, &dset);
|
||||||
taosArrayPush(pArray, (void *)(&dset));
|
taosArrayPush(pArray, (void *)(&dset));
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
|
@ -122,13 +122,13 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
|
static void *tsdbDecodeFSStatus(STsdb*pRepo, void *buf, SFSStatus *pStatus) {
|
||||||
tsdbResetFSStatus(pStatus);
|
tsdbResetFSStatus(pStatus);
|
||||||
|
|
||||||
// pStatus->pmf = &(pStatus->mf);
|
// pStatus->pmf = &(pStatus->mf);
|
||||||
|
|
||||||
// buf = tsdbDecodeSMFile(buf, pStatus->pmf);
|
// buf = tsdbDecodeSMFile(buf, pStatus->pmf);
|
||||||
buf = tsdbDecodeDFileSetArray(buf, pStatus->df);
|
buf = tsdbDecodeDFileSetArray(pRepo, buf, pStatus->df);
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -725,7 +725,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ptr = buffer;
|
ptr = buffer;
|
||||||
ptr = tsdbDecodeFSStatus(ptr, pStatus);
|
ptr = tsdbDecodeFSStatus(pRepo, ptr, pStatus);
|
||||||
} else {
|
} else {
|
||||||
tsdbResetFSStatus(pStatus);
|
tsdbResetFSStatus(pStatus);
|
||||||
}
|
}
|
||||||
|
@ -913,7 +913,7 @@ static int tsdbScanRootDir(STsdb *pRepo) {
|
||||||
const STfsFile *pf;
|
const STfsFile *pf;
|
||||||
|
|
||||||
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
|
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
|
||||||
STfsDir *tdir = tfsOpendir(rootDir);
|
STfsDir *tdir = tfsOpendir(pRepo->pTfs, rootDir);
|
||||||
if (tdir == NULL) {
|
if (tdir == NULL) {
|
||||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
|
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -947,7 +947,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
|
||||||
const STfsFile *pf;
|
const STfsFile *pf;
|
||||||
|
|
||||||
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
|
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
|
||||||
STfsDir *tdir = tfsOpendir(dataDir);
|
STfsDir *tdir = tfsOpendir(pRepo->pTfs, dataDir);
|
||||||
if (tdir == NULL) {
|
if (tdir == NULL) {
|
||||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
|
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -295,7 +295,7 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// ============== Operations on SDFile
|
// ============== Operations on SDFile
|
||||||
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
|
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK);
|
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK);
|
||||||
|
@ -305,8 +305,8 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
|
||||||
memset(&(pDFile->info), 0, sizeof(pDFile->info));
|
memset(&(pDFile->info), 0, sizeof(pDFile->info));
|
||||||
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
|
|
||||||
tsdbGetFilename(vid, fid, ver, ftype, fname);
|
tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname);
|
||||||
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
|
tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) {
|
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) {
|
||||||
|
@ -323,9 +323,9 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
|
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile) {
|
||||||
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
|
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
|
||||||
buf = tfsDecodeFile(buf, &(pDFile->f));
|
buf = tfsDecodeFile(pRepo->pTfs, buf, &(pDFile->f));
|
||||||
TSDB_FILE_SET_CLOSED(pDFile);
|
TSDB_FILE_SET_CLOSED(pDFile);
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
|
@ -559,13 +559,13 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============== Operations on SDFileSet
|
// ============== Operations on SDFileSet
|
||||||
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) {
|
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) {
|
||||||
pSet->fid = fid;
|
pSet->fid = fid;
|
||||||
pSet->state = 0;
|
pSet->state = 0;
|
||||||
|
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
|
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
|
||||||
tsdbInitDFile(pDFile, did, vid, fid, ver, ftype);
|
tsdbInitDFile(pRepo->pTfs, pDFile, did, fid, ver, ftype);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -587,14 +587,14 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
|
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
|
|
||||||
buf = taosDecodeFixedI32(buf, &(fid));
|
buf = taosDecodeFixedI32(buf, &(fid));
|
||||||
pSet->state = 0;
|
pSet->state = 0;
|
||||||
pSet->fid = fid;
|
pSet->fid = fid;
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
buf = tsdbDecodeSDFile(pRepo->pTfs, buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,7 +146,7 @@ void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname)
|
||||||
tstrncpy(pFile->rname, rname, TSDB_FILENAME_LEN);
|
tstrncpy(pFile->rname, rname, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
char tmpName[TMPNAME_LEN] = {0};
|
char tmpName[TMPNAME_LEN] = {0};
|
||||||
snprintf(tmpName, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TD_DIRSEP, rname);
|
snprintf(tmpName, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
|
||||||
tstrncpy(pFile->aname, tmpName, TSDB_FILENAME_LEN);
|
tstrncpy(pFile->aname, tmpName, TSDB_FILENAME_LEN);
|
||||||
pFile->pTfs = pTfs;
|
pFile->pTfs = pTfs;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue