Add mnd init cfg and mnd cfg trans.

This commit is contained in:
xiao-77 2024-11-14 17:20:50 +08:00
parent 748bb000b0
commit cc3f4504bf
11 changed files with 444 additions and 72 deletions

View File

@ -118,6 +118,7 @@ void cfgCleanup(SConfig *pCfg);
int32_t cfgGetSize(SConfig *pCfg);
SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName);
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock);
int32_t cfgUpdateItem(SConfigItem *pItem, SConfigItem *newItem);
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
int32_t cfgCreateIter(SConfig *pConf, SConfigIter **ppIter);

View File

@ -200,7 +200,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_CONFIG_H_
#define _TD_MND_CONFIG_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitConfig(SMnode *pMnode);
SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg);
SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw);
static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigItem *item);
static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigItem *item);
static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *oldItem, SConfigItem *newItem);
static int32_t mndProcessConfigReq(SRpcMsg *pReq);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_ARBGROUP_H_*/

View File

@ -106,8 +106,8 @@ typedef enum {
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
// TRN_CONFLICT_TOPIC = 4,
// TRN_CONFLICT_TOPIC_INSIDE = 5,
// TRN_CONFLICT_TOPIC = 4,
// TRN_CONFLICT_TOPIC_INSIDE = 5,
TRN_CONFLICT_ARBGROUP = 6,
TRN_CONFLICT_TSMA = 7,
} ETrnConflct;
@ -316,6 +316,7 @@ typedef struct {
TdThreadMutex mutex;
} SArbGroup;
typedef struct {
int32_t maxUsers;
int32_t maxDbs;
@ -649,12 +650,12 @@ typedef struct {
int32_t maxPollIntervalMs;
} SMqConsumerObj;
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer);
void tClearSMqConsumerObj(SMqConsumerObj* pConsumer);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
int32_t tNewSMqConsumerObj(int64_t consumerId, char* cgroup, int8_t updateType, char* topic, SCMSubscribeReq* subscribe,
SMqConsumerObj** ppConsumer);
void tClearSMqConsumerObj(SMqConsumerObj* pConsumer);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
typedef struct {
int32_t vgId;
@ -693,11 +694,11 @@ typedef struct {
char* qmsg; // SubPlanToString
} SMqSubscribeObj;
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub);
int32_t tCloneSubscribeObj(const SMqSubscribeObj* pSub, SMqSubscribeObj **ppSub);
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);
int32_t tNewSubscribeObj(const char* key, SMqSubscribeObj** ppSub);
int32_t tCloneSubscribeObj(const SMqSubscribeObj* pSub, SMqSubscribeObj** ppSub);
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);
// typedef struct {
// int32_t epoch;

View File

@ -0,0 +1,305 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndConfig.h"
#include "mndDnode.h"
#include "mndTrans.h"
#define CFG_VER_NUMBER 1
#define CFG_RESERVE_SIZE 63
static int32_t mndProcessConfigReq(SRpcMsg *pReq);
static int32_t mndInitWriteCfg(SMnode *pMnode);
static int32_t mndInitReadCfg(SMnode *pMnode);
int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item);
int32_t mndInitConfig(SMnode *pMnode) {
int32_t code = 0;
SSdbTable table = {
.sdbType = SDB_CFG,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mnCfgActionEncode,
.decodeFp = (SdbDecodeFp)mndCfgActionDecode,
.insertFp = (SdbInsertFp)mndCfgActionInsert,
.updateFp = (SdbUpdateFp)mndCfgActionUpdate,
.deleteFp = (SdbDeleteFp)mndCfgActionDelete,
};
if (pMnode->deploy) {
mndInitWriteCfg(pMnode);
} else {
mndInitReadCfg(pMnode);
}
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq);
return sdbSetTable(pMnode->pSdb, table);
}
SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg) {
int32_t code = 0;
int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY;
char buf[30];
int32_t size = sizeof(SConfigItem) + CFG_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
SConfigItem *item = NULL;
SDB_SET_INT32(pRaw, dataPos, strlen(item->name), _OVER)
SDB_SET_BINARY(pRaw, dataPos, item->name, strlen(item->name), _OVER)
SDB_SET_INT32(pRaw, dataPos, item->dtype, _OVER)
switch (item->dtype) {
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
SDB_SET_BOOL(pRaw, dataPos, item->bval, _OVER)
break;
case CFG_DTYPE_INT32:
SDB_SET_INT32(pRaw, dataPos, item->i32, _OVER);
break;
case CFG_DTYPE_INT64:
SDB_SET_INT64(pRaw, dataPos, item->i64, _OVER);
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
(void)sprintf(buf, "%f", item->fval);
SDB_SET_INT32(pRaw, dataPos, strlen(buf), _OVER)
SDB_SET_BINARY(pRaw, dataPos, buf, strlen(buf), _OVER)
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
SDB_SET_INT32(pRaw, dataPos, strlen(item->str), _OVER)
SDB_SET_BINARY(pRaw, dataPos, item->str, strlen(item->str), _OVER)
break;
}
terrno = 0;
_OVER:
if (terrno != 0) {
mError("cfg failed to encode to raw:%p since %s", pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("cfg encode to raw:%p, row:%p", pRaw, pCfg);
return pRaw;
}
SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) {
int32_t code = 0;
int32_t lino = 0;
int32_t len = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SConfigItem *item = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != CFG_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
pRow = sdbAllocRow(sizeof(SSdbRaw));
if (pRow == NULL) goto _OVER;
item = sdbGetRowObj(pRow);
if (item == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &len, _OVER)
SDB_GET_BINARY(pRaw, dataPos, item->name, len, _OVER)
SDB_GET_INT32(pRaw, dataPos, (int32_t *)&item->dtype, _OVER)
switch (item->dtype) {
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
SDB_GET_BOOL(pRaw, dataPos, &item->bval, _OVER)
break;
case CFG_DTYPE_INT32:
SDB_GET_INT32(pRaw, dataPos, &item->i32, _OVER);
break;
case CFG_DTYPE_INT64:
SDB_GET_INT64(pRaw, dataPos, &item->i64, _OVER);
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
SDB_GET_INT32(pRaw, dataPos, &len, _OVER)
char *buf = taosMemoryMalloc(len + 1);
SDB_GET_BINARY(pRaw, dataPos, buf, len, _OVER)
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
SDB_GET_INT32(pRaw, dataPos, &len, _OVER)
SDB_GET_BINARY(pRaw, dataPos, item->str, len, _OVER)
break;
}
_OVER:
if (terrno != 0) {
mError("cfg failed to decode from raw:%p since %s", pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
mTrace("cfg decode from raw:%p, row:%p", pRaw, item);
return pRow;
}
static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigItem *item) {
mTrace("cfg:%s, perform insert action, row:%p", item->name, item);
return 0;
}
static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigItem *item) {
mTrace("cfg:%s, perform delete action, row:%p", item->name, item);
return 0;
}
static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *pOld, SConfigItem *pNew) {
mTrace("cfg:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
return 0;
}
static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SConfigReq configReq = {0};
SDnodeObj *pDnode = NULL;
int32_t code = -1;
tDeserializeSConfigReq(pReq->pCont, pReq->contLen, &configReq);
SArray *diffArray = taosArrayInit(16, sizeof(SConfigItem));
SConfigRsp configRsp = {0};
configRsp.forceReadConfig = configReq.forceReadConfig;
configRsp.cver = tsmmConfigVersion;
if (configRsp.forceReadConfig) {
// compare config array from configReq with current config array
if (compareSConfigItemArrays(getGlobalCfg(tsCfg), configReq.array, diffArray)) {
configRsp.array = diffArray;
} else {
configRsp.isConifgVerified = 1;
taosArrayDestroy(diffArray);
}
} else {
configRsp.array = getGlobalCfg(tsCfg);
if (configReq.cver == tsmmConfigVersion) {
configRsp.isVersionVerified = 1;
} else {
configRsp.array = getGlobalCfg(tsCfg);
}
}
int32_t contLen = tSerializeSConfigRsp(NULL, 0, &configRsp);
void *pHead = rpcMallocCont(contLen);
contLen = tSerializeSConfigRsp(pHead, contLen, &configRsp);
taosArrayDestroy(diffArray);
if (contLen < 0) {
code = contLen;
goto _OVER;
}
pReq->info.rspLen = contLen;
pReq->info.rsp = pHead;
_OVER:
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_SUCCESS;
}
int32_t mndInitWriteCfg(SMnode *pMnode) {
int code = -1;
size_t sz = 0;
SConfigItem item = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "init-write-config");
if (pTrans == NULL) {
mError("failed to init write cfg in create trans, since %s", terrstr());
goto _OVER;
}
// encode mnd config version
item = (SConfigItem){.name = "tsmmConfigVersion", .dtype = CFG_DTYPE_INT32, .i32 = tsmmConfigVersion};
if ((code = mndSetCreateConfigCommitLogs(pTrans, &item)) != 0) {
mError("failed to init mnd config version, since %s", terrstr());
}
sz = taosArrayGetSize(getGlobalCfg(tsCfg));
for (int i = 0; i < sz; ++i) {
SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i);
if ((code = mndSetCreateConfigCommitLogs(pTrans, item)) != 0) {
mError("failed to init mnd config:%s, since %s", item->name, terrstr());
}
}
if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
_OVER:
mndTransDrop(pTrans);
return TSDB_CODE_SUCCESS;
}
int32_t mndInitReadCfg(SMnode *pMnode) {
int32_t code = 0;
int32_t sz = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "init-read-config");
if (pTrans == NULL) {
mError("failed to init read cfg in create trans, since %s", terrstr());
goto _OVER;
}
SConfigItem *item = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion");
if (item == NULL) {
mInfo("failed to acquire mnd config version, since %s", terrstr());
goto _OVER;
} else {
tsmmConfigVersion = item->i32;
sdbRelease(pMnode->pSdb, item);
}
for (int i = 0; i < sz; ++i) {
SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i);
SConfigItem *newItem = sdbAcquire(pMnode->pSdb, SDB_CFG, item->name);
if (newItem == NULL) {
mInfo("failed to acquire mnd config:%s, since %s", item->name, terrstr());
continue;
}
cfgUpdateItem(item, newItem);
sdbRelease(pMnode->pSdb, newItem);
}
_OVER:
mndTransDrop(pTrans);
return TSDB_CODE_SUCCESS;
}
int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item) {
int32_t code = 0;
SSdbRaw *pCommitRaw = mnCfgActionEncode(item);
if (pCommitRaw == NULL) {
code = terrno;
TAOS_RETURN(code);
}
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
return TSDB_CODE_SUCCESS;
}

View File

@ -14,10 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "mndDb.h"
#include "audit.h"
#include "command.h"
#include "mndArbGroup.h"
#include "mndCluster.h"
#include "mndConfig.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndIndex.h"
#include "mndPrivilege.h"
@ -34,7 +36,6 @@
#include "systable.h"
#include "thttp.h"
#include "tjson.h"
#include "command.h"
#define DB_VER_NUMBER 1
#define DB_RESERVE_SIZE 27
@ -1560,8 +1561,8 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
@ -1941,9 +1942,9 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
continue;
} else {
mTrace("db:%s, valid dbinfo, vgVersion:%d cfgVersion:%d stateTs:%" PRId64
" numOfTables:%d, changed to vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs,
pDbCacheInfo->numOfTable, pDb->vgVersion, pDb->cfgVersion, pDb->stateTs, numOfTable);
" numOfTables:%d, changed to vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs,
pDbCacheInfo->numOfTable, pDb->vgVersion, pDb->cfgVersion, pDb->stateTs, numOfTable);
}
if (pDbCacheInfo->cfgVersion < pDb->cfgVersion) {
@ -1955,7 +1956,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
rsp.pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
if (rsp.pTsmaRsp) rsp.pTsmaRsp->pTsmas = taosArrayInit(4, POINTER_BYTES);
if (rsp.pTsmaRsp && rsp.pTsmaRsp->pTsmas) {
bool exist = false;
bool exist = false;
int32_t code = mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist);
if (TSDB_CODE_SUCCESS != code) {
mndReleaseDb(pMnode, pDb);
@ -2386,7 +2387,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)strictVstr, false), &lino, _OVER);
char durationVstr[128] = {0};
int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE, pDb->cfg.daysPerFile);
int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE,
pDb->cfg.daysPerFile);
varDataSetLen(durationVstr, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2402,9 +2404,9 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
int32_t lenKeep2 = formatDurationOrKeep(keep2Str, sizeof(keep2Str), pDb->cfg.daysToKeep2);
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep1Str, keep2Str, keep0Str);
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep1Str, keep2Str, keep0Str);
} else {
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep0Str, keep1Str, keep2Str);
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep0Str, keep1Str, keep2Str);
}
varDataSetLen(keepVstr, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);

View File

@ -84,7 +84,6 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
static int32_t mndProcessConfigReq(SRpcMsg *pReq);
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
@ -123,7 +122,6 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq);
mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
@ -924,49 +922,7 @@ _OVER:
return mndUpdClusterInfo(pReq);
}
static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SConfigReq configReq = {0};
SDnodeObj *pDnode = NULL;
int32_t code = -1;
tDeserializeSConfigReq(pReq->pCont, pReq->contLen, &configReq);
SArray *diffArray = taosArrayInit(16, sizeof(SConfigItem));
SConfigRsp configRsp = {0};
configRsp.forceReadConfig = configReq.forceReadConfig;
configRsp.cver = tsmmConfigVersion;
if (configRsp.forceReadConfig) {
// compare config array from configReq with current config array
if (compareSConfigItemArrays(getGlobalCfg(tsCfg), configReq.array, diffArray)) {
configRsp.array = diffArray;
} else {
configRsp.isConifgVerified = 1;
taosArrayDestroy(diffArray);
}
} else {
configRsp.array = getGlobalCfg(tsCfg);
if (configReq.cver == tsmmConfigVersion) {
configRsp.isVersionVerified = 1;
} else {
configRsp.array = getGlobalCfg(tsCfg);
}
}
int32_t contLen = tSerializeSConfigRsp(NULL, 0, &configRsp);
void *pHead = rpcMallocCont(contLen);
contLen = tSerializeSConfigRsp(pHead, contLen, &configRsp);
taosArrayDestroy(diffArray);
if (contLen < 0) {
code = contLen;
goto _OVER;
}
pReq->info.rspLen = contLen;
pReq->info.rsp = pHead;
_OVER:
mndReleaseDnode(pMnode, pDnode);
return mndUpdClusterInfo(pReq);
}
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;

View File

@ -15,11 +15,12 @@
#define _DEFAULT_SOURCE
#include "mndAcct.h"
#include "mndArbGroup.h"
#include "mndAnode.h"
#include "mndArbGroup.h"
#include "mndCluster.h"
#include "mndCompact.h"
#include "mndCompactDetail.h"
#include "mndConfig.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
@ -603,6 +604,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
static int32_t mndInitSteps(SMnode *pMnode) {
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, mndCleanupArbGroup));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster));

View File

@ -60,6 +60,7 @@ extern "C" {
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
#define SDB_GET_UINT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawUInt8, uint8_t)
#define SDB_GET_BOOL(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawBool, bool)
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
{ \
@ -81,6 +82,7 @@ extern "C" {
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
#define SDB_SET_UINT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawUInt8, uint8_t)
#define SDB_SET_BOOL(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawBool, bool)
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
{ \
@ -162,7 +164,8 @@ typedef enum {
SDB_GRANT = 26, // grant log
SDB_ARBGROUP = 27,
SDB_ANODE = 28,
SDB_MAX = 29
SDB_MAX = 29,
SDB_CFG = 30
} ESdbType;
typedef struct SSdbRaw {
@ -373,7 +376,7 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
/**
* @brief get valid number of rows, removed rows are ignored
*/
int32_t sdbGetValidSize(SSdb* pSdb, ESdbType type);
int32_t sdbGetValidSize(SSdb *pSdb, ESdbType type);
/**
* @brief Get the max id of the table, keyType of table should be INT32
@ -407,6 +410,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw);
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val);
int32_t sdbSetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool val);
int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val);
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val);
int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val);
@ -415,6 +419,7 @@ int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen);
int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status);
int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val);
int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val);
int32_t sdbGetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool *val);
int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val);
int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val);
int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val);

View File

@ -84,6 +84,14 @@ int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val) {
return 0;
}
int32_t sdbSetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool val) {
if (val) {
return sdbSetRawUInt8(pRaw, dataPos, 1);
} else {
return sdbSetRawUInt8(pRaw, dataPos, 0);
}
}
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) {
int32_t code = 0;
if (pRaw == NULL) {
@ -214,6 +222,21 @@ int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val) {
return 0;
}
int32_t sdbGetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool *val) {
int32_t code = 0;
uint8_t v = 0;
code = sdbGetRawUInt8(pRaw, dataPos, &v);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (v) {
*val = true;
} else {
*val = false;
}
return TSDB_CODE_SUCCESS;
}
int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) {
int32_t code = 0;
if (pRaw == NULL) {

View File

@ -404,6 +404,47 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
TAOS_RETURN(code);
}
int32_t cfgUpdateItem(SConfigItem *pItem, SConfigItem *newItem) {
int32_t code = TSDB_CODE_SUCCESS;
if (pItem == NULL || newItem == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
pItem->bval = newItem->bval;
break;
}
case CFG_DTYPE_INT32: {
pItem->i32 = newItem->i32;
break;
}
case CFG_DTYPE_INT64: {
pItem->i64 = newItem->i64;
break;
}
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: {
pItem->fval = newItem->fval;
break;
}
case CFG_DTYPE_DIR:
case CFG_DTYPE_TIMEZONE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_NONE:
case CFG_DTYPE_STRING: {
pItem->str = newItem->str;
break;
}
default:
code = TSDB_CODE_INVALID_CFG;
break;
}
TAOS_RETURN(code);
}
SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName) {
if (pCfg == NULL) return NULL;
int32_t size = taosArrayGetSize(pCfg->localArray);