fix:[TD-30306]error in converity scan
This commit is contained in:
parent
7182e0948e
commit
d85d3aa70e
|
@ -858,7 +858,10 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
tscError("tmqSendHbReq asyncSendMsgToServer failed");
|
||||
}
|
||||
|
||||
OVER:
|
||||
tDestroySMqHbReq(&req);
|
||||
|
@ -1220,12 +1223,15 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(buf);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
SMqSubscribeCbParam param = {.rspErr = 0};
|
||||
if (tsem_init(¶m.rspSem, 0, 0) != 0) {
|
||||
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(sendInfo);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -1245,10 +1251,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
// avoid double free if msg is sent
|
||||
buf = NULL;
|
||||
sendInfo = NULL;
|
||||
|
||||
tsem_wait(¶m.rspSem);
|
||||
tsem_destroy(¶m.rspSem);
|
||||
|
||||
|
@ -1284,8 +1286,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
|
||||
FAIL:
|
||||
taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(sendInfo);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1745,6 +1745,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||
if (pParam == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(msg);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -1756,6 +1757,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(msg);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -1782,7 +1784,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
|
||||
return 0;
|
||||
FAIL:
|
||||
taosMemoryFreeClear(msg);
|
||||
return tmqPollCb(pParam, NULL, code);
|
||||
}
|
||||
|
||||
|
@ -2560,6 +2561,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
|
||||
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
taosMemoryFree(pReq);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -2567,6 +2569,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
if (pParam == NULL) {
|
||||
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pReq);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -2577,6 +2580,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pReq);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -2598,8 +2602,6 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
}
|
||||
|
||||
FAIL:
|
||||
taosMemoryFreeClear(pParam);
|
||||
taosMemoryFreeClear(pReq);
|
||||
askEpCb(pParam, NULL, code);
|
||||
}
|
||||
|
||||
|
@ -2706,7 +2708,6 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosMemoryFree(pParam);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -3066,6 +3067,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
sendInfo->requestId = req.reqId;
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->paramFreeFp = taosMemoryFree;
|
||||
sendInfo->fp = tmqGetWalInfoCb;
|
||||
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
|
||||
|
||||
|
@ -3077,8 +3079,6 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
||||
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pParam);
|
||||
taosMemoryFree(msg);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue