refact: remove sync pre snapshot
This commit is contained in:
parent
973fc99f75
commit
7bcda46c59
|
@ -274,8 +274,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
||||||
|
|
|
@ -195,7 +195,6 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,6 @@ typedef struct SyncAppendEntriesReply {
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
} SyncAppendEntriesReply;
|
} SyncAppendEntriesReply;
|
||||||
|
|
||||||
// ---------------------------------------------
|
|
||||||
typedef struct SyncHeartbeat {
|
typedef struct SyncHeartbeat {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -149,28 +148,8 @@ typedef struct SyncPreSnapshot {
|
||||||
|
|
||||||
// private data
|
// private data
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
|
|
||||||
} SyncPreSnapshot;
|
} SyncPreSnapshot;
|
||||||
|
|
||||||
SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId);
|
|
||||||
void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg);
|
|
||||||
void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen);
|
|
||||||
void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg);
|
|
||||||
char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len);
|
|
||||||
SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len);
|
|
||||||
void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg);
|
|
||||||
void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg);
|
|
||||||
SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
|
||||||
cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg);
|
|
||||||
char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg);
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg);
|
|
||||||
void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg);
|
|
||||||
void syncPreSnapshotLog(const SyncPreSnapshot* pMsg);
|
|
||||||
void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg);
|
|
||||||
|
|
||||||
// ---------------------------------------------
|
|
||||||
typedef struct SyncPreSnapshotReply {
|
typedef struct SyncPreSnapshotReply {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -181,28 +160,8 @@ typedef struct SyncPreSnapshotReply {
|
||||||
// private data
|
// private data
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncIndex snapStart;
|
SyncIndex snapStart;
|
||||||
|
|
||||||
} SyncPreSnapshotReply;
|
} SyncPreSnapshotReply;
|
||||||
|
|
||||||
SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId);
|
|
||||||
void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg);
|
|
||||||
void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen);
|
|
||||||
void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg);
|
|
||||||
char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len);
|
|
||||||
SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len);
|
|
||||||
void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg);
|
|
||||||
void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg);
|
|
||||||
SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
|
||||||
cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg);
|
|
||||||
char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg);
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg);
|
|
||||||
void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg);
|
|
||||||
void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg);
|
|
||||||
void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg);
|
|
||||||
|
|
||||||
// ---------------------------------------------
|
|
||||||
typedef struct SyncApplyMsg {
|
typedef struct SyncApplyMsg {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -375,9 +334,6 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
|
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
|
|
||||||
int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
|
|
||||||
|
|
||||||
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
||||||
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||||
|
|
||||||
|
@ -404,6 +360,8 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
||||||
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
|
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
|
||||||
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
|
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
|
||||||
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId);
|
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,301 +189,38 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncPreSnapshot----
|
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId) {
|
int32_t bytes = sizeof(SyncPreSnapshot);
|
||||||
uint32_t bytes = sizeof(SyncPreSnapshot);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
memset(pMsg, 0, bytes);
|
|
||||||
pMsg->bytes = bytes;
|
|
||||||
pMsg->vgId = vgId;
|
|
||||||
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
|
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
|
||||||
return pMsg;
|
pMsg->contLen = bytes;
|
||||||
}
|
if (pMsg->pCont == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg) {
|
return -1;
|
||||||
if (pMsg != NULL) {
|
|
||||||
taosMemoryFree(pMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen) {
|
|
||||||
ASSERT(pMsg->bytes <= bufLen);
|
|
||||||
memcpy(buf, pMsg, pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg) {
|
|
||||||
memcpy(pMsg, buf, len);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len) {
|
|
||||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
|
||||||
ASSERT(buf != NULL);
|
|
||||||
syncPreSnapshotSerialize(pMsg, buf, pMsg->bytes);
|
|
||||||
if (len != NULL) {
|
|
||||||
*len = pMsg->bytes;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len) {
|
|
||||||
uint32_t bytes = *((uint32_t*)buf);
|
|
||||||
SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
syncPreSnapshotDeserialize(buf, len, pMsg);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg) {
|
|
||||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
|
||||||
pRpcMsg->msgType = pMsg->msgType;
|
|
||||||
pRpcMsg->contLen = pMsg->bytes;
|
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
|
||||||
syncPreSnapshotSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg) {
|
|
||||||
syncPreSnapshotDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
|
||||||
SyncPreSnapshot* pMsg = syncPreSnapshotDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg) {
|
|
||||||
char u64buf[128] = {0};
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
|
||||||
cJSON* pTmp = pSrcId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
|
||||||
cJSON* pTmp = pDestId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
SyncPreSnapshot* pPreSnapshot = pMsg->pCont;
|
||||||
cJSON_AddItemToObject(pJson, "SyncPreSnapshot", pRoot);
|
pPreSnapshot->bytes = bytes;
|
||||||
return pJson;
|
pPreSnapshot->msgType = TDMT_SYNC_PRE_SNAPSHOT;
|
||||||
|
pPreSnapshot->vgId = vgId;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg) {
|
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
cJSON* pJson = syncPreSnapshot2Json(pMsg);
|
int32_t bytes = sizeof(SyncPreSnapshotReply);
|
||||||
char* serialized = cJSON_Print(pJson);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg) {
|
|
||||||
char* serialized = syncPreSnapshot2Str(pMsg);
|
|
||||||
printf("syncPreSnapshotPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg) {
|
|
||||||
char* serialized = syncPreSnapshot2Str(pMsg);
|
|
||||||
printf("syncPreSnapshotPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotLog(const SyncPreSnapshot* pMsg) {
|
|
||||||
char* serialized = syncPreSnapshot2Str(pMsg);
|
|
||||||
sTrace("syncPreSnapshotLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) {
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
char* serialized = syncPreSnapshot2Str(pMsg);
|
|
||||||
sTrace("syncPreSnapshotLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- message process SyncPreSnapshotReply----
|
|
||||||
SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId) {
|
|
||||||
uint32_t bytes = sizeof(SyncPreSnapshotReply);
|
|
||||||
SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
memset(pMsg, 0, bytes);
|
|
||||||
pMsg->bytes = bytes;
|
|
||||||
pMsg->vgId = vgId;
|
|
||||||
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
|
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
|
||||||
return pMsg;
|
pMsg->contLen = bytes;
|
||||||
}
|
if (pMsg->pCont == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg) {
|
return -1;
|
||||||
if (pMsg != NULL) {
|
|
||||||
taosMemoryFree(pMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen) {
|
|
||||||
ASSERT(pMsg->bytes <= bufLen);
|
|
||||||
memcpy(buf, pMsg, pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg) {
|
|
||||||
memcpy(pMsg, buf, len);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len) {
|
|
||||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
|
||||||
ASSERT(buf != NULL);
|
|
||||||
syncPreSnapshotReplySerialize(pMsg, buf, pMsg->bytes);
|
|
||||||
if (len != NULL) {
|
|
||||||
*len = pMsg->bytes;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len) {
|
|
||||||
uint32_t bytes = *((uint32_t*)buf);
|
|
||||||
SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
syncPreSnapshotReplyDeserialize(buf, len, pMsg);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg) {
|
|
||||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
|
||||||
pRpcMsg->msgType = pMsg->msgType;
|
|
||||||
pRpcMsg->contLen = pMsg->bytes;
|
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
|
||||||
syncPreSnapshotReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg) {
|
|
||||||
syncPreSnapshotReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
|
||||||
SyncPreSnapshotReply* pMsg = syncPreSnapshotReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) {
|
|
||||||
char u64buf[128] = {0};
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
|
||||||
cJSON* pTmp = pSrcId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
|
||||||
cJSON* pTmp = pDestId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart);
|
|
||||||
cJSON_AddStringToObject(pRoot, "snap-start", u64buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
SyncPreSnapshotReply* pPreSnapshotReply = pMsg->pCont;
|
||||||
cJSON_AddItemToObject(pJson, "SyncPreSnapshotReply", pRoot);
|
pPreSnapshotReply->bytes = bytes;
|
||||||
return pJson;
|
pPreSnapshotReply->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
|
||||||
}
|
pPreSnapshotReply->vgId = vgId;
|
||||||
|
return 0;
|
||||||
char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg) {
|
|
||||||
cJSON* pJson = syncPreSnapshotReply2Json(pMsg);
|
|
||||||
char* serialized = cJSON_Print(pJson);
|
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg) {
|
|
||||||
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
|
||||||
printf("syncPreSnapshotReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg) {
|
|
||||||
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
|
||||||
printf("syncPreSnapshotReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg) {
|
|
||||||
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
|
||||||
sTrace("syncPreSnapshotReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) {
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
|
||||||
sTrace("syncPreSnapshotReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncApplyMsg----
|
// ---- message process SyncApplyMsg----
|
||||||
|
|
|
@ -903,60 +903,3 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
|
|
||||||
syncLogRecvSyncPreSnapshot(ths, pMsg, "");
|
|
||||||
|
|
||||||
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
|
|
||||||
pMsgReply->srcId = ths->myRaftId;
|
|
||||||
pMsgReply->destId = pMsg->srcId;
|
|
||||||
pMsgReply->term = ths->pRaftStore->currentTerm;
|
|
||||||
|
|
||||||
SSyncLogStoreData *pData = ths->pLogStore->data;
|
|
||||||
SWal *pWal = pData->pWal;
|
|
||||||
|
|
||||||
if (syncNodeIsMnode(ths)) {
|
|
||||||
pMsgReply->snapStart = SYNC_INDEX_BEGIN;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
|
|
||||||
int64_t walCommitVer = walGetCommittedVer(pWal);
|
|
||||||
|
|
||||||
if (!isEmpty && ths->commitIndex != walCommitVer) {
|
|
||||||
sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
|
|
||||||
ths->commitIndex);
|
|
||||||
goto _IGNORE;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMsgReply->snapStart = ths->commitIndex + 1;
|
|
||||||
|
|
||||||
// make local log clean
|
|
||||||
int32_t code = ths->pLogStore->syncLogTruncate(ths->pLogStore, pMsgReply->snapStart);
|
|
||||||
if (code != 0) {
|
|
||||||
sNError(ths, "truncate wal error");
|
|
||||||
goto _IGNORE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// can not write behind _RESPONSE
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
|
|
||||||
_RESPONSE:
|
|
||||||
syncPreSnapshotReply2RpcMsg(pMsgReply, &rpcMsg);
|
|
||||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
|
||||||
|
|
||||||
syncPreSnapshotReplyDestroy(pMsgReply);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
_IGNORE:
|
|
||||||
syncPreSnapshotReplyDestroy(pMsgReply);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) {
|
|
||||||
syncLogRecvSyncPreSnapshotReply(ths, pMsg, "");
|
|
||||||
|
|
||||||
// start snapshot
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
|
@ -349,6 +349,46 @@ void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg);
|
||||||
void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg);
|
void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg);
|
||||||
void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg);
|
void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg);
|
||||||
|
|
||||||
|
SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId);
|
||||||
|
void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg);
|
||||||
|
void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg);
|
||||||
|
char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len);
|
||||||
|
SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg);
|
||||||
|
SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg);
|
||||||
|
char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg);
|
||||||
|
void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg);
|
||||||
|
void syncPreSnapshotLog(const SyncPreSnapshot* pMsg);
|
||||||
|
void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg);
|
||||||
|
|
||||||
|
SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId);
|
||||||
|
void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg);
|
||||||
|
void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg);
|
||||||
|
char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len);
|
||||||
|
SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg);
|
||||||
|
SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg);
|
||||||
|
char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg);
|
||||||
|
void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg);
|
||||||
|
void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg);
|
||||||
|
void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg);
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
|
||||||
|
int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1942,3 +1942,300 @@ void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) {
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- message process SyncPreSnapshot----
|
||||||
|
SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncPreSnapshot);
|
||||||
|
SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
ASSERT(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
syncPreSnapshotSerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
syncPreSnapshotDeserialize(buf, len, pMsg);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncPreSnapshotSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg) {
|
||||||
|
syncPreSnapshotDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncPreSnapshot* pMsg = syncPreSnapshotDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncPreSnapshot", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg) {
|
||||||
|
cJSON* pJson = syncPreSnapshot2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg) {
|
||||||
|
char* serialized = syncPreSnapshot2Str(pMsg);
|
||||||
|
printf("syncPreSnapshotPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg) {
|
||||||
|
char* serialized = syncPreSnapshot2Str(pMsg);
|
||||||
|
printf("syncPreSnapshotPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotLog(const SyncPreSnapshot* pMsg) {
|
||||||
|
char* serialized = syncPreSnapshot2Str(pMsg);
|
||||||
|
sTrace("syncPreSnapshotLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncPreSnapshot2Str(pMsg);
|
||||||
|
sTrace("syncPreSnapshotLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- message process SyncPreSnapshotReply----
|
||||||
|
SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncPreSnapshotReply);
|
||||||
|
SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
ASSERT(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
syncPreSnapshotReplySerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
syncPreSnapshotReplyDeserialize(buf, len, pMsg);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncPreSnapshotReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg) {
|
||||||
|
syncPreSnapshotReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncPreSnapshotReply* pMsg = syncPreSnapshotReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart);
|
||||||
|
cJSON_AddStringToObject(pRoot, "snap-start", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncPreSnapshotReply", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg) {
|
||||||
|
cJSON* pJson = syncPreSnapshotReply2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg) {
|
||||||
|
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
||||||
|
printf("syncPreSnapshotReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg) {
|
||||||
|
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
||||||
|
printf("syncPreSnapshotReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg) {
|
||||||
|
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
||||||
|
sTrace("syncPreSnapshotReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncPreSnapshotReply2Str(pMsg);
|
||||||
|
sTrace("syncPreSnapshotReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -130,3 +130,60 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
|
||||||
|
syncLogRecvSyncPreSnapshot(ths, pMsg, "");
|
||||||
|
|
||||||
|
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
|
||||||
|
pMsgReply->srcId = ths->myRaftId;
|
||||||
|
pMsgReply->destId = pMsg->srcId;
|
||||||
|
pMsgReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
|
||||||
|
SSyncLogStoreData *pData = ths->pLogStore->data;
|
||||||
|
SWal *pWal = pData->pWal;
|
||||||
|
|
||||||
|
if (syncNodeIsMnode(ths)) {
|
||||||
|
pMsgReply->snapStart = SYNC_INDEX_BEGIN;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
|
||||||
|
int64_t walCommitVer = walGetCommittedVer(pWal);
|
||||||
|
|
||||||
|
if (!isEmpty && ths->commitIndex != walCommitVer) {
|
||||||
|
sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
|
||||||
|
ths->commitIndex);
|
||||||
|
goto _IGNORE;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMsgReply->snapStart = ths->commitIndex + 1;
|
||||||
|
|
||||||
|
// make local log clean
|
||||||
|
int32_t code = ths->pLogStore->syncLogTruncate(ths->pLogStore, pMsgReply->snapStart);
|
||||||
|
if (code != 0) {
|
||||||
|
sNError(ths, "truncate wal error");
|
||||||
|
goto _IGNORE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// can not write behind _RESPONSE
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
|
||||||
|
_RESPONSE:
|
||||||
|
syncPreSnapshotReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||||
|
|
||||||
|
syncPreSnapshotReplyDestroy(pMsgReply);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_IGNORE:
|
||||||
|
syncPreSnapshotReplyDestroy(pMsgReply);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) {
|
||||||
|
syncLogRecvSyncPreSnapshotReply(ths, pMsg, "");
|
||||||
|
|
||||||
|
// start snapshot
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue