From b71a9e34831b607c283fe8eec08f32fcad624b1d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 27 Sep 2022 19:58:19 +0800 Subject: [PATCH 01/12] enh: improve error handling in syncNodeOpen --- source/dnode/vnode/src/vnd/vnodeOpen.c | 1 - source/libs/sync/inc/syncUtil.h | 2 +- source/libs/sync/src/syncIndexMgr.c | 7 +- source/libs/sync/src/syncMain.c | 141 +++++++++++++++++-------- source/libs/sync/src/syncRaftCfg.c | 2 - source/libs/sync/src/syncRaftStore.c | 6 +- source/libs/sync/src/syncRespMgr.c | 4 + source/libs/sync/src/syncSnapshot.c | 5 +- source/libs/sync/src/syncUtil.c | 14 ++- source/libs/sync/src/syncVoteMgr.c | 5 +- 10 files changed, 131 insertions(+), 56 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4ccfea4051..69785570f7 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 7ecff7ae97..96e22720e8 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port); void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); -void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); bool syncUtilEmptyId(const SRaftId* pId); diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 3bda9bcd51..28b5313ac5 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -20,7 +20,10 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); - ASSERT(pSyncIndexMgr != NULL); + if (pSyncIndexMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); pSyncIndexMgr->replicas = &(pSyncNode->replicasId); @@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI } ASSERT(0); return -1; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 17157fbd23..3d79c6c464 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -// life cycle -static void syncFreeNode(void* param); // --------------------------------- +static void syncNodeFreeCb(void *param) { + syncNodeClose(param); + param = NULL; +} int32_t syncInit() { int32_t ret = 0; if (!syncEnvIsStart()) { - tsNodeRefId = taosOpenRef(200, syncFreeNode); + tsNodeRefId = taosOpenRef(200, syncNodeFreeCb); if (tsNodeRefId < 0) { sError("failed to init node ref"); syncCleanUp(); @@ -86,11 +88,15 @@ void syncCleanUp() { int64_t syncOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); - ASSERT(pSyncNode != NULL); + if (pSyncNode == NULL) { + sError("failed to open sync node. vgId:%d", pSyncInfo->vgId); + return -1; + } pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); if (pSyncNode->rid < 0) { - syncFreeNode(pSyncNode); + syncNodeClose(pSyncNode); + pSyncNode = NULL; return -1; } @@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) { void syncStop(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) return; - int32_t vgId = pSyncNode->vgId; - syncNodeClose(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + taosRemoveRef(tsNodeRefId, rid); sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId); } @@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("syncNodeCheckNewConfig error"); + sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; } @@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("syncNodeCheckNewConfig error"); + sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; } @@ -941,16 +945,18 @@ _END: SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; - SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); - ASSERT(pSyncNode != NULL); - memset(pSyncNode, 0, sizeof(SSyncNode)); + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode)); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } int32_t ret = 0; if (!taosDirExist((char*)(pSyncInfo->path))) { if (taosMkDir(pSyncInfo->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); - return NULL; + goto _error; } } @@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.batchSize = pSyncInfo->batchSize; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); - ASSERT(ret == 0); - + if (ret != 0) { + sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath); + goto _error; + } } else { // update syncCfg by raft_config.json pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - ASSERT(pSyncNode->pRaftCfg != NULL); + if (pSyncNode->pRaftCfg == NULL) { + sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + goto _error; + } pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; raftCfgClose(pSyncNode->pRaftCfg); + pSyncNode->pRaftCfg = NULL; } // init by SSyncInfo @@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - ASSERT(pSyncNode->pRaftCfg != NULL); + if (pSyncNode->pRaftCfg == NULL) { + sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + goto _error; + } // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; - syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) { + sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId); + goto _error; + } // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; @@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } } for (int i = 0; i < pSyncNode->peersNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { + sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i); + goto _error; + } } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + if(!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { + sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i); + goto _error; + } } // init raft algorithm pSyncNode->pFsm = pSyncInfo->pFsm; + pSyncInfo->pFsm = NULL; pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->leaderCache = EMPTY_RAFT_ID; @@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); - ASSERT(pSyncNode->pRaftStore != NULL); + if (pSyncNode->pRaftStore == NULL) { + sError("failed to open raft store. path: %s", pSyncNode->raftStorePath); + goto _error; + } // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); - ASSERT(pSyncNode->pVotesGranted != NULL); + if (pSyncNode->pVotesGranted == NULL) { + sError("failed to create VotesGranted. vgId:%d", pSyncNode->vgId); + goto _error; + } pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); - ASSERT(pSyncNode->pVotesRespond != NULL); + if (pSyncNode->pVotesRespond == NULL) { + sError("failed to create VotesRespond. vgId:%d", pSyncNode->vgId); + goto _error; + } // init TLA+ leader vars pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); - ASSERT(pSyncNode->pNextIndex != NULL); + if (pSyncNode->pNextIndex == NULL) { + sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); - ASSERT(pSyncNode->pMatchIndex != NULL); + if (pSyncNode->pMatchIndex == NULL) { + sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); - ASSERT(pSyncNode->pLogStore != NULL); + if (pSyncNode->pLogStore == NULL) { + sError("failed to create SyncLogStore. vgId:%d", pSyncNode->vgId); + goto _error; + } SyncIndex commitIndex = SYNC_INDEX_INVALID; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot = {0}; int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - ASSERT(code == 0); + if (code != 0) { + sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code); + goto _error; + } if (snapshot.lastApplyIndex > commitIndex) { commitIndex = snapshot.lastApplyIndex; syncNodeEventLog(pSyncNode, "reset commit index by snapshot"); @@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // tools pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); - ASSERT(pSyncNode->pSyncRespMgr != NULL); + if (pSyncNode->pSyncRespMgr == NULL) { + sError("failed to create SyncRespMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } // restore state pSyncNode->restoreFinish = false; @@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { syncNodeEventLog(pSyncNode, "sync open"); return pSyncNode; + +_error: + if (pSyncInfo->pFsm) { + taosMemoryFree(pSyncInfo->pFsm); + pSyncInfo->pFsm = NULL; + } + syncNodeClose(pSyncNode); + pSyncNode = NULL; + return NULL; } void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { @@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "sync close"); - + if (pSyncNode == NULL) { + return; + } int32_t ret; - ASSERT(pSyncNode != NULL); ret = raftStoreClose(pSyncNode->pRaftStore); ASSERT(ret == 0); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); + pSyncNode->pSyncRespMgr = NULL; voteGrantedDestroy(pSyncNode->pVotesGranted); + pSyncNode->pVotesGranted = NULL; votesRespondDestory(pSyncNode->pVotesRespond); + pSyncNode->pVotesRespond = NULL; syncIndexMgrDestroy(pSyncNode->pNextIndex); + pSyncNode->pNextIndex = NULL; syncIndexMgrDestroy(pSyncNode->pMatchIndex); + pSyncNode->pMatchIndex = NULL; logStoreDestory(pSyncNode->pLogStore); + pSyncNode->pLogStore = NULL; raftCfgClose(pSyncNode->pRaftCfg); + pSyncNode->pRaftCfg = NULL; syncNodeStopPingTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode); @@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { pSyncNode->pNewNodeReceiver = NULL; } - // free memory in syncFreeNode - // taosMemoryFree(pSyncNode); + taosMemoryFree(pSyncNode); } // option @@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { return; } } else { - sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL"); + sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); } syncTimeoutDestroy(pSyncMsg); @@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p return 0; } -static void syncFreeNode(void* param) { - SSyncNode* pNode = param; - // inner object already free - // syncNodePrint2((char*)"==syncFreeNode==", pNode); - - taosMemoryFree(pNode); -} - const char* syncStr(ESyncState state) { switch (state) { case TAOS_SYNC_STATE_FOLLOWER: diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index ab404d1b9a..57126d0871 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { int32_t sysErr = errno; const char *sysErrStr = strerror(errno); sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr); - ASSERT(0); - return -1; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 29f78b582f..22b47a2c45 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) { SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore)); if (pRaftStore == NULL) { - sError("raftStoreOpen malloc error"); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } memset(pRaftStore, 0, sizeof(*pRaftStore)); @@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) { } int32_t raftStoreClose(SRaftStore *pRaftStore) { - ASSERT(pRaftStore != NULL); + if (pRaftStore == NULL) { + return 0; + } taosCloseFile(&pRaftStore->pFile); taosMemoryFree(pRaftStore); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index d7ed864180..103c225476 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -19,6 +19,10 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr)); + if (pObj == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pObj, 0, sizeof(SSyncRespMgr)); pObj->pRespHash = diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 0be3392a9a..68d81813ac 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI SSyncSnapshotSender *pSender = NULL; if (condition) { pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); - ASSERT(pSender != NULL); + if (pSender == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pSender, 0, sizeof(*pSender)); pSender->start = false; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 325073f366..6f234631da 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint32_t hostU32 = taosGetIpv4FromFqdn(host); if (hostU32 == (uint32_t)-1) { - sError("Get IP address error"); + sError("failed to resolve ipv4 addr. host:%s", host); + terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } @@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { addEpIntoEpSet(pEpSet, host, port); } -void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { +bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); - ASSERT(ipv4 != 0xFFFFFFFF); + if (ipv4 == 0xFFFFFFFF || ipv4 == 1) { + sError("failed to resolve ipv4 addr. fqdn: %s", pNodeInfo->nodeFqdn); + terrno = TSDB_CODE_TSC_INVALID_FQDN; + return false; + } char ipbuf[128] = {0}; tinet_ntoa(ipbuf, ipv4); raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); raftId->vgId = vgId; + return true; } bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { @@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) { q++; } } -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 1d46d71a05..641bb32d2d 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted)); - ASSERT(pVotesGranted != NULL); + if (pVotesGranted == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pVotesGranted, 0, sizeof(SVotesGranted)); pVotesGranted->replicas = &(pSyncNode->replicasId); From 4b4f7a44f400465c46e41c81f641bbe6374a299a Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 28 Sep 2022 17:39:22 +0800 Subject: [PATCH 02/12] fix: resolve memory leak in vnodeOpen when vnodeSyncOpen failure --- source/dnode/vnode/src/tsdb/tsdbOpen.c | 2 ++ source/dnode/vnode/src/vnd/vnodeBufPool.c | 6 +++++- source/dnode/vnode/src/vnd/vnodeOpen.c | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index ec760e3c57..fcbcff9248 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -85,6 +85,8 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { taosThreadRwlockDestroy(&(*pTsdb)->rwLock); + tsdbMemTableDestroy((*pTsdb)->mem); + (*pTsdb)->mem = NULL; tsdbFSClose(*pTsdb); tsdbCloseCache(*pTsdb); taosMemoryFreeClear(*pTsdb); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 5a22114ab4..6e02425b55 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -53,6 +53,10 @@ int vnodeCloseBufPool(SVnode *pVnode) { vnodeBufPoolDestroy(pPool); } + if (pVnode->inUse) { + vnodeBufPoolDestroy(pVnode->inUse); + pVnode->inUse = NULL; + } vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); return 0; @@ -177,4 +181,4 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { taosThreadMutexUnlock(&pVnode->mutex); } -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 69785570f7..b5307cecf2 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -173,6 +173,7 @@ _err: if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); + if (pVnode->pPool) vnodeCloseBufPool(pVnode); tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); From 45fb658cec54b1c377c31d04fb70f90b98d5b9ee Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 28 Sep 2022 19:06:43 +0800 Subject: [PATCH 03/12] fix: fix rows missed in query result issue --- source/dnode/vnode/src/tsdb/tsdbRead.c | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2ea6a57b9e..c3cb5f9eb8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -486,7 +486,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->suid = pCond->suid; pReader->order = pCond->order; - pReader->capacity = 4096; + pReader->capacity = capacity; pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL; pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; @@ -841,14 +841,18 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn bool asc = ASCENDING_TRAVERSE(pReader->order); int32_t step = asc ? 1 : -1; - if (asc && pReader->window.skey <= pBlock->minKey.ts) { - pDumpInfo->rowIndex = 0; - } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { - pDumpInfo->rowIndex = pBlock->nRow - 1; - } else { - int32_t pos = asc ? pBlock->nRow - 1 : 0; - int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order); + + if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) { + if (asc && pReader->window.skey <= pBlock->minKey.ts) { + //pDumpInfo->rowIndex = 0; + } else + if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { + //pDumpInfo->rowIndex = pBlock->nRow - 1; + } else { + int32_t pos = asc ? pBlock->nRow - 1 : 0; + int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; + pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order); + } } // time window check @@ -932,8 +936,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn pDumpInfo->rowIndex += step * remain; if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) { - int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - setBlockAllDumped(pDumpInfo, ts, pReader->order); +// int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; +// setBlockAllDumped(pDumpInfo, ts, pReader->order); } else { int64_t k = asc ? pBlock->maxKey.ts : pBlock->minKey.ts; setBlockAllDumped(pDumpInfo, k, pReader->order); From 8a74613a1342a03292560c4b5bf2961bcf4d41ba Mon Sep 17 00:00:00 2001 From: robotspace Date: Thu, 29 Sep 2022 10:02:53 +0800 Subject: [PATCH 04/12] Drop stream when test case is over. Add commands to disable built-in lualib. (#17134) --- examples/lua/README.md | 6 ++++++ examples/lua/lua51/lua_connector51.c | 24 +++++++++++++++--------- examples/lua/lua_connector.c | 16 ++++++++-------- examples/lua/test.lua | 10 ++++++++-- 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/examples/lua/README.md b/examples/lua/README.md index 32d6a4cace..5abf0c1aab 100644 --- a/examples/lua/README.md +++ b/examples/lua/README.md @@ -1,6 +1,12 @@ # TDengine driver connector for Lua It's a Lua implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. You may need to install Lua5.3 . +As TDengine is built with lua-enable with default configure, the built-in lua lib conflicts with external lua lib. The following commands require TDengine built with lua-disable. +To disable built-in lua: +``` +mkdir debug && cd debug +cmake .. -DBUILD_LUA=false && cmake --build . +``` ## Lua Dependencies - Lua: diff --git a/examples/lua/lua51/lua_connector51.c b/examples/lua/lua51/lua_connector51.c index 4c702b2aae..8a9051dd0c 100644 --- a/examples/lua/lua51/lua_connector51.c +++ b/examples/lua/lua51/lua_connector51.c @@ -29,7 +29,7 @@ static int l_connect(lua_State *L){ luaL_checktype(L, 1, LUA_TTABLE); lua_getfield(L, 1,"host"); - if (lua_isstring(L,-1)){ + if (lua_isstring(L, -1)){ host = lua_tostring(L, -1); // printf("host = %s\n", host); } @@ -58,8 +58,9 @@ static int l_connect(lua_State *L){ //printf("password = %s\n", password); } - lua_settop(L,0); + lua_settop(L, 0); + taos_init(); lua_newtable(L); int table_index = lua_gettop(L); @@ -125,7 +126,7 @@ static int l_query(lua_State *L){ //printf("row index:%d\n",rows); rows++; - lua_pushnumber(L,rows); + lua_pushnumber(L, rows); lua_newtable(L); for (int i = 0; i < num_fields; ++i) { @@ -136,15 +137,19 @@ static int l_query(lua_State *L){ lua_pushstring(L,fields[i].name); int32_t* length = taos_fetch_lengths(result); switch (fields[i].type) { + case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: lua_pushinteger(L,*((char *)row[i])); break; + case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_SMALLINT: lua_pushinteger(L,*((short *)row[i])); break; + case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: lua_pushinteger(L,*((int *)row[i])); break; + case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: lua_pushinteger(L,*((int64_t *)row[i])); break; @@ -154,6 +159,7 @@ static int l_query(lua_State *L){ case TSDB_DATA_TYPE_DOUBLE: lua_pushnumber(L,*((double *)row[i])); break; + case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: //printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]); @@ -197,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ printf("failed, reason:%s\n", taos_errstr(result)); lua_pushinteger(L, -1); lua_setfield(L, table_index, "code"); - lua_pushstring(L,"something is wrong");// taos_errstr(taos)); + lua_pushstring(L, taos_errstr(result)); lua_setfield(L, table_index, "error"); }else{ //printf("success to async query.\n"); @@ -214,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ static int l_async_query(lua_State *L){ int r = luaL_ref(L, LUA_REGISTRYINDEX); - TAOS * taos = (TAOS*)lua_topointer(L,1); - const char * sqlstr = lua_tostring(L,2); - // int stime = luaL_checknumber(L,3); + TAOS * taos = (TAOS*)lua_topointer(L, 1); + const char * sqlstr = lua_tostring(L, 2); + // int stime = luaL_checknumber(L, 3); lua_newtable(L); int table_index = lua_gettop(L); @@ -224,7 +230,7 @@ static int l_async_query(lua_State *L){ struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param)); p->state = L; p->callback=r; - // printf("r:%d, L:%d\n",r,L); + // printf("r:%d, L:%d\n", r, L); taos_query_a(taos,sqlstr,async_query_callback,p); lua_pushnumber(L, 0); @@ -267,7 +273,7 @@ static const struct luaL_Reg lib[] = { extern int luaopen_luaconnector51(lua_State* L) { // luaL_register(L, "luaconnector51", lib); - lua_newtable (L); + lua_newtable(L); luaL_setfuncs(L,lib,0); return 1; } diff --git a/examples/lua/lua_connector.c b/examples/lua/lua_connector.c index ce13ab3829..c3d8bcb995 100644 --- a/examples/lua/lua_connector.c +++ b/examples/lua/lua_connector.c @@ -29,7 +29,7 @@ static int l_connect(lua_State *L){ luaL_checktype(L, 1, LUA_TTABLE); lua_getfield(L, 1,"host"); - if (lua_isstring(L,-1)){ + if (lua_isstring(L, -1)){ host = lua_tostring(L, -1); // printf("host = %s\n", host); } @@ -58,7 +58,7 @@ static int l_connect(lua_State *L){ //printf("password = %s\n", password); } - lua_settop(L,0); + lua_settop(L, 0); taos_init(); @@ -126,7 +126,7 @@ static int l_query(lua_State *L){ //printf("row index:%d\n",rows); rows++; - lua_pushnumber(L,rows); + lua_pushnumber(L, rows); lua_newtable(L); for (int i = 0; i < num_fields; ++i) { @@ -203,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ printf("failed, reason:%s\n", taos_errstr(result)); lua_pushinteger(L, -1); lua_setfield(L, table_index, "code"); - lua_pushstring(L,"something is wrong");// taos_errstr(taos)); + lua_pushstring(L, taos_errstr(result)); lua_setfield(L, table_index, "error"); }else{ //printf("success to async query.\n"); @@ -220,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ static int l_async_query(lua_State *L){ int r = luaL_ref(L, LUA_REGISTRYINDEX); - TAOS * taos = (TAOS*)lua_topointer(L,1); - const char * sqlstr = lua_tostring(L,2); - // int stime = luaL_checknumber(L,3); + TAOS * taos = (TAOS*)lua_topointer(L, 1); + const char * sqlstr = lua_tostring(L, 2); + // int stime = luaL_checknumber(L, 3); lua_newtable(L); int table_index = lua_gettop(L); @@ -230,7 +230,7 @@ static int l_async_query(lua_State *L){ struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param)); p->state = L; p->callback=r; - // printf("r:%d, L:%d\n",r,L); + // printf("r:%d, L:%d\n", r, L); taos_query_a(taos,sqlstr,async_query_callback,p); lua_pushnumber(L, 0); diff --git a/examples/lua/test.lua b/examples/lua/test.lua index 3d725cc6a3..94415982e7 100644 --- a/examples/lua/test.lua +++ b/examples/lua/test.lua @@ -176,8 +176,14 @@ end driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)") +if res.code ~=0 then + print("create stream--- failed:"..res.error) + return +else + print("create stream--- pass") +end -print("From now on we start continous insert in an definite loop, pls wait for about 10 seconds and check stream table for result.") +print("From now on we start continous insertion in an definite loop, please wait for about 10 seconds and check stream table avg_degree for result.") local loop_index = 0 while loop_index < 10 do local t = os.time()*1000 @@ -193,5 +199,5 @@ while loop_index < 10 do os.execute("sleep " .. 1) loop_index = loop_index + 1 end -driver.query(conn,"DROP STREAM IF EXISTS avg_therm_s") +driver.query(conn,"DROP STREAM IF EXISTS stream_avg_degree") driver.close(conn) From 0e734f7aee14dcef638d1a4d0c3464a0c3ff3bdf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 29 Sep 2022 10:39:22 +0800 Subject: [PATCH 05/12] fix:timestamp out of range --- utils/test/c/sml_test.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index ca3d464da7..56ba622a9c 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -879,7 +879,7 @@ int sml_16960_Test() { "{" "\"timestamp\":" "" - "{ \"value\": 1349020800000, \"type\": \"ms\" }" + "{ \"value\": 1664418955000, \"type\": \"ms\" }" "," "\"value\":" "" @@ -916,7 +916,7 @@ int sml_16960_Test() { "{" "\"timestamp\":" "" - "{ \"value\": 1349020800001, \"type\": \"ms\" }" + "{ \"value\": 1664418955001, \"type\": \"ms\" }" "," "\"value\":" "" @@ -953,7 +953,7 @@ int sml_16960_Test() { "{" "\"timestamp\":" "" - "{ \"value\": 1349020800002, \"type\": \"ms\" }" + "{ \"value\": 1664418955002, \"type\": \"ms\" }" "," "\"value\":" "" @@ -990,7 +990,7 @@ int sml_16960_Test() { "{" "\"timestamp\":" "" - "{ \"value\": 1349020800003, \"type\": \"ms\" }" + "{ \"value\": 1664418955003, \"type\": \"ms\" }" "," "\"value\":" "" @@ -1027,7 +1027,7 @@ int sml_16960_Test() { "{" "\"timestamp\":" "" - "{ \"value\": 1349020800004, \"type\": \"ms\" }" + "{ \"value\": 1664418955004, \"type\": \"ms\" }" "," "\"value\":" "" From 7aa2ffc4a8d7ac63a66ed8b99b843b35a9150244 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 29 Sep 2022 11:09:23 +0800 Subject: [PATCH 06/12] docs: remove influx_max_line_bytes for 3.0 (#17137) --- docs/en/14-reference/_telegraf.mdx | 1 - docs/en/25-application/01-telegraf.md | 1 - docs/zh/14-reference/_telegraf.mdx | 1 - docs/zh/25-application/01-telegraf.md | 1 - 4 files changed, 4 deletions(-) diff --git a/docs/en/14-reference/_telegraf.mdx b/docs/en/14-reference/_telegraf.mdx index e32fb25693..bcf1a0893f 100644 --- a/docs/en/14-reference/_telegraf.mdx +++ b/docs/en/14-reference/_telegraf.mdx @@ -22,5 +22,4 @@ An example is as follows. username = "root" password = "taosdata" data_format = "influx" - influx_max_line_bytes = 250 ``` diff --git a/docs/en/25-application/01-telegraf.md b/docs/en/25-application/01-telegraf.md index 59491152bc..f700326449 100644 --- a/docs/en/25-application/01-telegraf.md +++ b/docs/en/25-application/01-telegraf.md @@ -60,7 +60,6 @@ For the configuration method, add the following text to `/etc/telegraf/telegraf. username = "" password = "" data_format = "influx" - influx_max_line_bytes = 250 ``` Then restart telegraf: diff --git a/docs/zh/14-reference/_telegraf.mdx b/docs/zh/14-reference/_telegraf.mdx index bae46d6606..3f92e5dde0 100644 --- a/docs/zh/14-reference/_telegraf.mdx +++ b/docs/zh/14-reference/_telegraf.mdx @@ -22,6 +22,5 @@ username = "root" password = "taosdata" data_format = "influx" - influx_max_line_bytes = 250 ``` diff --git a/docs/zh/25-application/01-telegraf.md b/docs/zh/25-application/01-telegraf.md index 4e9597f964..6338264d17 100644 --- a/docs/zh/25-application/01-telegraf.md +++ b/docs/zh/25-application/01-telegraf.md @@ -61,7 +61,6 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如 username = "" password = "" data_format = "influx" - influx_max_line_bytes = 250 ``` 然后重启 Telegraf: From 4c00225e9fe24c07f59048e7762fe7eb53d04753 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 29 Sep 2022 11:26:58 +0800 Subject: [PATCH 07/12] fix: fix coverity scan issue in tudf.c and udfd.c --- source/libs/function/src/tudf.c | 2 ++ source/libs/function/src/udfd.c | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 62427e0488..060a92f864 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1656,6 +1656,8 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); if (errCode != 0) { fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName); + taosMemoryFree(task->session); + taosMemoryFree(task); return TSDB_CODE_UDF_PIPE_CONNECT_ERR; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 636f006d6e..8ff0dc15a5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -960,7 +960,7 @@ int32_t udfdInitResidentFuncs() { char* token; while ((token = strtok_r(pSave, ",", &pSave)) != NULL) { char func[TSDB_FUNC_NAME_LEN] = {0}; - strncpy(func, token, strlen(token)); + strncpy(func, token, sizeof(func)); taosArrayPush(global.residentFuncs, func); } From aaf325489be5b4386d749c06f3e90217a08ff259 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 29 Sep 2022 12:53:06 +0800 Subject: [PATCH 08/12] docs: remove influx_max_line_bytes for 3.0 (#17139) From ee484470f3946ab9768759e217acb959979cfb8d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:13:44 +0800 Subject: [PATCH 09/12] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2c3808a703..fe13ba303c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -240,7 +240,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); if (!pVnode->restored) { - vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg); + vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg); terrno = TSDB_CODE_APP_NOT_READY; vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY); rpcFreeCont(pMsg->pCont); @@ -805,6 +805,12 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { return true; } + if (pVnode->restored) { + vDebug("vgId:%d, vnode not restore finished", pVnode->config.vgId); + terrno = TSDB_CODE_APP_NOT_READY; + return false; + } + vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync)); return false; From 4223be379a98facb73063daf4296d576db3fea7d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:15:26 +0800 Subject: [PATCH 10/12] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index fe13ba303c..f334c47b1b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -806,7 +806,7 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { } if (pVnode->restored) { - vDebug("vgId:%d, vnode not restore finished", pVnode->config.vgId); + vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); terrno = TSDB_CODE_APP_NOT_READY; return false; } From 7e85a15c31cd8b5ae7875cf8d6c26f41f86397c9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:18:20 +0800 Subject: [PATCH 11/12] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f334c47b1b..119fa02e05 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -805,7 +805,7 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { return true; } - if (pVnode->restored) { + if (!pVnode->restored) { vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); terrno = TSDB_CODE_APP_NOT_READY; return false; From f14d00a5d68074f7c105e45b6263766445bf8316 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Sep 2022 14:19:43 +0800 Subject: [PATCH 12/12] fix: do not process query and fetch msg until vnode restore finished --- source/dnode/vnode/src/vnd/vnodeSync.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 119fa02e05..e15783d92b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -797,6 +797,12 @@ bool vnodeIsLeader(SVnode *pVnode) { } bool vnodeIsReadyForRead(SVnode *pVnode) { + if (!pVnode->restored) { + vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); + terrno = TSDB_CODE_APP_NOT_READY; + return false; + } + if (syncIsReady(pVnode->sync)) { return true; } @@ -805,12 +811,6 @@ bool vnodeIsReadyForRead(SVnode *pVnode) { return true; } - if (!pVnode->restored) { - vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId); - terrno = TSDB_CODE_APP_NOT_READY; - return false; - } - vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync)); return false;