diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a42b0f75dd..10d5140a0b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -100,6 +100,7 @@ struct tmq_t { int64_t totalRows; // timer + tmr_h hbLiveTimer; tmr_h epTimer; tmr_h commitTimer; STscObj* pTscObj; // connection @@ -878,7 +879,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); if(tmrId != NULL){ - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmrId); + taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); } taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -1163,7 +1164,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); + pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};