show code
This commit is contained in:
parent
150dd5af3f
commit
0da606c7fb
|
@ -292,7 +292,7 @@ int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm);
|
||||||
|
|
||||||
SSyncState syncGetState(int64_t rid);
|
SSyncState syncGetState(int64_t rid);
|
||||||
int32_t syncGetArbToken(int64_t rid, char* outToken);
|
int32_t syncGetArbToken(int64_t rid, char* outToken);
|
||||||
int32_t syncGetAssignedLogSynced(int64_t rid);
|
int32_t syncCheckSynced(int64_t rid);
|
||||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
const char* syncStr(ESyncState state);
|
const char* syncStr(ESyncState state);
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,8 @@ static const SSysDbTableSchema arbGroupsSchema[] = {
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
{.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
{.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
{.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "is_sync", .bytes = 100, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
|
//{.name = "sync_code", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
{.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
|
|
|
@ -41,7 +41,7 @@ int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||||
SArbGroup *pNewGroup);
|
SArbGroup *pNewGroup);
|
||||||
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
||||||
bool newIsSync, SArbGroup *pNewGroup);
|
bool newIsSync, SArbGroup *pNewGroup, int32_t code);
|
||||||
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
||||||
SArbGroup *pNewGroup);
|
SArbGroup *pNewGroup);
|
||||||
|
|
||||||
|
|
|
@ -308,6 +308,7 @@ typedef struct {
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM];
|
SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM];
|
||||||
int8_t isSync;
|
int8_t isSync;
|
||||||
|
int32_t code;
|
||||||
SArbAssignedLeader assignedLeader;
|
SArbAssignedLeader assignedLeader;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
|
|
||||||
|
|
|
@ -1066,7 +1066,7 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
||||||
bool newIsSync, SArbGroup *pNewGroup) {
|
bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
|
||||||
bool updateIsSync = false;
|
bool updateIsSync = false;
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&pGroup->mutex);
|
(void)taosThreadMutexLock(&pGroup->mutex);
|
||||||
|
@ -1089,6 +1089,7 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0
|
||||||
if (pGroup->isSync != newIsSync) {
|
if (pGroup->isSync != newIsSync) {
|
||||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||||
pNewGroup->isSync = newIsSync;
|
pNewGroup->isSync = newIsSync;
|
||||||
|
pNewGroup->code = code;
|
||||||
|
|
||||||
mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync);
|
mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync);
|
||||||
updateIsSync = true;
|
updateIsSync = true;
|
||||||
|
@ -1099,7 +1100,8 @@ _OVER:
|
||||||
return updateIsSync;
|
return updateIsSync;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync) {
|
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync,
|
||||||
|
int32_t rsp_code) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
||||||
if (pGroup == NULL) {
|
if (pGroup == NULL) {
|
||||||
|
@ -1110,7 +1112,8 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
|
||||||
}
|
}
|
||||||
|
|
||||||
SArbGroup newGroup = {0};
|
SArbGroup newGroup = {0};
|
||||||
bool updateIsSync = mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup);
|
bool updateIsSync =
|
||||||
|
mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
|
||||||
if (updateIsSync) {
|
if (updateIsSync) {
|
||||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||||
mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
|
mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
|
||||||
|
@ -1186,6 +1189,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mInfo("vgId:%d, arb check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
|
||||||
if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
|
if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
|
||||||
mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
|
mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
|
||||||
syncRsp.arbToken);
|
syncRsp.arbToken);
|
||||||
|
@ -1194,7 +1198,8 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
|
bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
|
||||||
if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync)) != 0) {
|
if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
|
||||||
|
syncRsp.errCode)) != 0) {
|
||||||
mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
|
mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
@ -1333,8 +1338,16 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
&lino, _OVER);
|
&lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char strSync[100] = {0};
|
||||||
|
sprintf(strSync, "%d,%d", pGroup->isSync, pGroup->code);
|
||||||
|
char sync[100 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(sync, strSync, 100 + VARSTR_HEADER_SIZE);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
|
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)sync, false), pGroup, &lino, _OVER);
|
||||||
|
|
||||||
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
// RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->code, false), pGroup, &lino,
|
||||||
|
// _OVER);
|
||||||
|
|
||||||
if (pGroup->assignedLeader.dnodeId != 0) {
|
if (pGroup->assignedLeader.dnodeId != 0) {
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
|
|
@ -53,8 +53,8 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
|
||||||
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||||
SRpcMsg *pOriginRpc);
|
SRpcMsg *pOriginRpc);
|
||||||
|
|
||||||
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||||
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||||
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
|
||||||
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
@ -480,7 +480,7 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ret = vnodePreCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token);
|
int32_t ret = vnodeCheckToken(pVnode, syncReq.member0Token, syncReq.member1Token);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
vError("vgId:%d, failed to preprocess arb check sync request since %s", TD_VID(pVnode), tstrerror(ret));
|
vError("vgId:%d, failed to preprocess arb check sync request since %s", TD_VID(pVnode), tstrerror(ret));
|
||||||
}
|
}
|
||||||
|
@ -2501,7 +2501,7 @@ static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
|
static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token) {
|
||||||
SSyncState syncState = syncGetState(pVnode->sync);
|
SSyncState syncState = syncGetState(pVnode->sync);
|
||||||
if (syncState.state != TAOS_SYNC_STATE_LEADER) {
|
if (syncState.state != TAOS_SYNC_STATE_LEADER) {
|
||||||
return terrno = TSDB_CODE_SYN_NOT_LEADER;
|
return terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
|
@ -2521,13 +2521,13 @@ static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
|
static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
|
||||||
int32_t code = vnodePreCheckAssignedLogSyncd(pVnode, member0Token, member1Token);
|
int32_t code = vnodeCheckToken(pVnode, member0Token, member1Token);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return syncGetAssignedLogSynced(pVnode->sync);
|
return syncCheckSynced(pVnode->sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
|
@ -2551,7 +2551,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
|
||||||
syncRsp.member1Token = syncReq.member1Token;
|
syncRsp.member1Token = syncReq.member1Token;
|
||||||
syncRsp.vgId = TD_VID(pVnode);
|
syncRsp.vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
if (vnodeCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) {
|
if (vnodeCheckSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) {
|
||||||
vError("vgId:%d, failed to check assigned log syncd", TD_VID(pVnode));
|
vError("vgId:%d, failed to check assigned log syncd", TD_VID(pVnode));
|
||||||
}
|
}
|
||||||
syncRsp.errCode = terrno;
|
syncRsp.errCode = terrno;
|
||||||
|
|
|
@ -699,7 +699,7 @@ int32_t syncGetArbToken(int64_t rid, char* outToken) {
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncGetAssignedLogSynced(int64_t rid) {
|
int32_t syncCheckSynced(int64_t rid) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
|
|
@ -84,7 +84,7 @@ int32_t taosSetSystemLocale(const char *inLocale) {
|
||||||
if (NULL == locale) {
|
if (NULL == locale) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
uError("failed to set locale:%s", inLocale);
|
uError("failed to set locale:%s", inLocale);
|
||||||
return terrno;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(tsLocale, locale, TD_LOCALE_LEN);
|
tstrncpy(tsLocale, locale, TD_LOCALE_LEN);
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import glob
|
||||||
|
import shutil
|
||||||
|
import time
|
||||||
|
|
||||||
|
from frame.log import *
|
||||||
|
from frame.cases import *
|
||||||
|
from frame.sql import *
|
||||||
|
from frame.srvCtl import *
|
||||||
|
from frame.caseBase import *
|
||||||
|
from frame import *
|
||||||
|
from frame.autogen import *
|
||||||
|
from frame import epath
|
||||||
|
# from frame.server.dnodes import *
|
||||||
|
# from frame.server.cluster import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase(TBase):
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
updatecfgDict = {'dDebugFlag':131}
|
||||||
|
super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1")
|
||||||
|
|
||||||
|
self.valgrind = 0
|
||||||
|
self.db = "test"
|
||||||
|
self.stb = "meters"
|
||||||
|
self.childtable_count = 10
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;')
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
while count < 100:
|
||||||
|
tdSql.query("show arbgroups;")
|
||||||
|
|
||||||
|
if tdSql.getData(0, 4) == 1:
|
||||||
|
break
|
||||||
|
|
||||||
|
tdLog.info("wait 1 seconds for is sync")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
|
||||||
|
tdSql.query("show db.vgroups;")
|
||||||
|
|
||||||
|
if(tdSql.getData(0, 4) == "follower") and (tdSql.getData(0, 6) == "leader"):
|
||||||
|
tdLog.info("stop dnode2")
|
||||||
|
sc.dnodeStop(2)
|
||||||
|
|
||||||
|
if(tdSql.getData(0, 6) == "follower") and (tdSql.getData(0, 4) == "leader"):
|
||||||
|
tdLog.info("stop dnode 3")
|
||||||
|
sc.dnodeStop(3)
|
||||||
|
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
while count < 100:
|
||||||
|
tdSql.query("show db.vgroups;")
|
||||||
|
|
||||||
|
if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "):
|
||||||
|
break
|
||||||
|
|
||||||
|
tdLog.info("wait 1 seconds for set assigned")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue