From 6b066bcf1a809b5d65873ed9929fed5b5ace80ba Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 8 Nov 2020 14:46:12 +0000 Subject: [PATCH 1/7] TD-1919 --- src/inc/tsdb.h | 4 ++-- src/mnode/src/mnodeSdb.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 02ca99e4b8..3ef618ab36 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -126,7 +126,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ // the TSDB repository info typedef struct STsdbRepoInfo { STsdbCfg tsdbCfg; - int64_t version; // version of the repository + uint64_t version; // version of the repository int64_t tsdbTotalDataSize; // the original inserted data size int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository // TODO: Other informations to add @@ -136,7 +136,7 @@ STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo); // the meter information report structure typedef struct { STableCfg tableCfg; - int64_t version; + uint64_t version; int64_t tableTotalDataSize; // In bytes int64_t tableTotalDiskSize; // In bytes } STableInfo; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 6b6a49db93..e5c8b752b5 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -71,7 +71,7 @@ typedef struct _SSdbTable { typedef struct { ESyncRole role; ESdbStatus status; - int64_t version; + uint64_t version; int64_t sync; void * wal; SSyncCfg cfg; From 68078a6cab5970e4858b384d81cd16162bef3b74 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 Nov 2020 08:12:11 +0000 Subject: [PATCH 2/7] TD-2086 --- src/sync/src/syncMain.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 8d90315c48..11fddfa3b5 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -498,7 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { int32_t checkMs = 100 + (pNode->vgId * 10) % 100; - if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs; + if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs; sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer); } @@ -575,6 +575,15 @@ static void syncChooseMaster(SSyncNode *pNode) { if (index == pNode->selfIndex) { sInfo("vgId:%d, start to work as master", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; + + for (int32_t i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer->version == nodeVersion) { + pPeer->role = TAOS_SYNC_ROLE_SLAVE; + sInfo("%s, it shall work as slave", pPeer->id); + } + } + syncResetFlowCtrl(pNode); (*pNode->notifyRole)(pNode->ahandle, nodeRole); } else { @@ -1209,7 +1218,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle int32_t fwdLen; int32_t code = 0; - if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { + if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version > nodeVersion + 1) { sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, pWalHead->version, nodeVersion); for (int32_t i = 0; i < pNode->replica; ++i) { From 141cc3d1888cd9dfba4eb5250c714d93822059da Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 Nov 2020 08:23:32 +0000 Subject: [PATCH 3/7] TD-1948 --- src/dnode/src/dnodeVWrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index e0345eb1f6..bd807556e0 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -212,7 +212,7 @@ static void *dnodeProcessVWriteQueue(void *param) { pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); if (pWrite->code <= 0) pWrite->processedCount = 1; - if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; + if (pWrite->code == 0 && pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); } From 4e42b8cf5e49dfffebbf28c284cf2d1d3038f7ae Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 Nov 2020 08:46:02 +0000 Subject: [PATCH 4/7] TD-1948 --- src/sync/src/syncMain.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 11fddfa3b5..ae177fb938 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -580,6 +580,7 @@ static void syncChooseMaster(SSyncNode *pNode) { pPeer = pNode->peerInfo[i]; if (pPeer->version == nodeVersion) { pPeer->role = TAOS_SYNC_ROLE_SLAVE; + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; sInfo("%s, it shall work as slave", pPeer->id); } } From 3a9b5392134dfa7bc9f6d96a3ef4ac8dc1896e51 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 Nov 2020 09:44:22 +0000 Subject: [PATCH 5/7] TD-1948 --- src/sync/src/syncMain.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 8d90315c48..766dbaf174 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -1209,13 +1209,16 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle int32_t fwdLen; int32_t code = 0; - if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { - sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, - pWalHead->version, nodeVersion); - for (int32_t i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - syncRestartConnection(pPeer); + if (pWalHead->version > nodeVersion + 1) { + sError("vgId:%d, hver:%" PRIu64 ", inconsistent with ver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion); + if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { + sInfo("vgId:%d, restart connection", pNode->vgId); + for (int32_t i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } } + return TSDB_CODE_SYN_INVALID_VERSION; } From d5cd0ae9cb3210f5a8fd5e5520c19cb873cbe220 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 Nov 2020 10:03:24 +0000 Subject: [PATCH 6/7] TD-1948 --- src/sync/src/syncMain.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index adc420f376..44f66bc70c 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -576,6 +576,7 @@ static void syncChooseMaster(SSyncNode *pNode) { sInfo("vgId:%d, start to work as master", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; +#if 0 for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; if (pPeer->version == nodeVersion) { @@ -584,7 +585,7 @@ static void syncChooseMaster(SSyncNode *pNode) { sInfo("%s, it shall work as slave", pPeer->id); } } - +#endif syncResetFlowCtrl(pNode); (*pNode->notifyRole)(pNode->ahandle, nodeRole); } else { From 66f104bbfa09298ac1cf8e3f8517ee854141bdf6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 Nov 2020 11:08:59 +0000 Subject: [PATCH 7/7] TD-1982 definite lost while forward msg --- src/dnode/src/dnodeVWrite.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index bd807556e0..9d1d16e51e 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -225,11 +225,13 @@ static void *dnodeProcessVWriteQueue(void *param) { taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); if (qtype == TAOS_QTYPE_RPC) { dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); - } else if (qtype == TAOS_QTYPE_FWD) { - vnodeConfirmForward(pVnode, pWrite->pHead->version, 0); - taosFreeQitem(pWrite); - vnodeRelease(pVnode); } else { + if (qtype == TAOS_QTYPE_FWD) { + vnodeConfirmForward(pVnode, pWrite->pHead->version, 0); + } + if (pWrite->rspRet.rsp) { + rpcFreeCont(pWrite->rspRet.rsp); + } taosFreeQitem(pWrite); vnodeRelease(pVnode); }