diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 17eac7d096..7557ddb4e9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -377,7 +377,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } taosWUnLockLatch(&pTq->lock); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosMsleep(10); } @@ -410,7 +410,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { while (tqIsHandleExec(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); taosMsleep(10); } if (pHandle->pRef) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5eaf7b240b..e73aed8966 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -121,6 +121,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); + STqOffset offset = {0}; + strcpy(offset.subKey, pRequest->subKey); + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { + terrno = TSDB_CODE_PAR_INTERNAL_ERROR; + return -1; + } } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey);