diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 08d32b2b81..68d966add6 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -133,7 +133,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); int32_t tqMetaGetHandle(STQ* pTq, const char* key); -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer); STqOffsetStore* tqOffsetOpen(STQ* pTq); int32_t tqMetaTransform(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 18442f182b..925f8feb05 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -50,6 +50,9 @@ void tqDestroyTqHandle(void* data) { if (pData->block != NULL) { blockDataDestroy(pData->block); } + if (pData->pRef) { + walCloseRef(pData->pRef->pWal, pData->pRef->refId); + } } static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { @@ -571,9 +574,6 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosMsleep(10); continue; } - if (pHandle->pRef) { - walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); - } tqUnregisterPushHandle(pTq, pHandle); @@ -660,7 +660,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg goto end; } STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle); + ret = tqCreateHandle(pTq, &req, &handle, walGetCommittedVer(pTq->pVnode->pWal)); if (ret < 0) { tqDestroyTqHandle(&handle); goto end; @@ -689,7 +689,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // update handle to avoid req->qmsg changed if spilt vnode is failed STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle); + ret = tqCreateHandle(pTq, &req, &handle, pHandle->snapshotVer); if (ret < 0) { tqDestroyTqHandle(&handle); goto end; @@ -701,7 +701,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } -end: + end: tDecoderClear(&dc); return ret; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index cb64c9033a..5162136591 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -346,7 +346,7 @@ end: return code; } -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer){ int32_t vgId = TD_VID(pTq->pVnode); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -364,7 +364,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); } - handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); + handle->snapshotVer = snapshotVer; if(buildHandle(pTq, handle) < 0){ return -1;