Merge pull request #26342 from taosdata/fix/TD-30725

fix:[TD-30725]modify error code & fix potential memory leaks
This commit is contained in:
dapan1121 2024-07-04 11:00:49 +08:00 committed by GitHub
commit a8a725936e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 40 additions and 38 deletions

View File

@ -919,6 +919,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012)
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -1540,6 +1540,9 @@ int32_t doProcessMsgFromServer(void* param) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
if (pMsg->info.ahandle == NULL) {
tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
taosMemoryFree(arg->pEpset);
rpcFreeCont(pMsg->pCont);
taosMemoryFree(arg);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
STscObj* pTscObj = NULL;
@ -1557,6 +1560,12 @@ int32_t doProcessMsgFromServer(void* param) {
if (pRequest->self != pSendInfo->requestObjRefId) {
tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
pRequest->self, pSendInfo->requestObjRefId);
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
taosMemoryFree(arg->pEpset);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
pTscObj = pRequest->pTscObj;

View File

@ -258,7 +258,7 @@ static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
SMqCommitCbParamSet* pParamSet);
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset);
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
@ -1364,11 +1364,15 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) {
}
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = NULL;
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
if (pParam == NULL || pMsg == NULL) {
goto FAIL;
}
int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto FAIL;
@ -1479,7 +1483,6 @@ END:
FAIL:
if (tmq) tsem2_post(&tmq->rspSem);
taosMemoryFree(pParam);
if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
@ -1788,26 +1791,26 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAIL;
return code;
}
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
return code;
}
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
code = TSDB_CODE_INVALID_MSG;
taosMemoryFreeClear(msg);
goto FAIL;
return code;
}
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(msg);
goto FAIL;
return code;
}
pParam->refId = pTmq->refId;
@ -1819,13 +1822,14 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(msg);
goto FAIL;
return code;
}
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
@ -1836,7 +1840,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) {
goto FAIL;
return code;
}
pVg->pollCnt++;
@ -1844,8 +1848,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pTmq->pollCnt++;
return 0;
FAIL:
return tmqPollCb(pParam, NULL, code);
}
// broadcast the poll request to all related vnodes
@ -2628,7 +2630,6 @@ FAIL:
taosMemoryFree(pMsg->pData);
}
taosMemoryFree(pParam);
return code;
}
@ -2637,16 +2638,18 @@ int32_t syncAskEp(tmq_t* pTmq) {
if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY;
tsem_init(&pInfo->sem, 0, 0);
askEp(pTmq, pInfo, true, false);
tsem_wait(&pInfo->sem);
int32_t code = askEp(pTmq, pInfo, true, false);
if(code == 0){
tsem_wait(&pInfo->sem);
code = pInfo->code;
}
int32_t code = pInfo->code;
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
return code;
}
void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
SMqAskEpReq req = {0};
req.consumerId = pTmq->consumerId;
req.epoch = updateEpSet ? -1 : pTmq->epoch;
@ -2658,30 +2661,26 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
code = TSDB_CODE_INVALID_PARA;
goto FAIL;
return TSDB_CODE_INVALID_PARA;
}
pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
code = TSDB_CODE_INVALID_PARA;
taosMemoryFree(pReq);
goto FAIL;
return TSDB_CODE_INVALID_PARA;
}
pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pReq);
goto FAIL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pParam->refId = pTmq->refId;
@ -2690,30 +2689,22 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pReq);
goto FAIL;
taosMemoryFree(pParam);
return TSDB_CODE_OUT_OF_MEMORY;
}
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->fp = askEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
int64_t transporterId = 0;
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code == 0) {
return;
}
FAIL:
askEpCb(pParam, NULL, code);
return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
}
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {

View File

@ -1196,7 +1196,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else {
taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
terrno = TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
return -1;
}
}
@ -1217,7 +1217,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
terrno = TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
return -1;
}

View File

@ -766,6 +766,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of ra
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")