diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 382c0e6218..3011350aa4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -27,20 +27,20 @@ extern "C" { extern bool gRaftDetailLog; -#define SYNC_RESP_TTL_MS 10000000 -#define SYNC_SPEED_UP_HB_TIMER 400 -#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) -#define SYNC_SLOW_DOWN_RANGE 100 -#define SYNC_MAX_READ_RANGE 2 -#define SYNC_MAX_PROGRESS_WAIT_MS 4000 +#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_SPEED_UP_HB_TIMER 400 +#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) +#define SYNC_SLOW_DOWN_RANGE 100 +#define SYNC_MAX_READ_RANGE 2 +#define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) -#define SYNC_MAX_RECV_TIME_RANGE_MS 1200 -#define SYNC_ADD_QUORUM_COUNT 3 +#define SYNC_MAX_RECV_TIME_RANGE_MS 1200 +#define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_MAX_BATCH_SIZE 1 -#define SYNC_INDEX_BEGIN 0 -#define SYNC_INDEX_INVALID -1 -#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF +#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_INVALID -1 +#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF typedef enum { SYNC_STRATEGY_NO_SNAPSHOT = 0, @@ -132,7 +132,7 @@ typedef struct SSyncFSM { void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); - void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta *cbMeta); + void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index ae763efdb4..c241a69190 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -219,6 +219,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg); break; + case SYNC_CTRL_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg); + taosWriteQitem(pVnode->pSyncCtrlQ, pMsg); + break; case APPLY_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pApplyQ, pMsg); diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index e6d2bd4920..2a37e000e2 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -28,13 +28,13 @@ extern "C" { #include "trpc.h" #include "ttimer.h" -#define TIMER_MAX_MS 0x7FFFFFFF -#define ENV_TICK_TIMER_MS 1000 -#define PING_TIMER_MS 5000 -#define ELECT_TIMER_MS_MIN 5000 -#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) +#define TIMER_MAX_MS 0x7FFFFFFF +#define ENV_TICK_TIMER_MS 1000 +#define PING_TIMER_MS 5000 +#define ELECT_TIMER_MS_MIN 5000 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 900 +#define HEARTBEAT_TIMER_MS 900 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index fb85b89419..e8f17537b4 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -45,8 +45,8 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); +cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); +char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e699079a3f..e6f802f986 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -57,9 +57,32 @@ typedef struct SRaftCfg SRaftCfg; typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; +typedef struct SSyncTimer SSyncTimer; +typedef struct SSyncHbTimerData SSyncHbTimerData; extern bool gRaftDetailLog; +typedef struct SSyncHbTimerData { + SSyncNode* pSyncNode; + SSyncTimer* pTimer; + SRaftId destId; + uint64_t logicClock; +} SSyncHbTimerData; + +typedef struct SSyncTimer { + void* pTimer; + TAOS_TMR_CALLBACK timerCb; + uint64_t logicClock; + uint64_t counter; + int32_t timerMS; + SRaftId destId; + void *pData; +} SSyncTimer; + +int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId); +int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); +int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); + typedef struct SSyncNode { // init by SSyncInfo SyncGroupId vgId; @@ -139,6 +162,9 @@ typedef struct SSyncNode { TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp uint64_t heartbeatTimerCounter; + // peer heartbeat timer + SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA]; + // callback FpOnPingCb FpOnPing; FpOnPingReplyCb FpOnPingReply; @@ -256,6 +282,7 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); +SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index ba0f973815..e193e16c02 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -45,8 +45,8 @@ int32_t raftCfgIndexClose(SRaftCfgIndex *pRaftCfgIndex); int32_t raftCfgIndexPersist(SRaftCfgIndex *pRaftCfgIndex); int32_t raftCfgIndexAddConfigIndex(SRaftCfgIndex *pRaftCfgIndex, SyncIndex configIndex); -cJSON *raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex); -char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex); +cJSON * raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex); +char * raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex); int32_t raftCfgIndexFromJson(const cJSON *pRoot, SRaftCfgIndex *pRaftCfgIndex); int32_t raftCfgIndexFromStr(const char *s, SRaftCfgIndex *pRaftCfgIndex); @@ -73,14 +73,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); -cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); -char *syncCfg2Str(SSyncCfg *pSyncCfg); -char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); +cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); +char * syncCfg2Str(SSyncCfg *pSyncCfg); +char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); -char *raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); +char * raftCfg2Str(SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index edce124ee5..e1bb889393 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -61,6 +61,8 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); +int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRespMgr.h b/source/libs/sync/inc/syncRespMgr.h index 28978af77e..22e1005e5c 100644 --- a/source/libs/sync/inc/syncRespMgr.h +++ b/source/libs/sync/inc/syncRespMgr.h @@ -32,9 +32,9 @@ typedef struct SRespStub { } SRespStub; typedef struct SSyncRespMgr { - SHashObj *pRespHash; + SHashObj * pRespHash; int64_t ttl; - void *data; + void * data; TdThreadMutex mutex; uint64_t seqNum; } SSyncRespMgr; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 6fb558e45c..1ec9e9f2f1 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -28,10 +28,10 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" -#define SYNC_SNAPSHOT_SEQ_INVALID -1 +#define SYNC_SNAPSHOT_SEQ_INVALID -1 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2 -#define SYNC_SNAPSHOT_SEQ_BEGIN 0 -#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF +#define SYNC_SNAPSHOT_SEQ_BEGIN 0 +#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_RETRY_MS 5000 @@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; @@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); -char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); +char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; } SSyncSnapshotReceiver; @@ -89,8 +89,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 28b5313ac5..aead51b0b5 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -136,7 +136,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 13cf5b4995..00b1ceb376 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -46,15 +46,16 @@ static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodeEqNoop(SSyncNode* ths); static int32_t syncNodeAppendNoop(SSyncNode* ths); +static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); // process message ---- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); // --------------------------------- -static void syncNodeFreeCb(void *param) { - syncNodeClose(param); - param = NULL; +static void syncNodeFreeCb(void* param) { + syncNodeClose(param); + param = NULL; } int32_t syncInit() { @@ -918,6 +919,43 @@ _END: return ret; } +int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) { + pSyncTimer->pTimer = NULL; + pSyncTimer->counter = 0; + pSyncTimer->timerMS = pSyncNode->hbBaseLine; + pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer; + pSyncTimer->destId = destId; + atomic_store_64(&pSyncTimer->logicClock, 0); + return 0; +} + +int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { + int32_t ret = 0; + if (syncEnvIsStart()) { + + SSyncHbTimerData *pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); + pData->pSyncNode = pSyncNode; + pData->pTimer = pSyncTimer; + pData->destId = pSyncTimer->destId; + pData->logicClock = pSyncTimer->logicClock; + + pSyncTimer->pData = pData; + taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer); + } else { + sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); + } + return ret; +} + +int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { + int32_t ret = 0; + atomic_add_fetch_64(&pSyncTimer->logicClock, 1); + taosTmrStop(pSyncTimer->pTimer); + pSyncTimer->pTimer = NULL; + //taosMemoryFree(pSyncTimer->pData); + return ret; +} + // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; @@ -947,15 +985,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { meta.batchSize = pSyncInfo->batchSize; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); if (ret != 0) { - sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath); - goto _error; + sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath); + goto _error; } } else { // update syncCfg by raft_config.json pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); if (pSyncNode->pRaftCfg == NULL) { - sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); - goto _error; + sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + goto _error; } pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; @@ -986,8 +1024,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) { - sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId); - goto _error; + sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId); + goto _error; } // init peersNum, peers, peersId @@ -1001,17 +1039,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } for (int i = 0; i < pSyncNode->peersNum; ++i) { if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { - sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i); - goto _error; + sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i); + goto _error; } } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - if(!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { - sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i); - goto _error; + if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { + sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i); + goto _error; } } @@ -1091,8 +1129,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSnapshot snapshot = {0}; int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (code != 0) { - sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code); - goto _error; + sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code); + goto _error; } if (snapshot.lastApplyIndex > commitIndex) { commitIndex = snapshot.lastApplyIndex; @@ -1130,6 +1168,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer; pSyncNode->heartbeatTimerCounter = 0; + // init peer heartbeat timer + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { + syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]); + } + // init callback pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; @@ -1192,8 +1235,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { _error: if (pSyncInfo->pFsm) { - taosMemoryFree(pSyncInfo->pFsm); - pSyncInfo->pFsm = NULL; + taosMemoryFree(pSyncInfo->pFsm); + pSyncInfo->pFsm = NULL; } syncNodeClose(pSyncNode); pSyncNode = NULL; @@ -2135,6 +2178,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncNodeStopHeartbeatTimer(pSyncNode); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + syncHbTimerStop(pSyncNode, pSyncTimer); + } // reset elect timer syncNodeResetElectTimer(pSyncNode); @@ -2234,6 +2281,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // start heartbeat timer syncNodeStartHeartbeatTimer(pSyncNode); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + syncHbTimerStart(pSyncNode, pSyncTimer); + } // call back if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) { @@ -2595,6 +2646,62 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } } +static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { + SSyncHbTimerData* pData = (SSyncHbTimerData*)param; + SSyncNode* pSyncNode = pData->pSyncNode; + SSyncTimer* pSyncTimer = pData->pTimer; + + syncNodeEventLog(pSyncNode, "eq peer hb timer"); + + int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock); + int64_t msgLogicClock = atomic_load_64(&pData->logicClock); + + if (pSyncNode->replicaNum > 1) { + if (timerLogicClock == msgLogicClock) { + SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId); + pSyncMsg->srcId = pSyncNode->myRaftId; + pSyncMsg->destId = pData->destId; + pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; + pSyncMsg->commitIndex = pSyncNode->commitIndex; + pSyncMsg->privateTerm = 0; + + SRpcMsg rpcMsg; + syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg); + +// eq msg +#if 0 + if (pSyncNode->FpEqCtrlMsg != NULL) { + int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg); + if (code != 0) { + sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code); + rpcFreeCont(rpcMsg.pCont); + syncHeartbeatDestroy(pSyncMsg); + return; + } + } else { + sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); + } +#endif + + // send msg + syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); + + syncHeartbeatDestroy(pSyncMsg); + + if (syncEnvIsStart()) { + taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, + &pSyncTimer->pTimer); + } else { + sError("sync env is stop, syncNodeEqHeartbeatTimer"); + } + + } else { + sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRIu64 ", msgLogicClock:%" PRIu64 "", timerLogicClock, + msgLogicClock); + } + } +} + static int32_t syncNodeEqNoop(SSyncNode* ths) { int32_t ret = 0; ASSERT(ths->state == TAOS_SYNC_STATE_LEADER); @@ -3198,6 +3305,16 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) return pSender; } +SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) { + SSyncTimer* pTimer = NULL; + for (int i = 0; i < ths->replicaNum; ++i) { + if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) { + pTimer = &((ths->peerHeartbeatTimerArr)[i]); + } + } + return pTimer; +} + bool syncNodeCanChange(SSyncNode* pSyncNode) { if (pSyncNode->changing) { sError("sync cannot change"); diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 57126d0871..4491122a20 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -29,7 +29,7 @@ SRaftCfgIndex *raftCfgIndexOpen(const char *path) { taosLSeekFile(pRaftCfgIndex->pFile, 0, SEEK_SET); int32_t bufLen = MAX_CONFIG_INDEX_COUNT * 16; - char *pBuf = taosMemoryMalloc(bufLen); + char * pBuf = taosMemoryMalloc(bufLen); memset(pBuf, 0, bufLen); int64_t len = taosReadFile(pRaftCfgIndex->pFile, pBuf, bufLen); ASSERT(len > 0); @@ -93,7 +93,7 @@ cJSON *raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex) { char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex) { cJSON *pJson = raftCfgIndex2Json(pRaftCfgIndex); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -151,7 +151,7 @@ int32_t raftCfgIndexCreateFile(const char *path) { raftCfgIndex.configIndexCount = 1; raftCfgIndex.configIndexArr[0] = -1; - char *s = raftCfgIndex2Str(&raftCfgIndex); + char * s = raftCfgIndex2Str(&raftCfgIndex); int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1); ASSERT(ret == strlen(s) + 1); @@ -244,7 +244,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -252,7 +252,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{r-num:%d, my:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -349,7 +349,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -426,7 +426,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 886f7ad199..8a3940a363 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -496,4 +496,14 @@ int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaft syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); return 0; +} + +int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) { + int32_t ret = 0; + syncLogSendHeartbeat(pSyncNode, pMsg, ""); + + SRpcMsg rpcMsg; + syncHeartbeat2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg); + return ret; } \ No newline at end of file diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 103c225476..e6d367b912 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -136,7 +136,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { while (pStub) { size_t len; - void *key = taosHashGetKey(pStub, &len); + void * key = taosHashGetKey(pStub, &len); uint64_t *pSeqNum = (uint64_t *)key; sum++; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 68d81813ac..39d10462cc 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -36,8 +36,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI if (condition) { pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); if (pSender == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } memset(pSender, 0, sizeof(*pSender)); @@ -376,14 +376,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -655,7 +655,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -688,14 +688,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/source/libs/sync/test/syncClientRequestBatchTest.cpp b/source/libs/sync/test/syncClientRequestBatchTest.cpp index 84d037be01..5586b7a6ce 100644 --- a/source/libs/sync/test/syncClientRequestBatchTest.cpp +++ b/source/libs/sync/test/syncClientRequestBatchTest.cpp @@ -33,7 +33,7 @@ SyncClientRequestBatch *createMsg() { for (int32_t i = 0; i < 5; ++i) { SRpcMsg *pRpcMsg = createRpcMsg(i, 20); rpcMsgPArr[i] = pRpcMsg; - //taosMemoryFree(pRpcMsg); + // taosMemoryFree(pRpcMsg); } SRaftMeta raftArr[5]; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index fbfc4cda8e..c523fbc1c3 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -149,7 +149,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } -void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta *cbMeta) { +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index cfab3b6ae3..c04ab9b000 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -80,7 +80,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } -void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta *cbMeta) { +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term); } diff --git a/source/libs/sync/test/syncHeartbeatReplyTest.cpp b/source/libs/sync/test/syncHeartbeatReplyTest.cpp index 0ccd7b70bb..1fac03652b 100644 --- a/source/libs/sync/test/syncHeartbeatReplyTest.cpp +++ b/source/libs/sync/test/syncHeartbeatReplyTest.cpp @@ -35,13 +35,12 @@ void test1() { void test2() { SyncHeartbeatReply *pMsg = createMsg(); - uint32_t len = pMsg->bytes; - char * serialized = (char *)taosMemoryMalloc(len); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); syncHeartbeatReplySerialize(pMsg, serialized, len); SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyBuild(1000); syncHeartbeatReplyDeserialize(serialized, len, pMsg2); - syncHeartbeatReplyLog2((char *)"test2: syncHeartbeatReplySerialize -> syncHeartbeatReplyDeserialize ", - pMsg2); + syncHeartbeatReplyLog2((char *)"test2: syncHeartbeatReplySerialize -> syncHeartbeatReplyDeserialize ", pMsg2); taosMemoryFree(serialized); syncHeartbeatReplyDestroy(pMsg); @@ -50,11 +49,10 @@ void test2() { void test3() { SyncHeartbeatReply *pMsg = createMsg(); - uint32_t len; - char * serialized = syncHeartbeatReplySerialize2(pMsg, &len); + uint32_t len; + char * serialized = syncHeartbeatReplySerialize2(pMsg, &len); SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyDeserialize2(serialized, len); - syncHeartbeatReplyLog2((char *)"test3: syncHeartbeatReplySerialize3 -> syncHeartbeatReplyDeserialize2 ", - pMsg2); + syncHeartbeatReplyLog2((char *)"test3: syncHeartbeatReplySerialize3 -> syncHeartbeatReplyDeserialize2 ", pMsg2); taosMemoryFree(serialized); syncHeartbeatReplyDestroy(pMsg); @@ -63,12 +61,11 @@ void test3() { void test4() { SyncHeartbeatReply *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncHeartbeatReply2RpcMsg(pMsg, &rpcMsg); SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyBuild(1000); syncHeartbeatReplyFromRpcMsg(&rpcMsg, pMsg2); - syncHeartbeatReplyLog2((char *)"test4: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg ", - pMsg2); + syncHeartbeatReplyLog2((char *)"test4: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg ", pMsg2); rpcFreeCont(rpcMsg.pCont); syncHeartbeatReplyDestroy(pMsg); @@ -77,11 +74,10 @@ void test4() { void test5() { SyncHeartbeatReply *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncHeartbeatReply2RpcMsg(pMsg, &rpcMsg); SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyFromRpcMsg2(&rpcMsg); - syncHeartbeatReplyLog2((char *)"test5: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg2 ", - pMsg2); + syncHeartbeatReplyLog2((char *)"test5: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg2 ", pMsg2); rpcFreeCont(rpcMsg.pCont); syncHeartbeatReplyDestroy(pMsg); diff --git a/source/libs/sync/test/syncHeartbeatTest.cpp b/source/libs/sync/test/syncHeartbeatTest.cpp index d910c828f1..b0c0554355 100644 --- a/source/libs/sync/test/syncHeartbeatTest.cpp +++ b/source/libs/sync/test/syncHeartbeatTest.cpp @@ -19,7 +19,7 @@ SyncHeartbeat *createMsg() { pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); - pMsg->destId.vgId = 100; + pMsg->destId.vgId = 100; pMsg->term = 8; pMsg->commitIndex = 33; pMsg->privateTerm = 44; @@ -34,8 +34,8 @@ void test1() { void test2() { SyncHeartbeat *pMsg = createMsg(); - uint32_t len = pMsg->bytes; - char * serialized = (char *)taosMemoryMalloc(len); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); syncHeartbeatSerialize(pMsg, serialized, len); SyncHeartbeat *pMsg2 = syncHeartbeatBuild(789); syncHeartbeatDeserialize(serialized, len, pMsg2); @@ -48,8 +48,8 @@ void test2() { void test3() { SyncHeartbeat *pMsg = createMsg(); - uint32_t len; - char * serialized = syncHeartbeatSerialize2(pMsg, &len); + uint32_t len; + char * serialized = syncHeartbeatSerialize2(pMsg, &len); SyncHeartbeat *pMsg2 = syncHeartbeatDeserialize2(serialized, len); syncHeartbeatLog2((char *)"test3: syncHeartbeatSerialize2 -> syncHeartbeatDeserialize2 ", pMsg2); @@ -60,7 +60,7 @@ void test3() { void test4() { SyncHeartbeat *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncHeartbeat2RpcMsg(pMsg, &rpcMsg); SyncHeartbeat *pMsg2 = (SyncHeartbeat *)taosMemoryMalloc(rpcMsg.contLen); syncHeartbeatFromRpcMsg(&rpcMsg, pMsg2); @@ -73,9 +73,9 @@ void test4() { void test5() { SyncHeartbeat *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncHeartbeat2RpcMsg(pMsg, &rpcMsg); - SyncHeartbeat *pMsg2 =syncHeartbeatFromRpcMsg2(&rpcMsg); + SyncHeartbeat *pMsg2 = syncHeartbeatFromRpcMsg2(&rpcMsg); syncHeartbeatLog2((char *)"test5: syncHeartbeat2RpcMsg -> syncHeartbeatFromRpcMsg2 ", pMsg2); rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/test/syncRaftCfgIndexTest.cpp b/source/libs/sync/test/syncRaftCfgIndexTest.cpp index 6338383f92..bd8ffc45b6 100644 --- a/source/libs/sync/test/syncRaftCfgIndexTest.cpp +++ b/source/libs/sync/test/syncRaftCfgIndexTest.cpp @@ -53,20 +53,20 @@ SSyncCfg* createSyncCfg() { return pCfg; } -const char *pFile = "./raft_config_index.json"; +const char* pFile = "./raft_config_index.json"; void test1() { int32_t code = raftCfgIndexCreateFile(pFile); ASSERT(code == 0); - SRaftCfgIndex *pRaftCfgIndex = raftCfgIndexOpen(pFile); + SRaftCfgIndex* pRaftCfgIndex = raftCfgIndexOpen(pFile); raftCfgIndexLog2((char*)"==test1==", pRaftCfgIndex); raftCfgIndexClose(pRaftCfgIndex); } void test2() { - SRaftCfgIndex *pRaftCfgIndex = raftCfgIndexOpen(pFile); + SRaftCfgIndex* pRaftCfgIndex = raftCfgIndexOpen(pFile); for (int i = 0; i < 500; ++i) { raftCfgIndexAddConfigIndex(pRaftCfgIndex, i); } @@ -77,7 +77,7 @@ void test2() { } void test3() { - SRaftCfgIndex *pRaftCfgIndex = raftCfgIndexOpen(pFile); + SRaftCfgIndex* pRaftCfgIndex = raftCfgIndexOpen(pFile); raftCfgIndexLog2((char*)"==test3==", pRaftCfgIndex); raftCfgIndexClose(pRaftCfgIndex); diff --git a/tests/script/tsim/sync/sync2-test.sim b/tests/script/tsim/sync/sync2-test.sim new file mode 100644 index 0000000000..0419381d3c --- /dev/null +++ b/tests/script/tsim/sync/sync2-test.sim @@ -0,0 +1,153 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql select * from information_schema.ins_dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +print ===> $data30 $data31 $data32 $data33 $data34 $data35 +if $rows != 4 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi +if $data(3)[4] != ready then + goto step1 +endi +if $data(4)[4] != ready then + goto step1 +endi + +$replica = 3 +$vgroups = 1 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 100 then + print ====> db not ready! + return -1 +endi +sql select * from information_schema.ins_databases +print ===> rows: $rows +print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19] +if $rows != 3 then + return -1 +endi +if $data[2][15] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 300 then + print ====> vgroups not ready! + return -1 +endi + +sql show vgroups +print ===> rows: $rows +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11] + +if $rows != $vgroups then + return -1 +endi + +if $data[0][4] == leader then + if $data[0][6] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + endi + endi +elif $data[0][6] == leader then + if $data[0][4] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + endi + endi +elif $data[0][8] == leader then + if $data[0][4] == follower then + if $data[0][6] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + endi + endi +else + goto check_vg_ready +endi + + +vg_ready: +print ====> create stable/child table +sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +sql create table ct1 using stb tags(1000) + + +print ====> step1 insert 1000 records +$N = 1000 +$count = 0 +while $count < $N + $ms = 1591200000000 + $count + sql insert into ct1 values( $ms , $count , 2.1, 3.1) + $count = $count + 1 +endw + +print ====> step2 sleep 20s, checking data +sleep 20000 + + +print ====> step3 sleep 30s, kill leader +sleep 30000 + +print ====> step4 insert 1000 records +$N = 1000 +$count = 0 +while $count < $N + $ms = 1591201000000 + $count + sql insert into ct1 values( $ms , $count , 2.1, 3.1) + $count = $count + 1 +endw + +print ====> step5 sleep 20s, checking data +sleep 20000 +