diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 52407dc7a4..fbed164839 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -157,8 +157,8 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char if (rpcRsp.code != 0) { dError("user:%s, auth msg received from mnode, error:%s", user, tstrerror(rpcRsp.code)); } else { - dTrace("user:%s, auth msg received from mnode", user); SDMAuthRsp *pRsp = rpcRsp.pCont; + dTrace("user:%s, auth msg received from mnode", user); memcpy(secret, pRsp->secret, TSDB_KEY_LEN); memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN); *spi = pRsp->spi; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 0d6004bba5..137b97e287 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -57,7 +57,7 @@ typedef struct { // if name is empty(name[0] is zero), get the file from index or after, used by master // if name is provided(name[0] is not zero), get the named file at the specified index, used by unsynced node // it returns the file magic number and size, if file not there, magic shall be 0. -typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size); +typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion); // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file @@ -73,7 +73,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FNotifyRole)(void *ahandle, int8_t role); // when data file is synced successfully, notity app -typedef void (*FNotifyFileSynced)(void *ahandle); +typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef struct { int32_t vgId; // vgroup ID diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 0520bf8493..c0d389f305 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -185,7 +185,7 @@ void sdbUpdateMnodeRoles() { } } -static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { +static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) { sdbUpdateMnodeRoles(); return 0; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 3e638eb3d3..7e80f0d282 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -898,9 +898,9 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { if (pContext->pRsp) { // for synchronous API - tsem_post(pContext->pSem); memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); + tsem_post(pContext->pSem); } else { // for asynchronous API SRpcIpSet *pIpSet = NULL; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index d41957d8f4..dea9369dd8 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,8 +37,8 @@ typedef struct { int32_t refCount; // reference count int status; int8_t role; - int64_t version; - int64_t savedVersion; + int64_t version; // current version + int64_t fversion; // version on saved data file void *wqueue; void *rqueue; void *wal; @@ -46,11 +46,11 @@ typedef struct { void *sync; void *events; void *cq; // continuous query - int32_t cfgVersion; - STsdbCfg tsdbCfg; - SSyncCfg syncCfg; - SWalCfg walCfg; - char * rootDir; + int32_t cfgVersion; + STsdbCfg tsdbCfg; + SSyncCfg syncCfg; + SWalCfg walCfg; + char *rootDir; } SVnodeObj; int vnodeWriteToQueue(void *param, void *pHead, int type); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3b27344f20..0f92be0967 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -37,10 +37,10 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static bool vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); -static void vnodeNotifyFileSynced(void *ahandle); +static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; @@ -196,6 +196,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } vnodeReadVersion(pVnode); + pVnode->fversion = pVnode->version; pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); @@ -394,7 +395,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { SVnodeObj *pVnode = arg; if (status == TSDB_STATUS_COMMIT_START) { - pVnode->savedVersion = pVnode->version; + pVnode->fversion = pVnode->version; return walRenew(pVnode->wal); } @@ -404,8 +405,9 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { return 0; } -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) { SVnodeObj *pVnode = ahandle; + *fversion = pVnode->fversion; return tsdbGetFileInfo(pVnode->tsdb, name, index, size); } @@ -425,10 +427,14 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } -static void vnodeNotifyFileSynced(void *ahandle) { +static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { SVnodeObj *pVnode = ahandle; vTrace("vgId:%d, data file is synced", pVnode->vgId); + pVnode->fversion = fversion; + pVnode->version = fversion; + vnodeSaveVersion(pVnode); + char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); // clsoe tsdb, then open tsdb @@ -706,14 +712,14 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { char * content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->savedVersion); + len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); fclose(fp); free(content); - vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->savedVersion); + vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion); return 0; }