Merge pull request #17945 from taosdata/enh/TD-20043
reafct: adjust sync propose
This commit is contained in:
commit
4a0468aa45
|
@ -111,10 +111,6 @@ typedef struct SElectTimer {
|
|||
void* pData;
|
||||
} SElectTimer;
|
||||
|
||||
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
|
||||
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||
|
||||
typedef struct SPeerState {
|
||||
SyncIndex lastSendIndex;
|
||||
int64_t lastSendTime;
|
||||
|
|
|
@ -182,9 +182,8 @@ typedef struct SyncClientRequest {
|
|||
char data[]; // origin RpcMsg.pCont
|
||||
} SyncClientRequest;
|
||||
|
||||
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak,
|
||||
int32_t vgId); // step 1
|
||||
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen);
|
||||
SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pMsg, uint64_t seqNum, bool isWeak, int32_t vgId); // step 1
|
||||
void syncClientRequestDestroy(SyncClientRequest* pMsg);
|
||||
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
|
||||
|
|
|
@ -42,6 +42,9 @@ static int32_t syncNodeEqNoop(SSyncNode* ths);
|
|||
static int32_t syncNodeAppendNoop(SSyncNode* ths);
|
||||
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
|
||||
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
|
||||
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
|
||||
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo) {
|
||||
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
|
||||
|
@ -219,7 +222,6 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
|
|||
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("sync begin snapshot error");
|
||||
return -1;
|
||||
}
|
||||
|
@ -525,7 +527,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
|
|||
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
|
||||
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
|
||||
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
|
||||
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
|
||||
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
|
||||
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
|
||||
|
@ -544,7 +546,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
|
|||
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
|
||||
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
|
||||
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
|
||||
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
|
||||
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
|
||||
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
|
||||
|
@ -613,7 +615,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
|||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
pEpSet->numOfEps = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||
(pEpSet->numOfEps)++;
|
||||
|
@ -634,7 +636,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
|||
}
|
||||
|
||||
pEpSet->numOfEps = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||
(pEpSet->numOfEps)++;
|
||||
|
@ -672,73 +674,61 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|||
}
|
||||
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||
int32_t ret = 0;
|
||||
sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType));
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
// not restored, vnode enable
|
||||
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
|
||||
pSyncNode->vgId, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
|
||||
goto _END;
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
|
||||
return -1;
|
||||
}
|
||||
|
||||
SRespStub stub;
|
||||
stub.createTime = taosGetTimestampMs();
|
||||
stub.rpcMsg = *pMsg;
|
||||
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
||||
// not restored, vnode enable
|
||||
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
|
||||
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
|
||||
SRpcMsg rpcMsg;
|
||||
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
int32_t ret = 0;
|
||||
SyncClientRequest* pSyncMsg;
|
||||
|
||||
// optimized one replica
|
||||
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
|
||||
pSyncMsg = syncClientRequestBuild(pMsg, 0, isWeak, pSyncNode->vgId);
|
||||
|
||||
SyncIndex retIndex;
|
||||
int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex);
|
||||
if (code == 0) {
|
||||
pMsg->info.conn.applyIndex = retIndex;
|
||||
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||
ret = 1;
|
||||
sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
sTrace("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType));
|
||||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
}
|
||||
} else {
|
||||
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
|
||||
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
||||
|
||||
} else {
|
||||
if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
|
||||
ret = 0;
|
||||
} else {
|
||||
ret = -1;
|
||||
pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
|
||||
sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType));
|
||||
ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
|
||||
if (ret != 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId);
|
||||
sError("vgId:%d, failed to enqueue msg since %s", pSyncNode->vgId, terrstr());
|
||||
}
|
||||
}
|
||||
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
goto _END;
|
||||
|
||||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state),
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
goto _END;
|
||||
}
|
||||
|
||||
_END:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
|
||||
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
|
||||
pSyncTimer->pTimer = NULL;
|
||||
pSyncTimer->counter = 0;
|
||||
pSyncTimer->timerMS = pSyncNode->hbBaseLine;
|
||||
|
@ -748,7 +738,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||
int32_t ret = 0;
|
||||
if (syncIsInit()) {
|
||||
SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
|
||||
|
@ -765,7 +755,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||
int32_t ret = 0;
|
||||
atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
|
||||
taosTmrStop(pSyncTimer->pTimer);
|
||||
|
@ -865,14 +855,14 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
|
||||
// init peersNum, peers, peersId
|
||||
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
|
||||
int j = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
|
||||
pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
|
||||
j++;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
|
||||
sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
|
||||
goto _error;
|
||||
|
@ -881,7 +871,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
|
||||
// init replicaNum, replicasId
|
||||
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
|
||||
sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
|
||||
goto _error;
|
||||
|
@ -1030,7 +1020,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
pSyncNode->restoreFinish = false;
|
||||
|
||||
// snapshot senders
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
|
||||
// ASSERT(pSender != NULL);
|
||||
(pSyncNode->senders)[i] = pSender;
|
||||
|
@ -1161,7 +1151,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
taosMemoryFree(pSyncNode->pFsm);
|
||||
}
|
||||
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if ((pSyncNode->senders)[i] != NULL) {
|
||||
snapshotSenderDestroy((pSyncNode->senders)[i]);
|
||||
(pSyncNode->senders)[i] = NULL;
|
||||
|
@ -1206,7 +1196,7 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
|
|||
|
||||
int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SRaftId* destId = &(pSyncNode->peersId[i]);
|
||||
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
|
||||
ret = syncNodePing(pSyncNode, destId, pMsg);
|
||||
|
@ -1218,7 +1208,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
|
|||
|
||||
int32_t syncNodePingAll(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
SRaftId* destId = &(pSyncNode->replicasId[i]);
|
||||
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
|
||||
ret = syncNodePing(pSyncNode, destId, pMsg);
|
||||
|
@ -1322,7 +1312,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
|
||||
#endif
|
||||
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
|
||||
if (pSyncTimer != NULL) {
|
||||
syncHbTimerStart(pSyncNode, pSyncTimer);
|
||||
|
@ -1341,7 +1331,7 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
pSyncNode->pHeartbeatTimer = NULL;
|
||||
#endif
|
||||
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
|
||||
if (pSyncTimer != NULL) {
|
||||
syncHbTimerStop(pSyncNode, pSyncTimer);
|
||||
|
@ -1424,19 +1414,19 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
|||
cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
|
||||
cJSON* pPeers = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
|
||||
}
|
||||
cJSON* pPeersId = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
|
||||
}
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
|
||||
cJSON* pReplicasId = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
|
||||
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
|
||||
}
|
||||
|
||||
|
@ -1533,7 +1523,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
|||
// snapshot senders
|
||||
cJSON* pSenders = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "senders", pSenders);
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i]));
|
||||
}
|
||||
|
||||
|
@ -1558,7 +1548,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
||||
int len = 256;
|
||||
int32_t len = 256;
|
||||
char* s = (char*)taosMemoryMalloc(len);
|
||||
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||
|
@ -1584,7 +1574,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
|
|||
bool b1 = false;
|
||||
bool b2 = false;
|
||||
|
||||
for (int i = 0; i < config->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < config->replicaNum; ++i) {
|
||||
if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
|
||||
(config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
|
||||
b1 = true;
|
||||
|
@ -1592,7 +1582,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
|
|||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < config->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < config->replicaNum; ++i) {
|
||||
SRaftId raftId;
|
||||
raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
|
||||
raftId.vgId = pSyncNode->vgId;
|
||||
|
@ -1674,7 +1664,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
SRaftId oldReplicasId[TSDB_MAX_REPLICA];
|
||||
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
|
||||
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
oldSenders[i] = (pSyncNode->senders)[i];
|
||||
sSTrace(oldSenders[i], "snapshot sender save old");
|
||||
}
|
||||
|
@ -1685,20 +1675,20 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
|
||||
// init peersNum, peers, peersId
|
||||
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
|
||||
int j = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
|
||||
pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
|
||||
j++;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
|
||||
}
|
||||
|
||||
// init replicaNum, replicasId
|
||||
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
|
||||
}
|
||||
|
||||
|
@ -1713,15 +1703,15 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
// reset snapshot senders
|
||||
|
||||
// clear new
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
(pSyncNode->senders)[i] = NULL;
|
||||
}
|
||||
|
||||
// reset new
|
||||
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
// reset sender
|
||||
bool reset = false;
|
||||
for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
|
||||
for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
|
||||
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
|
@ -1744,7 +1734,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
}
|
||||
|
||||
// create new
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if ((pSyncNode->senders)[i] == NULL) {
|
||||
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
|
||||
sSTrace((pSyncNode->senders)[i], "snapshot sender create new");
|
||||
|
@ -1752,7 +1742,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
}
|
||||
|
||||
// free old
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if (oldSenders[i] != NULL) {
|
||||
snapshotSenderDestroy(oldSenders[i]);
|
||||
sNTrace(pSyncNode, "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
|
||||
|
@ -1893,7 +1883,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// set leader cache
|
||||
pSyncNode->leaderCache = pSyncNode->myRaftId;
|
||||
|
||||
for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
||||
// maybe overwrite myself, no harm
|
||||
// just do it!
|
||||
|
||||
|
@ -1907,7 +1897,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->pNextIndex->index[i] = lastIndex + 1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||
// maybe overwrite myself, no harm
|
||||
// just do it!
|
||||
pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
|
||||
|
@ -1920,7 +1910,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// update sender private term
|
||||
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
|
||||
if (pMySender != NULL) {
|
||||
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||
if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
|
||||
pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
|
||||
}
|
||||
|
@ -1974,7 +1964,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
|||
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
|
||||
|
||||
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
|
||||
pSyncNode->peerStates[i].lastSendTime = 0;
|
||||
}
|
||||
|
@ -2338,7 +2328,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
|
|||
|
||||
uint32_t entryLen;
|
||||
char* serialized = syncEntrySerialize(pEntry, &entryLen);
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestAlloc(entryLen);
|
||||
ASSERT(pSyncMsg->dataLen == entryLen);
|
||||
memcpy(pSyncMsg->data, serialized, entryLen);
|
||||
|
||||
|
@ -2360,8 +2350,8 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
|
|||
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
|
||||
|
||||
static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
|
||||
int code = 0;
|
||||
int entryLen = sizeof(*pEntry) + pEntry->dataLen;
|
||||
int32_t code = 0;
|
||||
int32_t entryLen = sizeof(*pEntry) + pEntry->dataLen;
|
||||
LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
|
||||
deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
|
@ -2543,7 +2533,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
|
|||
|
||||
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
||||
SyncTerm term = ths->pRaftStore->currentTerm;
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild2(pMsg, term, index);
|
||||
ASSERT(pEntry != NULL);
|
||||
|
||||
LRUHandle* h = NULL;
|
||||
|
@ -2679,7 +2669,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
|
|||
}
|
||||
|
||||
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
|
||||
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
|
||||
SRaftId raftId;
|
||||
raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
|
||||
raftId.vgId = ths->vgId;
|
||||
|
@ -2810,7 +2800,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
|||
}
|
||||
|
||||
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
|
||||
for (int i = 0; i < ths->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < ths->replicaNum; ++i) {
|
||||
if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
|
||||
return true;
|
||||
}
|
||||
|
@ -2820,7 +2810,7 @@ bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
|
|||
|
||||
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
|
||||
SSyncSnapshotSender* pSender = NULL;
|
||||
for (int i = 0; i < ths->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < ths->replicaNum; ++i) {
|
||||
if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
|
||||
pSender = (ths->senders)[i];
|
||||
}
|
||||
|
@ -2830,7 +2820,7 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId)
|
|||
|
||||
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
|
||||
SSyncTimer* pTimer = NULL;
|
||||
for (int i = 0; i < ths->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < ths->replicaNum; ++i) {
|
||||
if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
|
||||
pTimer = &((ths->peerHeartbeatTimerArr)[i]);
|
||||
}
|
||||
|
@ -2840,7 +2830,7 @@ SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
|
|||
|
||||
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
|
||||
SPeerState* pState = NULL;
|
||||
for (int i = 0; i < ths->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < ths->replicaNum; ++i) {
|
||||
if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
|
||||
pState = &((ths->peerStates)[i]);
|
||||
}
|
||||
|
@ -2879,7 +2869,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
|
|||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
|
||||
if (pSender != NULL && pSender->start) {
|
||||
sError("sync cannot change3");
|
||||
|
|
|
@ -831,21 +831,18 @@ void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) {
|
|||
}
|
||||
|
||||
// ---- message process SyncClientRequest----
|
||||
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
|
||||
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) {
|
||||
uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
|
||||
SyncClientRequest* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||
pMsg->seqNum = 0;
|
||||
pMsg->isWeak = false;
|
||||
pMsg->dataLen = dataLen;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
// step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) {
|
||||
SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen);
|
||||
SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) {
|
||||
SyncClientRequest* pMsg = syncClientRequestAlloc(pOriginalRpcMsg->contLen);
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->originalRpcType = pOriginalRpcMsg->msgType;
|
||||
pMsg->seqNum = seqNum;
|
||||
|
@ -891,7 +888,6 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len)
|
|||
|
||||
// step 2. SyncClientRequest => RpcMsg, to queue
|
||||
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
|
|
|
@ -142,7 +142,7 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) {
|
|||
memset(s, 0, len + 1);
|
||||
memcpy(s, ptr, len);
|
||||
|
||||
for (int i = 0; i < len; ++i) {
|
||||
for (int32_t i = 0; i < len; ++i) {
|
||||
if (!syncUtilCanPrint(s[i])) {
|
||||
s[i] = '.';
|
||||
}
|
||||
|
@ -157,8 +157,8 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
|
|||
memset(s, 0, len2);
|
||||
|
||||
char* p = s;
|
||||
for (int i = 0; i < len; ++i) {
|
||||
int n = sprintf(p, "%d,", ptr[i]);
|
||||
for (int32_t i = 0; i < len; ++i) {
|
||||
int32_t n = sprintf(p, "%d,", ptr[i]);
|
||||
p += n;
|
||||
}
|
||||
return s;
|
||||
|
@ -178,29 +178,11 @@ void syncUtilMsgNtoH(void* msg) {
|
|||
pHead->vgId = ntohl(pHead->vgId);
|
||||
}
|
||||
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
return true;
|
||||
}
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||
|
||||
return false;
|
||||
}
|
||||
bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||
|
||||
bool syncUtilUserCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool syncUtilUserRollback(tmsg_t msgType) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||
|
||||
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
||||
int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex);
|
||||
|
|
|
@ -74,8 +74,8 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
|
|||
|
||||
ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
|
||||
|
||||
int j = -1;
|
||||
for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||
int32_t j = -1;
|
||||
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||
if (syncUtilSameId(&((*(pVotesGranted->replicas))[i]), &(pMsg->srcId))) {
|
||||
j = i;
|
||||
break;
|
||||
|
@ -105,11 +105,11 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
|
|||
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
|
||||
cJSON *pReplicas = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
||||
for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
|
||||
}
|
||||
int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesGranted->replicaNum);
|
||||
for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||
int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum);
|
||||
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||
arr[i] = pVotesGranted->isGranted[i];
|
||||
}
|
||||
cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
|
||||
|
@ -168,7 +168,7 @@ void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
|
|||
|
||||
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
|
||||
bool ret = false;
|
||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) {
|
||||
ret = true;
|
||||
break;
|
||||
|
@ -183,7 +183,7 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p
|
|||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
|
||||
// ASSERT(pVotesRespond->isRespond[i] == false);
|
||||
pVotesRespond->isRespond[i] = true;
|
||||
|
@ -197,7 +197,7 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
|
|||
pVotesRespond->term = term;
|
||||
memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond));
|
||||
/*
|
||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
pVotesRespond->isRespond[i] = false;
|
||||
}
|
||||
*/
|
||||
|
@ -211,12 +211,12 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
|
|||
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
|
||||
cJSON *pReplicas = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
|
||||
}
|
||||
int respondNum = 0;
|
||||
int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesRespond->replicaNum);
|
||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
int32_t respondNum = 0;
|
||||
int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum);
|
||||
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||
arr[i] = pVotesRespond->isRespond[i];
|
||||
if (pVotesRespond->isRespond[i]) {
|
||||
respondNum++;
|
||||
|
|
|
@ -81,7 +81,7 @@ void test4() {
|
|||
|
||||
void test5() {
|
||||
SyncApplyMsg *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncApplyMsg2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncApplyMsg *pMsg2 = syncApplyMsgFromRpcMsg2(&rpcMsg);
|
||||
syncApplyMsgLog2((char *)"test5: syncClientRequest2RpcMsg -> syncApplyMsgFromRpcMsg2 ", pMsg2);
|
||||
|
|
|
@ -59,7 +59,7 @@ void test2() {
|
|||
uint32_t len = pMsg->bytes;
|
||||
char * serialized = (char *)taosMemoryMalloc(len);
|
||||
syncClientRequestSerialize(pMsg, serialized, len);
|
||||
SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen);
|
||||
SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen);
|
||||
syncClientRequestDeserialize(serialized, len, pMsg2);
|
||||
syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2);
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ SyncClientRequest *createMsg() {
|
|||
rpcMsg.contLen = 20;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
strcpy((char *)rpcMsg.pCont, "hello rpc");
|
||||
SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000);
|
||||
SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
return pMsg;
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ void test2() {
|
|||
uint32_t len = pMsg->bytes;
|
||||
char *serialized = (char *)taosMemoryMalloc(len);
|
||||
syncClientRequestSerialize(pMsg, serialized, len);
|
||||
SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen);
|
||||
SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen);
|
||||
syncClientRequestDeserialize(serialized, len, pMsg2);
|
||||
syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2);
|
||||
|
||||
|
@ -60,7 +60,7 @@ void test3() {
|
|||
|
||||
void test4() {
|
||||
SyncClientRequest *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen);
|
||||
syncClientRequestFromRpcMsg(&rpcMsg, pMsg2);
|
||||
|
@ -73,7 +73,7 @@ void test4() {
|
|||
|
||||
void test5() {
|
||||
SyncClientRequest *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg);
|
||||
syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2);
|
||||
|
|
|
@ -102,12 +102,12 @@ SRpcMsg *step0() {
|
|||
}
|
||||
|
||||
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000);
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SRpcMsg *step2(const SyncClientRequest *pMsg) {
|
||||
SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
|
||||
SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryCalloc(sizeof(SRpcMsg), 1);
|
||||
syncClientRequest2RpcMsg(pMsg, pRetMsg);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ void test1() {
|
|||
}
|
||||
|
||||
void test2() {
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10);
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10);
|
||||
pSyncMsg->originalRpcType = 33;
|
||||
pSyncMsg->seqNum = 11;
|
||||
pSyncMsg->isWeak = 1;
|
||||
|
@ -46,7 +46,7 @@ void test2() {
|
|||
}
|
||||
|
||||
void test3() {
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10);
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10);
|
||||
pSyncMsg->originalRpcType = 33;
|
||||
pSyncMsg->seqNum = 11;
|
||||
pSyncMsg->isWeak = 1;
|
||||
|
|
|
@ -47,7 +47,7 @@ SyncClientRequest *createSyncClientRequest() {
|
|||
rpcMsg.contLen = 20;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
strcpy((char *)rpcMsg.pCont, "hello rpc");
|
||||
SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000);
|
||||
SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
@ -156,7 +156,7 @@ void test7() {
|
|||
|
||||
void test8() {
|
||||
SyncClientRequest *pMsg = createSyncClientRequest();
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char *)"test8", &rpcMsg);
|
||||
syncClientRequestDestroy(pMsg);
|
||||
|
|
|
@ -162,7 +162,7 @@ SRpcMsg *step0() {
|
|||
}
|
||||
|
||||
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000);
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
|
@ -206,7 +206,7 @@ int main(int argc, char **argv) {
|
|||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
SyncClientRequest *pSyncClientRequest = pMsg1;
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
|
||||
gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@ SRpcMsg *step0() {
|
|||
}
|
||||
|
||||
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000);
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ int main(int argc, char **argv) {
|
|||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
SyncClientRequest *pSyncClientRequest = pMsg1;
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
|
||||
gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
|
||||
|
||||
|
|
Loading…
Reference in New Issue