Merge pull request #18070 from taosdata/fix/TD-20052
refact: remove sync ping and pingreply
This commit is contained in:
commit
0597c01107
|
@ -250,8 +250,8 @@ enum {
|
|||
|
||||
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL)
|
||||
|
|
|
@ -183,8 +183,6 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
|
|
|
@ -450,8 +450,6 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -67,8 +67,6 @@ extern "C" {
|
|||
|
||||
typedef struct SyncTimeout SyncTimeout;
|
||||
typedef struct SyncClientRequest SyncClientRequest;
|
||||
typedef struct SyncPing SyncPing;
|
||||
typedef struct SyncPingReply SyncPingReply;
|
||||
typedef struct SyncRequestVote SyncRequestVote;
|
||||
typedef struct SyncRequestVoteReply SyncRequestVoteReply;
|
||||
typedef struct SyncAppendEntries SyncAppendEntries;
|
||||
|
@ -93,17 +91,6 @@ typedef struct SyncHeartbeatReply SyncHeartbeatReply;
|
|||
typedef struct SyncHeartbeat SyncHeartbeat;
|
||||
typedef struct SyncPreSnapshot SyncPreSnapshot;
|
||||
|
||||
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
|
||||
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
||||
typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||
typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||
typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||
typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||
typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
|
||||
typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
||||
typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||
|
||||
extern bool gRaftDetailLog;
|
||||
|
||||
typedef struct SRaftId {
|
||||
|
@ -220,18 +207,6 @@ typedef struct SSyncNode {
|
|||
// peer heartbeat timer
|
||||
SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA];
|
||||
|
||||
// callback
|
||||
FpOnPingCb FpOnPing;
|
||||
FpOnPingReplyCb FpOnPingReply;
|
||||
FpOnClientRequestCb FpOnClientRequest;
|
||||
FpOnTimeoutCb FpOnTimeout;
|
||||
FpOnRequestVoteCb FpOnRequestVote;
|
||||
FpOnRequestVoteReplyCb FpOnRequestVoteReply;
|
||||
FpOnAppendEntriesCb FpOnAppendEntries;
|
||||
FpOnAppendEntriesReplyCb FpOnAppendEntriesReply;
|
||||
FpOnSnapshotCb FpOnSnapshot;
|
||||
FpOnSnapshotReplyCb FpOnSnapshotReply;
|
||||
|
||||
// tools
|
||||
SSyncRespMgr* pSyncRespMgr;
|
||||
|
||||
|
@ -269,12 +244,6 @@ bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
|||
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
||||
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
|
||||
|
||||
// ping --------------
|
||||
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
|
||||
int32_t syncNodePingSelf(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePingPeers(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePingAll(SSyncNode* pSyncNode);
|
||||
|
||||
// timer control --------------
|
||||
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
|
||||
|
|
|
@ -22,60 +22,6 @@ extern "C" {
|
|||
|
||||
#include "syncInt.h"
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncPing {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
// private data
|
||||
uint32_t dataLen;
|
||||
char data[];
|
||||
} SyncPing;
|
||||
|
||||
|
||||
void syncPingDestroy(SyncPing* pMsg);
|
||||
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
|
||||
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len);
|
||||
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncPingReply {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
// private data
|
||||
uint32_t dataLen;
|
||||
char data[];
|
||||
} SyncPingReply;
|
||||
|
||||
SyncPingReply* syncPingReplyBuild(uint32_t dataLen);
|
||||
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
|
||||
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
|
||||
void syncPingReplyDestroy(SyncPingReply* pMsg);
|
||||
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg);
|
||||
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len);
|
||||
SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len);
|
||||
int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen);
|
||||
SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen);
|
||||
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg);
|
||||
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncPingReply2Json(const SyncPingReply* pMsg);
|
||||
char* syncPingReply2Str(const SyncPingReply* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncPingReplyPrint(const SyncPingReply* pMsg);
|
||||
void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg);
|
||||
void syncPingReplyLog(const SyncPingReply* pMsg);
|
||||
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef enum ESyncTimeoutType {
|
||||
SYNC_TIMEOUT_PING = 100,
|
||||
SYNC_TIMEOUT_ELECTION,
|
||||
|
@ -656,8 +602,6 @@ void syncLocalCmdLog(const SyncLocalCmd* pMsg);
|
|||
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
|
||||
|
||||
// on message ----------------------
|
||||
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
|
||||
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
|
||||
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||
|
|
|
@ -145,14 +145,6 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
|||
SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnTimer(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing* pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPing(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply* pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
|
@ -906,18 +898,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
|
||||
}
|
||||
|
||||
// init callback
|
||||
pSyncNode->FpOnPing = syncNodeOnPing;
|
||||
pSyncNode->FpOnPingReply = syncNodeOnPingReply;
|
||||
pSyncNode->FpOnClientRequest = syncNodeOnClientRequest;
|
||||
pSyncNode->FpOnTimeout = syncNodeOnTimer;
|
||||
pSyncNode->FpOnSnapshot = syncNodeOnSnapshot;
|
||||
pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply;
|
||||
pSyncNode->FpOnRequestVote = syncNodeOnRequestVote;
|
||||
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReply;
|
||||
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntries;
|
||||
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReply;
|
||||
|
||||
// tools
|
||||
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
|
||||
if (pSyncNode->pSyncRespMgr == NULL) {
|
||||
|
@ -2062,33 +2042,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
// on message ----
|
||||
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) {
|
||||
sTrace("vgId:%d, recv sync-ping", ths->vgId);
|
||||
|
||||
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
|
||||
SRpcMsg rpcMsg;
|
||||
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||
|
||||
/*
|
||||
// htonl
|
||||
SMsgHead* pHead = rpcMsg.pCont;
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
*/
|
||||
|
||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||
syncPingReplyDestroy(pMsgReply);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) {
|
||||
int32_t ret = 0;
|
||||
sTrace("vgId:%d, recv sync-ping-reply", ths->vgId);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
||||
syncLogRecvHeartbeat(ths, pMsg, "");
|
||||
|
||||
|
|
|
@ -152,309 +152,6 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
|
|||
}
|
||||
|
||||
|
||||
void syncPingDestroy(SyncPing* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncPing* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncPingDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
||||
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
// ---- message process SyncPingReply----
|
||||
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
|
||||
uint32_t bytes = sizeof(SyncPingReply) + dataLen;
|
||||
SyncPingReply* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_SYNC_PING_REPLY;
|
||||
pMsg->dataLen = dataLen;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
|
||||
uint32_t dataLen = strlen(str) + 1;
|
||||
SyncPingReply* pMsg = syncPingReplyBuild(dataLen);
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->srcId = *srcId;
|
||||
pMsg->destId = *destId;
|
||||
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
|
||||
SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, vgId, "pang");
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncPingReplyDestroy(SyncPingReply* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPingReply) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncPingReplySerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncPingReply* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncPingReplyDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
if (tStartDecode(&decoder) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SyncPingReply* pMsg = NULL;
|
||||
uint32_t bytes;
|
||||
if (tDecodeU32(&decoder, &bytes) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
pMsg->bytes = bytes;
|
||||
|
||||
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
uint32_t len;
|
||||
char* data = NULL;
|
||||
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
ASSERT(len == pMsg->dataLen);
|
||||
memcpy(pMsg->data, data, len);
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
|
||||
syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
char* s;
|
||||
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data", s);
|
||||
taosMemoryFree(s);
|
||||
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncPingReply2Str(const SyncPingReply* pMsg) {
|
||||
cJSON* pJson = syncPingReply2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncPingReplyPrint(const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
printf("syncPingReplyPrint | len:%zu | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
printf("syncPingReplyPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyLog(const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
sTrace("syncPingReplyLog | len:%zu | %s", strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncClientRequest----
|
||||
SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) {
|
||||
uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
|
||||
|
|
|
@ -223,18 +223,18 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
|||
|
||||
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
|
||||
assert(pSyncNode != NULL);
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
|
||||
gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot;
|
||||
gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply;
|
||||
// gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot;
|
||||
// gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply;
|
||||
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
|
|
@ -146,16 +146,16 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
|||
|
||||
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
|
||||
assert(pSyncNode != NULL);
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
||||
|
|
|
@ -59,15 +59,15 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa
|
|||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
syncNodeStart(pSyncNode);
|
||||
|
|
|
@ -38,15 +38,15 @@ SSyncNode* syncNodeInit() {
|
|||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -38,15 +38,15 @@ SSyncNode* syncNodeInit() {
|
|||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -38,14 +38,14 @@ SSyncNode* syncNodeInit() {
|
|||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() {
|
|||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() {
|
|||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() {
|
|||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -120,16 +120,16 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
|||
|
||||
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
|
||||
assert(pSyncNode != NULL);
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
||||
|
|
|
@ -116,14 +116,14 @@ SSyncNode *syncNodeInit() {
|
|||
SSyncNode *pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
syncNodeStart(pSyncNode);
|
||||
|
|
|
@ -257,16 +257,16 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
|||
|
||||
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
|
||||
assert(pSyncNode != NULL);
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot;
|
||||
gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot;
|
||||
// gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply;
|
||||
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
|
|
@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() {
|
|||
pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -40,14 +40,14 @@ SSyncNode* syncNodeInit() {
|
|||
pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
|
|
|
@ -94,14 +94,14 @@ SSyncNode *syncNodeInit() {
|
|||
SSyncNode *pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
syncNodeStart(pSyncNode);
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
#include <stdlib.h>
|
||||
#include "os.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncTest.h"
|
||||
#include "taosdef.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
|
@ -32,6 +33,9 @@ extern "C" {
|
|||
#define TICK_Q_TIMER_MS 1000
|
||||
#define TICK_Ping_TIMER_MS 1000
|
||||
|
||||
typedef struct SyncPing SyncPing;
|
||||
typedef struct SyncPingReply SyncPingReply;
|
||||
|
||||
typedef struct SSyncIO {
|
||||
STaosQueue *pMsgQ;
|
||||
STaosQset *pQset;
|
||||
|
|
|
@ -42,6 +42,20 @@ extern "C" {
|
|||
|
||||
extern void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
||||
|
||||
typedef struct SyncPing SyncPing;
|
||||
typedef struct SyncPingReply SyncPingReply;
|
||||
|
||||
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
|
||||
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
||||
typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||
typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||
typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||
typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||
typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
|
||||
typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
||||
typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||
|
||||
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
|
||||
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
|
||||
void syncEntryPrint(const SSyncRaftEntry* pObj);
|
||||
|
@ -127,6 +141,18 @@ void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
|
|||
|
||||
|
||||
// origin syncMessage
|
||||
typedef struct SyncPing {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
// private data
|
||||
uint32_t dataLen;
|
||||
char data[];
|
||||
} SyncPing;
|
||||
|
||||
|
||||
SyncPing* syncPingBuild(uint32_t dataLen);
|
||||
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
|
||||
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
|
||||
|
@ -141,6 +167,51 @@ void syncPingPrint(const SyncPing* pMsg);
|
|||
void syncPingPrint2(char* s, const SyncPing* pMsg);
|
||||
void syncPingLog(const SyncPing* pMsg);
|
||||
void syncPingLog2(char* s, const SyncPing* pMsg);
|
||||
void syncPingDestroy(SyncPing* pMsg);
|
||||
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
|
||||
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len);
|
||||
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
|
||||
typedef struct SyncPingReply {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
// private data
|
||||
uint32_t dataLen;
|
||||
char data[];
|
||||
} SyncPingReply;
|
||||
|
||||
SyncPingReply* syncPingReplyBuild(uint32_t dataLen);
|
||||
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
|
||||
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
|
||||
void syncPingReplyDestroy(SyncPingReply* pMsg);
|
||||
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg);
|
||||
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len);
|
||||
SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len);
|
||||
int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen);
|
||||
SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen);
|
||||
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg);
|
||||
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncPingReply2Json(const SyncPingReply* pMsg);
|
||||
char* syncPingReply2Str(const SyncPingReply* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncPingReplyPrint(const SyncPingReply* pMsg);
|
||||
void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg);
|
||||
void syncPingReplyLog(const SyncPingReply* pMsg);
|
||||
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg);
|
||||
|
||||
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
|
||||
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
|
||||
int32_t syncNodePingSelf(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePingPeers(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePingAll(SSyncNode* pSyncNode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -138,20 +138,20 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
|||
cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);
|
||||
|
||||
// callback
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
|
||||
cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
|
||||
cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
|
||||
cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
|
||||
cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
|
||||
cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
|
||||
cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
|
||||
cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
|
||||
// cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
|
||||
// cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
|
||||
// cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
|
||||
// cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
|
||||
// cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
|
||||
// cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
|
||||
// cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
|
||||
|
||||
// restoreFinish
|
||||
cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish);
|
||||
|
@ -253,3 +253,29 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
// on message ----
|
||||
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) {
|
||||
sTrace("vgId:%d, recv sync-ping", ths->vgId);
|
||||
|
||||
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
|
||||
SRpcMsg rpcMsg;
|
||||
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||
|
||||
/*
|
||||
// htonl
|
||||
SMsgHead* pHead = rpcMsg.pCont;
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
*/
|
||||
|
||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||
syncPingReplyDestroy(pMsgReply);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) {
|
||||
int32_t ret = 0;
|
||||
sTrace("vgId:%d, recv sync-ping-reply", ths->vgId);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -254,6 +254,308 @@ void syncPingLog2(char* s, const SyncPing* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void syncPingDestroy(SyncPing* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncPing* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncPingDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
// ---- message process SyncPingReply----
|
||||
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
|
||||
uint32_t bytes = sizeof(SyncPingReply) + dataLen;
|
||||
SyncPingReply* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_SYNC_PING_REPLY;
|
||||
pMsg->dataLen = dataLen;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
|
||||
uint32_t dataLen = strlen(str) + 1;
|
||||
SyncPingReply* pMsg = syncPingReplyBuild(dataLen);
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->srcId = *srcId;
|
||||
pMsg->destId = *destId;
|
||||
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
|
||||
SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, vgId, "pang");
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncPingReplyDestroy(SyncPingReply* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPingReply) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncPingReplySerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncPingReply* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncPingReplyDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
if (tStartDecode(&decoder) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SyncPingReply* pMsg = NULL;
|
||||
uint32_t bytes;
|
||||
if (tDecodeU32(&decoder, &bytes) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
pMsg->bytes = bytes;
|
||||
|
||||
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
uint32_t len;
|
||||
char* data = NULL;
|
||||
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
|
||||
taosMemoryFree(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
ASSERT(len == pMsg->dataLen);
|
||||
memcpy(pMsg->data, data, len);
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
|
||||
syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
char* s;
|
||||
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data", s);
|
||||
taosMemoryFree(s);
|
||||
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncPingReply2Str(const SyncPingReply* pMsg) {
|
||||
cJSON* pJson = syncPingReply2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncPingReplyPrint(const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
printf("syncPingReplyPrint | len:%zu | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
printf("syncPingReplyPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyLog(const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
sTrace("syncPingReplyLog | len:%zu | %s", strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------
|
||||
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
|
||||
cJSON* pRoot;
|
||||
|
|
Loading…
Reference in New Issue