diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 0bd15c3418..ce81356f86 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -19,6 +19,7 @@ #include "tarray.h" #include "tconfig.h" #include "tdef.h" +#include "tmsg.h" #ifdef __cplusplus extern "C" { @@ -49,6 +50,7 @@ extern char tsLocalEp[]; extern char tsVersionName[]; extern uint16_t tsServerPort; extern int32_t tsVersion; +extern int32_t tsForceReadConfig; extern int32_t tsConfigVersion; extern int32_t tsConfigInited; extern int32_t tsStatusInterval; @@ -294,8 +296,8 @@ int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope); int32_t persistGlobalConfig(const char *path, int32_t version); int32_t persistLocalConfig(const char *path); int32_t localConfigSerialize(SArray *array, char **serialized); -int32_t tSerializeSConfigArray(void *buf, int32_t bufLen, SArray *array); -int32_t tDeserializeSConfigArray(void *buf, int32_t bufLen, SArray *pReq); +int32_t tSerializeSConfigArray(SEncoder *pEncoder, int32_t bufLen, SArray *array); +int32_t tDeserializeSConfigArray(SDecoder *pDecoder, int32_t bufLen, SArray *array); #ifdef __cplusplus } #endif diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c5bca8a5f7..6cbf814cb9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1807,12 +1807,14 @@ int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); typedef struct { + int32_t forceReadConfig; int32_t cver; + SArray* array; } SConfigReq; int32_t tSerializeSConfigReq(void* buf, int32_t bufLen, SConfigReq* pReq); int32_t tDeserializeSConfigReq(void* buf, int32_t bufLen, SConfigReq* pReq); -// void tFreeSStatusReq(SStatusReq* pReq); +void tFreeSStatusReq(SStatusReq* pReq); typedef struct { int32_t dnodeId; @@ -1891,6 +1893,17 @@ int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); void tFreeSStatusRsp(SStatusRsp* pRsp); +typedef struct { + int32_t forceReadConfig; + int32_t isConifgVerified; + int32_t isVersionVerified; + SArray* pArray; +} SConfigRsp; + +int32_t tSerializeSConfigRsp(void* buf, int32_t bufLen, SConfigRsp* pRsp); +int32_t tDeserializeSConfigRsp(void* buf, int32_t bufLen, SConfigRsp* pRsp); +void tFreeSConfigRsp(SConfigRsp* pRsp); + typedef struct { int32_t reserved; } SMTimerReq; diff --git a/include/util/tencode.h b/include/util/tencode.h index b66e79fa60..89a675d817 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -88,6 +88,7 @@ static int32_t tEncodeU64v(SEncoder* pCoder, uint64_t val); static int32_t tEncodeI64v(SEncoder* pCoder, int64_t val); static int32_t tEncodeFloat(SEncoder* pCoder, float val); static int32_t tEncodeDouble(SEncoder* pCoder, double val); +static int32_t tEncodeBool(SEncoder* pCoder, bool val); static int32_t tEncodeBinary(SEncoder* pCoder, const uint8_t* val, uint32_t len); static int32_t tEncodeBinaryEx(SEncoder* pCoder, const uint8_t* val, uint32_t len); static int32_t tEncodeCStrWithLen(SEncoder* pCoder, const char* val, uint32_t len); @@ -116,6 +117,7 @@ static int32_t tDecodeU64v(SDecoder* pCoder, uint64_t* val); static int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val); static int32_t tDecodeFloat(SDecoder* pCoder, float* val); static int32_t tDecodeDouble(SDecoder* pCoder, double* val); +static int32_t tDecodeBool(SDecoder* pCoder, bool* val); static int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len); static int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len); static int32_t tDecodeCStr(SDecoder* pCoder, char** val); @@ -205,6 +207,8 @@ static FORCE_INLINE int32_t tEncodeDouble(SEncoder* pCoder, double val) { return tEncodeU64(pCoder, v.ui); } +static int32_t tEncodeBool(SEncoder* pCoder, bool val) { return tEncodeU8(pCoder, val ? 1 : 0); } + static FORCE_INLINE int32_t tEncodeBinary(SEncoder* pCoder, const uint8_t* val, uint32_t len) { TAOS_CHECK_RETURN(tEncodeU32v(pCoder, len)); if (len) { @@ -391,6 +395,15 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) { return 0; } +static int32_t tDecodeBool(SDecoder* pCoder, bool* val) { + uint8_t v; + TAOS_CHECK_RETURN(tDecodeU8(pCoder, &v)); + if (val) { + *val = v ? true : false; + } + return 0; +} + static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) { uint32_t length = 0; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2919c2cb9f..0fac0c9c8e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -23,6 +23,7 @@ #include "tlog.h" #include "tmisce.h" #include "tunit.h" +#include "tutil.h" #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" @@ -43,6 +44,7 @@ char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port char tsVersionName[16] = "community"; uint16_t tsServerPort = 6030; int32_t tsVersion = 30000000; +int32_t tsForceReadConfig = 0; int32_t tsConfigVersion = 0; int32_t tsConfigInited = 0; int32_t tsStatusInterval = 1; // second @@ -2676,3 +2678,79 @@ int32_t persistLocalConfig(const char *path) { taosWriteFile(pConfigFile, serialized, strlen(serialized)); return TSDB_CODE_SUCCESS; } + +int32_t tSerializeSConfigArray(SEncoder *pEncoder, int32_t bufLen, SArray *array) { + int32_t code = 0; + int32_t lino = 0; + int sz = taosArrayGetSize(array); + + for (int i = 0; i < sz; i++) { + SConfigItem *item = (SConfigItem *)taosArrayGet(array, i); + switch (item->dtype) { + { + case CFG_DTYPE_NONE: + break; + case CFG_DTYPE_BOOL: + TAOS_CHECK_EXIT(tEncodeBool(pEncoder, item->bval)); + break; + case CFG_DTYPE_INT32: + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, item->i32)); + break; + case CFG_DTYPE_INT64: + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, item->i64)); + break; + case CFG_DTYPE_FLOAT: + case CFG_DTYPE_DOUBLE: + TAOS_CHECK_EXIT(tEncodeFloat(pEncoder, item->fval)); + break; + case CFG_DTYPE_STRING: + case CFG_DTYPE_DIR: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_TIMEZONE: + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, item->str)); + break; + } + } + } +_exit: + return code; +} + +int32_t tDeserializeSConfigArray(SDecoder *pDecoder, int32_t bufLen, SArray *array) { + int32_t code = 0; + int32_t lino = 0; + int sz = taosArrayGetSize(array); + + for (int i = 0; i < sz; i++) { + SConfigItem *item = (SConfigItem *)taosArrayGet(array, i); + switch (item->dtype) { + { + case CFG_DTYPE_NONE: + break; + case CFG_DTYPE_BOOL: + TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &item->bval)); + break; + case CFG_DTYPE_INT32: + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &item->i32)); + break; + case CFG_DTYPE_INT64: + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &item->i64)); + break; + case CFG_DTYPE_FLOAT: + case CFG_DTYPE_DOUBLE: + TAOS_CHECK_EXIT(tDecodeFloat(pDecoder, &item->fval)); + break; + case CFG_DTYPE_STRING: + case CFG_DTYPE_DIR: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_TIMEZONE: + TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &item->str)); + break; + } + } + } +_exit: + return code; +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ab63d8083c..40750a1b86 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1600,6 +1600,7 @@ _exit: return code; } + int32_t tSerializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; @@ -1607,6 +1608,11 @@ int32_t tSerializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) { int32_t tlen; tEncoderInit(&encoder, buf, bufLen); TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->forceReadConfig)); + if (pReq->forceReadConfig) { + } else { + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->cver)); + } TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->cver)); tEndEncode(&encoder); _exit: diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index f21f6a820f..a2aa16e2af 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -288,6 +288,8 @@ void dmSendConfigReq(SDnodeMgmt *pMgmt) { SConfigReq req = {0}; req.cver = tsConfigVersion; + req.forceReadConfig = tsForceReadConfig; + req.array = cfgGetGlobalCfg(tsCfg); dDebug("send config req to mnode, configVersion:%d", req.cver); int32_t contLen = tSerializeSConfigReq(NULL, 0, &req); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 0d804eadf0..7b01609c29 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -200,6 +200,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 24ae8382f9..8e042e2091 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -14,11 +14,11 @@ */ #define _DEFAULT_SOURCE -#include "mndDnode.h" #include #include "audit.h" #include "mndCluster.h" #include "mndDb.h" +#include "mndDnode.h" #include "mndMnode.h" #include "mndPrivilege.h" #include "mndQnode.h" @@ -83,6 +83,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessStatusReq(SRpcMsg *pReq); +static int32_t mndProcessConfigReq(SRpcMsg *pReq); static int32_t mndProcessNotifyReq(SRpcMsg *pReq); static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq); static int32_t mndProcessStatisReq(SRpcMsg *pReq); @@ -121,6 +122,7 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq); + mndSetMsgHandle(pMnode, TDMT_MND_CONFIG, mndProcessConfigReq); mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq); mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq); mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq); @@ -462,7 +464,7 @@ int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) { dInfo.isMnode = 0; } - if(taosArrayPush(pDnodeInfo, &dInfo) == NULL){ + if (taosArrayPush(pDnodeInfo, &dInfo) == NULL) { code = terrno; sdbCancelFetch(pSdb, pIter); break; @@ -471,12 +473,12 @@ int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) { TAOS_RETURN(code); } -#define CHECK_MONITOR_PARA(para,err) \ -if (pCfg->monitorParas.para != para) { \ - mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \ - terrno = err; \ - return err;\ -} +#define CHECK_MONITOR_PARA(para, err) \ + if (pCfg->monitorParas.para != para) { \ + mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \ + terrno = err; \ + return err; \ + } static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) { CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH); @@ -487,7 +489,8 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH); if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) { - mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb); + mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, + pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb); terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS; return DND_REASON_STATUS_MONITOR_NOT_MATCH; } @@ -583,8 +586,8 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) { return stateChanged; } -extern char* tsMonFwUri; -extern char* tsMonSlowLogUri; +extern char *tsMonFwUri; +extern char *tsMonSlowLogUri; static int32_t mndProcessStatisReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStatisReq statisReq = {0}; @@ -596,9 +599,9 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { mInfo("process statis req,\n %s", statisReq.pCont); } - if (statisReq.type == MONITOR_TYPE_COUNTER){ + if (statisReq.type == MONITOR_TYPE_COUNTER) { monSendContent(statisReq.pCont, tsMonFwUri); - }else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){ + } else if (statisReq.type == MONITOR_TYPE_SLOW_LOG) { monSendContent(statisReq.pCont, tsMonSlowLogUri); } @@ -920,6 +923,40 @@ _OVER: return mndUpdClusterInfo(pReq); } +static int32_t mndProcessConfigReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SConfigReq configReq = {0}; + SDnodeObj *pDnode = NULL; + int32_t code = -1; + + tDeserializeSConfigReq(pReq->pCont, pReq->contLen, &configReq); + + SStatusRsp statusRsp = {0}; + statusRsp.statusSeq++; + statusRsp.dnodeCfg.dnodeId = pDnode->id; + statusRsp.dnodeCfg.clusterId = pMnode->clusterId; + statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); + if (statusRsp.pDnodeEps == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + mndGetDnodeEps(pMnode, statusRsp.pDnodeEps); + statusRsp.ipWhiteVer = pMnode->ipWhiteVer; + + int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp); + void *pHead = rpcMallocCont(contLen); + contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp); + taosArrayDestroy(statusRsp.pDnodeEps); + if (contLen < 0) { + code = contLen; + goto _OVER; + } +_OVER: + mndReleaseDnode(pMnode, pDnode); + return mndUpdClusterInfo(pReq); +} + static int32_t mndProcessNotifyReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SNotifyReq notifyReq = {0}; @@ -1058,27 +1095,27 @@ _OVER: TAOS_RETURN(code); } -static void getSlowLogScopeString(int32_t scope, char* result){ - if(scope == SLOW_LOG_TYPE_NULL) { +static void getSlowLogScopeString(int32_t scope, char *result) { + if (scope == SLOW_LOG_TYPE_NULL) { (void)strcat(result, "NONE"); return; } - while(scope > 0){ - if(scope & SLOW_LOG_TYPE_QUERY) { + while (scope > 0) { + if (scope & SLOW_LOG_TYPE_QUERY) { (void)strcat(result, "QUERY"); scope &= ~SLOW_LOG_TYPE_QUERY; - } else if(scope & SLOW_LOG_TYPE_INSERT) { + } else if (scope & SLOW_LOG_TYPE_INSERT) { (void)strcat(result, "INSERT"); scope &= ~SLOW_LOG_TYPE_INSERT; - } else if(scope & SLOW_LOG_TYPE_OTHERS) { + } else if (scope & SLOW_LOG_TYPE_OTHERS) { (void)strcat(result, "OTHERS"); scope &= ~SLOW_LOG_TYPE_OTHERS; - } else{ + } else { (void)printf("invalid slow log scope:%d", scope); return; } - if(scope > 0) { + if (scope > 0) { (void)strcat(result, "|"); } } @@ -1439,7 +1476,7 @@ _OVER: static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) { int32_t code = 0; - char *p = pMCfgReq->config; + char *p = pMCfgReq->config; while (*p) { if (*p == ' ') { break; @@ -1537,7 +1574,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); #endif } else { - TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out); + TAOS_CHECK_GOTO(mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out); if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) { mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId); code = TSDB_CODE_INVALID_CFG;