fix:[TD-30883]send hb before close in tmq

This commit is contained in:
wangmm0220 2024-07-08 15:49:00 +08:00
parent 7ff7ef1d73
commit 2176fa3b1a
1 changed files with 3 additions and 2 deletions

View File

@ -100,6 +100,7 @@ struct tmq_t {
int64_t totalRows;
// timer
tmr_h hbLiveTimer;
tmr_h epTimer;
tmr_h commitTimer;
STscObj* pTscObj; // connection
@ -878,7 +879,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER:
tDestroySMqHbReq(&req);
if(tmrId != NULL){
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmrId);
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
}
taosReleaseRef(tmqMgmt.rsetId, refId);
}
@ -1163,7 +1164,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto _failed;
}
taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer);
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer);
char buf[TSDB_OFFSET_LEN] = {0};
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};