Merge pull request #5668 from taosdata/feature/linux
[TD-3458]<fix>: Fix the bug of losing tables in multiple replicas
This commit is contained in:
commit
3bb7384a61
|
@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
|
||||||
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
||||||
} else {
|
} else {
|
||||||
if (qtype == TAOS_QTYPE_FWD) {
|
if (qtype == TAOS_QTYPE_FWD) {
|
||||||
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
vnodeConfirmForward(pVnode, pWrite->pHead.version, pWrite->code, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||||
}
|
}
|
||||||
if (pWrite->rspRet.rsp) {
|
if (pWrite->rspRet.rsp) {
|
||||||
rpcFreeCont(pWrite->rspRet.rsp);
|
rpcFreeCont(pWrite->rspRet.rsp);
|
||||||
|
|
|
@ -35,7 +35,7 @@ extern "C" {
|
||||||
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
|
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
|
||||||
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
|
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
|
||||||
|
|
||||||
#define SYNC_MAX_FWDS 512
|
#define SYNC_MAX_FWDS 1024
|
||||||
#define SYNC_FWD_TIMER 300
|
#define SYNC_FWD_TIMER 300
|
||||||
#define SYNC_ROLE_TIMER 15000 // ms
|
#define SYNC_ROLE_TIMER 15000 // ms
|
||||||
#define SYNC_CHECK_INTERVAL 1000 // ms
|
#define SYNC_CHECK_INTERVAL 1000 // ms
|
||||||
|
|
|
@ -1459,7 +1459,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
|
|
||||||
if ((pNode->quorum > 1 || force) && code == 0) {
|
if ((pNode->quorum > 1 || force) && code == 0) {
|
||||||
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
|
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
|
||||||
if (code >= 0) code = 1;
|
if (code >= 0) {
|
||||||
|
code = 1;
|
||||||
|
} else {
|
||||||
|
pthread_mutex_unlock(&pNode->mutex);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
|
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
|
||||||
|
|
|
@ -91,13 +91,17 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
|
||||||
int32_t syncCode = 0;
|
int32_t syncCode = 0;
|
||||||
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||||
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
|
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
|
||||||
if (syncCode < 0) return syncCode;
|
if (syncCode < 0) {
|
||||||
|
pHead->version = 0;
|
||||||
|
return syncCode;
|
||||||
|
}
|
||||||
|
|
||||||
// write into WAL
|
// write into WAL
|
||||||
code = walWrite(pVnode->wal, pHead);
|
code = walWrite(pVnode->wal, pHead);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
|
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
|
||||||
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
|
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
|
||||||
|
pHead->version = 0;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue