refactor(sync): add double queues in vnode

This commit is contained in:
Minghao Li 2022-10-13 20:57:36 +08:00
parent 733440b352
commit 2ae4081a3f
22 changed files with 413 additions and 104 deletions

View File

@ -219,6 +219,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncQ, pMsg);
break;
case SYNC_CTRL_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncCtrlQ, pMsg);
break;
case APPLY_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pApplyQ, pMsg);

View File

@ -57,9 +57,32 @@ typedef struct SRaftCfg SRaftCfg;
typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
typedef struct SSyncTimer SSyncTimer;
typedef struct SSyncHbTimerData SSyncHbTimerData;
extern bool gRaftDetailLog;
typedef struct SSyncHbTimerData {
SSyncNode* pSyncNode;
SSyncTimer* pTimer;
SRaftId destId;
uint64_t logicClock;
} SSyncHbTimerData;
typedef struct SSyncTimer {
void* pTimer;
TAOS_TMR_CALLBACK timerCb;
uint64_t logicClock;
uint64_t counter;
int32_t timerMS;
SRaftId destId;
void *pData;
} SSyncTimer;
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
typedef struct SSyncNode {
// init by SSyncInfo
SyncGroupId vgId;
@ -139,6 +162,9 @@ typedef struct SSyncNode {
TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp
uint64_t heartbeatTimerCounter;
// peer heartbeat timer
SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA];
// callback
FpOnPingCb FpOnPing;
FpOnPingReplyCb FpOnPingReply;
@ -256,6 +282,7 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);

View File

@ -61,6 +61,8 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg);
#ifdef __cplusplus
}
#endif

View File

@ -46,6 +46,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
// process message ----
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
@ -918,6 +919,43 @@ _END:
return ret;
}
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
pSyncTimer->pTimer = NULL;
pSyncTimer->counter = 0;
pSyncTimer->timerMS = pSyncNode->hbBaseLine;
pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
pSyncTimer->destId = destId;
atomic_store_64(&pSyncTimer->logicClock, 0);
return 0;
}
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0;
if (syncEnvIsStart()) {
SSyncHbTimerData *pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
pData->pSyncNode = pSyncNode;
pData->pTimer = pSyncTimer;
pData->destId = pSyncTimer->destId;
pData->logicClock = pSyncTimer->logicClock;
pSyncTimer->pData = pData;
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer);
} else {
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
}
return ret;
}
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0;
atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
taosTmrStop(pSyncTimer->pTimer);
pSyncTimer->pTimer = NULL;
//taosMemoryFree(pSyncTimer->pData);
return ret;
}
// open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
@ -1130,6 +1168,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
pSyncNode->heartbeatTimerCounter = 0;
// init peer heartbeat timer
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
}
// init callback
pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
@ -2135,6 +2178,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncNodeStopHeartbeatTimer(pSyncNode);
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
syncHbTimerStop(pSyncNode, pSyncTimer);
}
// reset elect timer
syncNodeResetElectTimer(pSyncNode);
@ -2234,6 +2281,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode);
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
syncHbTimerStart(pSyncNode, pSyncTimer);
}
// call back
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
@ -2595,6 +2646,62 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
}
}
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
SSyncNode* pSyncNode = pData->pSyncNode;
SSyncTimer* pSyncTimer = pData->pTimer;
syncNodeEventLog(pSyncNode, "eq peer hb timer");
int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
if (pSyncNode->replicaNum > 1) {
if (timerLogicClock == msgLogicClock) {
SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId;
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->privateTerm = 0;
SRpcMsg rpcMsg;
syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);
// eq msg
#if 0
if (pSyncNode->FpEqCtrlMsg != NULL) {
int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
syncHeartbeatDestroy(pSyncMsg);
return;
}
} else {
sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
}
#endif
// send msg
syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
syncHeartbeatDestroy(pSyncMsg);
if (syncEnvIsStart()) {
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager,
&pSyncTimer->pTimer);
} else {
sError("sync env is stop, syncNodeEqHeartbeatTimer");
}
} else {
sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRIu64 ", msgLogicClock:%" PRIu64 "", timerLogicClock,
msgLogicClock);
}
}
}
static int32_t syncNodeEqNoop(SSyncNode* ths) {
int32_t ret = 0;
ASSERT(ths->state == TAOS_SYNC_STATE_LEADER);
@ -3198,6 +3305,16 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId)
return pSender;
}
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
SSyncTimer* pTimer = NULL;
for (int i = 0; i < ths->replicaNum; ++i) {
if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
pTimer = &((ths->peerHeartbeatTimerArr)[i]);
}
}
return pTimer;
}
bool syncNodeCanChange(SSyncNode* pSyncNode) {
if (pSyncNode->changing) {
sError("sync cannot change");

View File

@ -497,3 +497,13 @@ int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaft
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return 0;
}
int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) {
int32_t ret = 0;
syncLogSendHeartbeat(pSyncNode, pMsg, "");
SRpcMsg rpcMsg;
syncHeartbeat2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg);
return ret;
}

View File

@ -40,8 +40,7 @@ void test2() {
syncHeartbeatReplySerialize(pMsg, serialized, len);
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyBuild(1000);
syncHeartbeatReplyDeserialize(serialized, len, pMsg2);
syncHeartbeatReplyLog2((char *)"test2: syncHeartbeatReplySerialize -> syncHeartbeatReplyDeserialize ",
pMsg2);
syncHeartbeatReplyLog2((char *)"test2: syncHeartbeatReplySerialize -> syncHeartbeatReplyDeserialize ", pMsg2);
taosMemoryFree(serialized);
syncHeartbeatReplyDestroy(pMsg);
@ -53,8 +52,7 @@ void test3() {
uint32_t len;
char * serialized = syncHeartbeatReplySerialize2(pMsg, &len);
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyDeserialize2(serialized, len);
syncHeartbeatReplyLog2((char *)"test3: syncHeartbeatReplySerialize3 -> syncHeartbeatReplyDeserialize2 ",
pMsg2);
syncHeartbeatReplyLog2((char *)"test3: syncHeartbeatReplySerialize3 -> syncHeartbeatReplyDeserialize2 ", pMsg2);
taosMemoryFree(serialized);
syncHeartbeatReplyDestroy(pMsg);
@ -67,8 +65,7 @@ void test4() {
syncHeartbeatReply2RpcMsg(pMsg, &rpcMsg);
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyBuild(1000);
syncHeartbeatReplyFromRpcMsg(&rpcMsg, pMsg2);
syncHeartbeatReplyLog2((char *)"test4: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg ",
pMsg2);
syncHeartbeatReplyLog2((char *)"test4: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncHeartbeatReplyDestroy(pMsg);
@ -80,8 +77,7 @@ void test5() {
SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsg, &rpcMsg);
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyFromRpcMsg2(&rpcMsg);
syncHeartbeatReplyLog2((char *)"test5: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg2 ",
pMsg2);
syncHeartbeatReplyLog2((char *)"test5: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg2 ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncHeartbeatReplyDestroy(pMsg);

View File

@ -0,0 +1,153 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
if $rows != 4 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
if $data(3)[4] != ready then
goto step1
endi
if $data(4)[4] != ready then
goto step1
endi
$replica = 3
$vgroups = 1
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql select * from information_schema.ins_databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][15] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == leader then
if $data[0][6] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == leader then
if $data[0][4] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == leader then
if $data[0][4] == follower then
if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
sql create table ct1 using stb tags(1000)
print ====> step1 insert 1000 records
$N = 1000
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
print ====> step2 sleep 20s, checking data
sleep 20000
print ====> step3 sleep 30s, kill leader
sleep 30000
print ====> step4 insert 1000 records
$N = 1000
$count = 0
while $count < $N
$ms = 1591201000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
print ====> step5 sleep 20s, checking data
sleep 20000