diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0a64a02f20..0ec47a1cb3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -442,7 +442,6 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; -// taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -513,14 +512,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; + int64_t transporterId = 0; + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); + if(code != 0){ + return code; + } atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1); - - SEp* pEp = GET_ACTIVE_EP(epSet); - - - int64_t transporterId = 0; - return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); + return code; } static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { @@ -538,7 +537,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return NULL; } -static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ +static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { return NULL; @@ -548,13 +547,11 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p pParamSet->epoch = tmq->epoch; pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; - pParamSet->waitingRspNum = rspNum; + pParamSet->waitingRspNum = 0; return pParamSet; } - - static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){ SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); if (pTopic == NULL) { @@ -575,11 +572,10 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient } static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { - int32_t code = 0; tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); taosRLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; - code = getClientVg(tmq, pTopicName, vgId, &pVg); + int32_t code = getClientVg(tmq, pTopicName, vgId, &pVg); if(code != 0){ goto end; } @@ -597,7 +593,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq char commitBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -661,8 +657,7 @@ end: static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0; - // init as 1 to prevent concurrency issue - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1); + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -709,10 +704,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us // request is sent if (pParamSet->totalRspNum != 0) { - // count down since waiting rsp num init as 1 - commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } + code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; end: taosMemoryFree(pParamSet); @@ -2580,8 +2574,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } taosMemoryFree(pParamSet); -// tmq->needReportOffsetRows = true; - taosReleaseRef(tmqMgmt.rsetId, refId); return 0; } diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index fbfacd9eda..6b774b3eff 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -696,7 +696,7 @@ static int32_t g_once_commit_flag = 0; static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { + if (0 == g_once_commit_flag && code == 0) { g_once_commit_flag = 1; notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); }