diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 54cacfe156..825f7ef182 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -147,7 +147,6 @@ typedef struct { } SMqClientVg; typedef struct { - // subscribe info char topicName[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; SArray* vgs; // SArray @@ -381,7 +380,7 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { +static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } @@ -424,7 +423,7 @@ static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) { } } -int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { +static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; @@ -478,7 +477,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { return 0; } -static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) { +static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet) { STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); if (pOffset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -490,7 +489,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT int32_t groupLen = strlen(tmq->groupId); memcpy(pOffset->subKey, tmq->groupId, groupLen); pOffset->subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName); + strcpy(pOffset->subKey + groupLen + 1, pTopicName); int32_t len; int32_t code; @@ -527,7 +526,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pParam->vgId = pVg->vgId; pParam->pTmq = tmq; - tstrncpy(pParam->topicName, pTopic->topicName, tListLen(pParam->topicName)); + tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); // build send info SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -544,7 +543,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT .handle = NULL, }; - SEp* pEp = &pVg->epSet.eps[pVg->epSet.inUse]; + SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d", tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port); @@ -567,7 +566,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT return 0; } -int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) { +static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) { char* topic; int32_t vgId; if (TD_RES_TMQ(msg)) { @@ -591,6 +590,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; pParamSet->automatic = 0; @@ -601,15 +601,18 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm int32_t code = -1; + taosThreadMutexLock(&tmq->lock); for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); if (strcmp(pTopic->topicName, topic) != 0) continue; for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->vgId != vgId) continue; + if (pVg->vgId != vgId) { + continue; + } if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { - if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { + if (tmqSendCommitReq(tmq, pVg, pTopic->topicName, pParamSet) < 0) { tsem_destroy(&pParamSet->rspSem); taosMemoryFree(pParamSet); goto FAIL; @@ -623,6 +626,7 @@ HANDLE_RSP: if (pParamSet->totalRspNum == 0) { tsem_destroy(&pParamSet->rspSem); taosMemoryFree(pParamSet); + taosThreadMutexUnlock(&tmq->lock); return 0; } @@ -631,15 +635,18 @@ HANDLE_RSP: code = pParamSet->rspErr; tsem_destroy(&pParamSet->rspSem); taosMemoryFree(pParamSet); + taosThreadMutexUnlock(&tmq->lock); return code; } else { code = 0; } FAIL: + taosThreadMutexUnlock(&tmq->lock); if (code != 0 && async) { userCb(tmq, code, userParam); } + return 0; } @@ -672,7 +679,9 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, // init as 1 to prevent concurrency issue pParamSet->waitingRspNum = 1; + taosThreadMutexLock(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); + tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); for (int32_t i = 0; i < numOfTopics; i++) { @@ -681,18 +690,20 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, // todo race condition: fix it int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < numOfVgroups; j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { - if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { + SMqClientVg clientVg = *(SMqClientVg*)taosArrayGet(pTopic->vgs, j); + if (clientVg.currentOffset.type > 0 && !tOffsetEqual(&clientVg.currentOffset, &clientVg.committedOffset)) { + if (tmqSendCommitReq(tmq, &clientVg, pTopic->topicName, pParamSet) < 0) { continue; } } else { tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups); + tmq->consumerId, pTopic->topicName, clientVg.vgId, clientVg.currentOffset.version, j + 1, numOfVgroups); } } } + taosThreadMutexUnlock(&tmq->lock); + // no request is sent if (pParamSet->totalRspNum == 0) { tsem_destroy(&pParamSet->rspSem); @@ -1369,8 +1380,10 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic for (int32_t j = 0; j < vgNumGet; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - sprintf(vgKey, "%s:%d", pTopic->topicName, pVgEp->vgId); + + makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId); STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey)); + STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg}; if (pOffset != NULL) { offsetNew = *pOffset; @@ -1428,7 +1441,8 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); - sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); + makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); + char buf[80]; tFormatOffset(buf, 80, &pVgCur->currentOffset); tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, @@ -1454,7 +1468,6 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { } tmq->clientTopics = newTopics; - taosThreadMutexUnlock(&tmq->lock); int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY; @@ -1465,7 +1478,7 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { return set; } -int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { +static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; int8_t async = pParam->async; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); @@ -1757,7 +1770,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p } // broadcast the poll request to all related vnodes -int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { +static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); @@ -1793,7 +1806,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { return 0; } -int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) { +static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { @@ -1813,7 +1826,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) return 0; } -void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { +static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); while (1) { @@ -2117,11 +2130,9 @@ const char* tmq_get_table_name(TAOS_RES* res) { } void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { - // tmqCommitInner(tmq, msg, 0, 1, cb, param); } int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { - // return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL); }