fix:heap use after free of pParamSet since the first commitCb is called before send another commit message

This commit is contained in:
wangmm0220 2023-09-06 14:45:07 +08:00
parent 897fd5b8b5
commit cc62961337
1 changed files with 16 additions and 9 deletions

View File

@ -234,7 +234,6 @@ typedef struct {
int64_t refId; int64_t refId;
int32_t epoch; int32_t epoch;
int32_t waitingRspNum; int32_t waitingRspNum;
int32_t totalRspNum;
int32_t code; int32_t code;
tmq_commit_cb* callbackFn; tmq_commit_cb* callbackFn;
/*SArray* successfulOffsets;*/ /*SArray* successfulOffsets;*/
@ -513,12 +512,12 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
int64_t transporterId = 0; int64_t transporterId = 0;
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
if(code != 0){ if(code != 0){
atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
return code; return code;
} }
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
return code; return code;
} }
@ -537,7 +536,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
return NULL; return NULL;
} }
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam){ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) { if (pParamSet == NULL) {
return NULL; return NULL;
@ -547,7 +546,7 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p
pParamSet->epoch = tmq->epoch; pParamSet->epoch = tmq->epoch;
pParamSet->callbackFn = pCommitFp; pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
pParamSet->waitingRspNum = 0; pParamSet->waitingRspNum = rspNum;
return pParamSet; return pParamSet;
} }
@ -593,7 +592,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
char commitBuf[TSDB_OFFSET_LEN] = {0}; char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0);
if (pParamSet == NULL) { if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
@ -657,7 +656,8 @@ end:
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
int32_t code = 0; int32_t code = 0;
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); // init as 1 to prevent concurrency issue
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1);
if (pParamSet == NULL) { if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
@ -703,7 +703,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics);
// request is sent // request is sent
if (pParamSet->totalRspNum != 0) { if (pParamSet->waitingRspNum != 1) {
// count down since waiting rsp num init as 1
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
return; return;
} }
code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
@ -3005,7 +3007,12 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
if(code != 0){
taosMemoryFree(pParam);
taosMemoryFree(msg);
goto end;
}
} }
tsem_wait(&pCommon->rsp); tsem_wait(&pCommon->rsp);