Merge pull request #29178 from taosdata/fix/main/TD-33227
Fix(cfg):can not restart taosd cause cfg trans exec failed.
This commit is contained in:
commit
40e246059e
|
@ -29,7 +29,7 @@ static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigObj *obj);
|
||||||
static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigObj *obj);
|
static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigObj *obj);
|
||||||
static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *oldItem, SConfigObj *newObj);
|
static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *oldItem, SConfigObj *newObj);
|
||||||
static int32_t mndCfgActionDeploy(SMnode *pMnode);
|
static int32_t mndCfgActionDeploy(SMnode *pMnode);
|
||||||
static int32_t mndCfgActionPrepare(SMnode *pMnode);
|
static int32_t mndCfgActionAfterRestored(SMnode *pMnode);
|
||||||
|
|
||||||
static int32_t mndProcessConfigReq(SRpcMsg *pReq);
|
static int32_t mndProcessConfigReq(SRpcMsg *pReq);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "mndConfig.h"
|
#include "mndConfig.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
@ -52,7 +53,7 @@ int32_t mndInitConfig(SMnode *pMnode) {
|
||||||
.updateFp = (SdbUpdateFp)mndCfgActionUpdate,
|
.updateFp = (SdbUpdateFp)mndCfgActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndCfgActionDelete,
|
.deleteFp = (SdbDeleteFp)mndCfgActionDelete,
|
||||||
.deployFp = (SdbDeployFp)mndCfgActionDeploy,
|
.deployFp = (SdbDeployFp)mndCfgActionDeploy,
|
||||||
.prepareFp = (SdbPrepareFp)mndCfgActionPrepare};
|
.afterRestoredFp = (SdbAfterRestoredFp)mndCfgActionAfterRestored};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
|
||||||
|
@ -213,7 +214,7 @@ static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *pOld, SConfigObj *pNew
|
||||||
|
|
||||||
static int32_t mndCfgActionDeploy(SMnode *pMnode) { return mndInitWriteCfg(pMnode); }
|
static int32_t mndCfgActionDeploy(SMnode *pMnode) { return mndInitWriteCfg(pMnode); }
|
||||||
|
|
||||||
static int32_t mndCfgActionPrepare(SMnode *pMnode) { return mndTryRebuildCfg(pMnode); }
|
static int32_t mndCfgActionAfterRestored(SMnode *pMnode) { return mndTryRebuildCfg(pMnode); }
|
||||||
|
|
||||||
static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
|
static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
@ -340,11 +341,15 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTryRebuildCfg(SMnode *pMnode) {
|
int32_t mndTryRebuildCfg(SMnode *pMnode) {
|
||||||
|
if (!mndIsLeader(pMnode)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t sz = -1;
|
int32_t sz = -1;
|
||||||
STrans *pTrans = NULL;
|
STrans *pTrans = NULL;
|
||||||
SAcctObj *vObj = NULL, *obj = NULL;
|
SAcctObj *vObj = NULL, *obj = NULL;
|
||||||
SArray *addArray = NULL;
|
SArray *addArray = NULL;
|
||||||
|
|
||||||
vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion");
|
vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion");
|
||||||
if (vObj == NULL) {
|
if (vObj == NULL) {
|
||||||
if ((code = mndInitWriteCfg(pMnode)) < 0) goto _exit;
|
if ((code = mndInitWriteCfg(pMnode)) < 0) goto _exit;
|
||||||
|
|
|
@ -797,14 +797,7 @@ int32_t mndStart(SMnode *pMnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mndSetRestored(pMnode, true);
|
mndSetRestored(pMnode, true);
|
||||||
} else {
|
|
||||||
if (sdbPrepare(pMnode->pSdb) != 0) {
|
|
||||||
mError("failed to prepare sdb while start mnode");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
mndSetRestored(pMnode, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
grantReset(pMnode, TSDB_GRANT_ALL, 0);
|
grantReset(pMnode, TSDB_GRANT_ALL, 0);
|
||||||
|
|
||||||
return mndInitTimer(pMnode);
|
return mndInitTimer(pMnode);
|
||||||
|
|
|
@ -14,11 +14,11 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndSync.h"
|
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
|
#include "mndStream.h"
|
||||||
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndStream.h"
|
|
||||||
|
|
||||||
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
if (pMsg == NULL || pMsg->pCont == NULL) {
|
if (pMsg == NULL || pMsg->pCont == NULL) {
|
||||||
|
@ -309,6 +309,9 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:1, sync restore finished, repeat call");
|
mInfo("vgId:1, sync restore finished, repeat call");
|
||||||
}
|
}
|
||||||
|
if (sdbAfterRestored(pMnode->pSdb) != 0) {
|
||||||
|
mError("failed to prepare sdb while start mnode");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:1, sync restore finished");
|
mInfo("vgId:1, sync restore finished");
|
||||||
}
|
}
|
||||||
|
@ -507,7 +510,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
mError("failed to open sync, tsem_init, since %s", tstrerror(code));
|
mError("failed to open sync, tsem_init, since %s", tstrerror(code));
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
pMgmt->sync = syncOpen(&syncInfo, 1); // always check
|
pMgmt->sync = syncOpen(&syncInfo, 1); // always check
|
||||||
if (pMgmt->sync <= 0) {
|
if (pMgmt->sync <= 0) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
mError("failed to open sync since %s", tstrerror(code));
|
mError("failed to open sync since %s", tstrerror(code));
|
||||||
|
@ -546,7 +549,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
|
||||||
// pMgmt->transSeq = 0;
|
// pMgmt->transSeq = 0;
|
||||||
// terrno = TSDB_CODE_SYN_TIMEOUT;
|
// terrno = TSDB_CODE_SYN_TIMEOUT;
|
||||||
// pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
// pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
||||||
//if (tsem_post(&pMgmt->syncSem) < 0) {
|
// if (tsem_post(&pMgmt->syncSem) < 0) {
|
||||||
// mError("failed to post sem");
|
// mError("failed to post sem");
|
||||||
//}
|
//}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -117,7 +117,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
|
||||||
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
||||||
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
||||||
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
||||||
typedef int32_t (*SdbPrepareFp)(SMnode *pMnode);
|
typedef int32_t (*SdbAfterRestoredFp)(SMnode *pMnode);
|
||||||
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, SSdbRaw *pRaw);
|
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, SSdbRaw *pRaw);
|
||||||
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
||||||
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
||||||
|
@ -188,31 +188,31 @@ typedef struct SSdbRow {
|
||||||
} SSdbRow;
|
} SSdbRow;
|
||||||
|
|
||||||
typedef struct SSdb {
|
typedef struct SSdb {
|
||||||
SMnode *pMnode;
|
SMnode *pMnode;
|
||||||
SWal *pWal;
|
SWal *pWal;
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
char *currDir;
|
char *currDir;
|
||||||
char *tmpDir;
|
char *tmpDir;
|
||||||
int64_t commitIndex;
|
int64_t commitIndex;
|
||||||
int64_t commitTerm;
|
int64_t commitTerm;
|
||||||
int64_t commitConfig;
|
int64_t commitConfig;
|
||||||
int64_t applyIndex;
|
int64_t applyIndex;
|
||||||
int64_t applyTerm;
|
int64_t applyTerm;
|
||||||
int64_t applyConfig;
|
int64_t applyConfig;
|
||||||
int64_t tableVer[SDB_MAX];
|
int64_t tableVer[SDB_MAX];
|
||||||
int64_t maxId[SDB_MAX];
|
int64_t maxId[SDB_MAX];
|
||||||
EKeyType keyTypes[SDB_MAX];
|
EKeyType keyTypes[SDB_MAX];
|
||||||
SHashObj *hashObjs[SDB_MAX];
|
SHashObj *hashObjs[SDB_MAX];
|
||||||
TdThreadRwlock locks[SDB_MAX];
|
TdThreadRwlock locks[SDB_MAX];
|
||||||
SdbInsertFp insertFps[SDB_MAX];
|
SdbInsertFp insertFps[SDB_MAX];
|
||||||
SdbUpdateFp updateFps[SDB_MAX];
|
SdbUpdateFp updateFps[SDB_MAX];
|
||||||
SdbDeleteFp deleteFps[SDB_MAX];
|
SdbDeleteFp deleteFps[SDB_MAX];
|
||||||
SdbDeployFp deployFps[SDB_MAX];
|
SdbDeployFp deployFps[SDB_MAX];
|
||||||
SdbPrepareFp prepareFps[SDB_MAX];
|
SdbAfterRestoredFp afterRestoredFps[SDB_MAX];
|
||||||
SdbEncodeFp encodeFps[SDB_MAX];
|
SdbEncodeFp encodeFps[SDB_MAX];
|
||||||
SdbDecodeFp decodeFps[SDB_MAX];
|
SdbDecodeFp decodeFps[SDB_MAX];
|
||||||
SdbValidateFp validateFps[SDB_MAX];
|
SdbValidateFp validateFps[SDB_MAX];
|
||||||
TdThreadMutex filelock;
|
TdThreadMutex filelock;
|
||||||
} SSdb;
|
} SSdb;
|
||||||
|
|
||||||
typedef struct SSdbIter {
|
typedef struct SSdbIter {
|
||||||
|
@ -222,16 +222,16 @@ typedef struct SSdbIter {
|
||||||
} SSdbIter;
|
} SSdbIter;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
ESdbType sdbType;
|
ESdbType sdbType;
|
||||||
EKeyType keyType;
|
EKeyType keyType;
|
||||||
SdbDeployFp deployFp;
|
SdbDeployFp deployFp;
|
||||||
SdbPrepareFp prepareFp;
|
SdbAfterRestoredFp afterRestoredFp;
|
||||||
SdbEncodeFp encodeFp;
|
SdbEncodeFp encodeFp;
|
||||||
SdbDecodeFp decodeFp;
|
SdbDecodeFp decodeFp;
|
||||||
SdbInsertFp insertFp;
|
SdbInsertFp insertFp;
|
||||||
SdbUpdateFp updateFp;
|
SdbUpdateFp updateFp;
|
||||||
SdbDeleteFp deleteFp;
|
SdbDeleteFp deleteFp;
|
||||||
SdbValidateFp validateFp;
|
SdbValidateFp validateFp;
|
||||||
} SSdbTable;
|
} SSdbTable;
|
||||||
|
|
||||||
typedef struct SSdbOpt {
|
typedef struct SSdbOpt {
|
||||||
|
@ -279,7 +279,7 @@ int32_t sdbDeploy(SSdb *pSdb);
|
||||||
* @param pSdb The sdb object.
|
* @param pSdb The sdb object.
|
||||||
* @return int32_t 0 for success, -1 for failure.
|
* @return int32_t 0 for success, -1 for failure.
|
||||||
*/
|
*/
|
||||||
int32_t sdbPrepare(SSdb *pSdb);
|
int32_t sdbAfterRestored(SSdb *pSdb);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Load sdb from file.
|
* @brief Load sdb from file.
|
||||||
|
|
|
@ -127,7 +127,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||||
pSdb->deployFps[sdbType] = table.deployFp;
|
pSdb->deployFps[sdbType] = table.deployFp;
|
||||||
pSdb->encodeFps[sdbType] = table.encodeFp;
|
pSdb->encodeFps[sdbType] = table.encodeFp;
|
||||||
pSdb->decodeFps[sdbType] = table.decodeFp;
|
pSdb->decodeFps[sdbType] = table.decodeFp;
|
||||||
pSdb->prepareFps[sdbType] = table.prepareFp;
|
pSdb->afterRestoredFps[sdbType] = table.afterRestoredFp;
|
||||||
pSdb->validateFps[sdbType] = table.validateFp;
|
pSdb->validateFps[sdbType] = table.validateFp;
|
||||||
|
|
||||||
int32_t hashType = 0;
|
int32_t hashType = 0;
|
||||||
|
|
|
@ -48,12 +48,12 @@ static int32_t sdbDeployData(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbPrepareData(SSdb *pSdb) {
|
static int32_t sdbAfterRestoredData(SSdb *pSdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
mInfo("start to prepare sdb");
|
mInfo("start to prepare sdb");
|
||||||
|
|
||||||
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
||||||
SdbPrepareFp fp = pSdb->prepareFps[i];
|
SdbAfterRestoredFp fp = pSdb->afterRestoredFps[i];
|
||||||
if (fp == NULL) continue;
|
if (fp == NULL) continue;
|
||||||
|
|
||||||
mInfo("start to prepare sdb:%s", sdbTableName(i));
|
mInfo("start to prepare sdb:%s", sdbTableName(i));
|
||||||
|
@ -666,9 +666,9 @@ int32_t sdbDeploy(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbPrepare(SSdb *pSdb) {
|
int32_t sdbAfterRestored(SSdb *pSdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
code = sdbPrepareData(pSdb);
|
code = sdbAfterRestoredData(pSdb);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue