Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TS-5445-3.0

This commit is contained in:
Hongze Cheng 2025-02-27 17:18:33 +08:00
commit 0d2ca243b1
19 changed files with 311 additions and 9 deletions

View File

@ -37,6 +37,7 @@ The list of keywords is as follows:
| ASOF | |
| AT_ONCE | |
| ATTACH | |
| ASSIGN | Version 3.3.6.0 and later |
### B

View File

@ -69,9 +69,13 @@ alter database <dbname> replica 2|1
| ------- | ------ |
| 没有 Vnode 发生故障: Arbitrator 故障Mnode 宕机节点超过一个,导致 Mnode 无法选主)| **持续提供服务** |
| 仅一个 Vnode 故障VGroup 已经达成同步后,某一个 Vnode 才发生故障的 | **持续提供服务** |
| 仅一个 Vnode 故障2个 Vnode 同时故障,故障前 VGroup 达成同步,但是只有一个 Vnode 从故障中恢复服务,另一个 Vnode 服务故障 | **通过下面的命令强制指定leader, 继续提供服务** |
| 仅一个 Vnode 故障:离线 Vnode 启动后VGroup 未达成同步前,另一个 Vnode 服务故障的 | **无法提供服务** |
| 两个 Vnode 都发生故障 | **无法提供服务** |
```sql
ASSIGN LEADER FORCE;
```
## 常见问题

View File

@ -38,6 +38,7 @@ description: TDengine 保留关键字的详细列表
| AT_ONCE | |
| ATTACH | |
| AUTO | 3.3.5.0 及后续版本 |
| ASSIGN | 3.3.6.0 及后续版本 |
### B
|关键字|说明|

View File

@ -359,6 +359,7 @@ typedef enum ENodeType {
QUERY_NODE_CREATE_ANODE_STMT,
QUERY_NODE_DROP_ANODE_STMT,
QUERY_NODE_UPDATE_ANODE_STMT,
QUERY_NODE_ASSIGN_LEADER_STMT,
// show statement nodes
// see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET'
@ -2585,6 +2586,7 @@ typedef struct {
char* arbToken;
int64_t arbTerm;
char* memberToken;
int8_t force;
} SVArbSetAssignedLeaderReq;
int32_t tSerializeSVArbSetAssignedLeaderReq(void* buf, int32_t bufLen, SVArbSetAssignedLeaderReq* pReq);
@ -2663,6 +2665,15 @@ int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq
int32_t tDeserializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq);
void tFreeSBalanceVgroupReq(SBalanceVgroupReq* pReq);
typedef struct {
int32_t useless; // useless
int32_t sqlLen;
char* sql;
} SAssignLeaderReq;
int32_t tSerializeSAssignLeaderReq(void* buf, int32_t bufLen, SAssignLeaderReq* pReq);
int32_t tDeserializeSAssignLeaderReq(void* buf, int32_t bufLen, SAssignLeaderReq* pReq);
void tFreeSAssignLeaderReq(SAssignLeaderReq* pReq);
typedef struct {
int32_t vgId1;
int32_t vgId2;

View File

@ -421,6 +421,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_ASSIGN_LEADER, "mnd-arb-assign-leader", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_MND_ARB_MSG)
TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark

View File

@ -685,6 +685,10 @@ typedef struct SBalanceVgroupStmt {
ENodeType type;
} SBalanceVgroupStmt;
typedef struct SAssignLeaderStmt {
ENodeType type;
} SAssignLeaderStmt;
typedef struct SBalanceVgroupLeaderStmt {
ENodeType type;
int32_t vgId;

View File

@ -7673,6 +7673,46 @@ _exit:
void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) { FREESQL(); }
int32_t tSerializeSAssignLeaderReq(void *buf, int32_t bufLen, SAssignLeaderReq *pReq) {
SEncoder encoder = {0};
int32_t code = 0;
int32_t lino;
int32_t tlen;
tEncoderInit(&encoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartEncode(&encoder));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->useless));
ENCODESQL();
tEndEncode(&encoder);
_exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSAssignLeaderReq(void *buf, int32_t bufLen, SAssignLeaderReq *pReq) {
SDecoder decoder = {0};
int32_t code = 0;
int32_t lino;
tDecoderInit(&decoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->useless));
DECODESQL();
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
return code;
}
void tFreeSAssignLeaderReq(SAssignLeaderReq *pReq) { FREESQL(); }
int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) {
SEncoder encoder = {0};
int32_t code = 0;
@ -8199,6 +8239,7 @@ int32_t tSerializeSVArbSetAssignedLeaderReq(void *buf, int32_t bufLen, SVArbSetA
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->arbToken));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->arbTerm));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->memberToken));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->force));
tEndEncode(&encoder);
@ -8228,6 +8269,9 @@ int32_t tDeserializeSVArbSetAssignedLeaderReq(void *buf, int32_t bufLen, SVArbSe
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->memberToken));
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->force));
}
tEndDecode(&decoder);

View File

@ -160,6 +160,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ARB_ASSIGN_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -45,6 +45,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
if (token1 == NULL || token2 == NULL) return -1;
@ -70,6 +71,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
@ -535,11 +537,12 @@ static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int6
}
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
char *memberToken) {
char *memberToken, bool force) {
SVArbSetAssignedLeaderReq req = {0};
req.arbToken = arbToken;
req.arbTerm = arbTerm;
req.memberToken = memberToken;
if (force) req.force = 1;
int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
int32_t contLen = reqLen + sizeof(SMsgHead);
@ -559,10 +562,10 @@ static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, ch
}
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
int64_t term, char *memberToken) {
int64_t term, char *memberToken, bool force) {
int32_t code = 0;
int32_t contLen = 0;
void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken);
void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
if (!pHead) {
mError("vgId:%d, failed to build set-assigned request", vgId);
code = -1;
@ -649,6 +652,73 @@ void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SAr
*pOp = CHECK_SYNC_UPDATE;
}
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1, lino = 0;
SArray *pArray = NULL;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SArbGroup *pArbGroup = NULL;
SAssignLeaderReq req = {0};
if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
mInfo("begin to process assign leader");
char arbToken[TSDB_ARB_TOKEN_SIZE];
TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
int64_t term = mndGetTerm(pMnode);
if (term < 0) {
mError("arb failed to get term since %s", terrstr());
code = -1;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
while (1) {
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
if (pIter == NULL) break;
SArbGroup arbGroupDup = {0};
(void)taosThreadMutexLock(&pArbGroup->mutex);
mndArbGroupDupObj(pArbGroup, &arbGroupDup);
(void)taosThreadMutexUnlock(&pArbGroup->mutex);
sdbRelease(pSdb, pArbGroup);
int32_t dnodeId = 0;
for (int32_t i = 0; i < 2; i++) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
mndReleaseDnode(pMnode, pDnode);
if (isonline) {
dnodeId = arbGroupDup.members[i].info.dnodeId;
break;
}
}
(void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
}
code = 0;
// auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen);
_exit:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to assign leader since %s", tstrerror(code));
}
tFreeSAssignLeaderReq(&req);
TAOS_RETURN(code);
}
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
int32_t code = 0, lino = 0;
SMnode *pMnode = pReq->info.node;
@ -701,7 +771,7 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
break;
case CHECK_SYNC_SET_ASSIGNED_LEADER:
(void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token);
(void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
break;
case CHECK_SYNC_CHECK_SYNC:
@ -1361,7 +1431,7 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
char strCheckSyncCode[100] = {0};
char bufUpdateTime[40] = {0};
(void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
(void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);

View File

@ -199,6 +199,8 @@ const char* nodesNodeName(ENodeType type) {
return "ResetStreamStmt";
case QUERY_NODE_BALANCE_VGROUP_STMT:
return "BalanceVgroupStmt";
case QUERY_NODE_ASSIGN_LEADER_STMT:
return "AssignLeaderStmt";
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return "BalanceVgroupLeaderStmt";
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
@ -8195,6 +8197,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return dropStreamStmtToJson(pObj, pJson);
case QUERY_NODE_BALANCE_VGROUP_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize.
case QUERY_NODE_ASSIGN_LEADER_STMT:
return TSDB_CODE_SUCCESS;
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize.
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
@ -8564,6 +8568,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToDropStreamStmt(pJson, pObj);
case QUERY_NODE_BALANCE_VGROUP_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize.
case QUERY_NODE_ASSIGN_LEADER_STMT:
return TSDB_CODE_SUCCESS;
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return TSDB_CODE_SUCCESS;
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:

View File

@ -625,6 +625,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_BALANCE_VGROUP_STMT:
code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode);
break;
case QUERY_NODE_ASSIGN_LEADER_STMT:
code = makeNode(type, sizeof(SAssignLeaderStmt), &pNode);
break;
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode);
break;
@ -1506,6 +1509,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field
case QUERY_NODE_RESET_STREAM_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
case QUERY_NODE_ASSIGN_LEADER_STMT:
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field
case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field

View File

@ -309,6 +309,7 @@ SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, STok
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);
SNode* createAssignLeaderStmt(SAstCreateContext* pCxt);
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId);
SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName);
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);

View File

@ -871,6 +871,9 @@ cmd ::= KILL COMPACT NK_INTEGER(A).
/************************************************ merge/redistribute/ vgroup ******************************************/
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
cmd ::= ASSIGN LEADER FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); }
cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); }
cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); }
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }

View File

@ -3929,6 +3929,16 @@ _err:
return NULL;
}
SNode* createAssignLeaderStmt(SAstCreateContext* pCxt) {
CHECK_PARSER_STATUS(pCxt);
SAssignLeaderStmt* pStmt = NULL;
pCxt->errCode = nodesMakeNode(QUERY_NODE_ASSIGN_LEADER_STMT, (SNode**)&pStmt);
CHECK_MAKE_NODE(pStmt);
return (SNode*)pStmt;
_err:
return NULL;
}
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId) {
CHECK_PARSER_STATUS(pCxt);
SBalanceVgroupLeaderStmt* pStmt = NULL;

View File

@ -360,6 +360,7 @@ static SKeyword keywordTable[] = {
{"ON_FAILURE", TK_ON_FAILURE},
{"NOTIFY_HISTORY", TK_NOTIFY_HISTORY},
{"REGEXP", TK_REGEXP},
{"ASSIGN", TK_ASSIGN},
{"TRUE_FOR", TK_TRUE_FOR},
{"META_ONLY", TK_META_ONLY}
};

View File

@ -8934,6 +8934,10 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq)
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SBalanceVgroupReq, pReq);
break;
}
case TDMT_MND_ARB_ASSIGN_LEADER: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SAssignLeaderReq, pReq);
break;
}
case TDMT_MND_REDISTRIBUTE_VGROUP: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SRedistributeVgroupReq, pReq);
break;
@ -13186,6 +13190,13 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm
return code;
}
static int32_t translateAssignLeader(STranslateContext* pCxt, SAssignLeaderStmt* pStmt) {
SAssignLeaderReq req = {0};
int32_t code = buildCmdMsg(pCxt, TDMT_MND_ARB_ASSIGN_LEADER, (FSerializeFunc)tSerializeSAssignLeaderReq, &req);
tFreeSAssignLeaderReq(&req);
return code;
}
static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) {
SBalanceVgroupLeaderReq req = {0};
req.vgId = pStmt->vgId;
@ -13943,6 +13954,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_BALANCE_VGROUP_STMT:
code = translateBalanceVgroup(pCxt, (SBalanceVgroupStmt*)pNode);
break;
case QUERY_NODE_ASSIGN_LEADER_STMT:
code = translateAssignLeader(pCxt, (SAssignLeaderStmt*)pNode);
break;
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode);
break;

View File

@ -334,10 +334,12 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
req.memberToken);
goto _OVER;
if (!req.force) {
if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
req.memberToken);
goto _OVER;
}
}
if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {

View File

@ -0,0 +1,123 @@
import taos
import sys
import os
import subprocess
import glob
import shutil
import time
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.srvCtl import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
from frame import epath
# from frame.server.dnodes import *
# from frame.server.cluster import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
updatecfgDict = {'dDebugFlag':131}
super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1")
self.valgrind = 0
self.db = "test"
self.stb = "meters"
self.childtable_count = 10
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;')
time.sleep(1)
tdSql.execute("use db;")
tdSql.execute("CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);")
tdSql.execute("CREATE TABLE d0 USING meters TAGS (\"California.SanFrancisco\", 2);");
count = 0
while count < 100:
tdSql.query("show arbgroups;")
if tdSql.getData(0, 4) == True:
break
tdLog.info("wait 1 seconds for is sync")
time.sleep(1)
count += 1
if count == 100:
tdLog.exit("arbgroup sync failed")
return
sc.dnodeStop(2)
sc.dnodeStop(3)
sc.dnodeStart(2)
count = 0
while count < 100:
tdSql.query("show db.vgroups;")
if(tdSql.getData(0, 4) == "candidate") or (tdSql.getData(0, 6) == "candidate"):
break
tdLog.info("wait 1 seconds for candidate")
time.sleep(1)
count += 1
if count == 100:
tdLog.exit("wait candidate failed")
return
tdSql.execute("ASSIGN LEADER FORCE;")
count = 0
while count < 100:
tdSql.query("show db.vgroups;")
if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "):
break
tdLog.info("wait 1 seconds for set assigned")
time.sleep(1)
count += 1
if count == 100:
tdLog.exit("assign leader failed")
return
tdSql.execute("INSERT INTO d0 VALUES (NOW, 10.3, 219, 0.31);")
sc.dnodeStart(3)
count = 0
while count < 100:
tdSql.query("show arbgroups;")
if tdSql.getData(0, 4) == 1:
break
tdLog.info("wait 1 seconds for is sync")
time.sleep(1)
count += 1
if count == 100:
tdLog.exit("arbgroup sync failed")
return
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -22,6 +22,7 @@
,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py -N 3 -M 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/arbitrator.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/arbitrator_restart.py -N 3
,,n,army,python3 ./test.py -f storage/s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py