From 629de12bb1cdf8c39295f2dee38c371a613e4b77 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 6 Sep 2023 18:29:28 +0800 Subject: [PATCH] fix:do not return error if commit nothing --- source/client/src/clientTmq.c | 132 ++++++++---------- tests/system-test/7-tmq/subscribeDb3.py | 4 +- .../tmqConsFromTsdb1-1ctb-funcNFilter.py | 2 +- .../7-tmq/tmqConsFromTsdb1-1ctb.py | 2 +- .../tmqConsFromTsdb1-mutilVg-mutilCtb.py | 2 +- .../7-tmq/tmqConsFromTsdb1-mutilVg.py | 2 +- tests/system-test/7-tmq/tmqConsFromTsdb1.py | 2 +- 7 files changed, 63 insertions(+), 83 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d76aa2456e..ed83e41427 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -190,7 +190,6 @@ typedef struct { typedef struct { int64_t refId; - int32_t epoch; void* pParam; __tmq_askep_fn_t pUserFn; } SMqAskEpCbParam; @@ -708,7 +707,6 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } - code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; end: taosMemoryFree(pParamSet); @@ -743,20 +741,6 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { taosMemoryFree(param); } -//void tmqAssignDelayedReportTask(void* param, void* tmrId) { -// int64_t refId = *(int64_t*)param; -// tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); -// if (tmq != NULL) { -// int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); -// *pTaskType = TMQ_DELAYED_TASK__REPORT; -// taosWriteQitem(tmq->delayedTask, pTaskType); -// tsem_post(&tmq->rspSem); -// } -// -// taosReleaseRef(tmqMgmt.rsetId, refId); -// taosMemoryFree(param); -//} - int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { if (pMsg) { taosMemoryFree(pMsg->pData); @@ -984,7 +968,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { } if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) { + if (rsp != 0) { return rsp; } } @@ -1085,7 +1069,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; -// pTmq->needReportOffsetRows = true; // set conf strcpy(pTmq->clientId, conf->clientId); @@ -1146,7 +1129,7 @@ _failed: } int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { - if(tmq == NULL) return TSDB_CODE_INVALID_PARA; + if(tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA; const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); @@ -1222,7 +1205,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if(code != 0){ + goto FAIL; + } // avoid double free if msg is sent buf = NULL; @@ -1239,7 +1225,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { if (retryCnt++ > MAX_RETRY_COUNT) { - tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); + tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); code = TSDB_CODE_MND_CONSUMER_NOT_READY; goto FAIL; } @@ -1512,10 +1498,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) taosWLockLatch(&tmq->lock); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); - char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0}; tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); - // todo extract method for (int32_t i = 0; i < topicNumCur; i++) { // find old topic SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); @@ -1566,32 +1551,17 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); - if (tmq == NULL) { - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; -// pParam->pUserFn(tmq, terrno, NULL, pParam->pParam); - - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); - return terrno; + code = TSDB_CODE_TMQ_CONSUMER_CLOSED; + goto END; } if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); - pParam->pUserFn(tmq, code, NULL, pParam->pParam); - taosReleaseRef(tmqMgmt.rsetId, pParam->refId); - - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); - return code; + goto END; } - // tmq's epoch is monotonically increase, - // so it's safe to discard any old epoch msg. - // Epoch will only increase when received newer epoch ep msg SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); if (head->epoch <= epoch) { @@ -1610,10 +1580,10 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, head->epoch, epoch); } - - pParam->pUserFn(tmq, code, pMsg, pParam->pParam); taosReleaseRef(tmqMgmt.rsetId, pParam->refId); +END: + pParam->pUserFn(tmq, code, pMsg, pParam->pParam); taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); taosMemoryFree(pParam); @@ -1925,9 +1895,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { // todo handle the wal range and epset for each vgroup @@ -1957,9 +1927,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -2015,9 +1985,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else { tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); @@ -2124,7 +2094,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { // if auto commit is set, commit before close consumer. Otherwise, do nothing. if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) { + if (rsp != 0) { return rsp; } } @@ -2440,23 +2410,29 @@ end: } } -void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { +void defaultAskEpCb(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { SAskEpInfo* pInfo = param; pInfo->code = code; - if (code == TSDB_CODE_SUCCESS) { - SMqRspHead* head = pDataBuf->pData; - - SMqAskEpRsp rsp; - tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); - doUpdateLocalEp(pTmq, head->epoch, &rsp); - tDeleteSMqAskEpRsp(&rsp); + if (pTmq == NULL || code != TSDB_CODE_SUCCESS){ + goto END; } + SMqRspHead* head = pDataBuf->pData; + SMqAskEpRsp rsp; + tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); + doUpdateLocalEp(pTmq, head->epoch, &rsp); + tDeleteSMqAskEpRsp(&rsp); + +END: tsem_post(&pInfo->sem); } void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { + if (pTmq == NULL){ + terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; + return; + } if (code != TSDB_CODE_SUCCESS) { terrno = code; return; @@ -2482,7 +2458,7 @@ int32_t doAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); tsem_init(&pInfo->sem, 0, 0); - asyncAskEp(pTmq, updateEpCallbackFn, pInfo); + asyncAskEp(pTmq, defaultAskEpCb, pInfo); tsem_wait(&pInfo->sem); int32_t code = pInfo->code; @@ -2496,49 +2472,45 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { req.consumerId = pTmq->consumerId; req.epoch = pTmq->epoch; strcpy(req.cgroup, pTmq->groupId); + int code = 0; + SMqAskEpCbParam* pParam = NULL; + void* pReq = NULL; int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); if (tlen < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); - askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param); - return; + code = TSDB_CODE_INVALID_PARA; + goto FAIL; } - void* pReq = taosMemoryCalloc(1, tlen); + pReq = taosMemoryCalloc(1, tlen); if (pReq == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); - askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); - return; + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); - taosMemoryFree(pReq); - - askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param); - return; + code = TSDB_CODE_INVALID_PARA; + goto FAIL; } - SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); + pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); - taosMemoryFree(pReq); - - askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); - return; + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } pParam->refId = pTmq->refId; - pParam->epoch = pTmq->epoch; pParam->pUserFn = askEpFn; pParam->pParam = param; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { - taosMemoryFree(pParam); - taosMemoryFree(pReq); - askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); - return; + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; @@ -2553,7 +2525,15 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); int64_t transporterId = 0; - asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if(code == 0){ + return; + } + +FAIL: + taosMemoryFreeClear(pParam); + taosMemoryFreeClear(pReq); + askEpFn(pTmq, code, NULL, param); } int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index b66334a6a6..9e8ca99930 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -94,7 +94,7 @@ class TDTestCase: resultList=[] while 1: tdSql.query("select * from %s.consumeresult"%cdbName) - #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))) if tdSql.getRows() == expectRows: break else: @@ -336,7 +336,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: + if totalConsumeRows > expectrowcnt or totalConsumeRows < 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py index 6a03f0f751..117c3ce637 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py @@ -218,7 +218,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py index c11159c6e5..2864240441 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py index 439845aa54..d8606efe58 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py index 53ff020b08..05aa82c929 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index 4bb6cf463f..dcaa6ceb7c 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -217,7 +217,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)): + if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10)