Merge pull request #3861 from taosdata/feature/sync
[TD-1652]: Possible loss of data when the synchronization state is switched
This commit is contained in:
commit
e94b5f1f2f
|
@ -311,6 +311,16 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
|
|||
|
||||
if (pNode == NULL) return 0;
|
||||
|
||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
|
||||
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
|
||||
pWalHead->version, nodeVersion);
|
||||
for (int i = 0; i < pNode->replica; ++i) {
|
||||
pPeer = pNode->peerInfo[i];
|
||||
syncRestartConnection(pPeer);
|
||||
}
|
||||
return TSDB_CODE_SYN_INVALID_VERSION;
|
||||
}
|
||||
|
||||
// always update version
|
||||
nodeVersion = pWalHead->version;
|
||||
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
|
||||
|
|
|
@ -77,8 +77,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
|||
}
|
||||
|
||||
// assign version
|
||||
pVnode->version++;
|
||||
pHead->version = pVnode->version;
|
||||
pHead->version = pVnode->version + 1;
|
||||
if (pVnode->delay) usleep(pVnode->delay * 1000);
|
||||
|
||||
} else { // from wal or forward
|
||||
|
@ -86,16 +85,16 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
|||
if (pHead->version <= pVnode->version) return 0;
|
||||
}
|
||||
|
||||
pVnode->version = pHead->version;
|
||||
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
|
||||
int32_t syncCode = 0;
|
||||
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
|
||||
if (syncCode < 0) return syncCode;
|
||||
|
||||
// write into WAL
|
||||
code = walWrite(pVnode->wal, pHead);
|
||||
if (code < 0) return code;
|
||||
|
||||
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
|
||||
int32_t syncCode = 0;
|
||||
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
|
||||
if (syncCode < 0) return syncCode;
|
||||
pVnode->version = pHead->version;
|
||||
|
||||
// write data locally
|
||||
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
|
||||
|
|
Loading…
Reference in New Issue