set file version correctly
This commit is contained in:
parent
c094c957f6
commit
530aa3e5e7
|
@ -57,7 +57,7 @@ typedef struct {
|
||||||
// if name is empty(name[0] is zero), get the file from index or after, used by master
|
// if name is empty(name[0] is zero), get the file from index or after, used by master
|
||||||
// if name is provided(name[0] is not zero), get the named file at the specified index, used by unsynced node
|
// if name is provided(name[0] is not zero), get the named file at the specified index, used by unsynced node
|
||||||
// it returns the file magic number and size, if file not there, magic shall be 0.
|
// it returns the file magic number and size, if file not there, magic shall be 0.
|
||||||
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size);
|
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
|
||||||
|
|
||||||
// get the wal file from index or after
|
// get the wal file from index or after
|
||||||
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
|
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
|
||||||
|
@ -73,7 +73,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
|
||||||
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
|
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
|
||||||
|
|
||||||
// when data file is synced successfully, notity app
|
// when data file is synced successfully, notity app
|
||||||
typedef void (*FNotifyFileSynced)(void *ahandle);
|
typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId; // vgroup ID
|
int32_t vgId; // vgroup ID
|
||||||
|
|
|
@ -37,8 +37,8 @@ typedef struct {
|
||||||
int32_t refCount; // reference count
|
int32_t refCount; // reference count
|
||||||
int status;
|
int status;
|
||||||
int8_t role;
|
int8_t role;
|
||||||
int64_t version;
|
int64_t version; // current version
|
||||||
int64_t savedVersion;
|
int64_t fversion; // version on saved data file
|
||||||
void *wqueue;
|
void *wqueue;
|
||||||
void *rqueue;
|
void *rqueue;
|
||||||
void *wal;
|
void *wal;
|
||||||
|
@ -46,11 +46,11 @@ typedef struct {
|
||||||
void *sync;
|
void *sync;
|
||||||
void *events;
|
void *events;
|
||||||
void *cq; // continuous query
|
void *cq; // continuous query
|
||||||
int32_t cfgVersion;
|
int32_t cfgVersion;
|
||||||
STsdbCfg tsdbCfg;
|
STsdbCfg tsdbCfg;
|
||||||
SSyncCfg syncCfg;
|
SSyncCfg syncCfg;
|
||||||
SWalCfg walCfg;
|
SWalCfg walCfg;
|
||||||
char * rootDir;
|
char *rootDir;
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
int vnodeWriteToQueue(void *param, void *pHead, int type);
|
int vnodeWriteToQueue(void *param, void *pHead, int type);
|
||||||
|
|
|
@ -196,6 +196,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeReadVersion(pVnode);
|
vnodeReadVersion(pVnode);
|
||||||
|
pVnode->fversion = pVnode->version;
|
||||||
|
|
||||||
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
||||||
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||||
|
@ -394,7 +395,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
|
||||||
SVnodeObj *pVnode = arg;
|
SVnodeObj *pVnode = arg;
|
||||||
|
|
||||||
if (status == TSDB_STATUS_COMMIT_START) {
|
if (status == TSDB_STATUS_COMMIT_START) {
|
||||||
pVnode->savedVersion = pVnode->version;
|
pVnode->fversion = pVnode->version;
|
||||||
return walRenew(pVnode->wal);
|
return walRenew(pVnode->wal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,8 +405,9 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) {
|
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {
|
||||||
SVnodeObj *pVnode = ahandle;
|
SVnodeObj *pVnode = ahandle;
|
||||||
|
*fversion = pVnode->fversion;
|
||||||
return tsdbGetFileInfo(pVnode->tsdb, name, index, size);
|
return tsdbGetFileInfo(pVnode->tsdb, name, index, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,12 +427,17 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||||
cqStop(pVnode->cq);
|
cqStop(pVnode->cq);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeNotifyFileSynced(void *ahandle) {
|
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
|
||||||
SVnodeObj *pVnode = ahandle;
|
SVnodeObj *pVnode = ahandle;
|
||||||
vTrace("vgId:%d, data file is synced", pVnode->vgId);
|
vTrace("vgId:%d, data file is synced", pVnode->vgId);
|
||||||
|
|
||||||
|
pVnode->fversion = fversion;
|
||||||
|
pVnode->version = fversion;
|
||||||
|
vnodeSaveVersion(pVnode);
|
||||||
|
|
||||||
char rootDir[128] = "\0";
|
char rootDir[128] = "\0";
|
||||||
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
|
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
|
||||||
|
|
||||||
// close tsdb, then open tsdb
|
// close tsdb, then open tsdb
|
||||||
tsdbCloseRepo(pVnode->tsdb);
|
tsdbCloseRepo(pVnode->tsdb);
|
||||||
STsdbAppH appH = {0};
|
STsdbAppH appH = {0};
|
||||||
|
@ -706,14 +713,14 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
||||||
char * content = calloc(1, maxLen + 1);
|
char * content = calloc(1, maxLen + 1);
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->savedVersion);
|
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion);
|
||||||
len += snprintf(content + len, maxLen - len, "}\n");
|
len += snprintf(content + len, maxLen - len, "}\n");
|
||||||
|
|
||||||
fwrite(content, 1, len, fp);
|
fwrite(content, 1, len, fp);
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
free(content);
|
free(content);
|
||||||
|
|
||||||
vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->savedVersion);
|
vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue