commit
d3ed3cda40
|
@ -43,8 +43,8 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode);
|
||||||
void mnodeDecMnodeRef(struct SMnodeObj *pMnode);
|
void mnodeDecMnodeRef(struct SMnodeObj *pMnode);
|
||||||
|
|
||||||
char * mnodeGetMnodeRoleStr();
|
char * mnodeGetMnodeRoleStr();
|
||||||
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet);
|
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect);
|
||||||
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet);
|
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect);
|
||||||
char* mnodeGetMnodeMasterEp();
|
char* mnodeGetMnodeMasterEp();
|
||||||
|
|
||||||
void mnodeGetMnodeInfos(void *mnodes);
|
void mnodeGetMnodeInfos(void *mnodes);
|
||||||
|
|
|
@ -273,14 +273,14 @@ void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) {
|
||||||
mnodeMnodeUnLock();
|
mnodeMnodeUnLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
|
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect) {
|
||||||
mnodeMnodeRdLock();
|
mnodeMnodeRdLock();
|
||||||
*epSet = tsMEpForPeer;
|
*epSet = tsMEpForPeer;
|
||||||
mnodeMnodeUnLock();
|
mnodeMnodeUnLock();
|
||||||
|
|
||||||
mTrace("vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d", tsMEpForPeer.numOfEps, tsMEpForPeer.inUse);
|
mTrace("vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d", tsMEpForPeer.numOfEps, tsMEpForPeer.inUse);
|
||||||
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
||||||
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
|
if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
|
||||||
epSet->inUse = (i + 1) % epSet->numOfEps;
|
epSet->inUse = (i + 1) % epSet->numOfEps;
|
||||||
mTrace("vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
mTrace("vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
||||||
} else {
|
} else {
|
||||||
|
@ -289,14 +289,14 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) {
|
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {
|
||||||
mnodeMnodeRdLock();
|
mnodeMnodeRdLock();
|
||||||
*epSet = tsMEpForShell;
|
*epSet = tsMEpForShell;
|
||||||
mnodeMnodeUnLock();
|
mnodeMnodeUnLock();
|
||||||
|
|
||||||
mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse);
|
mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse);
|
||||||
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
||||||
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
|
if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
|
||||||
epSet->inUse = (i + 1) % epSet->numOfEps;
|
epSet->inUse = (i + 1) % epSet->numOfEps;
|
||||||
mTrace("vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
mTrace("vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -54,7 +54,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
|
||||||
if (!sdbIsMaster()) {
|
if (!sdbIsMaster()) {
|
||||||
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
|
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
|
||||||
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
||||||
mnodeGetMnodeEpSetForPeer(epSet);
|
mnodeGetMnodeEpSetForPeer(epSet, true);
|
||||||
rpcRsp->rsp = epSet;
|
rpcRsp->rsp = epSet;
|
||||||
rpcRsp->len = sizeof(SRpcEpSet);
|
rpcRsp->len = sizeof(SRpcEpSet);
|
||||||
|
|
||||||
|
|
|
@ -284,8 +284,9 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
|
||||||
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
|
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
|
||||||
pConn->numOfQueries = 0;
|
pConn->numOfQueries = 0;
|
||||||
pConn->numOfStreams = 0;
|
pConn->numOfStreams = 0;
|
||||||
|
|
||||||
int32_t numOfQueries = htonl(pHBMsg->numOfQueries);
|
int32_t numOfQueries = htonl(pHBMsg->numOfQueries);
|
||||||
|
int32_t numOfStreams = htonl(pHBMsg->numOfStreams);
|
||||||
|
|
||||||
if (numOfQueries > 0) {
|
if (numOfQueries > 0) {
|
||||||
if (pConn->pQueries == NULL) {
|
if (pConn->pQueries == NULL) {
|
||||||
pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_STREAM_SAVE_SIZE);
|
pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_STREAM_SAVE_SIZE);
|
||||||
|
@ -299,7 +300,6 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfStreams = htonl(pHBMsg->numOfStreams);
|
|
||||||
if (numOfStreams > 0) {
|
if (numOfStreams > 0) {
|
||||||
if (pConn->pStreams == NULL) {
|
if (pConn->pStreams == NULL) {
|
||||||
pConn->pStreams = calloc(sizeof(SStreamDesc), QUERY_STREAM_SAVE_SIZE);
|
pConn->pStreams = calloc(sizeof(SStreamDesc), QUERY_STREAM_SAVE_SIZE);
|
||||||
|
@ -309,7 +309,7 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
|
||||||
|
|
||||||
int32_t saveSize = pConn->numOfStreams * sizeof(SStreamDesc);
|
int32_t saveSize = pConn->numOfStreams * sizeof(SStreamDesc);
|
||||||
if (saveSize > 0 && pConn->pStreams != NULL) {
|
if (saveSize > 0 && pConn->pStreams != NULL) {
|
||||||
memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc), saveSize);
|
memcpy(pConn->pStreams, pHBMsg->pData + numOfQueries * sizeof(SQueryDesc), saveSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
|
||||||
if (!sdbIsMaster()) {
|
if (!sdbIsMaster()) {
|
||||||
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
|
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
|
||||||
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
||||||
mnodeGetMnodeEpSetForShell(epSet);
|
mnodeGetMnodeEpSetForShell(epSet, true);
|
||||||
rpcRsp->rsp = epSet;
|
rpcRsp->rsp = epSet;
|
||||||
rpcRsp->len = sizeof(SRpcEpSet);
|
rpcRsp->len = sizeof(SRpcEpSet);
|
||||||
|
|
||||||
|
|
|
@ -282,7 +282,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
|
pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
|
||||||
pRsp->totalDnodes = htonl(mnodeGetDnodesNum());
|
pRsp->totalDnodes = htonl(mnodeGetDnodesNum());
|
||||||
mnodeGetMnodeEpSetForShell(&pRsp->epSet);
|
mnodeGetMnodeEpSetForShell(&pRsp->epSet, false);
|
||||||
|
|
||||||
pMsg->rpcRsp.rsp = pRsp;
|
pMsg->rpcRsp.rsp = pRsp;
|
||||||
pMsg->rpcRsp.len = sizeof(SHeartBeatRsp);
|
pMsg->rpcRsp.len = sizeof(SHeartBeatRsp);
|
||||||
|
@ -349,7 +349,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
|
||||||
pConnectRsp->writeAuth = pUser->writeAuth;
|
pConnectRsp->writeAuth = pUser->writeAuth;
|
||||||
pConnectRsp->superAuth = pUser->superAuth;
|
pConnectRsp->superAuth = pUser->superAuth;
|
||||||
|
|
||||||
mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet);
|
mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet, false);
|
||||||
|
|
||||||
connect_over:
|
connect_over:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -50,7 +50,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
|
||||||
if (!sdbIsMaster()) {
|
if (!sdbIsMaster()) {
|
||||||
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
|
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
|
||||||
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
||||||
mnodeGetMnodeEpSetForShell(epSet);
|
mnodeGetMnodeEpSetForShell(epSet, true);
|
||||||
rpcRsp->rsp = epSet;
|
rpcRsp->rsp = epSet;
|
||||||
rpcRsp->len = sizeof(SRpcEpSet);
|
rpcRsp->len = sizeof(SRpcEpSet);
|
||||||
|
|
||||||
|
|
|
@ -62,12 +62,15 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SSyncHead syncHead;
|
SSyncHead syncHead;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
|
uint16_t tranId;
|
||||||
char fqdn[TSDB_FQDN_LEN];
|
char fqdn[TSDB_FQDN_LEN];
|
||||||
int32_t sourceId; // only for arbitrator
|
int32_t sourceId; // only for arbitrator
|
||||||
} SFirstPkt;
|
} SFirstPkt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t sync;
|
int8_t sync;
|
||||||
|
int8_t reserved;
|
||||||
|
uint16_t tranId;
|
||||||
} SFirstPktRsp;
|
} SFirstPktRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -187,6 +190,7 @@ void syncRestartConnection(SSyncPeer *pPeer);
|
||||||
void syncBroadcastStatus(SSyncNode *pNode);
|
void syncBroadcastStatus(SSyncNode *pNode);
|
||||||
void syncAddPeerRef(SSyncPeer *pPeer);
|
void syncAddPeerRef(SSyncPeer *pPeer);
|
||||||
int32_t syncDecPeerRef(SSyncPeer *pPeer);
|
int32_t syncDecPeerRef(SSyncPeer *pPeer);
|
||||||
|
uint16_t syncGenTranId();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -396,9 +396,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
|
||||||
pFwdRsp->code = code;
|
pFwdRsp->code = code;
|
||||||
|
|
||||||
int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
|
int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
|
||||||
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen);
|
if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) {
|
||||||
|
|
||||||
if (retLen == msgLen) {
|
|
||||||
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
|
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
|
||||||
} else {
|
} else {
|
||||||
sDebug("%s, failed to send forward ack, restart", pPeer->id);
|
sDebug("%s, failed to send forward ack, restart", pPeer->id);
|
||||||
|
@ -873,6 +871,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
|
||||||
firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
|
firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
|
||||||
firstPkt.syncHead.vgId = pNode->vgId;
|
firstPkt.syncHead.vgId = pNode->vgId;
|
||||||
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
|
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
|
||||||
|
firstPkt.tranId = syncGenTranId();
|
||||||
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
||||||
firstPkt.port = tsSyncPort;
|
firstPkt.port = tsSyncPort;
|
||||||
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
|
||||||
|
@ -881,7 +880,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
|
||||||
sError("%s, failed to send sync-req to peer", pPeer->id);
|
sError("%s, failed to send sync-req to peer", pPeer->id);
|
||||||
} else {
|
} else {
|
||||||
nodeSStatus = TAOS_SYNC_STATUS_START;
|
nodeSStatus = TAOS_SYNC_STATUS_START;
|
||||||
sInfo("%s, sync-req is sent to peer, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
sInfo("%s, sync-req is sent to peer, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1018,8 +1017,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
|
||||||
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
|
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen);
|
if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) {
|
||||||
if (retLen == statusMsgLen) {
|
|
||||||
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
|
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
|
||||||
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
|
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
|
||||||
syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId,
|
syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId,
|
||||||
|
@ -1053,10 +1051,11 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
|
||||||
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
|
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
|
||||||
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
||||||
firstPkt.port = tsSyncPort;
|
firstPkt.port = tsSyncPort;
|
||||||
|
firstPkt.tranId = syncGenTranId();
|
||||||
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
|
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
|
||||||
|
|
||||||
if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
|
if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
|
||||||
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d", pPeer->id, connFd, pPeer->syncFd);
|
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId);
|
||||||
pPeer->peerFd = connFd;
|
pPeer->peerFd = connFd;
|
||||||
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
|
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
|
||||||
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
|
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
|
||||||
|
@ -1123,6 +1122,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId);
|
||||||
|
|
||||||
SSyncNode *pNode = *ppNode;
|
SSyncNode *pNode = *ppNode;
|
||||||
pthread_mutex_lock(&pNode->mutex);
|
pthread_mutex_lock(&pNode->mutex);
|
||||||
|
|
||||||
|
|
|
@ -64,8 +64,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
||||||
sinfo.index = 0;
|
sinfo.index = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
// read file info
|
// read file info
|
||||||
int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
|
int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(SFileInfo));
|
||||||
if (ret < 0) {
|
if (ret != sizeof(SFileInfo)) {
|
||||||
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
|
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -96,7 +96,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
||||||
|
|
||||||
// send file ack
|
// send file ack
|
||||||
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
||||||
if (ret < 0) {
|
if (ret != sizeof(fileAck)) {
|
||||||
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
|
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -154,7 +154,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
|
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
|
||||||
if (ret < 0) {
|
if (ret != sizeof(SWalHead)) {
|
||||||
sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno));
|
sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -166,7 +166,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
|
||||||
} // wal sync over
|
} // wal sync over
|
||||||
|
|
||||||
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
|
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
|
||||||
if (ret < 0) {
|
if (ret != pHead->len) {
|
||||||
sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno));
|
sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -286,11 +286,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
|
||||||
uint64_t fversion = 0;
|
uint64_t fversion = 0;
|
||||||
|
|
||||||
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||||
SFirstPktRsp firstPktRsp = {.sync = 1};
|
SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()};
|
||||||
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) {
|
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
|
||||||
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno));
|
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId);
|
||||||
|
|
||||||
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
||||||
int32_t code = syncRestoreFile(pPeer, &fversion);
|
int32_t code = syncRestoreFile(pPeer, &fversion);
|
||||||
|
|
|
@ -58,7 +58,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
|
||||||
uint64_t fver, wver;
|
uint64_t fver, wver;
|
||||||
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
|
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +92,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
char name[TSDB_FILENAME_LEN * 2] = {0};
|
char name[TSDB_FILENAME_LEN * 2] = {0};
|
||||||
|
|
||||||
if (syncGetFileVersion(pNode, pPeer) < 0) return -1;
|
if (syncGetFileVersion(pNode, pPeer) < 0) {
|
||||||
|
pPeer->fileChanged = 1;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// retrieve file info
|
// retrieve file info
|
||||||
|
@ -100,12 +103,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
fileInfo.size = 0;
|
fileInfo.size = 0;
|
||||||
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
||||||
&fileInfo.size, &fileInfo.fversion);
|
&fileInfo.size, &fileInfo.fversion);
|
||||||
// fileInfo.size = htonl(size);
|
|
||||||
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
|
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
|
||||||
|
|
||||||
// send the file info
|
// send the file info
|
||||||
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
||||||
if (ret < 0) {
|
if (ret != sizeof(fileInfo)) {
|
||||||
code = -1;
|
code = -1;
|
||||||
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||||
break;
|
break;
|
||||||
|
@ -119,8 +121,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the ack from peer
|
// wait for the ack from peer
|
||||||
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
|
||||||
if (ret < 0) {
|
if (ret != sizeof(SFileAck)) {
|
||||||
code = -1;
|
code = -1;
|
||||||
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||||
break;
|
break;
|
||||||
|
@ -384,12 +386,15 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
|
|
||||||
sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
|
||||||
|
|
||||||
SWalHead walHead;
|
SWalHead walHead;
|
||||||
memset(&walHead, 0, sizeof(walHead));
|
memset(&walHead, 0, sizeof(walHead));
|
||||||
taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
|
if (taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)) == sizeof(walHead)) {
|
||||||
|
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
|
||||||
|
sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||||
|
} else {
|
||||||
|
sError("%s, failed to send last wal record since %s", pPeer->id, strerror(errno));
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
|
sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
|
||||||
}
|
}
|
||||||
|
@ -404,20 +409,23 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
|
||||||
memset(&firstPkt, 0, sizeof(firstPkt));
|
memset(&firstPkt, 0, sizeof(firstPkt));
|
||||||
firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
|
firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
|
||||||
firstPkt.syncHead.vgId = pNode->vgId;
|
firstPkt.syncHead.vgId = pNode->vgId;
|
||||||
|
firstPkt.tranId = syncGenTranId();
|
||||||
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
||||||
firstPkt.port = tsSyncPort;
|
firstPkt.port = tsSyncPort;
|
||||||
|
|
||||||
if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) {
|
if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
|
||||||
sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno));
|
sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
sDebug("%s, send firstPkt to peer, tranId:%u", pPeer->id, firstPkt.tranId);
|
||||||
|
|
||||||
SFirstPktRsp firstPktRsp;
|
SFirstPktRsp firstPktRsp;
|
||||||
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) {
|
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
|
||||||
sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno));
|
sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue