diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a12a635837..f4890206fe 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -250,8 +250,8 @@ enum { TD_NEW_MSG_SEG(TDMT_SYNC_MSG) TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) // no longer used + TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ec4f42c847..a322f76800 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -183,8 +183,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 2265f58060..e15d7ac3df 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -450,8 +450,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 46bbb14421..d9eefab33f 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -67,8 +67,6 @@ extern "C" { typedef struct SyncTimeout SyncTimeout; typedef struct SyncClientRequest SyncClientRequest; -typedef struct SyncPing SyncPing; -typedef struct SyncPingReply SyncPingReply; typedef struct SyncRequestVote SyncRequestVote; typedef struct SyncRequestVoteReply SyncRequestVoteReply; typedef struct SyncAppendEntries SyncAppendEntries; @@ -93,17 +91,6 @@ typedef struct SyncHeartbeatReply SyncHeartbeatReply; typedef struct SyncHeartbeat SyncHeartbeat; typedef struct SyncPreSnapshot SyncPreSnapshot; -typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); -typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); -typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); -typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg); -typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg); -typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); -typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); -typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg); -typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); -typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); - extern bool gRaftDetailLog; typedef struct SRaftId { @@ -220,18 +207,6 @@ typedef struct SSyncNode { // peer heartbeat timer SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA]; - // callback - FpOnPingCb FpOnPing; - FpOnPingReplyCb FpOnPingReply; - FpOnClientRequestCb FpOnClientRequest; - FpOnTimeoutCb FpOnTimeout; - FpOnRequestVoteCb FpOnRequestVote; - FpOnRequestVoteReplyCb FpOnRequestVoteReply; - FpOnAppendEntriesCb FpOnAppendEntries; - FpOnAppendEntriesReplyCb FpOnAppendEntriesReply; - FpOnSnapshotCb FpOnSnapshot; - FpOnSnapshotReplyCb FpOnSnapshotReply; - // tools SSyncRespMgr* pSyncRespMgr; @@ -269,12 +244,6 @@ bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex); -// ping -------------- -int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); -int32_t syncNodePingSelf(SSyncNode* pSyncNode); -int32_t syncNodePingPeers(SSyncNode* pSyncNode); -int32_t syncNodePingAll(SSyncNode* pSyncNode); - // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 7dc04d0b86..2110c80df3 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -22,60 +22,6 @@ extern "C" { #include "syncInt.h" -// --------------------------------------------- -typedef struct SyncPing { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - // private data - uint32_t dataLen; - char data[]; -} SyncPing; - - -void syncPingDestroy(SyncPing* pMsg); -void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); -SyncPing* syncPingDeserialize2(const char* buf, uint32_t len); -SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg); - -// --------------------------------------------- -typedef struct SyncPingReply { - uint32_t bytes; - int32_t vgId; - uint32_t msgType; - SRaftId srcId; - SRaftId destId; - // private data - uint32_t dataLen; - char data[]; -} SyncPingReply; - -SyncPingReply* syncPingReplyBuild(uint32_t dataLen); -SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); -SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); -void syncPingReplyDestroy(SyncPingReply* pMsg); -void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); -void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); -char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len); -SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len); -int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen); -SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen); -void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); -void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); -SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPingReply2Json(const SyncPingReply* pMsg); -char* syncPingReply2Str(const SyncPingReply* pMsg); - -// for debug ---------------------- -void syncPingReplyPrint(const SyncPingReply* pMsg); -void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg); -void syncPingReplyLog(const SyncPingReply* pMsg); -void syncPingReplyLog2(char* s, const SyncPingReply* pMsg); - -// --------------------------------------------- typedef enum ESyncTimeoutType { SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_ELECTION, @@ -656,8 +602,6 @@ void syncLocalCmdLog(const SyncLocalCmd* pMsg); void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); // on message ---------------------- -int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); -int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 25f6c5e6f1..c9e6885e50 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -145,14 +145,6 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); code = syncNodeOnTimer(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING) { - SyncPing* pSyncMsg = syncPingFromRpcMsg2(pMsg); - code = syncNodeOnPing(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply* pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - code = syncNodeOnPingReply(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { @@ -906,18 +898,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]); } - // init callback - pSyncNode->FpOnPing = syncNodeOnPing; - pSyncNode->FpOnPingReply = syncNodeOnPingReply; - pSyncNode->FpOnClientRequest = syncNodeOnClientRequest; - pSyncNode->FpOnTimeout = syncNodeOnTimer; - pSyncNode->FpOnSnapshot = syncNodeOnSnapshot; - pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply; - pSyncNode->FpOnRequestVote = syncNodeOnRequestVote; - pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReply; - pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntries; - pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReply; - // tools pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); if (pSyncNode->pSyncRespMgr == NULL) { @@ -2062,33 +2042,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { return ret; } -// on message ---- -int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) { - sTrace("vgId:%d, recv sync-ping", ths->vgId); - - SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId); - SRpcMsg rpcMsg; - syncPingReply2RpcMsg(pMsgReply, &rpcMsg); - - /* - // htonl - SMsgHead* pHead = rpcMsg.pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); - */ - - syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); - syncPingReplyDestroy(pMsgReply); - - return 0; -} - -int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) { - int32_t ret = 0; - sTrace("vgId:%d, recv sync-ping-reply", ths->vgId); - return ret; -} - int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { syncLogRecvHeartbeat(ths, pMsg, ""); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 565b21eb55..74f4a19f81 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -152,309 +152,6 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { } -void syncPingDestroy(SyncPing* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); - ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); -} - -SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncPing* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncPingDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - - -SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -// ---- message process SyncPingReply---- -SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { - uint32_t bytes = sizeof(SyncPingReply) + dataLen; - SyncPingReply* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->msgType = TDMT_SYNC_PING_REPLY; - pMsg->dataLen = dataLen; - return pMsg; -} - -SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) { - uint32_t dataLen = strlen(str) + 1; - SyncPingReply* pMsg = syncPingReplyBuild(dataLen); - pMsg->vgId = vgId; - pMsg->srcId = *srcId; - pMsg->destId = *destId; - snprintf(pMsg->data, pMsg->dataLen, "%s", str); - return pMsg; -} - -SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) { - SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, vgId, "pang"); - return pMsg; -} - -void syncPingReplyDestroy(SyncPingReply* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); - ASSERT(pMsg->bytes == sizeof(SyncPingReply) + pMsg->dataLen); -} - -char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncPingReplySerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncPingReply* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncPingReplyDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) { - return -1; - } - - if (tEncodeU32(&encoder, pMsg->bytes) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->vgId) < 0) { - return -1; - } - if (tEncodeU32(&encoder, pMsg->msgType) < 0) { - return -1; - } - if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) { - return -1; - } - if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) { - return -1; - } - if (tEncodeU32(&encoder, pMsg->dataLen) < 0) { - return -1; - } - if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) { - return -1; - } - - tEndEncode(&encoder); - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) { - return NULL; - } - - SyncPingReply* pMsg = NULL; - uint32_t bytes; - if (tDecodeU32(&decoder, &bytes) < 0) { - return NULL; - } - - pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - pMsg->bytes = bytes; - - if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - uint32_t len; - char* data = NULL; - if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - ASSERT(len == pMsg->dataLen); - memcpy(pMsg->data, data, len); - - tEndDecode(&decoder); - tDecoderClear(&decoder); - return pMsg; -} - -void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { - syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); - } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot); - return pJson; -} - -char* syncPingReply2Str(const SyncPingReply* pMsg) { - cJSON* pJson = syncPingReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncPingReplyPrint(const SyncPingReply* pMsg) { - char* serialized = syncPingReply2Str(pMsg); - printf("syncPingReplyPrint | len:%zu | %s \n", strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg) { - char* serialized = syncPingReply2Str(pMsg); - printf("syncPingReplyPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPingReplyLog(const SyncPingReply* pMsg) { - char* serialized = syncPingReply2Str(pMsg); - sTrace("syncPingReplyLog | len:%zu | %s", strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncPingReply2Str(pMsg); - sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - // ---- message process SyncClientRequest---- SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { uint32_t bytes = sizeof(SyncClientRequest) + dataLen; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index c29def9ca3..057f2ea6dd 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -223,18 +223,18 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot; - gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply; + // gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot; + // gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply; gSyncIO->pSyncNode = pSyncNode; syncNodeRelease(pSyncNode); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 23acadc0cc..bab3d2236f 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -146,16 +146,16 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; gSyncIO->pSyncNode = pSyncNode; syncNodeRelease(pSyncNode); diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index 59d731c823..58c0e7d13a 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -59,15 +59,15 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; syncNodeStart(pSyncNode); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 54bfb1b387..d2ae46a443 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -38,15 +38,15 @@ SSyncNode* syncNodeInit() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index a082f9373a..ae74929b7b 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -38,15 +38,15 @@ SSyncNode* syncNodeInit() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index b15a767db5..1e0ff54514 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -38,14 +38,14 @@ SSyncNode* syncNodeInit() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index 19f377e037..975565770a 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index ded7e22eac..8aa4e9dada 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index 4f1bb64798..e77358f375 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 2419ec0974..4a82bba15d 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -120,16 +120,16 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; gSyncIO->pSyncNode = pSyncNode; syncNodeRelease(pSyncNode); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 40eac0b55f..15854eeca6 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -116,14 +116,14 @@ SSyncNode *syncNodeInit() { SSyncNode *pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; syncNodeStart(pSyncNode); diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index e8d40d5054..8c486df118 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -257,16 +257,16 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot; - gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot; + // gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply; gSyncIO->pSyncNode = pSyncNode; syncNodeRelease(pSyncNode); diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 2d2a12c656..67573a6a37 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -39,14 +39,14 @@ SSyncNode* syncNodeInit() { pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 712e97edc9..3a0dac98b0 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -40,14 +40,14 @@ SSyncNode* syncNodeInit() { pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index 56dd0a5845..aae1862681 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -94,14 +94,14 @@ SSyncNode *syncNodeInit() { SSyncNode *pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + // gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + // gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + // gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + // gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + // gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + // gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + // gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + // gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; syncNodeStart(pSyncNode); diff --git a/source/libs/sync/test/sync_test_lib/inc/syncIO.h b/source/libs/sync/test/sync_test_lib/inc/syncIO.h index 955a832b68..2c5e19d863 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncIO.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncIO.h @@ -25,6 +25,7 @@ extern "C" { #include #include "os.h" #include "syncInt.h" +#include "syncTest.h" #include "taosdef.h" #include "tqueue.h" #include "trpc.h" @@ -32,6 +33,9 @@ extern "C" { #define TICK_Q_TIMER_MS 1000 #define TICK_Ping_TIMER_MS 1000 +typedef struct SyncPing SyncPing; +typedef struct SyncPingReply SyncPingReply; + typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset *pQset; diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 93da2df5a2..f531304178 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -42,6 +42,20 @@ extern "C" { extern void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); +typedef struct SyncPing SyncPing; +typedef struct SyncPingReply SyncPingReply; + +typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); +typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); +typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); +typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg); +typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg); +typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); +typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg); +typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); +typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); + cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); void syncEntryPrint(const SSyncRaftEntry* pObj); @@ -127,6 +141,18 @@ void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); // origin syncMessage +typedef struct SyncPing { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + uint32_t dataLen; + char data[]; +} SyncPing; + + SyncPing* syncPingBuild(uint32_t dataLen); SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); @@ -141,6 +167,51 @@ void syncPingPrint(const SyncPing* pMsg); void syncPingPrint2(char* s, const SyncPing* pMsg); void syncPingLog(const SyncPing* pMsg); void syncPingLog2(char* s, const SyncPing* pMsg); +void syncPingDestroy(SyncPing* pMsg); +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); +SyncPing* syncPingDeserialize2(const char* buf, uint32_t len); +SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg); + +typedef struct SyncPingReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + uint32_t dataLen; + char data[]; +} SyncPingReply; + +SyncPingReply* syncPingReplyBuild(uint32_t dataLen); +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); +void syncPingReplyDestroy(SyncPingReply* pMsg); +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); +char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len); +SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len); +int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen); +SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen); +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); +SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPingReply2Json(const SyncPingReply* pMsg); +char* syncPingReply2Str(const SyncPingReply* pMsg); + +// for debug ---------------------- +void syncPingReplyPrint(const SyncPingReply* pMsg); +void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg); +void syncPingReplyLog(const SyncPingReply* pMsg); +void syncPingReplyLog2(char* s, const SyncPingReply* pMsg); + +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); +int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); +int32_t syncNodePingSelf(SSyncNode* pSyncNode); +int32_t syncNodePingPeers(SSyncNode* pSyncNode); +int32_t syncNodePingAll(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c index 5e1a9be164..6b461da0e5 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c @@ -138,20 +138,20 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); // callback - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing); - cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply); - cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote); - cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply); - cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries); - cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply); - cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout); - cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing); + // cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply); + // cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote); + // cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply); + // cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries); + // cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply); + // cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout); + // cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf); // restoreFinish cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish); @@ -253,3 +253,29 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { return ret; } +// on message ---- +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) { + sTrace("vgId:%d, recv sync-ping", ths->vgId); + + SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsgReply, &rpcMsg); + + /* + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + */ + + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + syncPingReplyDestroy(pMsgReply); + + return 0; +} + +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) { + int32_t ret = 0; + sTrace("vgId:%d, recv sync-ping-reply", ths->vgId); + return ret; +} diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index cdd2ae045d..c56d983e6e 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -254,6 +254,308 @@ void syncPingLog2(char* s, const SyncPing* pMsg) { } } +void syncPingDestroy(SyncPing* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); +} + +SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPing* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncPingDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +// ---- message process SyncPingReply---- +SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SyncPingReply) + dataLen; + SyncPingReply* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = TDMT_SYNC_PING_REPLY; + pMsg->dataLen = dataLen; + return pMsg; +} + +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPingReply* pMsg = syncPingReplyBuild(dataLen); + pMsg->vgId = vgId; + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) { + SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, vgId, "pang"); + return pMsg; +} + +void syncPingReplyDestroy(SyncPingReply* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncPingReply) + pMsg->dataLen); +} + +char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncPingReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPingReply* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncPingReplyDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) { + return -1; + } + + if (tEncodeU32(&encoder, pMsg->bytes) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->vgId) < 0) { + return -1; + } + if (tEncodeU32(&encoder, pMsg->msgType) < 0) { + return -1; + } + if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) { + return -1; + } + if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) { + return -1; + } + if (tEncodeU32(&encoder, pMsg->dataLen) < 0) { + return -1; + } + if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) { + return -1; + } + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + if (tStartDecode(&decoder) < 0) { + return NULL; + } + + SyncPingReply* pMsg = NULL; + uint32_t bytes; + if (tDecodeU32(&decoder, &bytes) < 0) { + return NULL; + } + + pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + pMsg->bytes = bytes; + + if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + uint32_t len; + char* data = NULL; + if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + ASSERT(len == pMsg->dataLen); + memcpy(pMsg->data, data, len); + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return pMsg; +} + +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { + syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot); + return pJson; +} + +char* syncPingReply2Str(const SyncPingReply* pMsg) { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncPingReplyPrint(const SyncPingReply* pMsg) { + char* serialized = syncPingReply2Str(pMsg); + printf("syncPingReplyPrint | len:%zu | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg) { + char* serialized = syncPingReply2Str(pMsg); + printf("syncPingReplyPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPingReplyLog(const SyncPingReply* pMsg) { + char* serialized = syncPingReply2Str(pMsg); + sTrace("syncPingReplyLog | len:%zu | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncPingReply2Str(pMsg); + sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot;