From 2176fa3b1ad3e9dc2ab2651de71999d3b5cc1572 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 8 Jul 2024 15:49:00 +0800 Subject: [PATCH] fix:[TD-30883]send hb before close in tmq --- source/client/src/clientTmq.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a42b0f75dd..10d5140a0b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -100,6 +100,7 @@ struct tmq_t { int64_t totalRows; // timer + tmr_h hbLiveTimer; tmr_h epTimer; tmr_h commitTimer; STscObj* pTscObj; // connection @@ -878,7 +879,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); if(tmrId != NULL){ - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmrId); + taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); } taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -1163,7 +1164,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); + pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};