feat:[TD-30270] opti close logic in tmq

This commit is contained in:
wangmm0220 2024-09-02 17:55:16 +08:00
parent 9d1ded192c
commit fb5cd43fdc
1 changed files with 144 additions and 161 deletions

View File

@ -28,6 +28,9 @@
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000
#define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_HEARTBEAT_INTERVAL 3000
#define DEFAULT_ASKEP_INTERVAL 1000 #define DEFAULT_ASKEP_INTERVAL 1000
#define DEFAULT_COMMIT_CNT 1
#define SUBSCRIBE_RETRY_MAX_COUNT 240
#define SUBSCRIBE_RETRY_INTERVAL 500
struct SMqMgmt { struct SMqMgmt {
tmr_h timer; tmr_h timer;
@ -458,6 +461,9 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
SArray* container = &list->container; SArray* container = &list->container;
if (src == NULL || src[0] == 0) return TSDB_CODE_INVALID_PARA; if (src == NULL || src[0] == 0) return TSDB_CODE_INVALID_PARA;
char* topic = taosStrdup(src); char* topic = taosStrdup(src);
if (topic == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
if (taosArrayPush(container, &topic) == NULL) return TSDB_CODE_INVALID_PARA; if (taosArrayPush(container, &topic) == NULL) return TSDB_CODE_INVALID_PARA;
return 0; return 0;
} }
@ -469,7 +475,7 @@ void tmq_list_destroy(tmq_list_t* list) {
} }
int32_t tmq_list_get_size(const tmq_list_t* list) { int32_t tmq_list_get_size(const tmq_list_t* list) {
if (list == NULL) return -1; if (list == NULL) return TSDB_CODE_INVALID_PARA;
const SArray* container = &list->container; const SArray* container = &list->container;
return taosArrayGetSize(container); return taosArrayGetSize(container);
} }
@ -481,12 +487,16 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
} }
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
if (pBuf){
taosMemoryFreeClear(pBuf->pData);
taosMemoryFreeClear(pBuf->pEpSet);
}
if(param == NULL){
return TSDB_CODE_INVALID_PARA;
}
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId); return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
} }
@ -557,7 +567,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
if (code != 0) { if (code != 0) {
(void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
return code;
} }
return code; return code;
} }
@ -613,22 +622,15 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient
return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS; return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
} }
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0;
tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
taosRLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, pTopicName, vgId, &pVg);
if (code != 0) {
goto end;
}
if (offsetVal->type <= 0) { if (offsetVal->type <= 0) {
code = TSDB_CODE_TMQ_INVALID_MSG; code = TSDB_CODE_TMQ_INVALID_MSG;
goto end; return code;
} }
if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
goto end; return code;
} }
char offsetBuf[TSDB_OFFSET_LEN] = {0}; char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
@ -636,26 +638,39 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
char commitBuf[TSDB_OFFSET_LEN] = {0}; char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
SMqCommitCbParamSet* pParamSet = NULL;
code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
if (code != 0) {
goto end;
}
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
taosMemoryFree(pParamSet); return code;
goto end;
} }
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal); tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
return code;
}
end: static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
tmq_commit_cb* pCommitFp, void* userParam) {
tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
SMqCommitCbParamSet* pParamSet = NULL;
int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
if (code != 0){
return code;
}
taosRLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
code = getClientVg(tmq, pTopicName, vgId, &pVg);
if (code == 0) {
code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
}
taosRUnLockLatch(&tmq->lock); taosRUnLockLatch(&tmq->lock);
if (code != 0){
taosMemoryFree(pParamSet);
}
return code; return code;
} }
@ -704,14 +719,8 @@ end:
} }
} }
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
int32_t code = 0; int32_t code = 0;
// init as 1 to prevent concurrency issue
SMqCommitCbParamSet* pParamSet = NULL;
code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1, &pParamSet);
if (code != 0) {
goto end;
}
taosRLockLatch(&tmq->lock); taosRLockLatch(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
@ -720,67 +729,50 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic == NULL) { if (pTopic == NULL) {
code = TSDB_CODE_TMQ_INVALID_TOPIC; code = TSDB_CODE_TMQ_INVALID_TOPIC;
taosRUnLockLatch(&tmq->lock); goto END;
goto end;
} }
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
numOfVgroups);
for (int32_t j = 0; j < numOfVgroups; j++) { for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg == NULL) { if (pVg == NULL) {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
taosRUnLockLatch(&tmq->lock); goto END;
goto end;
}
if (pVg->offsetInfo.endOffset.type > 0 &&
!tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) {
char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset);
char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64
" topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1,
numOfVgroups);
continue;
} }
tscDebug("consumer:0x%" PRIx64 code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", if (code != 0){
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); goto END;
tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset);
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
} }
} }
} }
taosRUnLockLatch(&tmq->lock); tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
numOfTopics); numOfTopics);
END:
taosRUnLockLatch(&tmq->lock);
return code;
}
// request is sent static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
if (pParamSet->waitingRspNum != 1) { int32_t code = 0;
// count down since waiting rsp num init as 1 SMqCommitCbParamSet* pParamSet = NULL;
code = commitRspCountDown(pParamSet, tmq->consumerId, "", 0); // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
if (code != 0) { if (code != 0) {
tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code)); tscError("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
pParamSet = NULL; if (pCommitFp != NULL) {
goto end; pCommitFp(tmq, code, userParam);
} }
return; return;
} }
code = innerCommitAll(tmq, pParamSet);
if (code != 0){
tscError("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
}
end: code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
taosMemoryFree(pParamSet); if (code != 0) {
if (pCommitFp != NULL) { tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
pCommitFp(tmq, code, userParam);
} }
return; return;
} }
@ -824,18 +816,18 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
} }
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0) {
goto _return;
}
if (pMsg == NULL || param == NULL) { if (pMsg == NULL || param == NULL) {
code = TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
goto _return; }
if (code != 0) {
goto END;
} }
SMqHbRsp rsp = {0}; SMqHbRsp rsp = {0};
code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
if (code != 0) { if (code != 0) {
goto _return; goto END;
} }
int64_t refId = (int64_t)param; int64_t refId = (int64_t)param;
@ -861,8 +853,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
tDestroySMqHbRsp(&rsp); tDestroySMqHbRsp(&rsp);
_return: END:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
return code; return code;
@ -882,7 +873,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
req.pollFlag = atomic_load_8(&pollFlag); req.pollFlag = atomic_load_8(&pollFlag);
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
if (req.topics == NULL) { if (req.topics == NULL) {
return; goto END;
} }
taosRLockLatch(&tmq->lock); taosRLockLatch(&tmq->lock);
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
@ -924,25 +915,25 @@ void tmqSendHbReq(void* param, void* tmrId) {
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
if (tlen < 0) { if (tlen < 0) {
tscError("tSerializeSMqHbReq failed"); tscError("tSerializeSMqHbReq failed");
goto OVER; goto END;
} }
void* pReq = taosMemoryCalloc(1, tlen); void* pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) { if (tlen < 0) {
tscError("failed to malloc MqHbReq msg, size:%d", tlen); tscError("failed to malloc MqHbReq msg, size:%d", tlen);
goto OVER; goto END;
} }
if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) { if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
tscError("tSerializeSMqHbReq %d failed", tlen); tscError("tSerializeSMqHbReq %d failed", tlen);
taosMemoryFree(pReq); taosMemoryFree(pReq);
goto OVER; goto END;
} }
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) { if (sendInfo == NULL) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
goto OVER; goto END;
} }
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
@ -960,9 +951,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
if (code != 0) { if (code != 0) {
tscError("tmqSendHbReq asyncSendMsgToServer failed"); tscError("tmqSendHbReq asyncSendMsgToServer failed");
} }
(void)atomic_val_compare_exchange_8(&pollFlag, 1, 0); (void)atomic_val_compare_exchange_8(&pollFlag, 1, 0);
OVER:
END:
tDestroySMqHbReq(&req); tDestroySMqHbReq(&req);
if (tmrId != NULL) { if (tmrId != NULL) {
(void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); (void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
@ -1000,21 +991,18 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
while (pTaskType != NULL) { while (pTaskType != NULL) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
tscDebug("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
code = askEp(pTmq, NULL, false, false); code = askEp(pTmq, NULL, false, false);
if (code != 0) { if (code != 0) {
tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
continue;
} }
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer);
(void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0); pTmq->autoCommitInterval / 1000.0);
(void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, (void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer);
&pTmq->commitTimer);
} else { } else {
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
} }
@ -1076,6 +1064,10 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
} }
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
taosMemoryFreeClear(pMsg->pEpSet);
}
if (param == NULL) { if (param == NULL) {
return code; return code;
} }
@ -1083,9 +1075,6 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code; pParam->rspErr = code;
if (pMsg) {
taosMemoryFree(pMsg->pEpSet);
}
(void)tsem2_post(&pParam->rspSem); (void)tsem2_post(&pParam->rspSem);
return 0; return 0;
} }
@ -1161,7 +1150,7 @@ void tmqFreeImpl(void* handle) {
} }
taosMemoryFree(tmq); taosMemoryFree(tmq);
tscDebug("consumer:0x%" PRIx64 " closed", id); tscInfo("consumer:0x%" PRIx64 " closed", id);
} }
static void tmqMgmtInit(void) { static void tmqMgmtInit(void) {
@ -1263,8 +1252,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->epoch = 0; pTmq->epoch = 0;
// set conf // set conf
(void)strcpy(pTmq->clientId, conf->clientId); tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
(void)strcpy(pTmq->groupId, conf->groupId); tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
pTmq->withTbName = conf->withTbName; pTmq->withTbName = conf->withTbName;
pTmq->useSnapshot = conf->snapEnable; pTmq->useSnapshot = conf->snapEnable;
pTmq->autoCommit = conf->autoCommit; pTmq->autoCommit = conf->autoCommit;
@ -1280,7 +1269,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->enableBatchMeta = conf->enableBatchMeta; pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN); tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) { if (taosGetFqdn(pTmq->fqdn) != 0) {
(void)strcpy(pTmq->fqdn, "localhost"); tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
} }
if (conf->replayEnable) { if (conf->replayEnable) {
pTmq->autoCommit = false; pTmq->autoCommit = false;
@ -1303,7 +1292,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (code) { if (code) {
terrno = code; terrno = code;
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
(void)tsem2_destroy(&pTmq->rspSem);
SET_ERROR_MSG_TMQ("init tscObj failed") SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed; goto _failed;
} }
@ -1336,7 +1324,6 @@ _failed:
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA; if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
@ -1355,7 +1342,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) { if (req.topicNames == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto END;
} }
req.withTbName = tmq->withTbName; req.withTbName = tmq->withTbName;
@ -1370,20 +1357,20 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i); char* topic = taosArrayGetP(container, i);
if (topic == NULL) { if (topic == NULL) {
code = TSDB_CODE_INVALID_PARA; code = terrno;
goto FAIL; goto END;
} }
SName name = {0}; SName name = {0};
code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
if (code) { if (code) {
tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
code); code);
goto FAIL; goto END;
} }
char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFName == NULL) { if (topicFName == NULL) {
code = terrno; code = terrno;
goto FAIL; goto END;
} }
code = tNameExtractFullName(&name, topicFName); code = tNameExtractFullName(&name, topicFName);
@ -1391,13 +1378,13 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
code); code);
taosMemoryFree(topicFName); taosMemoryFree(topicFName);
goto FAIL; goto END;
} }
if (taosArrayPush(req.topicNames, &topicFName) == NULL) { if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
taosMemoryFree(topicFName); taosMemoryFree(topicFName);
goto FAIL; goto END;
} }
tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
} }
@ -1405,8 +1392,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req); int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
buf = taosMemoryMalloc(tlen); buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
goto FAIL; goto END;
} }
void* abuf = buf; void* abuf = buf;
@ -1416,7 +1403,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (sendInfo == NULL) { if (sendInfo == NULL) {
code = terrno; code = terrno;
taosMemoryFree(buf); taosMemoryFree(buf);
goto FAIL; goto END;
} }
SMqSubscribeCbParam param = {.rspErr = 0}; SMqSubscribeCbParam param = {.rspErr = 0};
@ -1424,7 +1411,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code = TSDB_CODE_TSC_INTERNAL_ERROR; code = TSDB_CODE_TSC_INTERNAL_ERROR;
taosMemoryFree(buf); taosMemoryFree(buf);
taosMemoryFree(sendInfo); taosMemoryFree(sendInfo);
goto FAIL; goto END;
} }
sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
@ -1439,7 +1426,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
goto FAIL; goto END;
} }
(void)tsem2_wait(&param.rspSem); (void)tsem2_wait(&param.rspSem);
@ -1447,22 +1434,22 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (param.rspErr != 0) { if (param.rspErr != 0) {
code = param.rspErr; code = param.rspErr;
goto FAIL; goto END;
} }
int32_t retryCnt = 0; int32_t retryCnt = 0;
while ((code = syncAskEp(tmq)) != 0) { while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
tmq->consumerId, tstrerror(code)); tmq->consumerId, tstrerror(code));
if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
code = 0; code = 0;
} }
goto FAIL; goto END;
} }
tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500); taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
} }
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
@ -1470,10 +1457,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
code = TSDB_CODE_TSC_INTERNAL_ERROR; code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto FAIL; goto END;
} }
FAIL: END:
taosArrayDestroyP(req.topicNames, taosMemoryFree); taosArrayDestroyP(req.topicNames, taosMemoryFree);
return code; return code;
@ -2495,53 +2482,46 @@ static void displayConsumeStatistics(tmq_t* pTmq) {
} }
static int32_t innerClose(tmq_t* tmq) { static int32_t innerClose(tmq_t* tmq) {
if (tmq->status != TMQ_CONSUMER_STATUS__READY) { int32_t code = 0;
tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId); int8_t status = atomic_load_8(&tmq->status);
return 0; if (status == TMQ_CONSUMER_STATUS__CLOSED || status != TMQ_CONSUMER_STATUS__READY) {
tscInfo("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, close it directly", tmq->consumerId, status);
goto END;
} }
if (tmq->autoCommit) { if (tmq->autoCommit) {
int32_t code = tmq_commit_sync(tmq, NULL); code = tmq_commit_sync(tmq, NULL);
if (code != 0) { if (code != 0) {
return code; goto END;
} }
} }
tmqSendHbReq((void*)(tmq->refId), NULL); tmqSendHbReq((void*)(tmq->refId), NULL);
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
if (lst == NULL) { if (lst == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
} }
int32_t code = tmq_subscribe(tmq, lst); code = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst); tmq_list_destroy(lst);
if(code != 0){
goto END;
}
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
END:
return code; return code;
} }
int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_unsubscribe(tmq_t* tmq) {
if (tmq == NULL) return TSDB_CODE_INVALID_PARA; if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, tmq->status); tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, tmq->status);
int32_t code = 0; return innerClose(tmq);
if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) {
code = innerClose(tmq);
if (code == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
}
}
return code;
} }
int32_t tmq_consumer_close(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) {
if (tmq == NULL) return TSDB_CODE_INVALID_PARA; if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq); displayConsumeStatistics(tmq);
int32_t code = 0; int32_t code = innerClose(tmq);
if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) {
code = innerClose(tmq);
if (code == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
}
}
if (code == 0) { if (code == 0) {
(void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
} }
@ -2715,7 +2695,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
tscError("failed to allocate memory for sync commit"); tscError("failed to allocate memory for sync commit");
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
if (tsem2_init(&pInfo->sem, 0, 0) != 0) { if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
tscError("failed to init sem for sync commit"); tscError("failed to init sem for sync commit");
@ -2856,11 +2836,11 @@ end:
} }
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (param == NULL) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
if (pParam == NULL) {
goto FAIL; goto FAIL;
} }
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) { if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED; code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
@ -2906,7 +2886,7 @@ END:
(void)taosReleaseRef(tmqMgmt.rsetId, pParam->refId); (void)taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
FAIL: FAIL:
if (pParam->sync) { if (pParam && pParam->sync) {
SAskEpInfo* pInfo = pParam->pParam; SAskEpInfo* pInfo = pParam->pParam;
if (pInfo) { if (pInfo) {
pInfo->code = code; pInfo->code = code;
@ -2945,7 +2925,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
SMqAskEpReq req = {0}; SMqAskEpReq req = {0};
req.consumerId = pTmq->consumerId; req.consumerId = pTmq->consumerId;
req.epoch = updateEpSet ? -1 : pTmq->epoch; req.epoch = updateEpSet ? -1 : pTmq->epoch;
(void)strcpy(req.cgroup, pTmq->groupId); tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
int code = 0; int code = 0;
SMqAskEpCbParam* pParam = NULL; SMqAskEpCbParam* pParam = NULL;
void* pReq = NULL; void* pReq = NULL;
@ -3001,11 +2981,10 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
int64_t refId = pParamSet->refId; int64_t refId = pParamSet->refId;
int32_t code = 0;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
taosMemoryFree(pParamSet); code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return TSDB_CODE_TMQ_CONSUMER_CLOSED;
} }
// if no more waiting rsp // if no more waiting rsp
@ -3014,7 +2993,11 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
} }
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
return taosReleaseRef(tmqMgmt.rsetId, refId); if (tmq != NULL) {
code = taosReleaseRef(tmqMgmt.rsetId, refId);
}
return code;
} }
int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {