From 48b6bd438d5d10529a97f73b0656ad55f567a275 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 22 Dec 2022 17:30:02 +0800 Subject: [PATCH 1/9] fix: restart snapshot sender on receiver is restart --- include/util/taoserror.h | 1 + source/dnode/vnode/src/vnd/vnodeCommit.c | 7 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- source/libs/sync/src/syncSnapshot.c | 137 ++++++++++++--------- source/libs/sync/test/syncRaftLogTest2.cpp | 2 +- source/libs/sync/test/syncRaftLogTest3.cpp | 2 +- source/util/src/terror.c | 1 + 8 files changed, 90 insertions(+), 66 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 7cc1a47404..2d7b15ebda 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -517,6 +517,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912) #define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913) #define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914) +#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index be977e7cbd..f78956a431 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -234,10 +234,11 @@ int vnodeAsyncCommit(SVnode *pVnode) { _exit: if (code) { - vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), + vError("vgId:%d, vnode async commit failed since %s, commitId:%" PRId64, TD_VID(pVnode), tstrerror(code), pVnode->state.commitID); } else { - vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__); + vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode), + pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied); } return code; } @@ -256,7 +257,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { char dir[TSDB_FILENAME_LEN] = {0}; SVnode *pVnode = pInfo->pVnode; - vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), + vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm); // persist wal before starting diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index fcfacd1ca9..dbd06d6ec0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -423,7 +423,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ASSERT(pHdr->index == pWriter->index + 1); pWriter->index = pHdr->index; - vInfo("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index, + vInfo("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index, pHdr->type, nData); switch (pHdr->type) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 5caaae502f..0437703c92 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -465,9 +465,9 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { SVnode *pVnode = pFsm->data; - vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len); + vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len); int32_t code = vnodeSnapWrite(pWriter, pBuf, len); - vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len); + vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len); return code; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 30324c1113..7a35f90165 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -294,7 +294,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { } if (snapshotSenderIsStart(pSender)) { - sSError(pSender, "snapshot sender already start, ignore"); + sSInfo(pSender, "snapshot sender already start, ignore"); return 0; } @@ -523,7 +523,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { if (pMsg->seq != pReceiver->ack + 1) { sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG; return -1; } @@ -721,8 +721,12 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS timeNow = taosGetTimestampMs(); } + int32_t code = 0; if (snapshotReceiverGotData(pReceiver, pMsg) != 0) { - return -1; + code = terrno; + if (code >= SYNC_SNAPSHOT_SEQ_INVALID) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + } } // build msg @@ -740,7 +744,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pReceiver->startTime; pRspMsg->ack = pReceiver->ack; // receiver maybe already closed - pRspMsg->code = 0; + pRspMsg->code = code; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; // send msg @@ -861,7 +865,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); snapshotReceiverForceStop(pReceiver); } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq"); + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); syncNodeOnSnapshotTransfering(pSyncNode, pMsg); } else { // error log @@ -982,68 +986,85 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } // state, term, seq/ack - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { - // prepare , send begin msg - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); - syncNodeOnSnapshotReplyPre(pSyncNode, pMsg); - return 0; - } + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + sSError(pSender, "snapshot sender not leader"); + return -1; + } - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); - if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { - return -1; - } + if (pMsg->term != pSyncNode->pRaftStore->currentTerm) { + sSError(pSender, "snapshot sender term not equal"); + return -1; + } - if (snapshotSend(pSender) != 0) { - return -1; - } - return 0; - } + if (pMsg->code != 0) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code"); + sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code); + snapshotSenderStop(pSender, true); + SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); + if (pMgr) { + syncLogReplMgrReset(pMgr); + } - // receive ack is finish, close sender - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); - snapshotSenderStop(pSender, true); - SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); - if (pMgr) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr"); - syncLogReplMgrReset(pMgr); - } - return 0; - } + return -1; + } - // send next msg - if (pMsg->ack == pSender->seq) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq"); - // update sender ack - if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { - return -1; - } - if (snapshotSend(pSender) != 0) { - return -1; - } + // prepare , send begin msg + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); + syncNodeOnSnapshotReplyPre(pSyncNode, pMsg); + return 0; + } - } else if (pMsg->ack == pSender->seq - 1) { - // maybe resend - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend"); - snapshotReSend(pSender); - - } else { - // error log - sSError(pSender, "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq); - return -1; - } - } else { - // error log - sSError(pSender, "snapshot sender term not equal"); + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); + if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { return -1; } + + if (snapshotSend(pSender) != 0) { + return -1; + } + return 0; + } + + // receive ack is finish, close sender + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); + snapshotSenderStop(pSender, true); + SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); + if (pMgr) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr"); + syncLogReplMgrReset(pMgr); + } + return 0; + } + + // send next msg + if (pMsg->ack == pSender->seq) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data"); + // update sender ack + if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { + return -1; + } + if (snapshotSend(pSender) != 0) { + return -1; + } + + } else if (pMsg->ack == pSender->seq - 1) { + // maybe resend + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend"); + snapshotReSend(pSender); + } else { // error log - sSError(pSender, "snapshot sender not leader"); + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack"); + sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq); + snapshotSenderStop(pSender, true); + SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); + if (pMgr) { + syncLogReplMgrReset(pMgr); + } + return -1; } diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index a7752dcb8b..de9137fbe1 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -47,7 +47,7 @@ void init() { pSyncNode->pWal = pWal; pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); - pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; + // pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; } void cleanup() { diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index 31c06625aa..1eed08102f 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -47,7 +47,7 @@ void init() { pSyncNode->pWal = pWal; pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); - pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; + // pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; } void cleanup() { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d73c8661fc..e59b1daa05 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -405,6 +405,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for pr TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") //tq From 99bc54dfd9dc66ffd952a74da6c68883ca9c1221 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 22 Dec 2022 20:29:42 +0800 Subject: [PATCH 2/9] tdb/pager: debug logs for pager restore --- source/libs/tdb/src/db/tdbPager.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 2a2a6f8bbd..e0c3397d63 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -880,6 +880,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbDebug("pager/restore: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId); + for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { // read pgno & the page from journal SPgno pgno; @@ -890,6 +892,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbTrace("pager/restore: restore pgno:%d,", pgno); + ret = tdbOsRead(jfd, pageBuf, pPager->pageSize); if (ret < 0) { tdbOsFree(pageBuf); From 78af4f54a3108246b397d5859a6f3f4795b0e440 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 22 Dec 2022 20:29:42 +0800 Subject: [PATCH 3/9] tdb/pager: debug logs for pager restore --- source/libs/tdb/src/db/tdbPager.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 2a2a6f8bbd..eb7bcf9385 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -880,6 +880,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName); + for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { // read pgno & the page from journal SPgno pgno; @@ -890,6 +892,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbTrace("pager/restore: restore pgno:%d,", pgno); + ret = tdbOsRead(jfd, pageBuf, pPager->pageSize); if (ret < 0) { tdbOsFree(pageBuf); @@ -949,7 +953,12 @@ int tdbPagerRestoreJournals(SPager *pPager) { while ((pDirEntry = tdbReadDir(pDir)) != NULL) { char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry)); if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) { - if (tdbPagerRestore(pPager, name) < 0) { + char jname[TD_PATH_MAX] = {0}; + int dirLen = strlen(pPager->pEnv->dbName); + memcpy(jname, pPager->pEnv->dbName, dirLen); + jname[dirLen] = '/'; + memcpy(jname + dirLen + 1, name, strlen(name)); + if (tdbPagerRestore(pPager, jname) < 0) { tdbCloseDir(&pDir); tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name); From 7a38465c760eedb9f0893c8579c33cab3676879a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 22 Dec 2022 20:29:42 +0800 Subject: [PATCH 4/9] tdb/pager: debug logs for pager restore --- source/libs/tdb/src/db/tdbPager.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 2a2a6f8bbd..ce20f6808f 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -880,6 +880,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName); + for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { // read pgno & the page from journal SPgno pgno; @@ -890,6 +892,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbTrace("pager/restore: restore pgno:%d,", pgno); + ret = tdbOsRead(jfd, pageBuf, pPager->pageSize); if (ret < 0) { tdbOsFree(pageBuf); @@ -929,7 +933,7 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } - if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + if (tdbOsRemove(jFileName) < 0 && errno != ENOENT) { tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -949,7 +953,12 @@ int tdbPagerRestoreJournals(SPager *pPager) { while ((pDirEntry = tdbReadDir(pDir)) != NULL) { char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry)); if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) { - if (tdbPagerRestore(pPager, name) < 0) { + char jname[TD_PATH_MAX] = {0}; + int dirLen = strlen(pPager->pEnv->dbName); + memcpy(jname, pPager->pEnv->dbName, dirLen); + jname[dirLen] = '/'; + memcpy(jname + dirLen + 1, name, strlen(name)); + if (tdbPagerRestore(pPager, jname) < 0) { tdbCloseDir(&pDir); tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name); From 773423a6b64e8f5951affa2f7fe7b156ad7cbf1f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 23 Dec 2022 09:11:45 +0800 Subject: [PATCH 5/9] fix: remove some logs --- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 4 ++-- source/libs/sync/src/syncSnapshot.c | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index dbd06d6ec0..0362d4af2a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -423,8 +423,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ASSERT(pHdr->index == pWriter->index + 1); pWriter->index = pHdr->index; - vInfo("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index, - pHdr->type, nData); + vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index, + pHdr->type, nData); switch (pHdr->type) { case SNAP_DATA_CFG: { diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 7a35f90165..9e1c6b36f1 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -49,7 +49,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; - sDebug("vgId:%d, snapshot sender create", pSender->pSyncNode->vgId); return pSender; } From 0c64cfc309e41de75bb10ef2ae7c126413ad949b Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 22 Dec 2022 20:29:42 +0800 Subject: [PATCH 6/9] tdb/pager: debug logs for pager restore --- source/libs/tdb/src/db/tdbPager.c | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 2a2a6f8bbd..62d82edeb1 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -880,6 +880,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName); + for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { // read pgno & the page from journal SPgno pgno; @@ -890,6 +892,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } + tdbTrace("pager/restore: restore pgno:%d,", pgno); + ret = tdbOsRead(jfd, pageBuf, pPager->pageSize); if (ret < 0) { tdbOsFree(pageBuf); @@ -929,7 +933,7 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { return -1; } - if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + if (tdbOsRemove(jFileName) < 0 && errno != ENOENT) { tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -949,7 +953,12 @@ int tdbPagerRestoreJournals(SPager *pPager) { while ((pDirEntry = tdbReadDir(pDir)) != NULL) { char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry)); if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) { - if (tdbPagerRestore(pPager, name) < 0) { + char jname[TD_PATH_MAX] = {0}; + int dirLen = strlen(pPager->pEnv->dbName); + memcpy(jname, pPager->pEnv->dbName, dirLen); + jname[dirLen] = '/'; + memcpy(jname + dirLen + 1, name, strlen(name)); + if (tdbPagerRestore(pPager, jname) < 0) { tdbCloseDir(&pDir); tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name); @@ -975,7 +984,12 @@ int tdbPagerRollback(SPager *pPager) { char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry)); if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) { - if (tdbOsRemove(name) < 0 && errno != ENOENT) { + char jname[TD_PATH_MAX] = {0}; + int dirLen = strlen(pPager->pEnv->dbName); + memcpy(jname, pPager->pEnv->dbName, dirLen); + jname[dirLen] = '/'; + memcpy(jname + dirLen + 1, name, strlen(name)); + if (tdbOsRemove(jname) < 0 && errno != ENOENT) { tdbCloseDir(&pDir); tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), name); From b59bee6696b159ad6bb8fed620de3f70057174f1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 23 Dec 2022 11:03:50 +0800 Subject: [PATCH 7/9] fix: restart snapshot sender on receiver is restart --- source/libs/sync/src/syncPipeline.c | 2 +- source/libs/sync/src/syncRaftLog.c | 24 ++++++++--------- source/libs/sync/src/syncSnapshot.c | 41 ++++++++++++++++++----------- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index f438856ace..225de30755 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -112,7 +112,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S return prevLogTerm; } - sError("vgId:%d, failed to get log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), prevIndex); + sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 018ac5bb7d..3f9f397ef5 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -115,8 +115,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI const char* sysErrStr = strerror(errno); sNError(pData->pSyncNode, - "wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - snapshotIndex, err, err, errStr, sysErr, sysErrStr); + "wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, + err, errStr, sysErr, sysErrStr); return -1; } @@ -212,8 +212,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr int32_t sysErr = errno; const char* sysErrStr = strerror(errno); - sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pEntry->index, err, err, errStr, sysErr, sysErrStr); + sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", + pEntry->index, err, errStr, sysErr, sysErrStr); return -1; } @@ -257,11 +257,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR const char* sysErrStr = strerror(errno); if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, - err, err, errStr, sysErr, sysErrStr); + sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index, + err, errStr, sysErr, sysErrStr); } else { - sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, - err, err, errStr, sysErr, sysErrStr); + sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index, err, + errStr, sysErr, sysErrStr); } /* @@ -341,8 +341,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); - sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr); + sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", + pData->pSyncNode->vgId, fromIndex, err, errStr, sysErr, sysErrStr); } // event log @@ -392,8 +392,8 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); - sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr); + sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", + pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr); return -1; } return 0; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 9e1c6b36f1..54c11a503b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -747,7 +747,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; // send msg - syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver receiving"); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received"); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr()); return -1; @@ -979,32 +979,31 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return -1; } - if (pMsg->startTime != pSender->startTime) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender:% " PRId64 " receiver:%" PRId64 " time not match"); - return -1; - } - // state, term, seq/ack if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader"); sSError(pSender, "snapshot sender not leader"); - return -1; + goto _ERROR; + } + + if (pMsg->startTime != pSender->startTime) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match"); + sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime, + pSender->startTime, pMsg->code); + goto _ERROR; } if (pMsg->term != pSyncNode->pRaftStore->currentTerm) { - sSError(pSender, "snapshot sender term not equal"); - return -1; + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); + sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, + pSyncNode->pRaftStore->currentTerm); + goto _ERROR; } if (pMsg->code != 0) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code"); sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code); - snapshotSenderStop(pSender, true); - SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); - if (pMgr) { - syncLogReplMgrReset(pMgr); - } - - return -1; + goto _ERROR; } // prepare , send begin msg @@ -1068,4 +1067,14 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } return 0; + +_ERROR: + snapshotSenderStop(pSender, true); + SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); + if (pMgr) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr"); + syncLogReplMgrReset(pMgr); + } + + return -1; } From 90e44ced3eba30d57a6700a4b23da1e5f2841bf8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 23 Dec 2022 11:15:45 +0800 Subject: [PATCH 8/9] test: add logs --- tests/script/sh/deploy.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 662c4a1a6c..217bd66ef6 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -134,6 +134,7 @@ echo "mDebugFlag 143" >> $TAOS_CFG echo "wDebugFlag 143" >> $TAOS_CFG echo "sDebugFlag 143" >> $TAOS_CFG echo "tsdbDebugFlag 143" >> $TAOS_CFG +echo "tdbDebugFlag 143" >> $TAOS_CFG echo "tqDebugFlag 143" >> $TAOS_CFG echo "fsDebugFlag 143" >> $TAOS_CFG echo "idxDebugFlag 143" >> $TAOS_CFG From 4f33119d461b0f9a8fbdc2fa1eda92555be062ab Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 23 Dec 2022 13:07:10 +0800 Subject: [PATCH 9/9] test: adjust test.sh --- tests/script/test.sh | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/script/test.sh b/tests/script/test.sh index a7a5d34fbe..19180382fd 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -10,13 +10,11 @@ set +e #set -x FILE_NAME= -RELEASE=0 -ASYNC=0 VALGRIND=0 -UNIQUE=0 +TEST=0 UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` -while getopts "f:agvum" arg +while getopts "f:tgv" arg do case $arg in f) @@ -25,8 +23,8 @@ do v) VALGRIND=1 ;; - u) - UNIQUE=1 + t) + TEST=1 ;; g) VALGRIND=2 @@ -140,6 +138,11 @@ if [ -n "$FILE_NAME" ]; then result=$? echo "Execute result:" $result + if [ $TEST -eq 1 ]; then + echo "Exit without check asan errors" + exit 1 + fi + if [ $result -eq 0 ]; then $CODE_DIR/sh/sigint_stop_dnodes.sh $CODE_DIR/sh/checkAsan.sh