diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a887db301c..c38e206a9b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1430,6 +1430,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.committedOffset = offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; + clientVg.seekUpdated = false; + clientVg.receivedInfoFromVnode = false; + taosArrayPush(pTopic->vgs, &clientVg); } } @@ -1858,9 +1861,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate + if (!pVg->seekUpdated) { pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset; } + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1878,9 +1882,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate + if (!pVg->seekUpdated) { // if offset is validate pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; } + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b71b33d72a..2447a23edc 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -110,12 +110,6 @@ typedef struct { tq_handle_status status; } STqHandle; -//typedef struct { -// SMqDataRsp* pDataRsp; -// char subKey[TSDB_SUBSCRIBE_KEY_LEN]; -// SRpcHandleInfo info; -//} STqPushEntry; - struct STQ { SVnode* pVnode; char* path; @@ -195,7 +189,6 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); -bool tqIsHandleExecuting(STqHandle* pHandle); #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e7add137bd..6d700c5c9f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -307,13 +307,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - if (pOffset->val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); - if (pHandle && (walRefVer(pHandle->pRef, pOffset->val.version) < 0)) { - return -1; - } - } - return 0; } @@ -381,16 +374,6 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) pOffset->val.version = ever; } - ASSERT(0); - if (offset.val.type == TMQ_OFFSET__LOG) { - taosWLockLatch(&pTq->lock); - STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); - if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) { - taosWUnLockLatch(&pTq->lock); - return -1; - } - taosWUnLockLatch(&pTq->lock); - // save the new offset value if (pSavedOffset != NULL) { tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, @@ -404,7 +387,6 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) return -1; } - walReaderSeekVer(pHandle->execHandle.pTqReader->pWalReader, vgOffset.offset.val.version); tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); @@ -575,6 +557,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; @@ -586,15 +569,11 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } + if (pHandle->pRef) { walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); } - while (tqIsHandleExecuting(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); - taosMsleep(5); - } - code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);