Merge pull request #21358 from taosdata/feat/TD-23362-3.0
feat: support update active code by SQL
This commit is contained in:
commit
6e5ec3d097
|
@ -267,8 +267,8 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_DNODE_CONFIG_LEN 128
|
#define TSDB_DNODE_CONFIG_LEN 128
|
||||||
#define TSDB_DNODE_VALUE_LEN 256
|
#define TSDB_DNODE_VALUE_LEN 256
|
||||||
|
|
||||||
#define TSDB_ACTIVE_KEY_LEN 109 // history 109:?
|
#define TSDB_ACTIVE_KEY_LEN 109
|
||||||
#define TSDB_CONN_ACTIVE_KEY_LEN 257 // history 257:?
|
#define TSDB_CONN_ACTIVE_KEY_LEN 255
|
||||||
|
|
||||||
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE
|
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,11 @@ static const char *offlineReason[] = {
|
||||||
"unknown",
|
"unknown",
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
DND_ACTIVE_CODE,
|
||||||
|
DND_CONN_ACTIVE_CODE,
|
||||||
|
};
|
||||||
|
|
||||||
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
|
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
|
||||||
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
|
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
|
||||||
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
|
||||||
|
@ -227,6 +232,14 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
|
||||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
|
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
|
||||||
mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||||
pOld->updateTime = pNew->updateTime;
|
pOld->updateTime = pNew->updateTime;
|
||||||
|
#ifdef TD_ENTERPRISE
|
||||||
|
if (strncmp(pOld->active, pNew->active, TSDB_ACTIVE_KEY_LEN) != 0) {
|
||||||
|
strncpy(pOld->active, pNew->active, TSDB_ACTIVE_KEY_LEN);
|
||||||
|
}
|
||||||
|
if (strncmp(pOld->connActive, pNew->connActive, TSDB_CONN_ACTIVE_KEY_LEN) != 0) {
|
||||||
|
strncpy(pOld->connActive, pNew->connActive, TSDB_CONN_ACTIVE_KEY_LEN);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -635,6 +648,69 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfgReq, int8_t action) {
|
||||||
|
SSdbRaw *pRaw = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
SDnodeObj *pDnode = NULL;
|
||||||
|
bool cfgAll = pCfgReq->dnodeId == -1;
|
||||||
|
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
if (cfgAll) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
} else if(!(pDnode = mndAcquireDnode(pMnode, pCfgReq->dnodeId))) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pTrans) {
|
||||||
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "config-dnode");
|
||||||
|
if (!pTrans) goto _OVER;
|
||||||
|
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDnodeObj tmpDnode = *pDnode;
|
||||||
|
if (action == DND_ACTIVE_CODE) {
|
||||||
|
strncpy(tmpDnode.active, pCfgReq->value, TSDB_ACTIVE_KEY_LEN);
|
||||||
|
} else if (action == DND_CONN_ACTIVE_CODE) {
|
||||||
|
strncpy(tmpDnode.connActive, pCfgReq->value, TSDB_CONN_ACTIVE_KEY_LEN);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRaw = mndDnodeActionEncode(&tmpDnode);
|
||||||
|
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||||
|
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
pRaw = NULL;
|
||||||
|
|
||||||
|
mInfo("dnode:%d, config dnode, cfg:%d, app:%p config:%s value:%s", pDnode->id, pCfgReq->dnodeId, pReq->info.ahandle,
|
||||||
|
pCfgReq->config, pCfgReq->value);
|
||||||
|
|
||||||
|
if (cfgAll) {
|
||||||
|
sdbRelease(pSdb, pDnode);
|
||||||
|
pDnode = NULL;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTrans && mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (cfgAll) {
|
||||||
|
sdbRelease(pSdb, pDnode);
|
||||||
|
} else {
|
||||||
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
}
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
sdbFreeRaw(pRaw);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -977,6 +1053,34 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
strcpy(dcfgReq.config, "monitor");
|
strcpy(dcfgReq.config, "monitor");
|
||||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
|
#ifdef TD_ENTERPRISE
|
||||||
|
} else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) {
|
||||||
|
int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE;
|
||||||
|
int8_t index = opt == DND_ACTIVE_CODE ? 10 : 11;
|
||||||
|
if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) {
|
||||||
|
mError("dnode:%d, failed to config activeCode since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int32_t vlen = strlen(cfgReq.value);
|
||||||
|
if (vlen > 0 && ((opt == DND_ACTIVE_CODE && vlen != (TSDB_ACTIVE_KEY_LEN - 1)) ||
|
||||||
|
(opt == DND_CONN_ACTIVE_CODE &&
|
||||||
|
(vlen > (TSDB_CONN_ACTIVE_KEY_LEN - 1) || vlen < (TSDB_ACTIVE_KEY_LEN - 1))))) {
|
||||||
|
mError("dnode:%d, failed to config activeCode since invalid vlen:%d. conf:%s, val:%s", cfgReq.dnodeId, vlen,
|
||||||
|
cfgReq.config, cfgReq.value);
|
||||||
|
terrno = TSDB_CODE_INVALID_OPTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? "activeCode" : "cActiveCode");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%s", cfgReq.value);
|
||||||
|
|
||||||
|
if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) {
|
||||||
|
mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
} else {
|
} else {
|
||||||
bool findOpt = false;
|
bool findOpt = false;
|
||||||
for (int32_t d = 0; d < optionSize; ++d) {
|
for (int32_t d = 0; d < optionSize; ++d) {
|
||||||
|
@ -1030,7 +1134,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
|
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
|
||||||
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
|
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
|
||||||
dcfgReq.config, dcfgReq.value);
|
dcfgReq.config, dcfgReq.value);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
|
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
|
||||||
tmsgSendReq(&epSet, &rpcMsg);
|
tmsgSendReq(&epSet, &rpcMsg);
|
||||||
code = 0;
|
code = 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -305,6 +305,7 @@ int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen) {
|
||||||
dst[j] = src[k];
|
dst[j] = src[k];
|
||||||
j++;
|
j++;
|
||||||
}
|
}
|
||||||
|
if (j >= dlen) j = dlen - 1;
|
||||||
dst[j] = '\0';
|
dst[j] = '\0';
|
||||||
return j;
|
return j;
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,6 +163,52 @@ class TDTestCase:
|
||||||
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
|
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
|
||||||
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
||||||
|
|
||||||
|
def ins_dnodes_check(self):
|
||||||
|
tdSql.execute('drop database if exists db2')
|
||||||
|
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
|
||||||
|
tdSql.query(f'select * from information_schema.ins_dnodes')
|
||||||
|
result = tdSql.queryResult
|
||||||
|
tdSql.checkEqual(result[0][0],1)
|
||||||
|
tdSql.checkEqual(result[0][8],"")
|
||||||
|
tdSql.checkEqual(result[0][9],"")
|
||||||
|
self.str107 = 'Hc7VCc+'
|
||||||
|
for t in range (10):
|
||||||
|
self.str107 += 'tP+2soIXpP'
|
||||||
|
self.str108 = self.str107 + '='
|
||||||
|
self.str109 = self.str108 + '+'
|
||||||
|
self.str254 = self.str108 + self.str108 + '01234567890123456789012345678901234567'
|
||||||
|
self.str255 = self.str254 + '='
|
||||||
|
self.str256 = self.str254 + '=('
|
||||||
|
self.str257 = self.str254 + '=()'
|
||||||
|
self.str510 = self.str255 + self.str255
|
||||||
|
tdSql.error('alter dnode 1 "activeCode" "a"')
|
||||||
|
tdSql.error('alter dnode 1 "activeCode" "' + self.str107 + '"')
|
||||||
|
tdSql.execute('alter all dnodes "activeCode" "' + self.str108 + '"')
|
||||||
|
tdSql.error('alter dnode 1 "activeCode" "' + self.str109 + '"')
|
||||||
|
tdSql.error('alter all dnodes "activeCode" "' + self.str510 + '"')
|
||||||
|
tdSql.query(f'select * from information_schema.ins_dnodes')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][8],self.str108)
|
||||||
|
tdSql.execute('alter dnode 1 "activeCode" ""')
|
||||||
|
tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][0],"")
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][1],'')
|
||||||
|
tdSql.error('alter dnode 1 "cActiveCode" "a"')
|
||||||
|
tdSql.error('alter dnode 1 "cActiveCode" "' + self.str107 + '"')
|
||||||
|
tdSql.error('alter dnode 1 "cActiveCode" "' + self.str256 + '"')
|
||||||
|
tdSql.error('alter all dnodes "cActiveCode" "' + self.str255 + '"')
|
||||||
|
tdSql.error('alter all dnodes "cActiveCode" "' + self.str256 + '"')
|
||||||
|
tdSql.error('alter all dnodes "cActiveCode" "' + self.str257 + '"')
|
||||||
|
tdSql.execute('alter all dnodes "cActiveCode" "' + self.str254 + '"')
|
||||||
|
tdSql.error('alter dnode 1 "cActiveCode" "' + self.str510 + '"')
|
||||||
|
tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][0],"")
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][1],self.str254)
|
||||||
|
tdSql.execute('alter dnode 1 "cActiveCode" "' + self.str109 + '"')
|
||||||
|
tdSql.query(f'show dnodes')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][9],self.str109)
|
||||||
|
tdSql.execute('alter all dnodes "cActiveCode" ""')
|
||||||
|
tdSql.query(f'select c_active_code from information_schema.ins_dnodes')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][0],'')
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepare_data()
|
self.prepare_data()
|
||||||
|
@ -170,6 +216,7 @@ class TDTestCase:
|
||||||
self.ins_columns_check()
|
self.ins_columns_check()
|
||||||
# self.ins_col_check_4096()
|
# self.ins_col_check_4096()
|
||||||
self.ins_stable_check()
|
self.ins_stable_check()
|
||||||
|
self.ins_dnodes_check()
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
Loading…
Reference in New Issue