From cc62961337e692be232988eb57d6cddbfeff840e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 6 Sep 2023 14:45:07 +0800 Subject: [PATCH] fix:heap use after free of pParamSet since the first commitCb is called before send another commit message --- source/client/src/clientTmq.c | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3969a68fd1..d76aa2456e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -234,7 +234,6 @@ typedef struct { int64_t refId; int32_t epoch; int32_t waitingRspNum; - int32_t totalRspNum; int32_t code; tmq_commit_cb* callbackFn; /*SArray* successfulOffsets;*/ @@ -513,12 +512,12 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; int64_t transporterId = 0; + atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); if(code != 0){ + atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); return code; } - atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); - atomic_add_fetch_32(&pParamSet->totalRspNum, 1); return code; } @@ -537,7 +536,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return NULL; } -static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam){ +static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { return NULL; @@ -547,7 +546,7 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p pParamSet->epoch = tmq->epoch; pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; - pParamSet->waitingRspNum = 0; + pParamSet->waitingRspNum = rspNum; return pParamSet; } @@ -593,7 +592,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq char commitBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -657,7 +656,8 @@ end: static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0; - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); + // init as 1 to prevent concurrency issue + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -703,7 +703,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); // request is sent - if (pParamSet->totalRspNum != 0) { + if (pParamSet->waitingRspNum != 1) { + // count down since waiting rsp num init as 1 + commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; @@ -3005,7 +3007,12 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); + if(code != 0){ + taosMemoryFree(pParam); + taosMemoryFree(msg); + goto end; + } } tsem_wait(&pCommon->rsp);