feat: support alter cluster

This commit is contained in:
kailixu 2023-12-18 16:34:31 +08:00
parent 9fa90fc953
commit 70006bb711
13 changed files with 3453 additions and 4580 deletions

View File

@ -2023,6 +2023,17 @@ int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
void tFreeSExplainRsp(SExplainRsp* pRsp);
typedef struct {
char config[TSDB_DNODE_CONFIG_LEN];
char value[TSDB_DNODE_VALUE_LEN];
int32_t sqlLen;
char* sql;
} SMCfgClusterReq;
int32_t tSerializeSMCfgClusterReq(void* buf, int32_t bufLen, SMCfgClusterReq* pReq);
int32_t tDeserializeSMCfgClusterReq(void* buf, int32_t bufLen, SMCfgClusterReq* pReq);
void tFreeSMCfgClusterReq(SMCfgClusterReq* pReq);
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
int32_t port;

View File

@ -219,6 +219,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_CLUSTER, "config-cluster", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)

View File

@ -2264,6 +2264,37 @@ int32_t tDeserializeSGetUserWhiteListRsp(void *buf, int32_t bufLen, SGetUserWhit
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp *pRsp) { taosMemoryFree(pRsp->pWhiteLists); }
int32_t tSerializeSMCfgClusterReq(void *buf, int32_t bufLen, SMCfgClusterReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->config) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->value) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMCfgClusterReq(void *buf, int32_t bufLen, SMCfgClusterReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->config) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->value) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMCfgClusterReq(SMCfgClusterReq *pReq) { FREESQL(); }
int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -194,6 +194,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_VIEW_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_COMPACT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_CLUSTER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;

View File

@ -97,12 +97,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
int32_t dmInitVars(SDnode *pDnode);
void dmClearVars(SDnode *pDnode);
#ifdef TD_MODULE_OPTIMIZE
int32_t dmInitModule(SDnode *pDnode, SMgmtWrapper *wrappers);
bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper);
#else
int32_t dmInitModule(SDnode *pDnode);
#endif
int32_t dmInitModule(SDnode *pDnode);
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
@ -123,11 +118,7 @@ int32_t dmInitStatusClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode);
void dmCleanupStatusClient(SDnode *pDnode);
SMsgCb dmGetMsgcb(SDnode *pDnode);
#ifdef TD_MODULE_OPTIMIZE
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers);
#else
int32_t dmInitMsgHandle(SDnode *pDnode);
#endif
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c

View File

@ -24,7 +24,6 @@
#include "tglobal.h"
#endif
#ifndef TD_MODULE_OPTIMIZE
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@ -38,7 +37,6 @@ static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
return required;
}
#endif
int32_t dmInitDnode(SDnode *pDnode) {
dDebug("start to create dnode");
@ -81,15 +79,9 @@ int32_t dmInitDnode(SDnode *pDnode) {
if (pDnode->lockfile == NULL) {
goto _OVER;
}
#ifdef TD_MODULE_OPTIMIZE
if (dmInitModule(pDnode, pDnode->wrappers) != 0) {
goto _OVER;
}
#else
if (dmInitModule(pDnode) != 0) {
goto _OVER;
}
#endif
indexInit(tsNumOfCommitThreads);
streamMetaInit();

View File

@ -251,33 +251,6 @@ _OVER:
dmReleaseWrapper(pWrapper);
}
#ifdef TD_MODULE_OPTIMIZE
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers) {
SDnodeTrans *pTrans = &pDnode->trans;
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = wrappers + ntype;
SArray *pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId;
}
if (!pMgmt->needCheckVgId) {
pHandle->defaultNtype = ntype;
}
pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
}
taosArrayDestroy(pArray);
}
return 0;
}
#else
int32_t dmInitMsgHandle(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
@ -303,7 +276,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0;
}
#endif
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();

View File

@ -76,6 +76,7 @@ typedef enum {
MND_OPER_DROP_TOPIC,
MND_OPER_CREATE_VIEW,
MND_OPER_DROP_VIEW,
MND_OPER_CONFIG_CLUSTER,
} EOperType;
typedef enum {

View File

@ -14,7 +14,9 @@
*/
#define _DEFAULT_SOURCE
#include "audit.h"
#include "mndCluster.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndTrans.h"
@ -31,6 +33,8 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode);
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
static int32_t mndProcessUptimeTimer(SRpcMsg *pReq);
static int32_t mndProcessConfigClusterReq(SRpcMsg *pReq);
static int32_t mndProcessConfigClusterRsp(SRpcMsg *pReq);
int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {
@ -45,6 +49,8 @@ int32_t mndInitCluster(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_MND_UPTIME_TIMER, mndProcessUptimeTimer);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_CLUSTER, mndProcessConfigClusterReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_CLUSTER_RSP, mndProcessConfigClusterRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
@ -401,3 +407,54 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
mndTransDrop(pTrans);
return 0;
}
static int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMCfgClusterReq cfgReq = {0};
if (tDeserializeSMCfgClusterReq(pReq->pCont, pReq->contLen, &cfgReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t code = 0;
mInfo("cluster: start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_CLUSTER) != 0) {
code = terrno != 0 ? terrno : TSDB_CODE_MND_NO_RIGHTS;
goto _exit;
}
if (strncmp(cfgReq.config, GRANT_ACTIVE_CODE, 11) == 0) {
#ifdef TD_ENTERPRISE
if (strlen(cfgReq.config) >= TSDB_DNODE_CONFIG_LEN) {
code = TSDB_CODE_INVALID_CFG;
goto _exit;
}
if (strlen(cfgReq.value) >= TSDB_DNODE_VALUE_LEN) {
code = TSDB_CODE_INVALID_CFG_VALUE;
goto _exit;
}
// code = xxx;
#else
code = TSDB_CODE_OPS_NOT_SUPPORT;
goto _exit;
#endif
}
{ // audit
auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, cfgReq.sqlLen);
}
_exit:
tFreeSMCfgClusterReq(&cfgReq);
if (code != 0) {
terrno = code;
mError("cluster: failed to config:%s, %s since %s", cfgReq.config, cfgReq.value, terrstr());
} else {
mError("cluster: success to config:%s, %s", cfgReq.config, cfgReq.value);
}
return -1;
}
static int32_t mndProcessConfigClusterRsp(SRpcMsg *pRsp) {
mInfo("config rsp from cluster");
return 0;
}

View File

@ -193,6 +193,8 @@ const char* nodesNodeName(ENodeType type) {
return "GrantStmt";
case QUERY_NODE_REVOKE_STMT:
return "RevokeStmt";
case QUERY_NODE_ALTER_CLUSTER_STMT:
return "AlterClusterStmt";
case QUERY_NODE_SHOW_DNODES_STMT:
return "ShowDnodesStmt";
case QUERY_NODE_SHOW_MNODES_STMT:
@ -6122,6 +6124,31 @@ static int32_t jsonToDropConsumerGroupStmt(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkAlterClusterStmtConfig = "Config";
static const char* jkAlterClusterStmtValue = "Value";
static int32_t alterClusterStmtToJson(const void* pObj, SJson* pJson) {
const SAlterClusterStmt* pNode = (const SAlterClusterStmt*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkAlterClusterStmtConfig, pNode->config);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAlterClusterStmtValue, pNode->value);
}
return code;
}
static int32_t jsonToAlterClusterStmt(const SJson* pJson, void* pObj) {
SAlterClusterStmt* pNode = (SAlterClusterStmt*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkAlterClusterStmtConfig, pNode->config);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAlterClusterStmtValue, pNode->value);
}
return code;
}
static const char* jkAlterLocalStmtConfig = "Config";
static const char* jkAlterLocalStmtValue = "Value";
@ -6974,6 +7001,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return grantStmtToJson(pObj, pJson);
case QUERY_NODE_REVOKE_STMT:
return revokeStmtToJson(pObj, pJson);
case QUERY_NODE_ALTER_CLUSTER_STMT:
return alterClusterStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_DNODES_STMT:
return showDnodesStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_MNODES_STMT:
@ -7297,6 +7326,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToGrantStmt(pJson, pObj);
case QUERY_NODE_REVOKE_STMT:
return jsonToRevokeStmt(pJson, pObj);
case QUERY_NODE_ALTER_CLUSTER_STMT:
return jsonToAlterClusterStmt(pJson, pObj);
case QUERY_NODE_SHOW_DNODES_STMT:
return jsonToShowDnodesStmt(pJson, pObj);
case QUERY_NODE_SHOW_MNODES_STMT:

View File

@ -174,7 +174,7 @@ unsafe_opt(A) ::= UNSAFE.
/************************************************ alter cluster *********************************************************/
cmd ::= ALTER CLUSTER NK_STRING(A). { pCxt->pRootNode = createAlterClusterStmt(pCxt, &A, NULL); }
cmd ::= ALTER CLUSTER NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterClsuterStmt(pCxt, &A, &B); }
cmd ::= ALTER CLUSTER NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterClusterStmt(pCxt, &A, &B); }
/************************************************ alter local *********************************************************/
cmd ::= ALTER LOCAL NK_STRING(A). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, NULL); }

View File

@ -5309,6 +5309,11 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq)
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropStreamReq, pReq);
break;
}
case TDMT_MND_CONFIG_CLUSTER: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCfgClusterReq, pReq);
break;
}
default: {
break;
}
@ -6533,6 +6538,16 @@ static int32_t translateRestoreDnode(STranslateContext* pCxt, SRestoreComponentN
return code;
}
static int32_t translateAlterCluster(STranslateContext* pCxt, SAlterClusterStmt* pStmt) {
SMCfgClusterReq cfgReq = {0};
strcpy(cfgReq.config, pStmt->config);
strcpy(cfgReq.value, pStmt->value);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_CLUSTER, (FSerializeFunc)tSerializeSMCfgClusterReq, &cfgReq);
tFreeSMCfgClusterReq(&cfgReq);
return code;
}
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
int32_t* pVgId) {
SVgroupInfo vg = {0};
@ -8526,6 +8541,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_COMPACT_DATABASE_STMT:
code = translateCompact(pCxt, (SCompactDatabaseStmt*)pNode);
break;
case QUERY_NODE_ALTER_CLUSTER_STMT:
code = translateAlterCluster(pCxt, (SAlterClusterStmt*)pNode);
break;
case QUERY_NODE_KILL_CONNECTION_STMT:
code = translateKillConnection(pCxt, (SKillStmt*)pNode);
break;

File diff suppressed because it is too large Load Diff