From 322c0633aa0d2c07f85b9dc2e443333678f0a872 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 8 Jul 2024 10:46:14 +0800 Subject: [PATCH] fix:[TD-30915]send hb before close in tmq --- source/client/src/clientTmq.c | 65 ++++++++++++----------------------- 1 file changed, 22 insertions(+), 43 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 21d1a528da..a42b0f75dd 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -27,6 +27,7 @@ #define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_HEARTBEAT_INTERVAL 3000 +#define DEFAULT_ASKEP_INTERVAL 1000 struct SMqMgmt { int8_t inited; @@ -99,7 +100,6 @@ struct tmq_t { int64_t totalRows; // timer - tmr_h hbLiveTimer; tmr_h epTimer; tmr_h commitTimer; STscObj* pTscObj; // connection @@ -737,35 +737,33 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (tmq == NULL) return; int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); - if (pTaskType == NULL) return; + if (pTaskType != NULL){ + *pTaskType = type; + if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0){ + tsem2_post(&tmq->rspSem); + } + } - *pTaskType = type; - taosWriteQitem(tmq->delayedTask, pTaskType); - tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); } void tmqAssignAskEpTask(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP); - taosMemoryFree(param); } void tmqReplayTask(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); - if (tmq == NULL) goto END; + if (tmq == NULL) return; tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); -END: - taosMemoryFree(param); } void tmqAssignDelayedCommitTask(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT); - taosMemoryFree(param); } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { @@ -802,11 +800,10 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { } void tmqSendHbReq(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - taosMemoryFree(param); return; } @@ -880,7 +877,9 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); + if(tmrId != NULL){ + taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmrId); + } taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -908,21 +907,14 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { askEp(pTmq, NULL, false, false); - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); - taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); + taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; - asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); - taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); + taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); } else { tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } @@ -1171,9 +1163,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer); + taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; @@ -1301,18 +1291,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } // init ep timer - if (tmq->epTimer == NULL) { - int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t)); - *pRefId1 = tmq->refId; - tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer); - } - + tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); // init auto commit timer - if (tmq->autoCommit && tmq->commitTimer == NULL) { - int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t)); - *pRefId2 = tmq->refId; - tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer); - } + tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); FAIL: taosArrayDestroyP(req.topicNames, taosMemoryFree); @@ -2015,9 +1996,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRsp->rsp.sleepTime; if (pVg->blockSleepForReplay > 0) { - int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t)); - *pRefId1 = tmq->refId; - taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer); + taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer); } } tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 @@ -2274,7 +2253,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { return code; } } - taosSsleep(2); // sleep 2s for hb to send offset and rows to server + tmqSendHbReq((void*)(tmq->refId), NULL); tmq_list_t* lst = tmq_list_new(); int32_t code = tmq_subscribe(tmq, lst);