enh: call walEndSnapshot while sdb writefile
This commit is contained in:
parent
3148df2107
commit
4da408afe6
|
@ -76,7 +76,6 @@ typedef struct {
|
||||||
} STelemMgmt;
|
} STelemMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SWal *pWal;
|
|
||||||
sem_t syncSem;
|
sem_t syncSem;
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
bool standby;
|
bool standby;
|
||||||
|
@ -109,6 +108,7 @@ typedef struct SMnode {
|
||||||
SQHandle *pQuery;
|
SQHandle *pQuery;
|
||||||
SHashObj *infosMeta;
|
SHashObj *infosMeta;
|
||||||
SHashObj *perfsMeta;
|
SHashObj *perfsMeta;
|
||||||
|
SWal *pWal;
|
||||||
SShowMgmt showMgmt;
|
SShowMgmt showMgmt;
|
||||||
SProfileMgmt profileMgmt;
|
SProfileMgmt profileMgmt;
|
||||||
STelemMgmt telemMgmt;
|
STelemMgmt telemMgmt;
|
||||||
|
|
|
@ -139,10 +139,40 @@ static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndInitWal(SMnode *pMnode) {
|
||||||
|
char path[PATH_MAX + 20] = {0};
|
||||||
|
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
||||||
|
SWalCfg cfg = {
|
||||||
|
.vgId = 1,
|
||||||
|
.fsyncPeriod = 0,
|
||||||
|
.rollPeriod = -1,
|
||||||
|
.segSize = -1,
|
||||||
|
.retentionPeriod = -1,
|
||||||
|
.retentionSize = -1,
|
||||||
|
.level = TAOS_WAL_FSYNC,
|
||||||
|
};
|
||||||
|
|
||||||
|
pMnode->pWal = walOpen(path, &cfg);
|
||||||
|
if (pMnode->pWal == NULL) {
|
||||||
|
mError("failed to open wal since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndCloseWal(SMnode *pMnode) {
|
||||||
|
if (pMnode->pWal != NULL) {
|
||||||
|
walClose(pMnode->pWal);
|
||||||
|
pMnode->pWal = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndInitSdb(SMnode *pMnode) {
|
static int32_t mndInitSdb(SMnode *pMnode) {
|
||||||
SSdbOpt opt = {0};
|
SSdbOpt opt = {0};
|
||||||
opt.path = pMnode->path;
|
opt.path = pMnode->path;
|
||||||
opt.pMnode = pMnode;
|
opt.pMnode = pMnode;
|
||||||
|
opt.pWal = pMnode->pWal;
|
||||||
|
|
||||||
pMnode->pSdb = sdbInit(&opt);
|
pMnode->pSdb = sdbInit(&opt);
|
||||||
if (pMnode->pSdb == NULL) {
|
if (pMnode->pSdb == NULL) {
|
||||||
|
@ -156,7 +186,6 @@ static int32_t mndOpenSdb(SMnode *pMnode) {
|
||||||
if (!pMnode->deploy) {
|
if (!pMnode->deploy) {
|
||||||
return sdbReadFile(pMnode->pSdb);
|
return sdbReadFile(pMnode->pSdb);
|
||||||
} else {
|
} else {
|
||||||
// return sdbDeploy(pMnode->pSdb);;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -182,6 +211,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndInitSteps(SMnode *pMnode) {
|
static int32_t mndInitSteps(SMnode *pMnode) {
|
||||||
|
if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
|
||||||
|
@ -201,7 +231,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
|
||||||
if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
|
||||||
|
|
|
@ -17,15 +17,27 @@
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
SMsgHead *pHead = pMsg->pCont;
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
|
|
||||||
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = tmsgSendReq(pEpSet, pMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
|
@ -89,8 +101,8 @@ void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
|
||||||
pMgmt->errCode = cbMeta.code;
|
pMgmt->errCode = cbMeta.code;
|
||||||
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64,
|
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
|
||||||
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
|
cbMeta.code, cbMeta.index, cbMeta.term);
|
||||||
|
|
||||||
if (pMgmt->transId == -1) {
|
if (pMgmt->transId == -1) {
|
||||||
if (pMgmt->errCode != 0) {
|
if (pMgmt->errCode != 0) {
|
||||||
|
@ -155,27 +167,9 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
int32_t mndInitSync(SMnode *pMnode) {
|
int32_t mndInitSync(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
|
||||||
char path[PATH_MAX + 20] = {0};
|
|
||||||
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
|
||||||
SWalCfg cfg = {
|
|
||||||
.vgId = 1,
|
|
||||||
.fsyncPeriod = 0,
|
|
||||||
.rollPeriod = -1,
|
|
||||||
.segSize = -1,
|
|
||||||
.retentionPeriod = -1,
|
|
||||||
.retentionSize = -1,
|
|
||||||
.level = TAOS_WAL_FSYNC,
|
|
||||||
};
|
|
||||||
|
|
||||||
pMgmt->pWal = walOpen(path, &cfg);
|
|
||||||
if (pMgmt->pWal == NULL) {
|
|
||||||
mError("failed to open wal since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||||
syncInfo.pWal = pMgmt->pWal;
|
syncInfo.pWal = pMnode->pWal;
|
||||||
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
||||||
syncInfo.isStandBy = pMgmt->standby;
|
syncInfo.isStandBy = pMgmt->standby;
|
||||||
syncInfo.snapshotEnable = true;
|
syncInfo.snapshotEnable = true;
|
||||||
|
@ -208,10 +202,6 @@ void mndCleanupSync(SMnode *pMnode) {
|
||||||
mDebug("mnode sync is stopped, id:%" PRId64, pMgmt->sync);
|
mDebug("mnode sync is stopped, id:%" PRId64, pMgmt->sync);
|
||||||
|
|
||||||
tsem_destroy(&pMgmt->syncSem);
|
tsem_destroy(&pMgmt->syncSem);
|
||||||
if (pMgmt->pWal != NULL) {
|
|
||||||
walClose(pMgmt->pWal);
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -165,6 +166,7 @@ typedef struct SSdbRow {
|
||||||
|
|
||||||
typedef struct SSdb {
|
typedef struct SSdb {
|
||||||
SMnode *pMnode;
|
SMnode *pMnode;
|
||||||
|
SWal *pWal;
|
||||||
char *currDir;
|
char *currDir;
|
||||||
char *tmpDir;
|
char *tmpDir;
|
||||||
int64_t lastCommitVer;
|
int64_t lastCommitVer;
|
||||||
|
@ -206,6 +208,7 @@ typedef struct {
|
||||||
typedef struct SSdbOpt {
|
typedef struct SSdbOpt {
|
||||||
const char *path;
|
const char *path;
|
||||||
SMnode *pMnode;
|
SMnode *pMnode;
|
||||||
|
SWal *pWal;
|
||||||
} SSdbOpt;
|
} SSdbOpt;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -52,10 +52,12 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
pSdb->keyTypes[i] = SDB_KEY_INT32;
|
pSdb->keyTypes[i] = SDB_KEY_INT32;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSdb->pWal = pOption->pWal;
|
||||||
pSdb->curVer = -1;
|
pSdb->curVer = -1;
|
||||||
pSdb->curTerm = -1;
|
pSdb->curTerm = -1;
|
||||||
pSdb->lastCommitVer = -1;
|
pSdb->lastCommitVer = -1;
|
||||||
pSdb->lastCommitTerm = -1;
|
pSdb->lastCommitTerm = -1;
|
||||||
|
pSdb->curConfig = -1;
|
||||||
pSdb->pMnode = pOption->pMnode;
|
pSdb->pMnode = pOption->pMnode;
|
||||||
taosThreadMutexInit(&pSdb->filelock, NULL);
|
taosThreadMutexInit(&pSdb->filelock, NULL);
|
||||||
mDebug("sdb init successfully");
|
mDebug("sdb init successfully");
|
||||||
|
|
|
@ -441,12 +441,23 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbWriteFile(SSdb *pSdb) {
|
int32_t sdbWriteFile(SSdb *pSdb) {
|
||||||
|
int32_t code = 0;
|
||||||
if (pSdb->curVer == pSdb->lastCommitVer) {
|
if (pSdb->curVer == pSdb->lastCommitVer) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
taosThreadMutexLock(&pSdb->filelock);
|
||||||
int32_t code = sdbWriteFileImp(pSdb);
|
if (pSdb->pWal != NULL) {
|
||||||
|
code = walBeginSnapshot(pSdb->pWal, pSdb->curVer);
|
||||||
|
}
|
||||||
|
if (code == 0) {
|
||||||
|
code = sdbWriteFileImp(pSdb);
|
||||||
|
}
|
||||||
|
if (code == 0) {
|
||||||
|
if (pSdb->pWal != NULL) {
|
||||||
|
code = walEndSnapshot(pSdb->pWal);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to write sdb file since %s", terrstr());
|
mError("failed to write sdb file since %s", terrstr());
|
||||||
}
|
}
|
||||||
|
|
|
@ -252,7 +252,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
SSyncInfo syncInfo = {
|
SSyncInfo syncInfo = {
|
||||||
.isStandBy = false,
|
.isStandBy = false,
|
||||||
.snapshotEnable = false,
|
.snapshotEnable = false,
|
||||||
.vgId = pVnode->config.vgId,
|
.vgId = pVnode->config.vgId,
|
||||||
.isStandBy = pVnode->config.standby,
|
.isStandBy = pVnode->config.standby,
|
||||||
|
|
Loading…
Reference in New Issue