add prepare func.
This commit is contained in:
parent
cc3f4504bf
commit
4a03483683
|
@ -45,10 +45,6 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void mmBuildConfigForDeploy(SMnodeMgmt *pMgmt) {
|
||||
persistGlobalConfig(getGlobalCfg(tsCfg), pMgmt->path, tsmmConfigVersion);
|
||||
}
|
||||
|
||||
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
|
||||
pOption->deploy = true;
|
||||
pOption->msgCb = pMgmt->msgCb;
|
||||
|
@ -124,7 +120,6 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
dInfo("mnode start to deploy");
|
||||
pMgmt->pData->dnodeId = 1;
|
||||
mmBuildOptionForDeploy(pMgmt, pInput, &option);
|
||||
mmBuildConfigForDeploy(pMgmt);
|
||||
} else {
|
||||
dInfo("mnode start to open");
|
||||
mmBuildOptionForOpen(pMgmt, &option);
|
||||
|
|
|
@ -21,12 +21,15 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
int32_t mndInitConfig(SMnode *pMnode);
|
||||
int32_t mndInitConfig(SMnode *pMnode);
|
||||
|
||||
SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg);
|
||||
SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigItem *item);
|
||||
static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigItem *item);
|
||||
static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *oldItem, SConfigItem *newItem);
|
||||
static int32_t mndCfgActionDeploy(SMnode *pMnode);
|
||||
static int32_t mndCfgActionPrepare(SMnode *pMnode);
|
||||
|
||||
static int32_t mndProcessConfigReq(SRpcMsg *pReq);
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -29,28 +29,22 @@ int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item);
|
|||
|
||||
int32_t mndInitConfig(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_CFG,
|
||||
.keyType = SDB_KEY_INT32,
|
||||
.encodeFp = (SdbEncodeFp)mnCfgActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndCfgActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndCfgActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndCfgActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndCfgActionDelete,
|
||||
};
|
||||
|
||||
if (pMnode->deploy) {
|
||||
mndInitWriteCfg(pMnode);
|
||||
} else {
|
||||
mndInitReadCfg(pMnode);
|
||||
}
|
||||
SSdbTable table = {.sdbType = SDB_CFG,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.encodeFp = (SdbEncodeFp)mnCfgActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndCfgActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndCfgActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndCfgActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndCfgActionDelete,
|
||||
.deployFp = (SdbDeployFp)mndCfgActionDeploy,
|
||||
.prepareFp = (SdbPrepareFp)mndCfgActionPrepare};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg) {
|
||||
SSdbRaw *mnCfgActionEncode(SConfigItem *pItem) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -60,26 +54,25 @@ SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg) {
|
|||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SConfigItem *item = NULL;
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(item->name), _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, item->name, strlen(item->name), _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, item->dtype, _OVER)
|
||||
switch (item->dtype) {
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(pItem->name), _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pItem->name, strlen(pItem->name), _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pItem->dtype, _OVER)
|
||||
switch (pItem->dtype) {
|
||||
case CFG_DTYPE_NONE:
|
||||
break;
|
||||
case CFG_DTYPE_BOOL:
|
||||
SDB_SET_BOOL(pRaw, dataPos, item->bval, _OVER)
|
||||
SDB_SET_BOOL(pRaw, dataPos, pItem->bval, _OVER)
|
||||
break;
|
||||
case CFG_DTYPE_INT32:
|
||||
SDB_SET_INT32(pRaw, dataPos, item->i32, _OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, pItem->i32, _OVER);
|
||||
break;
|
||||
case CFG_DTYPE_INT64:
|
||||
SDB_SET_INT64(pRaw, dataPos, item->i64, _OVER);
|
||||
SDB_SET_INT64(pRaw, dataPos, pItem->i64, _OVER);
|
||||
break;
|
||||
case CFG_DTYPE_FLOAT:
|
||||
case CFG_DTYPE_DOUBLE:
|
||||
(void)sprintf(buf, "%f", item->fval);
|
||||
(void)sprintf(buf, "%f", pItem->fval);
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(buf), _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, strlen(buf), _OVER)
|
||||
break;
|
||||
|
@ -88,8 +81,8 @@ SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg) {
|
|||
case CFG_DTYPE_LOCALE:
|
||||
case CFG_DTYPE_CHARSET:
|
||||
case CFG_DTYPE_TIMEZONE:
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(item->str), _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, item->str, strlen(item->str), _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(pItem->str), _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pItem->str, strlen(pItem->str), _OVER)
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -101,7 +94,7 @@ _OVER:
|
|||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
mTrace("cfg encode to raw:%p, row:%p", pRaw, pCfg);
|
||||
mTrace("cfg encode to raw:%p, row:%p", pRaw, pItem);
|
||||
return pRaw;
|
||||
}
|
||||
|
||||
|
@ -121,7 +114,7 @@ SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pRow = sdbAllocRow(sizeof(SSdbRaw));
|
||||
pRow = sdbAllocRow(sizeof(SConfigItem));
|
||||
if (pRow == NULL) goto _OVER;
|
||||
|
||||
item = sdbGetRowObj(pRow);
|
||||
|
@ -184,6 +177,10 @@ static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *pOld, SConfigItem *pN
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCfgActionDeploy(SMnode *pMnode) { return mndInitWriteCfg(pMnode); }
|
||||
|
||||
static int32_t mndCfgActionPrepare(SMnode *pMnode) { return mndInitConfig(pMnode); }
|
||||
|
||||
static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SConfigReq configReq = {0};
|
||||
|
|
|
@ -604,7 +604,6 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
|
|||
|
||||
static int32_t mndInitSteps(SMnode *pMnode) {
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, mndCleanupArbGroup));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster));
|
||||
|
@ -639,7 +638,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
|
|||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem));
|
||||
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, mndCleanupArbGroup));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -798,6 +797,12 @@ int32_t mndStart(SMnode *pMnode) {
|
|||
return -1;
|
||||
}
|
||||
mndSetRestored(pMnode, true);
|
||||
} else {
|
||||
if (sdbPrepare(pMnode->pSdb) != 0) {
|
||||
mError("failed to prepare sdb while start mnode");
|
||||
return -1;
|
||||
}
|
||||
mndSetRestored(pMnode, true);
|
||||
}
|
||||
|
||||
grantReset(pMnode, TSDB_GRANT_ALL, 0);
|
||||
|
|
|
@ -115,6 +115,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
|
|||
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
||||
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
||||
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
||||
typedef int32_t (*SdbPrepareFp)(SMnode *pMnode);
|
||||
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, SSdbRaw *pRaw);
|
||||
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
||||
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
||||
|
@ -164,8 +165,8 @@ typedef enum {
|
|||
SDB_GRANT = 26, // grant log
|
||||
SDB_ARBGROUP = 27,
|
||||
SDB_ANODE = 28,
|
||||
SDB_MAX = 29,
|
||||
SDB_CFG = 30
|
||||
SDB_CFG = 29,
|
||||
SDB_MAX = 30
|
||||
} ESdbType;
|
||||
|
||||
typedef struct SSdbRaw {
|
||||
|
@ -205,6 +206,7 @@ typedef struct SSdb {
|
|||
SdbUpdateFp updateFps[SDB_MAX];
|
||||
SdbDeleteFp deleteFps[SDB_MAX];
|
||||
SdbDeployFp deployFps[SDB_MAX];
|
||||
SdbPrepareFp prepareFps[SDB_MAX];
|
||||
SdbEncodeFp encodeFps[SDB_MAX];
|
||||
SdbDecodeFp decodeFps[SDB_MAX];
|
||||
SdbValidateFp validateFps[SDB_MAX];
|
||||
|
@ -221,6 +223,7 @@ typedef struct {
|
|||
ESdbType sdbType;
|
||||
EKeyType keyType;
|
||||
SdbDeployFp deployFp;
|
||||
SdbPrepareFp prepareFp;
|
||||
SdbEncodeFp encodeFp;
|
||||
SdbDecodeFp decodeFp;
|
||||
SdbInsertFp insertFp;
|
||||
|
@ -268,6 +271,14 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table);
|
|||
*/
|
||||
int32_t sdbDeploy(SSdb *pSdb);
|
||||
|
||||
/**
|
||||
* @brief prepare the initial rows of sdb.
|
||||
*
|
||||
* @param pSdb The sdb object.
|
||||
* @return int32_t 0 for success, -1 for failure.
|
||||
*/
|
||||
int32_t sdbPrepare(SSdb *pSdb);
|
||||
|
||||
/**
|
||||
* @brief Load sdb from file.
|
||||
*
|
||||
|
|
|
@ -48,6 +48,26 @@ static int32_t sdbDeployData(SSdb *pSdb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sdbPrepareData(SSdb *pSdb) {
|
||||
int32_t code = 0;
|
||||
mInfo("start to prepare sdb");
|
||||
|
||||
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
||||
SdbDeployFp fp = pSdb->prepareFps[i];
|
||||
if (fp == NULL) continue;
|
||||
|
||||
mInfo("start to prepare sdb:%s", sdbTableName(i));
|
||||
code = (*fp)(pSdb->pMnode);
|
||||
if (code != 0) {
|
||||
mError("failed to prepare sdb:%s since %s", sdbTableName(i), tstrerror(code));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
mInfo("sdb prepare success");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void sdbResetData(SSdb *pSdb) {
|
||||
mInfo("start to reset sdb");
|
||||
|
||||
|
@ -641,6 +661,15 @@ int32_t sdbDeploy(SSdb *pSdb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t sdbPrepare(SSdb *pSdb) {
|
||||
int32_t code = 0;
|
||||
code = sdbPrepareData(pSdb);
|
||||
if (code != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
|
||||
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
|
||||
if (pIter == NULL) {
|
||||
|
|
|
@ -76,6 +76,8 @@ const char *sdbTableName(ESdbType type) {
|
|||
return "arb_group";
|
||||
case SDB_ANODE:
|
||||
return "anode";
|
||||
case SDB_CFG:
|
||||
return "config";
|
||||
default:
|
||||
return "undefine";
|
||||
}
|
||||
|
@ -444,7 +446,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
|
|||
|
||||
void sdbCancelFetch(SSdb *pSdb, void *pIter) {
|
||||
if (pIter == NULL) return;
|
||||
SSdbRow *pRow = *(SSdbRow **)pIter;
|
||||
SSdbRow *pRow = *(SSdbRow **)pIter;
|
||||
mTrace("cancel fetch row:%p", pRow);
|
||||
SHashObj *hash = sdbGetHash(pSdb, pRow->type);
|
||||
if (hash == NULL) return;
|
||||
|
@ -532,12 +534,12 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type) {
|
|||
}
|
||||
|
||||
bool countValid(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
||||
int32_t* pInt = p1;
|
||||
int32_t *pInt = p1;
|
||||
(*pInt) += 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t sdbGetValidSize(SSdb* pSdb, ESdbType type) {
|
||||
int32_t sdbGetValidSize(SSdb *pSdb, ESdbType type) {
|
||||
int32_t num = 0;
|
||||
sdbTraverse(pSdb, type, countValid, &num, 0, 0);
|
||||
return num;
|
||||
|
|
Loading…
Reference in New Issue