diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 4275e5626b..f0da479700 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -165,7 +165,6 @@ struct SStreamTaskWriter { STQ* pTq; int64_t sver; int64_t ever; - TXN* txn; }; int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) { @@ -182,12 +181,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa pWriter->sver = sver; pWriter->ever = ever; - if (tdbBegin(pTq->pStreamMeta->db, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - code = -1; - taosMemoryFree(pWriter); - goto _err; - } - *ppWriter = pWriter; tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode)); return code; @@ -204,29 +197,31 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode)); + + taosWLockLatch(&pTq->pStreamMeta->lock); if (rollback) { - tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn); + tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn); } else { - code = tdbCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn); + code = tdbCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn); if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn); + code = tdbPostCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn); if (code) goto _err; } + if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + code = -1; + goto _err; + } + + taosWUnLockLatch(&pTq->pStreamMeta->lock); taosMemoryFree(pWriter); - - // restore from metastore - // if (tqMetaRestoreHandle(pTq) < 0) { - // goto _err; - // } - return code; _err: tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + taosWUnLockLatch(&pTq->pStreamMeta->lock); return code; - return 0; } int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { @@ -251,7 +246,7 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t taosWLockLatch(&pTq->pStreamMeta->lock); if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), - nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { + nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 733f073ce9..6df2044612 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -414,20 +414,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (pWriter->pStreamTaskWriter) { code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback); if (code) goto _exit; - } - - if (pWriter->pStreamStateWriter) { - code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); - if (code) goto _exit; - - code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0); - pWriter->pStreamStateWriter = NULL; - if (code) goto _exit; - } - - if (pWriter->pStreamTaskWriter) { - code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback); - if (code) goto _exit; + pWriter->pStreamTaskWriter = NULL; } if (pWriter->pStreamStateWriter) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a11f4a8b26..837222c55e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -129,7 +129,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { goto _err; } - if (streamMetaBegin(pMeta) < 0) { goto _err; } @@ -543,10 +542,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } int32_t streamMetaBegin(SStreamMeta* pMeta) { + taosWLockLatch(&pMeta->lock); if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + taosWUnLockLatch(&pMeta->lock); return -1; } + taosWUnLockLatch(&pMeta->lock); return 0; }