From fb5cd43fdcc2b2d37fb0b257ba2dff3a7b9ad47b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 2 Sep 2024 17:55:16 +0800 Subject: [PATCH] feat:[TD-30270] opti close logic in tmq --- source/client/src/clientTmq.c | 305 ++++++++++++++++------------------ 1 file changed, 144 insertions(+), 161 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3927172b61..5fd0377522 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -28,6 +28,9 @@ #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_ASKEP_INTERVAL 1000 +#define DEFAULT_COMMIT_CNT 1 +#define SUBSCRIBE_RETRY_MAX_COUNT 240 +#define SUBSCRIBE_RETRY_INTERVAL 500 struct SMqMgmt { tmr_h timer; @@ -458,6 +461,9 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { SArray* container = &list->container; if (src == NULL || src[0] == 0) return TSDB_CODE_INVALID_PARA; char* topic = taosStrdup(src); + if (topic == NULL){ + return TSDB_CODE_OUT_OF_MEMORY; + } if (taosArrayPush(container, &topic) == NULL) return TSDB_CODE_INVALID_PARA; return 0; } @@ -469,7 +475,7 @@ void tmq_list_destroy(tmq_list_t* list) { } int32_t tmq_list_get_size(const tmq_list_t* list) { - if (list == NULL) return -1; + if (list == NULL) return TSDB_CODE_INVALID_PARA; const SArray* container = &list->container; return taosArrayGetSize(container); } @@ -481,12 +487,16 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { } static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { + if (pBuf){ + taosMemoryFreeClear(pBuf->pData); + taosMemoryFreeClear(pBuf->pEpSet); + } + if(param == NULL){ + return TSDB_CODE_INVALID_PARA; + } SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; - taosMemoryFree(pBuf->pData); - taosMemoryFree(pBuf->pEpSet); - return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId); } @@ -557,7 +567,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); if (code != 0) { (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); - return code; } return code; } @@ -613,22 +622,15 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS; } -static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, - tmq_commit_cb* pCommitFp, void* userParam) { - tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); - taosRLockLatch(&tmq->lock); - SMqClientVg* pVg = NULL; - int32_t code = getClientVg(tmq, pTopicName, vgId, &pVg); - if (code != 0) { - goto end; - } +static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){ + int32_t code = 0; if (offsetVal->type <= 0) { code = TSDB_CODE_TMQ_INVALID_MSG; - goto end; + return code; } if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; - goto end; + return code; } char offsetBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); @@ -636,26 +638,39 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq char commitBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - SMqCommitCbParamSet* pParamSet = NULL; - code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet); - if (code != 0) { - goto end; - } - code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); - taosMemoryFree(pParamSet); - goto end; + return code; } tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal); + return code; +} -end: +static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, + tmq_commit_cb* pCommitFp, void* userParam) { + tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + SMqCommitCbParamSet* pParamSet = NULL; + int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet); + if (code != 0){ + return code; + } + + taosRLockLatch(&tmq->lock); + SMqClientVg* pVg = NULL; + code = getClientVg(tmq, pTopicName, vgId, &pVg); + if (code == 0) { + code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet); + } taosRUnLockLatch(&tmq->lock); + + if (code != 0){ + taosMemoryFree(pParamSet); + } return code; } @@ -704,14 +719,8 @@ end: } } -static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { +static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){ int32_t code = 0; - // init as 1 to prevent concurrency issue - SMqCommitCbParamSet* pParamSet = NULL; - code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1, &pParamSet); - if (code != 0) { - goto end; - } taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); @@ -720,67 +729,50 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); if (pTopic == NULL) { code = TSDB_CODE_TMQ_INVALID_TOPIC; - taosRUnLockLatch(&tmq->lock); - goto end; + goto END; } int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, - numOfVgroups); + tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (pVg == NULL) { code = TSDB_CODE_INVALID_PARA; - taosRUnLockLatch(&tmq->lock); - goto end; + goto END; } - if (pVg->offsetInfo.endOffset.type > 0 && - !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) { - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); - char commitBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - - code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet); - if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 - " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1, - numOfVgroups); - continue; - } - - tscDebug("consumer:0x%" PRIx64 - " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); - tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset); - } else { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); + code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet); + if (code != 0){ + goto END; } } } - taosRUnLockLatch(&tmq->lock); - - tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, + tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT, numOfTopics); +END: + taosRUnLockLatch(&tmq->lock); + return code; +} - // request is sent - if (pParamSet->waitingRspNum != 1) { - // count down since waiting rsp num init as 1 - code = commitRspCountDown(pParamSet, tmq->consumerId, "", 0); - if (code != 0) { - tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code)); - pParamSet = NULL; - goto end; +static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { + int32_t code = 0; + SMqCommitCbParamSet* pParamSet = NULL; + // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue + code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet); + if (code != 0) { + tscError("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code)); + if (pCommitFp != NULL) { + pCommitFp(tmq, code, userParam); } return; } + code = innerCommitAll(tmq, pParamSet); + if (code != 0){ + tscError("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code)); + } -end: - taosMemoryFree(pParamSet); - if (pCommitFp != NULL) { - pCommitFp(tmq, code, userParam); + code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1); + if (code != 0) { + tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code)); } return; } @@ -824,18 +816,18 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { - if (code != 0) { - goto _return; - } if (pMsg == NULL || param == NULL) { - code = TSDB_CODE_INVALID_PARA; - goto _return; + return TSDB_CODE_INVALID_PARA; + } + + if (code != 0) { + goto END; } SMqHbRsp rsp = {0}; code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); if (code != 0) { - goto _return; + goto END; } int64_t refId = (int64_t)param; @@ -861,8 +853,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { tDestroySMqHbRsp(&rsp); -_return: - +END: taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); return code; @@ -882,7 +873,7 @@ void tmqSendHbReq(void* param, void* tmrId) { req.pollFlag = atomic_load_8(&pollFlag); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); if (req.topics == NULL) { - return; + goto END; } taosRLockLatch(&tmq->lock); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -924,25 +915,25 @@ void tmqSendHbReq(void* param, void* tmrId) { int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { tscError("tSerializeSMqHbReq failed"); - goto OVER; + goto END; } void* pReq = taosMemoryCalloc(1, tlen); if (tlen < 0) { tscError("failed to malloc MqHbReq msg, size:%d", tlen); - goto OVER; + goto END; } if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) { tscError("tSerializeSMqHbReq %d failed", tlen); taosMemoryFree(pReq); - goto OVER; + goto END; } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(pReq); - goto OVER; + goto END; } sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; @@ -960,9 +951,9 @@ void tmqSendHbReq(void* param, void* tmrId) { if (code != 0) { tscError("tmqSendHbReq asyncSendMsgToServer failed"); } - (void)atomic_val_compare_exchange_8(&pollFlag, 1, 0); -OVER: + +END: tDestroySMqHbReq(&req); if (tmrId != NULL) { (void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); @@ -1000,21 +991,18 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { + tscDebug("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId); code = askEp(pTmq, NULL, false, false); if (code != 0) { tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); - continue; } - tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); - (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, - &pTmq->epTimer); + (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; + tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn; asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); - (void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, - &pTmq->commitTimer); + (void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); } else { tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } @@ -1076,6 +1064,10 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { + if (pMsg) { + taosMemoryFreeClear(pMsg->pEpSet); + } + if (param == NULL) { return code; } @@ -1083,9 +1075,6 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; - if (pMsg) { - taosMemoryFree(pMsg->pEpSet); - } (void)tsem2_post(&pParam->rspSem); return 0; } @@ -1161,7 +1150,7 @@ void tmqFreeImpl(void* handle) { } taosMemoryFree(tmq); - tscDebug("consumer:0x%" PRIx64 " closed", id); + tscInfo("consumer:0x%" PRIx64 " closed", id); } static void tmqMgmtInit(void) { @@ -1263,8 +1252,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->epoch = 0; // set conf - (void)strcpy(pTmq->clientId, conf->clientId); - (void)strcpy(pTmq->groupId, conf->groupId); + tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN); + tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN); pTmq->withTbName = conf->withTbName; pTmq->useSnapshot = conf->snapEnable; pTmq->autoCommit = conf->autoCommit; @@ -1280,7 +1269,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->enableBatchMeta = conf->enableBatchMeta; tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (taosGetFqdn(pTmq->fqdn) != 0) { - (void)strcpy(pTmq->fqdn, "localhost"); + tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN); } if (conf->replayEnable) { pTmq->autoCommit = false; @@ -1303,7 +1292,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { if (code) { terrno = code; tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); - (void)tsem2_destroy(&pTmq->rspSem); SET_ERROR_MSG_TMQ("init tscObj failed") goto _failed; } @@ -1336,7 +1324,6 @@ _failed: int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA; - const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; @@ -1355,7 +1342,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } req.withTbName = tmq->withTbName; @@ -1370,20 +1357,20 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); if (topic == NULL) { - code = TSDB_CODE_INVALID_PARA; - goto FAIL; + code = terrno; + goto END; } SName name = {0}; code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); if (code) { tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, code); - goto FAIL; + goto END; } char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); if (topicFName == NULL) { code = terrno; - goto FAIL; + goto END; } code = tNameExtractFullName(&name, topicFName); @@ -1391,13 +1378,13 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, code); taosMemoryFree(topicFName); - goto FAIL; + goto END; } if (taosArrayPush(req.topicNames, &topicFName) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; taosMemoryFree(topicFName); - goto FAIL; + goto END; } tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); } @@ -1405,8 +1392,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req); buf = taosMemoryMalloc(tlen); if (buf == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + code = terrno; + goto END; } void* abuf = buf; @@ -1416,7 +1403,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (sendInfo == NULL) { code = terrno; taosMemoryFree(buf); - goto FAIL; + goto END; } SMqSubscribeCbParam param = {.rspErr = 0}; @@ -1424,7 +1411,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { code = TSDB_CODE_TSC_INTERNAL_ERROR; taosMemoryFree(buf); taosMemoryFree(sendInfo); - goto FAIL; + goto END; } sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; @@ -1439,7 +1426,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (code != 0) { - goto FAIL; + goto END; } (void)tsem2_wait(¶m.rspSem); @@ -1447,22 +1434,22 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (param.rspErr != 0) { code = param.rspErr; - goto FAIL; + goto END; } int32_t retryCnt = 0; while ((code = syncAskEp(tmq)) != 0) { - if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { + if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, tstrerror(code)); if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { code = 0; } - goto FAIL; + goto END; } tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); - taosMsleep(500); + taosMsleep(SUBSCRIBE_RETRY_INTERVAL); } tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); @@ -1470,10 +1457,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { code = TSDB_CODE_TSC_INTERNAL_ERROR; - goto FAIL; + goto END; } -FAIL: +END: taosArrayDestroyP(req.topicNames, taosMemoryFree); return code; @@ -2495,53 +2482,46 @@ static void displayConsumeStatistics(tmq_t* pTmq) { } static int32_t innerClose(tmq_t* tmq) { - if (tmq->status != TMQ_CONSUMER_STATUS__READY) { - tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId); - return 0; + int32_t code = 0; + int8_t status = atomic_load_8(&tmq->status); + if (status == TMQ_CONSUMER_STATUS__CLOSED || status != TMQ_CONSUMER_STATUS__READY) { + tscInfo("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, close it directly", tmq->consumerId, status); + goto END; } if (tmq->autoCommit) { - int32_t code = tmq_commit_sync(tmq, NULL); + code = tmq_commit_sync(tmq, NULL); if (code != 0) { - return code; + goto END; } } tmqSendHbReq((void*)(tmq->refId), NULL); tmq_list_t* lst = tmq_list_new(); if (lst == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } - int32_t code = tmq_subscribe(tmq, lst); + code = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); + if(code != 0){ + goto END; + } + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); + +END: return code; } int32_t tmq_unsubscribe(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; - tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, tmq->status); - int32_t code = 0; - if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) { - code = innerClose(tmq); - if (code == 0) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); - } - } - return code; + return innerClose(tmq); } int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; - tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); displayConsumeStatistics(tmq); - int32_t code = 0; - if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) { - code = innerClose(tmq); - if (code == 0) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); - } - } - + int32_t code = innerClose(tmq); if (code == 0) { (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); } @@ -2715,7 +2695,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); if (pInfo == NULL) { tscError("failed to allocate memory for sync commit"); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (tsem2_init(&pInfo->sem, 0, 0) != 0) { tscError("failed to init sem for sync commit"); @@ -2856,12 +2836,12 @@ end: } int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { - if (param == NULL) { + SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; + if (pParam == NULL) { goto FAIL; } - SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { code = TSDB_CODE_TMQ_CONSUMER_CLOSED; goto FAIL; @@ -2906,7 +2886,7 @@ END: (void)taosReleaseRef(tmqMgmt.rsetId, pParam->refId); FAIL: - if (pParam->sync) { + if (pParam && pParam->sync) { SAskEpInfo* pInfo = pParam->pParam; if (pInfo) { pInfo->code = code; @@ -2945,7 +2925,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMqAskEpReq req = {0}; req.consumerId = pTmq->consumerId; req.epoch = updateEpSet ? -1 : pTmq->epoch; - (void)strcpy(req.cgroup, pTmq->groupId); + tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN); int code = 0; SMqAskEpCbParam* pParam = NULL; void* pReq = NULL; @@ -3001,11 +2981,10 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { int64_t refId = pParamSet->refId; - + int32_t code = 0; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - taosMemoryFree(pParamSet); - return TSDB_CODE_TMQ_CONSUMER_CLOSED; + code = TSDB_CODE_TMQ_CONSUMER_CLOSED; } // if no more waiting rsp @@ -3014,7 +2993,11 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } taosMemoryFree(pParamSet); - return taosReleaseRef(tmqMgmt.rsetId, refId); + if (tmq != NULL) { + code = taosReleaseRef(tmqMgmt.rsetId, refId); + } + + return code; } int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {