Merge pull request #26423 from taosdata/fix/TD-30883-3.0

fix[TD-30883] tmqParamsTest.py failed in some times
This commit is contained in:
dapan1121 2024-07-09 17:12:00 +08:00 committed by GitHub
commit 3852c55a75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 42 deletions

View File

@ -27,6 +27,7 @@
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000
#define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_HEARTBEAT_INTERVAL 3000
#define DEFAULT_ASKEP_INTERVAL 1000
struct SMqMgmt { struct SMqMgmt {
int8_t inited; int8_t inited;
@ -737,35 +738,33 @@ static void generateTimedTask(int64_t refId, int32_t type) {
if (tmq == NULL) return; if (tmq == NULL) return;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
if (pTaskType == NULL) return; if (pTaskType != NULL){
*pTaskType = type;
if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0){
tsem2_post(&tmq->rspSem);
}
}
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
} }
void tmqAssignAskEpTask(void* param, void* tmrId) { void tmqAssignAskEpTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = (int64_t)param;
generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP); generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
taosMemoryFree(param);
} }
void tmqReplayTask(void* param, void* tmrId) { void tmqReplayTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = (int64_t)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) goto END; if (tmq == NULL) return;
tsem2_post(&tmq->rspSem); tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
END:
taosMemoryFree(param);
} }
void tmqAssignDelayedCommitTask(void* param, void* tmrId) { void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = (int64_t)param;
generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT); generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
taosMemoryFree(param);
} }
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
@ -802,11 +801,10 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
void tmqSendHbReq(void* param, void* tmrId) { void tmqSendHbReq(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = (int64_t)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
taosMemoryFree(param);
return; return;
} }
@ -880,7 +878,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER: OVER:
tDestroySMqHbReq(&req); tDestroySMqHbReq(&req);
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); if(tmrId != NULL){
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
}
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
} }
@ -908,21 +908,14 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
askEp(pTmq, NULL, false, false); askEp(pTmq, NULL, false, false);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0); pTmq->autoCommitInterval / 1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer);
} else { } else {
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
} }
@ -1171,9 +1164,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto _failed; goto _failed;
} }
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer);
*pRefId = pTmq->refId;
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
@ -1301,18 +1292,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
// init ep timer // init ep timer
if (tmq->epTimer == NULL) { tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
*pRefId1 = tmq->refId;
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
}
// init auto commit timer // init auto commit timer
if (tmq->autoCommit && tmq->commitTimer == NULL) { tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
*pRefId2 = tmq->refId;
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
}
FAIL: FAIL:
taosArrayDestroyP(req.topicNames, taosMemoryFree); taosArrayDestroyP(req.topicNames, taosMemoryFree);
@ -2015,9 +1997,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockReceiveTs = taosGetTimestampMs();
pVg->blockSleepForReplay = pRsp->rsp.sleepTime; pVg->blockSleepForReplay = pRsp->rsp.sleepTime;
if (pVg->blockSleepForReplay > 0) { if (pVg->blockSleepForReplay > 0) {
int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t)); taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer);
*pRefId1 = tmq->refId;
taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer);
} }
} }
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
@ -2274,7 +2254,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
return code; return code;
} }
} }
taosSsleep(2); // sleep 2s for hb to send offset and rows to server tmqSendHbReq((void*)(tmq->refId), NULL);
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
int32_t code = tmq_subscribe(tmq, lst); int32_t code = tmq_subscribe(tmq, lst);

View File

@ -134,6 +134,8 @@ class TDTestCase:
if offset_value != "earliest" and offset_value != "": if offset_value != "earliest" and offset_value != "":
if offset_value == "latest": if offset_value == "latest":
offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
if None in offset_value_list:
continue
offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list)) offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list))
offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list)) offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list))
tdSql.checkEqual(offset_value_list1 == offset_value_list2, True) tdSql.checkEqual(offset_value_list1 == offset_value_list2, True)
@ -142,6 +144,8 @@ class TDTestCase:
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
elif offset_value == "none": elif offset_value == "none":
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
if None in offset_value_list:
continue
offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info)) tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))
@ -155,6 +159,8 @@ class TDTestCase:
# tdSql.checkEqual(sum(rows_value_list), expected_res) # tdSql.checkEqual(sum(rows_value_list), expected_res)
else: else:
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
if None in offset_value_list:
continue
offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
tdSql.checkEqual(offset_value_list1, [None]*len(subscription_info)) tdSql.checkEqual(offset_value_list1, [None]*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))
@ -162,6 +168,8 @@ class TDTestCase:
else: else:
if offset_value != "none": if offset_value != "none":
offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
if None in offset_value_list:
continue
offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list)) offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list))
offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list)) offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list))
tdSql.checkEqual(offset_value_list1 <= offset_value_list2, True) tdSql.checkEqual(offset_value_list1 <= offset_value_list2, True)
@ -170,6 +178,8 @@ class TDTestCase:
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
else: else:
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
if None in offset_value_list:
continue
offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info)) tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))