From ea7733f9ce1d823ea67113ec80bcadc6d11774fa Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 4 Jul 2024 18:10:10 +0800 Subject: [PATCH 1/4] fix[TD-30883] tmqParamsTest.py failed in some times --- tests/system-test/7-tmq/tmqParamsTest.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index b25f23ef11..3d5fb52da5 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -123,6 +123,7 @@ class TDTestCase: tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000))) stop_flag = 1 finally: + time.sleep(5) #wait for send heartbeat to update subscription info. consumer.unsubscribe() consumer.close() tdSql.checkEqual(consumer_info, expected_parameters) @@ -134,6 +135,8 @@ class TDTestCase: if offset_value != "earliest" and offset_value != "": if offset_value == "latest": offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) + if None in offset_value_list: + continue offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list)) offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list)) tdSql.checkEqual(offset_value_list1 == offset_value_list2, True) @@ -142,6 +145,8 @@ class TDTestCase: tdSql.checkEqual(sum(rows_value_list), expected_res) elif offset_value == "none": offset_value_list = list(map(lambda x: x[-2], subscription_info)) + if None in offset_value_list: + continue offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) @@ -155,6 +160,8 @@ class TDTestCase: # tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) + if None in offset_value_list: + continue offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) tdSql.checkEqual(offset_value_list1, [None]*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) @@ -162,6 +169,8 @@ class TDTestCase: else: if offset_value != "none": offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) + if None in offset_value_list: + continue offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list)) offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list)) tdSql.checkEqual(offset_value_list1 <= offset_value_list2, True) @@ -170,6 +179,8 @@ class TDTestCase: tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) + if None in offset_value_list: + continue offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) From 322c0633aa0d2c07f85b9dc2e443333678f0a872 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 8 Jul 2024 10:46:14 +0800 Subject: [PATCH 2/4] fix:[TD-30915]send hb before close in tmq --- source/client/src/clientTmq.c | 65 ++++++++++++----------------------- 1 file changed, 22 insertions(+), 43 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 21d1a528da..a42b0f75dd 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -27,6 +27,7 @@ #define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_HEARTBEAT_INTERVAL 3000 +#define DEFAULT_ASKEP_INTERVAL 1000 struct SMqMgmt { int8_t inited; @@ -99,7 +100,6 @@ struct tmq_t { int64_t totalRows; // timer - tmr_h hbLiveTimer; tmr_h epTimer; tmr_h commitTimer; STscObj* pTscObj; // connection @@ -737,35 +737,33 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (tmq == NULL) return; int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); - if (pTaskType == NULL) return; + if (pTaskType != NULL){ + *pTaskType = type; + if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0){ + tsem2_post(&tmq->rspSem); + } + } - *pTaskType = type; - taosWriteQitem(tmq->delayedTask, pTaskType); - tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); } void tmqAssignAskEpTask(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP); - taosMemoryFree(param); } void tmqReplayTask(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); - if (tmq == NULL) goto END; + if (tmq == NULL) return; tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); -END: - taosMemoryFree(param); } void tmqAssignDelayedCommitTask(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT); - taosMemoryFree(param); } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { @@ -802,11 +800,10 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { } void tmqSendHbReq(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - taosMemoryFree(param); return; } @@ -880,7 +877,9 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); + if(tmrId != NULL){ + taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmrId); + } taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -908,21 +907,14 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { askEp(pTmq, NULL, false, false); - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); - taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); + taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; - asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); - taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); + taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); } else { tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } @@ -1171,9 +1163,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer); + taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; @@ -1301,18 +1291,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } // init ep timer - if (tmq->epTimer == NULL) { - int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t)); - *pRefId1 = tmq->refId; - tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer); - } - + tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); // init auto commit timer - if (tmq->autoCommit && tmq->commitTimer == NULL) { - int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t)); - *pRefId2 = tmq->refId; - tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer); - } + tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); FAIL: taosArrayDestroyP(req.topicNames, taosMemoryFree); @@ -2015,9 +1996,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRsp->rsp.sleepTime; if (pVg->blockSleepForReplay > 0) { - int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t)); - *pRefId1 = tmq->refId; - taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer); + taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer); } } tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 @@ -2274,7 +2253,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { return code; } } - taosSsleep(2); // sleep 2s for hb to send offset and rows to server + tmqSendHbReq((void*)(tmq->refId), NULL); tmq_list_t* lst = tmq_list_new(); int32_t code = tmq_subscribe(tmq, lst); From 7ff7ef1d73d67eda3f1ed500d4cdb74925b59f35 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 8 Jul 2024 13:51:30 +0800 Subject: [PATCH 3/4] fix:[TD-30883]send hb before close in tmq --- tests/system-test/7-tmq/tmqParamsTest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index 3d5fb52da5..a323dff19e 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -123,7 +123,6 @@ class TDTestCase: tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000))) stop_flag = 1 finally: - time.sleep(5) #wait for send heartbeat to update subscription info. consumer.unsubscribe() consumer.close() tdSql.checkEqual(consumer_info, expected_parameters) From 2176fa3b1ad3e9dc2ab2651de71999d3b5cc1572 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 8 Jul 2024 15:49:00 +0800 Subject: [PATCH 4/4] fix:[TD-30883]send hb before close in tmq --- source/client/src/clientTmq.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a42b0f75dd..10d5140a0b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -100,6 +100,7 @@ struct tmq_t { int64_t totalRows; // timer + tmr_h hbLiveTimer; tmr_h epTimer; tmr_h commitTimer; STscObj* pTscObj; // connection @@ -878,7 +879,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); if(tmrId != NULL){ - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmrId); + taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); } taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -1163,7 +1164,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); + pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};