TD-2428
This commit is contained in:
parent
e0d113b175
commit
456ef978cc
|
@ -35,7 +35,9 @@ typedef enum {
|
|||
TAOS_SMSG_STATUS_RSP = 10,
|
||||
TAOS_SMSG_SETUP = 11,
|
||||
TAOS_SMSG_SETUP_RSP = 12,
|
||||
TAOS_SMSG_END = 13,
|
||||
TAOS_SMSG_SYNC_FILE = 13,
|
||||
TAOS_SMSG_SYNC_FILE_RSP = 14,
|
||||
TAOS_SMSG_END = 15,
|
||||
} ESyncMsgType;
|
||||
|
||||
typedef enum {
|
||||
|
@ -116,7 +118,7 @@ typedef struct {
|
|||
|
||||
#pragma pack(pop)
|
||||
|
||||
#define SYNC_PROTOCOL_VERSION 0
|
||||
#define SYNC_PROTOCOL_VERSION 1
|
||||
#define SYNC_SIGNATURE ((uint16_t)(0xCDEF))
|
||||
|
||||
extern char *statusType[];
|
||||
|
@ -131,6 +133,9 @@ void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
|
|||
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
|
||||
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
|
||||
|
||||
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
|
||||
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tglobal.h"
|
||||
#include "tchecksum.h"
|
||||
#include "syncInt.h"
|
||||
|
||||
|
@ -92,3 +93,19 @@ void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
|
|||
pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead);
|
||||
syncBuildHead(&pMsg->head);
|
||||
}
|
||||
|
||||
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) {
|
||||
memset(pMsg, 0, sizeof(SFileAck));
|
||||
pMsg->head.type = TAOS_SMSG_SYNC_FILE_RSP;
|
||||
pMsg->head.vgId = vgId;
|
||||
pMsg->head.len = sizeof(SFileAck) - sizeof(SSyncHead);
|
||||
syncBuildHead(&pMsg->head);
|
||||
}
|
||||
|
||||
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) {
|
||||
memset(pMsg, 0, sizeof(SFileInfo));
|
||||
pMsg->head.type = TAOS_SMSG_SYNC_FILE;
|
||||
pMsg->head.vgId = vgId;
|
||||
pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead);
|
||||
syncBuildHead(&pMsg->head);
|
||||
}
|
|
@ -73,6 +73,12 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
break;
|
||||
}
|
||||
|
||||
ret = syncCheckHead((SSyncHead*)(&minfo));
|
||||
if (ret != 0) {
|
||||
sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret));
|
||||
break;
|
||||
}
|
||||
|
||||
// if no more file from master, break;
|
||||
if (minfo.name[0] == 0 || minfo.magic == 0) {
|
||||
sDebug("%s, no more files to restore", pPeer->id);
|
||||
|
@ -94,12 +100,12 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
&sinfo.fversion);
|
||||
|
||||
// if file not there or magic is not the same, file shall be synced
|
||||
memset(&fileAck, 0, sizeof(fileAck));
|
||||
syncBuildFileAck(&fileAck, pNode->vgId);
|
||||
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
|
||||
|
||||
// send file ack
|
||||
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
||||
if (ret != sizeof(fileAck)) {
|
||||
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
|
||||
if (ret != sizeof(SFileAck)) {
|
||||
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -99,6 +99,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
|
||||
while (1) {
|
||||
// retrieve file info
|
||||
syncBuildFileInfo(&fileInfo, pNode->vgId);
|
||||
fileInfo.name[0] = 0;
|
||||
fileInfo.size = 0;
|
||||
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
||||
|
@ -106,8 +107,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
|
||||
|
||||
// send the file info
|
||||
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
||||
if (ret != sizeof(fileInfo)) {
|
||||
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
|
||||
if (ret != sizeof(SFileInfo)) {
|
||||
code = -1;
|
||||
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||
break;
|
||||
|
@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
break;
|
||||
}
|
||||
|
||||
ret = syncCheckHead((SSyncHead*)(&fileAck));
|
||||
if (ret != 0) {
|
||||
code = -1;
|
||||
sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret));
|
||||
break;
|
||||
}
|
||||
|
||||
// set the peer sync version
|
||||
pPeer->sversion = fileInfo.fversion;
|
||||
|
||||
|
|
Loading…
Reference in New Issue