diff --git a/include/util/tconfig.h b/include/util/tconfig.h index 98e0df9655..312a10e2d6 100644 --- a/include/util/tconfig.h +++ b/include/util/tconfig.h @@ -118,6 +118,7 @@ void cfgCleanup(SConfig *pCfg); int32_t cfgGetSize(SConfig *pCfg); SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName); int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock); +int32_t cfgUpdateItem(SConfigItem *pItem, SConfigItem *newItem); int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer); int32_t cfgCreateIter(SConfig *pConf, SConfigIter **ppIter); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 7b01609c29..8e595f76c9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -200,7 +200,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndConfig.h b/source/dnode/mnode/impl/inc/mndConfig.h new file mode 100644 index 0000000000..22a115b30b --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndConfig.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_CONFIG_H_ +#define _TD_MND_CONFIG_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif +int32_t mndInitConfig(SMnode *pMnode); +SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg); +SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw); +static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigItem *item); +static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigItem *item); +static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *oldItem, SConfigItem *newItem); + +static int32_t mndProcessConfigReq(SRpcMsg *pReq); +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_ARBGROUP_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index d2d9b2e8eb..c2ee3944e5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -106,8 +106,8 @@ typedef enum { TRN_CONFLICT_GLOBAL = 1, TRN_CONFLICT_DB = 2, TRN_CONFLICT_DB_INSIDE = 3, -// TRN_CONFLICT_TOPIC = 4, -// TRN_CONFLICT_TOPIC_INSIDE = 5, + // TRN_CONFLICT_TOPIC = 4, + // TRN_CONFLICT_TOPIC_INSIDE = 5, TRN_CONFLICT_ARBGROUP = 6, TRN_CONFLICT_TSMA = 7, } ETrnConflct; @@ -316,6 +316,7 @@ typedef struct { TdThreadMutex mutex; } SArbGroup; + typedef struct { int32_t maxUsers; int32_t maxDbs; @@ -649,12 +650,12 @@ typedef struct { int32_t maxPollIntervalMs; } SMqConsumerObj; -int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, - char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer); -void tClearSMqConsumerObj(SMqConsumerObj* pConsumer); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); -int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); -void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); +int32_t tNewSMqConsumerObj(int64_t consumerId, char* cgroup, int8_t updateType, char* topic, SCMSubscribeReq* subscribe, + SMqConsumerObj** ppConsumer); +void tClearSMqConsumerObj(SMqConsumerObj* pConsumer); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); +int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); +void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); typedef struct { int32_t vgId; @@ -693,11 +694,11 @@ typedef struct { char* qmsg; // SubPlanToString } SMqSubscribeObj; -int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub); -int32_t tCloneSubscribeObj(const SMqSubscribeObj* pSub, SMqSubscribeObj **ppSub); -void tDeleteSubscribeObj(SMqSubscribeObj* pSub); -int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub); -void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver); +int32_t tNewSubscribeObj(const char* key, SMqSubscribeObj** ppSub); +int32_t tCloneSubscribeObj(const SMqSubscribeObj* pSub, SMqSubscribeObj** ppSub); +void tDeleteSubscribeObj(SMqSubscribeObj* pSub); +int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub); +void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver); // typedef struct { // int32_t epoch; diff --git a/source/dnode/mnode/impl/src/mndConfig.c b/source/dnode/mnode/impl/src/mndConfig.c new file mode 100644 index 0000000000..59fdc6260a --- /dev/null +++ b/source/dnode/mnode/impl/src/mndConfig.c @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndConfig.h" +#include "mndDnode.h" +#include "mndTrans.h" + +#define CFG_VER_NUMBER 1 +#define CFG_RESERVE_SIZE 63 + +static int32_t mndProcessConfigReq(SRpcMsg *pReq); +static int32_t mndInitWriteCfg(SMnode *pMnode); +static int32_t mndInitReadCfg(SMnode *pMnode); + +int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item); + +int32_t mndInitConfig(SMnode *pMnode) { + int32_t code = 0; + SSdbTable table = { + .sdbType = SDB_CFG, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mnCfgActionEncode, + .decodeFp = (SdbDecodeFp)mndCfgActionDecode, + .insertFp = (SdbInsertFp)mndCfgActionInsert, + .updateFp = (SdbUpdateFp)mndCfgActionUpdate, + .deleteFp = (SdbDeleteFp)mndCfgActionDelete, + }; + + if (pMnode->deploy) { + mndInitWriteCfg(pMnode); + } else { + mndInitReadCfg(pMnode); + } + + mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq); + + return sdbSetTable(pMnode->pSdb, table); +} + +SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg) { + int32_t code = 0; + int32_t lino = 0; + terrno = TSDB_CODE_OUT_OF_MEMORY; + char buf[30]; + + int32_t size = sizeof(SConfigItem) + CFG_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, size); + if (pRaw == NULL) goto _OVER; + + int32_t dataPos = 0; + SConfigItem *item = NULL; + SDB_SET_INT32(pRaw, dataPos, strlen(item->name), _OVER) + SDB_SET_BINARY(pRaw, dataPos, item->name, strlen(item->name), _OVER) + SDB_SET_INT32(pRaw, dataPos, item->dtype, _OVER) + switch (item->dtype) { + case CFG_DTYPE_NONE: + break; + case CFG_DTYPE_BOOL: + SDB_SET_BOOL(pRaw, dataPos, item->bval, _OVER) + break; + case CFG_DTYPE_INT32: + SDB_SET_INT32(pRaw, dataPos, item->i32, _OVER); + break; + case CFG_DTYPE_INT64: + SDB_SET_INT64(pRaw, dataPos, item->i64, _OVER); + break; + case CFG_DTYPE_FLOAT: + case CFG_DTYPE_DOUBLE: + (void)sprintf(buf, "%f", item->fval); + SDB_SET_INT32(pRaw, dataPos, strlen(buf), _OVER) + SDB_SET_BINARY(pRaw, dataPos, buf, strlen(buf), _OVER) + break; + case CFG_DTYPE_STRING: + case CFG_DTYPE_DIR: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_TIMEZONE: + SDB_SET_INT32(pRaw, dataPos, strlen(item->str), _OVER) + SDB_SET_BINARY(pRaw, dataPos, item->str, strlen(item->str), _OVER) + break; + } + + terrno = 0; + +_OVER: + if (terrno != 0) { + mError("cfg failed to encode to raw:%p since %s", pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + mTrace("cfg encode to raw:%p, row:%p", pRaw, pCfg); + return pRaw; +} + +SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) { + int32_t code = 0; + int32_t lino = 0; + int32_t len = -1; + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SConfigItem *item = NULL; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; + + if (sver != CFG_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto _OVER; + } + + pRow = sdbAllocRow(sizeof(SSdbRaw)); + if (pRow == NULL) goto _OVER; + + item = sdbGetRowObj(pRow); + if (item == NULL) goto _OVER; + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &len, _OVER) + SDB_GET_BINARY(pRaw, dataPos, item->name, len, _OVER) + SDB_GET_INT32(pRaw, dataPos, (int32_t *)&item->dtype, _OVER) + switch (item->dtype) { + case CFG_DTYPE_NONE: + break; + case CFG_DTYPE_BOOL: + SDB_GET_BOOL(pRaw, dataPos, &item->bval, _OVER) + break; + case CFG_DTYPE_INT32: + SDB_GET_INT32(pRaw, dataPos, &item->i32, _OVER); + break; + case CFG_DTYPE_INT64: + SDB_GET_INT64(pRaw, dataPos, &item->i64, _OVER); + break; + case CFG_DTYPE_FLOAT: + case CFG_DTYPE_DOUBLE: + SDB_GET_INT32(pRaw, dataPos, &len, _OVER) + char *buf = taosMemoryMalloc(len + 1); + SDB_GET_BINARY(pRaw, dataPos, buf, len, _OVER) + break; + case CFG_DTYPE_STRING: + case CFG_DTYPE_DIR: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_TIMEZONE: + SDB_GET_INT32(pRaw, dataPos, &len, _OVER) + SDB_GET_BINARY(pRaw, dataPos, item->str, len, _OVER) + break; + } + +_OVER: + if (terrno != 0) { + mError("cfg failed to decode from raw:%p since %s", pRaw, terrstr()); + taosMemoryFreeClear(pRow); + return NULL; + } + + mTrace("cfg decode from raw:%p, row:%p", pRaw, item); + return pRow; +} + +static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigItem *item) { + mTrace("cfg:%s, perform insert action, row:%p", item->name, item); + return 0; +} + +static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigItem *item) { + mTrace("cfg:%s, perform delete action, row:%p", item->name, item); + return 0; +} + +static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *pOld, SConfigItem *pNew) { + mTrace("cfg:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); + return 0; +} + +static int32_t mndProcessConfigReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SConfigReq configReq = {0}; + SDnodeObj *pDnode = NULL; + int32_t code = -1; + + tDeserializeSConfigReq(pReq->pCont, pReq->contLen, &configReq); + SArray *diffArray = taosArrayInit(16, sizeof(SConfigItem)); + SConfigRsp configRsp = {0}; + configRsp.forceReadConfig = configReq.forceReadConfig; + configRsp.cver = tsmmConfigVersion; + if (configRsp.forceReadConfig) { + // compare config array from configReq with current config array + if (compareSConfigItemArrays(getGlobalCfg(tsCfg), configReq.array, diffArray)) { + configRsp.array = diffArray; + } else { + configRsp.isConifgVerified = 1; + taosArrayDestroy(diffArray); + } + } else { + configRsp.array = getGlobalCfg(tsCfg); + if (configReq.cver == tsmmConfigVersion) { + configRsp.isVersionVerified = 1; + } else { + configRsp.array = getGlobalCfg(tsCfg); + } + } + + int32_t contLen = tSerializeSConfigRsp(NULL, 0, &configRsp); + void *pHead = rpcMallocCont(contLen); + contLen = tSerializeSConfigRsp(pHead, contLen, &configRsp); + taosArrayDestroy(diffArray); + if (contLen < 0) { + code = contLen; + goto _OVER; + } + pReq->info.rspLen = contLen; + pReq->info.rsp = pHead; +_OVER: + + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_SUCCESS; +} + +int32_t mndInitWriteCfg(SMnode *pMnode) { + int code = -1; + size_t sz = 0; + + SConfigItem item = {0}; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "init-write-config"); + if (pTrans == NULL) { + mError("failed to init write cfg in create trans, since %s", terrstr()); + goto _OVER; + } + + // encode mnd config version + item = (SConfigItem){.name = "tsmmConfigVersion", .dtype = CFG_DTYPE_INT32, .i32 = tsmmConfigVersion}; + if ((code = mndSetCreateConfigCommitLogs(pTrans, &item)) != 0) { + mError("failed to init mnd config version, since %s", terrstr()); + } + sz = taosArrayGetSize(getGlobalCfg(tsCfg)); + + for (int i = 0; i < sz; ++i) { + SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i); + if ((code = mndSetCreateConfigCommitLogs(pTrans, item)) != 0) { + mError("failed to init mnd config:%s, since %s", item->name, terrstr()); + } + } + if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER; + if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; + +_OVER: + mndTransDrop(pTrans); + return TSDB_CODE_SUCCESS; +} + +int32_t mndInitReadCfg(SMnode *pMnode) { + int32_t code = 0; + int32_t sz = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "init-read-config"); + if (pTrans == NULL) { + mError("failed to init read cfg in create trans, since %s", terrstr()); + goto _OVER; + } + SConfigItem *item = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion"); + if (item == NULL) { + mInfo("failed to acquire mnd config version, since %s", terrstr()); + goto _OVER; + } else { + tsmmConfigVersion = item->i32; + sdbRelease(pMnode->pSdb, item); + } + + for (int i = 0; i < sz; ++i) { + SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i); + SConfigItem *newItem = sdbAcquire(pMnode->pSdb, SDB_CFG, item->name); + if (newItem == NULL) { + mInfo("failed to acquire mnd config:%s, since %s", item->name, terrstr()); + continue; + } + cfgUpdateItem(item, newItem); + sdbRelease(pMnode->pSdb, newItem); + } +_OVER: + mndTransDrop(pTrans); + return TSDB_CODE_SUCCESS; +} + +int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item) { + int32_t code = 0; + SSdbRaw *pCommitRaw = mnCfgActionEncode(item); + if (pCommitRaw == NULL) { + code = terrno; + TAOS_RETURN(code); + } + if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code); + if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code); + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 0d17ccd0b0..6f6a45d374 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -14,10 +14,12 @@ */ #define _DEFAULT_SOURCE -#include "mndDb.h" #include "audit.h" +#include "command.h" #include "mndArbGroup.h" #include "mndCluster.h" +#include "mndConfig.h" +#include "mndDb.h" #include "mndDnode.h" #include "mndIndex.h" #include "mndPrivilege.h" @@ -34,7 +36,6 @@ #include "systable.h" #include "thttp.h" #include "tjson.h" -#include "command.h" #define DB_VER_NUMBER 1 #define DB_RESERVE_SIZE 27 @@ -1560,8 +1561,8 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SVgObj *pVgroup = NULL; @@ -1941,9 +1942,9 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, continue; } else { mTrace("db:%s, valid dbinfo, vgVersion:%d cfgVersion:%d stateTs:%" PRId64 - " numOfTables:%d, changed to vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d", - pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs, - pDbCacheInfo->numOfTable, pDb->vgVersion, pDb->cfgVersion, pDb->stateTs, numOfTable); + " numOfTables:%d, changed to vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d", + pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs, + pDbCacheInfo->numOfTable, pDb->vgVersion, pDb->cfgVersion, pDb->stateTs, numOfTable); } if (pDbCacheInfo->cfgVersion < pDb->cfgVersion) { @@ -1955,7 +1956,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, rsp.pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp)); if (rsp.pTsmaRsp) rsp.pTsmaRsp->pTsmas = taosArrayInit(4, POINTER_BYTES); if (rsp.pTsmaRsp && rsp.pTsmaRsp->pTsmas) { - bool exist = false; + bool exist = false; int32_t code = mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist); if (TSDB_CODE_SUCCESS != code) { mndReleaseDb(pMnode, pDb); @@ -2386,7 +2387,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)strictVstr, false), &lino, _OVER); char durationVstr[128] = {0}; - int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE, pDb->cfg.daysPerFile); + int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE, + pDb->cfg.daysPerFile); varDataSetLen(durationVstr, len); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -2402,9 +2404,9 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, int32_t lenKeep2 = formatDurationOrKeep(keep2Str, sizeof(keep2Str), pDb->cfg.daysToKeep2); if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) { - len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep1Str, keep2Str, keep0Str); + len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep1Str, keep2Str, keep0Str); } else { - len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep0Str, keep1Str, keep2Str); + len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep0Str, keep1Str, keep2Str); } varDataSetLen(keepVstr, len); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d981831c12..b4036e328e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -84,7 +84,6 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessStatusReq(SRpcMsg *pReq); -static int32_t mndProcessConfigReq(SRpcMsg *pReq); static int32_t mndProcessNotifyReq(SRpcMsg *pReq); static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq); static int32_t mndProcessStatisReq(SRpcMsg *pReq); @@ -123,7 +122,6 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq); - mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq); mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq); mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq); mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq); @@ -924,49 +922,7 @@ _OVER: return mndUpdClusterInfo(pReq); } -static int32_t mndProcessConfigReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SConfigReq configReq = {0}; - SDnodeObj *pDnode = NULL; - int32_t code = -1; - tDeserializeSConfigReq(pReq->pCont, pReq->contLen, &configReq); - SArray *diffArray = taosArrayInit(16, sizeof(SConfigItem)); - SConfigRsp configRsp = {0}; - configRsp.forceReadConfig = configReq.forceReadConfig; - configRsp.cver = tsmmConfigVersion; - if (configRsp.forceReadConfig) { - // compare config array from configReq with current config array - if (compareSConfigItemArrays(getGlobalCfg(tsCfg), configReq.array, diffArray)) { - configRsp.array = diffArray; - } else { - configRsp.isConifgVerified = 1; - taosArrayDestroy(diffArray); - } - } else { - configRsp.array = getGlobalCfg(tsCfg); - if (configReq.cver == tsmmConfigVersion) { - configRsp.isVersionVerified = 1; - } else { - configRsp.array = getGlobalCfg(tsCfg); - } - } - - int32_t contLen = tSerializeSConfigRsp(NULL, 0, &configRsp); - void *pHead = rpcMallocCont(contLen); - contLen = tSerializeSConfigRsp(pHead, contLen, &configRsp); - taosArrayDestroy(diffArray); - if (contLen < 0) { - code = contLen; - goto _OVER; - } - pReq->info.rspLen = contLen; - pReq->info.rsp = pHead; -_OVER: - - mndReleaseDnode(pMnode, pDnode); - return mndUpdClusterInfo(pReq); -} static int32_t mndProcessNotifyReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 6c30193ea7..4cdad9825f 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -15,11 +15,12 @@ #define _DEFAULT_SOURCE #include "mndAcct.h" -#include "mndArbGroup.h" #include "mndAnode.h" +#include "mndArbGroup.h" #include "mndCluster.h" #include "mndCompact.h" #include "mndCompactDetail.h" +#include "mndConfig.h" #include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" @@ -603,6 +604,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle static int32_t mndInitSteps(SMnode *pMnode) { TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal)); + TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, mndCleanupArbGroup)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster)); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index f6d1587bb2..c9b352d79e 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -60,6 +60,7 @@ extern "C" { #define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t) #define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) #define SDB_GET_UINT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawUInt8, uint8_t) +#define SDB_GET_BOOL(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawBool, bool) #define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \ { \ @@ -81,6 +82,7 @@ extern "C" { #define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t) #define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t) #define SDB_SET_UINT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawUInt8, uint8_t) +#define SDB_SET_BOOL(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawBool, bool) #define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ { \ @@ -162,7 +164,8 @@ typedef enum { SDB_GRANT = 26, // grant log SDB_ARBGROUP = 27, SDB_ANODE = 28, - SDB_MAX = 29 + SDB_MAX = 29, + SDB_CFG = 30 } ESdbType; typedef struct SSdbRaw { @@ -373,7 +376,7 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type); /** * @brief get valid number of rows, removed rows are ignored */ -int32_t sdbGetValidSize(SSdb* pSdb, ESdbType type); +int32_t sdbGetValidSize(SSdb *pSdb, ESdbType type); /** * @brief Get the max id of the table, keyType of table should be INT32 @@ -407,6 +410,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val); +int32_t sdbSetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool val); int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val); int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val); int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val); @@ -415,6 +419,7 @@ int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen); int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status); int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val); int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val); +int32_t sdbGetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool *val); int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val); int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val); int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val); diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 24ac2ac462..c71e415d1b 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -84,6 +84,14 @@ int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val) { return 0; } +int32_t sdbSetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool val) { + if (val) { + return sdbSetRawUInt8(pRaw, dataPos, 1); + } else { + return sdbSetRawUInt8(pRaw, dataPos, 0); + } +} + int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) { int32_t code = 0; if (pRaw == NULL) { @@ -214,6 +222,21 @@ int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val) { return 0; } +int32_t sdbGetRawBool(SSdbRaw *pRaw, int32_t dataPos, bool *val) { + int32_t code = 0; + uint8_t v = 0; + code = sdbGetRawUInt8(pRaw, dataPos, &v); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (v) { + *val = true; + } else { + *val = false; + } + return TSDB_CODE_SUCCESS; +} + int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) { int32_t code = 0; if (pRaw == NULL) { diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 3fed23e8a2..ca3b123472 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -404,6 +404,47 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy TAOS_RETURN(code); } +int32_t cfgUpdateItem(SConfigItem *pItem, SConfigItem *newItem) { + int32_t code = TSDB_CODE_SUCCESS; + if (pItem == NULL || newItem == NULL) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + + switch (pItem->dtype) { + case CFG_DTYPE_BOOL: { + pItem->bval = newItem->bval; + break; + } + case CFG_DTYPE_INT32: { + pItem->i32 = newItem->i32; + break; + } + case CFG_DTYPE_INT64: { + pItem->i64 = newItem->i64; + break; + } + case CFG_DTYPE_FLOAT: + case CFG_DTYPE_DOUBLE: { + pItem->fval = newItem->fval; + break; + } + case CFG_DTYPE_DIR: + case CFG_DTYPE_TIMEZONE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_NONE: + case CFG_DTYPE_STRING: { + pItem->str = newItem->str; + break; + } + default: + code = TSDB_CODE_INVALID_CFG; + break; + } + + TAOS_RETURN(code); +} + SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName) { if (pCfg == NULL) return NULL; int32_t size = taosArrayGetSize(pCfg->localArray);