fix:[TD-30915]send hb before close in tmq
This commit is contained in:
parent
ea7733f9ce
commit
322c0633aa
|
@ -27,6 +27,7 @@
|
|||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
|
||||
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
|
||||
#define DEFAULT_HEARTBEAT_INTERVAL 3000
|
||||
#define DEFAULT_ASKEP_INTERVAL 1000
|
||||
|
||||
struct SMqMgmt {
|
||||
int8_t inited;
|
||||
|
@ -99,7 +100,6 @@ struct tmq_t {
|
|||
int64_t totalRows;
|
||||
|
||||
// timer
|
||||
tmr_h hbLiveTimer;
|
||||
tmr_h epTimer;
|
||||
tmr_h commitTimer;
|
||||
STscObj* pTscObj; // connection
|
||||
|
@ -737,35 +737,33 @@ static void generateTimedTask(int64_t refId, int32_t type) {
|
|||
if (tmq == NULL) return;
|
||||
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
|
||||
if (pTaskType == NULL) return;
|
||||
|
||||
if (pTaskType != NULL){
|
||||
*pTaskType = type;
|
||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||
if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0){
|
||||
tsem2_post(&tmq->rspSem);
|
||||
}
|
||||
}
|
||||
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
}
|
||||
|
||||
void tmqAssignAskEpTask(void* param, void* tmrId) {
|
||||
int64_t refId = *(int64_t*)param;
|
||||
int64_t refId = (int64_t)param;
|
||||
generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
|
||||
taosMemoryFree(param);
|
||||
}
|
||||
|
||||
void tmqReplayTask(void* param, void* tmrId) {
|
||||
int64_t refId = *(int64_t*)param;
|
||||
int64_t refId = (int64_t)param;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq == NULL) goto END;
|
||||
if (tmq == NULL) return;
|
||||
|
||||
tsem2_post(&tmq->rspSem);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
END:
|
||||
taosMemoryFree(param);
|
||||
}
|
||||
|
||||
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||
int64_t refId = *(int64_t*)param;
|
||||
int64_t refId = (int64_t)param;
|
||||
generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
|
||||
taosMemoryFree(param);
|
||||
}
|
||||
|
||||
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
|
@ -802,11 +800,10 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
void tmqSendHbReq(void* param, void* tmrId) {
|
||||
int64_t refId = *(int64_t*)param;
|
||||
int64_t refId = (int64_t)param;
|
||||
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq == NULL) {
|
||||
taosMemoryFree(param);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -880,7 +877,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
|
||||
OVER:
|
||||
tDestroySMqHbReq(&req);
|
||||
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||
if(tmrId != NULL){
|
||||
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmrId);
|
||||
}
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
}
|
||||
|
||||
|
@ -908,21 +907,14 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
|
||||
askEp(pTmq, NULL, false, false);
|
||||
|
||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
|
||||
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
|
||||
taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||
tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
|
||||
|
||||
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
|
||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
|
||||
pTmq->autoCommitInterval / 1000.0);
|
||||
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
|
||||
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer);
|
||||
} else {
|
||||
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
|
||||
}
|
||||
|
@ -1171,9 +1163,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
goto _failed;
|
||||
}
|
||||
|
||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer);
|
||||
taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer);
|
||||
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
|
||||
|
@ -1301,18 +1291,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
}
|
||||
|
||||
// init ep timer
|
||||
if (tmq->epTimer == NULL) {
|
||||
int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId1 = tmq->refId;
|
||||
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
|
||||
}
|
||||
|
||||
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
|
||||
// init auto commit timer
|
||||
if (tmq->autoCommit && tmq->commitTimer == NULL) {
|
||||
int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId2 = tmq->refId;
|
||||
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
|
||||
}
|
||||
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
|
||||
|
||||
FAIL:
|
||||
taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
||||
|
@ -2015,9 +1996,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
pVg->blockReceiveTs = taosGetTimestampMs();
|
||||
pVg->blockSleepForReplay = pRsp->rsp.sleepTime;
|
||||
if (pVg->blockSleepForReplay > 0) {
|
||||
int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId1 = tmq->refId;
|
||||
taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer);
|
||||
taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer);
|
||||
}
|
||||
}
|
||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||
|
@ -2274,7 +2253,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
|||
return code;
|
||||
}
|
||||
}
|
||||
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
|
||||
tmqSendHbReq((void*)(tmq->refId), NULL);
|
||||
|
||||
tmq_list_t* lst = tmq_list_new();
|
||||
int32_t code = tmq_subscribe(tmq, lst);
|
||||
|
|
Loading…
Reference in New Issue