Merge pull request #27682 from taosdata/fix/fixMemleakInterface

fix mem leak
This commit is contained in:
Hongze Cheng 2024-09-05 17:02:53 +08:00 committed by GitHub
commit 46a58a0eb8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 67 additions and 71 deletions

View File

@ -349,8 +349,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; // int64_t transporterId = 0;
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg)); TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
(void)tsem_wait(&pRequest->body.rspSem); (void)tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -406,8 +406,8 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SAppInstInfo* pAppInfo = getAppInfo(pRequest); SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; // int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
if (code) { if (code) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} }

View File

@ -296,9 +296,8 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
pSendInfo->fp = fetchWhiteListCallbackFn; pSendInfo->fp = fetchWhiteListCallbackFn;
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST; pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) { if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) {
tscWarn("failed to async send msg to server"); tscWarn("failed to async send msg to server");
} }
releaseTscObj(connId); releaseTscObj(connId);
@ -861,8 +860,8 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
} }
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) { int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) {
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) ||
columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { TD_RES_TMQ_BATCH_META(res)) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -165,8 +165,8 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
pInfo->requestId = tGenIdPI64(); pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0; pInfo->requestObjRefId = 0;
int64_t transporterId = 0; // int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
FAILED: FAILED:
if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) { if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
@ -528,7 +528,8 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td
if (code == 0) { if (code == 0) {
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId); tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId);
} else { } else {
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code); tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId,
code);
} }
*fileName = NULL; *fileName = NULL;
} }

View File

@ -552,9 +552,9 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
int64_t transporterId = 0; // int64_t transporterId = 0;
(void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
if (code != 0) { if (code != 0) {
(void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
return code; return code;
@ -955,8 +955,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
tscError("tmqSendHbReq asyncSendMsgToServer failed"); tscError("tmqSendHbReq asyncSendMsgToServer failed");
} }
@ -1436,8 +1435,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
goto FAIL; goto FAIL;
} }
@ -2044,10 +2042,10 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->fp = tmqPollCb; sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME; sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0; // int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) { if (code != 0) {
@ -3221,8 +3219,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
sendInfo->fp = tmCommittedCb; sendInfo->fp = tmCommittedCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
(void)tsem2_destroy(&pParam->sem); (void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
@ -3498,13 +3495,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->fp = tmqGetWalInfoCb; sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
int64_t transporterId = 0; // int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
if (code != 0) { if (code != 0) {
goto end; goto end;
} }
@ -3668,8 +3665,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->fp = tmqSeekCb; sendInfo->fp = tmqSeekCb;
sendInfo->msgType = TDMT_VND_TMQ_SEEK; sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
(void)tsem2_destroy(&pParam->sem); (void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam); taosMemoryFree(pParam);

View File

@ -136,8 +136,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int
pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
pMsgSendInfo->fp = inserterCallback; pMsgSendInfo->fp = inserterCallback;
int64_t transporterId = 0; return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
} }
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) { static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
@ -235,7 +234,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
if (pColInfoData->info.type != pCol->type) { if (pColInfoData->info.type != pCol->type) {
qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k, pCol->type); qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
pCol->type);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end; goto _end;
} }
@ -462,7 +462,8 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->explain = pInserterNode->explain; inserter->explain = pInserterNode->explain;
int64_t suid = 0; int64_t suid = 0;
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
&inserter->pSchema, &suid);
if (code) { if (code) {
terrno = code; terrno = code;
goto _return; goto _return;

View File

@ -1575,8 +1575,8 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SMetaReader mr = {0}; SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId, p, code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId,
numOfRows, GET_TASKID(pTaskInfo)); p, numOfRows, GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&mr); pAPI->metaReaderFn.clearReader(&mr);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -2170,8 +2170,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pMsgSendInfo->fp = loadSysTableCallback; pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId; pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0; code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo);
code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code; pTaskInfo->code = code;