From 6b5daf6c9fd06497d7f9d8ee01f70bd99b0c7ea5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 4 Jul 2023 14:59:36 +0800 Subject: [PATCH] fix:add lock for tmq->clientTopic --- source/client/src/clientTmq.c | 168 +++++++++++++++++++++------------- 1 file changed, 103 insertions(+), 65 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 83550aa15d..daa5c2d2ab 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -636,6 +636,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; + taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); @@ -646,6 +647,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm pTopicName, numOfTopics); taosMemoryFree(pParamSet); pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + taosRUnLockLatch(&tmq->lock); return; } @@ -663,6 +665,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm vgId, numOfVgroups, pTopicName); taosMemoryFree(pParamSet); pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + taosRUnLockLatch(&tmq->lock); return; } @@ -679,6 +682,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm taosMemoryFree(pParamSet); pCommitFp(tmq, code, userParam); } + taosRUnLockLatch(&tmq->lock); } static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { @@ -696,6 +700,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us // init as 1 to prevent concurrency issue pParamSet->waitingRspNum = 1; + taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); @@ -725,6 +730,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us } } } + taosRUnLockLatch(&tmq->lock); tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); @@ -799,6 +805,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; + taosRLockLatch(&tmq->lock); // if(tmq->needReportOffsetRows){ req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ @@ -820,6 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) { } // tmq->needReportOffsetRows = false; // } + taosRUnLockLatch(&tmq->lock); int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { @@ -986,10 +994,12 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { if (*topics == NULL) { *topics = tmq_list_new(); } + taosRLockLatch(&tmq->lock); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); tmq_list_append(*topics, strchr(topic->topicName, '.') + 1); } + taosRUnLockLatch(&tmq->lock); return 0; } @@ -1527,12 +1537,7 @@ static void freeClientVgInfo(void* param) { static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; - int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics); - - char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", - tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); if (epoch <= tmq->epoch) { return false; } @@ -1548,6 +1553,12 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) return false; } + taosWLockLatch(&tmq->lock); + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + + char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", + tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); // todo extract method for (int32_t i = 0; i < topicNumCur; i++) { // find old topic @@ -1579,7 +1590,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) taosHashCleanup(pVgOffsetHashMap); - taosWLockLatch(&tmq->lock); // destroy current buffered existed topics info if (tmq->clientTopics) { taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); @@ -1807,6 +1817,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){ return 0; } + int32_t code = 0; + + taosWLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); @@ -1816,7 +1829,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms + if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; @@ -1831,15 +1844,17 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } atomic_store_32(&pVg->vgSkipCnt, 0); - int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout); + code = doTmqPollImpl(tmq, pTopic, pVg, timeout); if (code != TSDB_CODE_SUCCESS) { - return code; + goto end; } } } - tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId); - return 0; +end: + taosWUnLockLatch(&tmq->lock); + tscDebug("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code); + return code; } static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) { @@ -1891,12 +1906,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; if (pDataRsp->head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + taosWUnLockLatch(&tmq->lock); return NULL; } // update the epset @@ -1944,8 +1961,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); + taosWUnLockLatch(&tmq->lock); return pRsp; } + taosWUnLockLatch(&tmq->lock); } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); @@ -1960,12 +1979,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + taosWUnLockLatch(&tmq->lock); return NULL; } @@ -1977,6 +1998,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); + taosWUnLockLatch(&tmq->lock); return pRsp; } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", @@ -1989,12 +2011,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + taosWUnLockLatch(&tmq->lock); return NULL; } @@ -2017,32 +2041,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->emptyBlockReceiveTs = taosGetTimestampMs(); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - continue; } else { pVg->emptyBlockReceiveTs = 0; // reset the ts + // build rsp + void* pRsp = NULL; + int64_t numOfRows = 0; + if (pollRspWrapper->taosxRsp.createTableNum == 0) { + pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); + } else { + pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); + } + + tmq->totalRows += numOfRows; + + char buf[TSDB_OFFSET_LEN]; + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); + tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, + tmq->totalRows, pollRspWrapper->reqId); + + taosFreeQitem(pollRspWrapper); + taosWUnLockLatch(&tmq->lock); + return pRsp; } - - // build rsp - void* pRsp = NULL; - int64_t numOfRows = 0; - if (pollRspWrapper->taosxRsp.createTableNum == 0) { - pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); - } else { - pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); - } - - tmq->totalRows += numOfRows; - - char buf[TSDB_OFFSET_LEN]; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); - tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, - tmq->totalRows, pollRspWrapper->reqId); - - taosFreeQitem(pollRspWrapper); - return pRsp; - + taosWUnLockLatch(&tmq->lock); } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); @@ -2121,7 +2144,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } -static void displayConsumeStatistics(const tmq_t* pTmq) { +static void displayConsumeStatistics(tmq_t* pTmq) { + taosRLockLatch(&pTmq->lock); int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); @@ -2137,7 +2161,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) { tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); } } - + taosRUnLockLatch(&pTmq->lock); tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); } @@ -2544,14 +2568,18 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t* numOfAssignment) { *numOfAssignment = 0; *assignment = NULL; + SMqVgCommon* pCommon = NULL; int32_t accId = tmq->pTscObj->acctId; char tname[128] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); + int32_t code = TSDB_CODE_SUCCESS; + taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto end; } // in case of snapshot is opened, no valid offset will return @@ -2561,7 +2589,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a if (*assignment == NULL) { tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId, (*numOfAssignment) * sizeof(tmq_topic_assignment)); - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; } bool needFetch = false; @@ -2586,10 +2615,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } if (needFetch) { - SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon)); + pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon)); if (pCommon == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + code = terrno; + goto end; } pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment)); @@ -2604,8 +2634,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam)); if (pParam == NULL) { - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } pParam->epoch = tmq->epoch; @@ -2619,30 +2649,30 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); if (msgSize < 0) { taosMemoryFree(pParam); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } char* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { taosMemoryFree(pParam); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { taosMemoryFree(msg); taosMemoryFree(pParam); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(pParam); taosMemoryFree(msg); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; @@ -2662,20 +2692,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } tsem_wait(&pCommon->rsp); - int32_t code = pCommon->code; + code = pCommon->code; terrno = code; if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(*assignment); - *assignment = NULL; - *numOfAssignment = 0; - } else { - int32_t num = taosArrayGetSize(pCommon->pList); - for(int32_t i = 0; i < num; ++i) { - (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); - } - *numOfAssignment = num; + goto end; } + int32_t num = taosArrayGetSize(pCommon->pList); + for(int32_t i = 0; i < num; ++i) { + (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); + } + *numOfAssignment = num; for (int32_t j = 0; j < (*numOfAssignment); ++j) { tmq_topic_assignment* p = &(*assignment)[j]; @@ -2701,12 +2728,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pOffsetInfo->committedOffset.version = p->currentOffset; } } - - destroyCommonInfo(pCommon); - return code; - } else { - return TSDB_CODE_SUCCESS; } + +end: + if(code != TSDB_CODE_SUCCESS){ + taosMemoryFree(*assignment); + *assignment = NULL; + *numOfAssignment = 0; + } + destroyCommonInfo(pCommon); + taosWUnLockLatch(&tmq->lock); + return code; } void tmq_free_assignment(tmq_topic_assignment* pAssignment) { @@ -2727,9 +2759,11 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ char tname[128] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); + taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } @@ -2745,6 +2779,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (pVg == NULL) { tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } @@ -2753,12 +2788,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int32_t type = pOffsetInfo->currentOffset.type; if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } @@ -2773,6 +2810,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); + taosWUnLockLatch(&tmq->lock); SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); if (pInfo == NULL) {