diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 825f7ef182..ba223984e3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -214,6 +214,8 @@ typedef struct { } SMqCommitCbParam; static int32_t tmqAskEp(tmq_t* tmq, bool async); +static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); +static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); @@ -380,42 +382,6 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { - return sprintf(dst, "%s:%d", topicName, vg); -} - -int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId); - if (tmq == NULL) { - if (!pParamSet->async) { - tsem_destroy(&pParamSet->rspSem); - } - taosMemoryFree(pParamSet); - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return -1; - } - - // if no more waiting rsp - if (pParamSet->async) { - // call async cb func - if (pParamSet->automatic && tmq->commitCb) { - tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam); - } else if (!pParamSet->automatic && pParamSet->userCb) { - // sem post - pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam); - } - taosMemoryFree(pParamSet); - } else { - tsem_post(&pParamSet->rspSem); - } - -#if 0 - taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); - taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); -#endif - return 0; -} - static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) { int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); if (waitingRspNum == 0) { @@ -728,8 +694,8 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, return code; } -int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, - void* userParam) { +static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, + void* userParam) { if (msg) { return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam); } else { @@ -1550,90 +1516,6 @@ END: return code; } -int32_t tmqAskEp(tmq_t* tmq, bool async) { - int32_t code = TSDB_CODE_SUCCESS; -#if 0 - int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); - if (epStatus == 1) { - int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); - tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt); - if (epSkipCnt < 5000) return 0; - } - atomic_store_32(&tmq->epSkipCnt, 0); -#endif - - SMqAskEpReq req = {0}; - req.consumerId = tmq->consumerId; - req.epoch = tmq->epoch; - strcpy(req.cgroup, tmq->groupId); - - int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); - if (tlen < 0) { - tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId); - return -1; - } - - void* pReq = taosMemoryCalloc(1, tlen); - if (pReq == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { - tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen); - taosMemoryFree(pReq); - return -1; - } - - SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); - if (pParam == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId); - taosMemoryFree(pReq); - return -1; - } - - pParam->refId = tmq->refId; - pParam->epoch = tmq->epoch; - pParam->async = async; - tsem_init(&pParam->rspSem, 0, 0); - - SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) { - tsem_destroy(&pParam->rspSem); - taosMemoryFree(pParam); - taosMemoryFree(pReq); - return -1; - } - - sendInfo->msgInfo = (SDataBuf){ - .pData = pReq, - .len = tlen, - .handle = NULL, - }; - - sendInfo->requestId = generateRequestId(); - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmqAskEpCb; - sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; - - SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async, - sendInfo->requestId); - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - - if (!async) { - tsem_wait(&pParam->rspSem); - code = pParam->code; - taosMemoryFree(pParam); - } - - return code; -} - void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { int32_t groupLen = strlen(tmq->groupId); memcpy(pReq->subKey, tmq->groupId, groupLen); @@ -2136,3 +2018,123 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL); } + +int32_t tmqAskEp(tmq_t* tmq, bool async) { + int32_t code = TSDB_CODE_SUCCESS; +#if 0 + int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); + if (epStatus == 1) { + int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); + tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt); + if (epSkipCnt < 5000) return 0; + } + atomic_store_32(&tmq->epSkipCnt, 0); +#endif + + SMqAskEpReq req = {0}; + req.consumerId = tmq->consumerId; + req.epoch = tmq->epoch; + strcpy(req.cgroup, tmq->groupId); + + int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); + if (tlen < 0) { + tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId); + return -1; + } + + void* pReq = taosMemoryCalloc(1, tlen); + if (pReq == NULL) { + tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { + tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen); + taosMemoryFree(pReq); + return -1; + } + + SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); + if (pParam == NULL) { + tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId); + taosMemoryFree(pReq); + return -1; + } + + pParam->refId = tmq->refId; + pParam->epoch = tmq->epoch; + pParam->async = async; + tsem_init(&pParam->rspSem, 0, 0); + + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + tsem_destroy(&pParam->rspSem); + taosMemoryFree(pParam); + taosMemoryFree(pReq); + return -1; + } + + sendInfo->msgInfo = (SDataBuf){ + .pData = pReq, + .len = tlen, + .handle = NULL, + }; + + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqAskEpCb; + sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; + + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async, + sendInfo->requestId); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + if (!async) { + tsem_wait(&pParam->rspSem); + code = pParam->code; + taosMemoryFree(pParam); + } + + return code; +} + +int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { + return sprintf(dst, "%s:%d", topicName, vg); +} + +int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId); + if (tmq == NULL) { + if (!pParamSet->async) { + tsem_destroy(&pParamSet->rspSem); + } + taosMemoryFree(pParamSet); + terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; + return -1; + } + + // if no more waiting rsp + if (pParamSet->async) { + // call async cb func + if (pParamSet->automatic && tmq->commitCb) { + tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam); + } else if (!pParamSet->automatic && pParamSet->userCb) { + // sem post + pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam); + } + taosMemoryFree(pParamSet); + } else { + tsem_post(&pParamSet->rspSem); + } + +#if 0 + taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); + taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); +#endif + return 0; +} \ No newline at end of file