Send alter msg sync.

This commit is contained in:
xiao-77 2024-12-11 15:46:44 +08:00
parent 2abb383ec0
commit c50781d089
6 changed files with 59 additions and 31 deletions

View File

@ -54,6 +54,7 @@ typedef enum {
typedef enum { CFG_SCOPE_SERVER, CFG_SCOPE_CLIENT, CFG_SCOPE_BOTH } ECfgScopeType; typedef enum { CFG_SCOPE_SERVER, CFG_SCOPE_CLIENT, CFG_SCOPE_BOTH } ECfgScopeType;
typedef enum { CFG_CATEGORY_GLOBAL, CFG_CATEGORY_LOCAL } ECfgCategoryType; typedef enum { CFG_CATEGORY_GLOBAL, CFG_CATEGORY_LOCAL } ECfgCategoryType;
typedef enum { CFG_ALTER_LOCAL, CFG_ALTER_DNODE, CFG_ALTER_ALL_DNODES } CfgAlterType;
typedef enum { typedef enum {
CFG_DYN_NONE = 0, CFG_DYN_NONE = 0,
@ -123,7 +124,8 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName);
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock); int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock);
int32_t cfgGetAndSetItem(SConfig *pCfg, SConfigItem **ppItem, const char *name, const char *value, ECfgSrcType stype, int32_t cfgGetAndSetItem(SConfig *pCfg, SConfigItem **ppItem, const char *name, const char *value, ECfgSrcType stype,
bool lock); bool lock);
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer, bool isUpdateAll); int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer,
CfgAlterType alterType);
int32_t cfgCreateIter(SConfig *pConf, SConfigIter **ppIter); int32_t cfgCreateIter(SConfig *pConf, SConfigIter **ppIter);
SConfigItem *cfgNextIter(SConfigIter *pIter); SConfigItem *cfgNextIter(SConfigIter *pIter);

View File

@ -22,13 +22,9 @@
#include "mndUser.h" #include "mndUser.h"
#include "tutil.h" #include "tutil.h"
#define CFG_VER_NUMBER 1 #define CFG_VER_NUMBER 1
#define CFG_RESERVE_SIZE 63 #define CFG_RESERVE_SIZE 63
#define CFG_ALTER_TIMEOUT 3 * 1000
enum CfgAlterType {
CFG_ALTER_DNODE,
CFG_ALTER_ALL_DNODES,
};
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue); static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
static int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj); static int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj);
@ -458,29 +454,59 @@ static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq
int32_t code = -1; int32_t code = -1;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
int64_t curMs = taosGetTimestampMs();
while (1) { while (1) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) { if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) {
bool online = mndIsDnodeOnline(pDnode, curMs);
if (!online) {
mWarn("dnode:%d, is offline, skip to send config req", pDnode->id);
continue;
}
SEpSet epSet = mndGetDnodeEpset(pDnode); SEpSet epSet = mndGetDnodeEpset(pDnode);
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq); int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
void *pBuf = rpcMallocCont(bufLen); void *pBuf = rpcMallocCont(bufLen);
if (pBuf != NULL) { if (pBuf == NULL) {
if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) { sdbCancelFetch(pMnode->pSdb, pIter);
sdbCancelFetch(pMnode->pSdb, pIter); sdbRelease(pMnode->pSdb, pDnode);
sdbRelease(pMnode->pSdb, pDnode); code = TSDB_CODE_OUT_OF_MEMORY;
code = bufLen; return code;
return code; }
}
mInfo("dnode:%d, send config req to dnode, config:%s value:%s", dnodeId, pDcfgReq->config, pDcfgReq->value); if ((bufLen = tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq)) <= 0) {
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen}; sdbCancelFetch(pMnode->pSdb, pIter);
code = tmsgSendReq(&epSet, &rpcMsg); sdbRelease(pMnode->pSdb, pDnode);
code = bufLen;
rpcFreeCont(pBuf);
return code;
}
mInfo("dnode:%d, send config req to dnode, config:%s value:%s", pDnode->id, pDcfgReq->config, pDcfgReq->value);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
SRpcMsg rpcRsp = {0};
code = rpcSendRecvWithTimeout(pMnode->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, NULL, CFG_ALTER_TIMEOUT);
if (code != 0) {
mError("failed to send config req to dnode:%d, since %s", pDnode->id, tstrerror(code));
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDnode);
return code;
}
code = rpcRsp.code;
if (code != 0) {
mError("failed to alter config %s,on dnode:%d, since %s", pDcfgReq->config, pDnode->id, tstrerror(code));
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDnode);
return code;
} }
} }
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
} }
@ -541,8 +567,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
updateIpWhiteList = 1; updateIpWhiteList = 1;
} }
bool isUpdateAll = (cfgReq.dnodeId == 0 || cfgReq.dnodeId == -1) ? true : false; CfgAlterType alterType = (cfgReq.dnodeId == 0 || cfgReq.dnodeId == -1) ? CFG_ALTER_ALL_DNODES : CFG_ALTER_DNODE;
TAOS_CHECK_GOTO(cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true, isUpdateAll), &lino, TAOS_CHECK_GOTO(cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true, alterType), &lino,
_err_out); _err_out);
} }
SConfigItem *pItem = cfgGetItem(taosGetCfg(), dcfgReq.config); SConfigItem *pItem = cfgGetItem(taosGetCfg(), dcfgReq.config);
@ -658,6 +684,7 @@ static int32_t initConfigArrayFromSdb(SMnode *pMnode, SArray *array) {
goto _exit; goto _exit;
} }
if (strcasecmp(obj->name, "tsmmConfigVersion") == 0) { if (strcasecmp(obj->name, "tsmmConfigVersion") == 0) {
sdbRelease(pSdb, obj);
continue; continue;
} }
SConfigItem item = {0}; SConfigItem item = {0};

View File

@ -726,7 +726,8 @@ static void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STab
} }
} }
static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* pDbCfg, char* tbName, STableCfg* pCfg, void* charsetCxt) { static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* pDbCfg, char* tbName, STableCfg* pCfg,
void* charsetCxt) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
QRY_ERR_RET(blockDataEnsureCapacity(pBlock, 1)); QRY_ERR_RET(blockDataEnsureCapacity(pBlock, 1));
pBlock->info.rows = 1; pBlock->info.rows = 1;
@ -905,7 +906,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
goto _return; goto _return;
} }
if (cfgCheckRangeForDynUpdate(tsCfg, pStmt->config, pStmt->value, false, false)) { if (cfgCheckRangeForDynUpdate(tsCfg, pStmt->config, pStmt->value, false, CFG_ALTER_LOCAL)) {
return terrno; return terrno;
} }
@ -1064,7 +1065,8 @@ static int32_t execShowCreateView(SShowCreateViewStmt* pStmt, SRetrieveTableRsp*
return code; return code;
} }
int32_t qExecCommand(int64_t* pConnId, bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp, int8_t biMode, void* charsetCxt) { int32_t qExecCommand(int64_t* pConnId, bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp, int8_t biMode,
void* charsetCxt) {
switch (nodeType(pStmt)) { switch (nodeType(pStmt)) {
case QUERY_NODE_DESCRIBE_STMT: case QUERY_NODE_DESCRIBE_STMT:
return execDescribe(sysInfoUser, pStmt, pRsp, biMode); return execDescribe(sysInfoUser, pStmt, pRsp, biMode);

View File

@ -48,6 +48,7 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
} }
return code; return code;
} }
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg) { int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg) {
int32_t code = (*defaultMsgCb.sendSyncReqFp)(epSet, pMsg); int32_t code = (*defaultMsgCb.sendSyncReqFp)(epSet, pMsg);
if (code != 0) { if (code != 0) {

View File

@ -585,7 +585,8 @@ int32_t checkItemDyn(SConfigItem *pItem, bool isServer) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer, bool isUpdateAll) { int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer,
CfgAlterType alterType) {
cfgLock(pCfg); cfgLock(pCfg);
SConfigItem *pItem = cfgGetItem(pCfg, name); SConfigItem *pItem = cfgGetItem(pCfg, name);
@ -598,7 +599,7 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
cfgUnLock(pCfg); cfgUnLock(pCfg);
TAOS_RETURN(code); TAOS_RETURN(code);
} }
if ((!isUpdateAll) && (pItem->category == CFG_CATEGORY_GLOBAL)) { if ((pItem->category == CFG_CATEGORY_GLOBAL) && alterType == CFG_ALTER_DNODE) {
uError("failed to config:%s, not support update global config on only one dnode", name); uError("failed to config:%s, not support update global config on only one dnode", name);
cfgUnLock(pCfg); cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_INVALID_CFG); TAOS_RETURN(TSDB_CODE_INVALID_CFG);

View File

@ -315,11 +315,6 @@ class TDTestCase:
"value": 500, "value": 500,
"category": "global" "category": "global"
}, },
{
"name": "syncElectInterval",
"value": 50000,
"category": "global"
},
{ {
"name": "syncHeartbeatInterval", "name": "syncHeartbeatInterval",
"value": 3000, "value": 3000,