From 1c26f1b53e4c274c6b31d1d4a0602b88352a4934 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Oct 2022 16:13:55 +0800 Subject: [PATCH 1/2] fix(wal): reference --- source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- source/libs/wal/src/walRef.c | 1 + source/libs/wal/src/walWrite.c | 21 ++++++++++++++++----- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 739b8bbf01..eea79c5335 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -550,7 +550,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) { - ASSERT(0); + mError("failed to prepare trans rebalance since %s", terrstr()); goto REB_FAIL; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 2c45fbbdaf..119d0575d8 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -42,6 +42,7 @@ void walCloseRef(SWal *pWal, int64_t refId) { int32_t walRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; + wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId); if (pRef->refVer != ver) { taosThreadMutexLock(&pWal->mutex); if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 3354308c49..e6b4e9d8ff 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -257,6 +257,8 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { pWal->vers.verInSnapshotting = ver; + wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, + pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); // check file rolling if (pWal->cfg.retentionPeriod == 0) { taosThreadMutexLock(&pWal->mutex); @@ -273,6 +275,10 @@ int32_t walEndSnapshot(SWal *pWal) { int32_t code = 0; taosThreadMutexLock(&pWal->mutex); int64_t ver = pWal->vers.verInSnapshotting; + + wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId, + ver, pWal->vers.firstVer, pWal->vers.lastVer); + if (ver == -1) { code = -1; goto END; @@ -287,7 +293,8 @@ int32_t walEndSnapshot(SWal *pWal) { if (pIter == NULL) break; SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer == -1) continue; - ver = TMIN(ver, pRef->refVer); + ver = TMIN(ver, pRef->refVer - 1); + wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); } int deleteCnt = 0; @@ -298,8 +305,9 @@ int32_t walEndSnapshot(SWal *pWal) { SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); if (pInfo) { if (ver >= pInfo->lastVer) { - pInfo++; + pInfo--; } + wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer); // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) || @@ -315,10 +323,12 @@ int32_t walEndSnapshot(SWal *pWal) { for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); + wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0) { goto UPDATE_META; } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); + wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0) { ASSERT(0); } @@ -409,7 +419,7 @@ END: } static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { - SWalIdxEntry entry = {.ver = ver, .offset = offset}; + SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); ASSERT(pFileInfo != NULL); ASSERT(pFileInfo->firstVer >= 0); @@ -424,7 +434,8 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { return -1; } - ASSERT(taosLSeekFile(pWal->pIdxFile, 0, SEEK_END) == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned"); + ASSERT(taosLSeekFile(pWal->pIdxFile, 0, SEEK_END) == idxOffset + sizeof(SWalIdxEntry) && + "Offset of idx entries misaligned"); return 0; } @@ -432,7 +443,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy const void *body, int32_t bodyLen) { int64_t code = 0; - int64_t offset = walGetCurFileOffset(pWal); + int64_t offset = walGetCurFileOffset(pWal); SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); ASSERT(pFileInfo != NULL); From f49d91ac1a53c4c2f0582454db2d9e09f9dae892 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Oct 2022 17:11:22 +0800 Subject: [PATCH 2/2] fix(stream): decode bloom filter --- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 2 +- source/libs/stream/src/streamUpdate.c | 8 ++++++-- source/util/src/tbloomfilter.c | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b4a7ce7737..92e5f8df7a 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -167,7 +167,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { if (rollback) { ASSERT(0); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); if (code) goto _err; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 80410568e5..199892c241 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -175,11 +175,15 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr maxTs = TMAX(maxTs, ts); SScalableBf *pSBf = getSBf(pInfo, ts); if (pSBf) { - tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); + SUpdateKey updateKey = { + .tbUid = tbUid, + .ts = ts, + }; + tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); } } TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); - if (pMaxTs == NULL || *pMaxTs > tbUid) { + if (pMaxTs == NULL || *pMaxTs > maxTs) { taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); } } diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index b9c96dd606..84a78f3477 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -137,8 +137,10 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) { if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error; } if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error; - pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); - pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); + /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/ + /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/ + pBF->hashFn1 = taosFastHash; + pBF->hashFn2 = taosDJB2Hash; return pBF; _error: