fix:save offset if latest
This commit is contained in:
parent
a3e214b9e8
commit
5eb1c559e5
|
@ -377,7 +377,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
|
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,7 +410,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
while (tqIsHandleExec(pHandle)) {
|
while (tqIsHandleExec(pHandle)) {
|
||||||
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle);
|
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
if (pHandle->pRef) {
|
if (pHandle->pRef) {
|
||||||
|
|
|
@ -121,6 +121,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
|
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
|
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
|
||||||
|
STqOffset offset = {0};
|
||||||
|
strcpy(offset.subKey, pRequest->subKey);
|
||||||
|
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||||
|
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||||
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
|
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
|
||||||
pHandle->subKey, consumerId, vgId, pRequest->subKey);
|
pHandle->subKey, consumerId, vgId, pRequest->subKey);
|
||||||
|
|
Loading…
Reference in New Issue