fix mem leak

This commit is contained in:
yihaoDeng 2024-09-05 14:40:14 +08:00
parent e07c85989d
commit 2e3e5d0a99
6 changed files with 68 additions and 71 deletions

View File

@ -339,8 +339,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg));
// int64_t transporterId = 0;
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
(void)tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS;
}
@ -396,8 +396,8 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
// int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
if (code) {
doRequestCallback(pRequest, code);
}

View File

@ -296,9 +296,9 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
pSendInfo->fp = fetchWhiteListCallbackFn;
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) {
// int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) {
tscWarn("failed to async send msg to server");
}
releaseTscObj(connId);
@ -860,9 +860,9 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
return pResInfo->pCol[columnIndex].offset;
}
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 ||
columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) {
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return TSDB_CODE_INVALID_PARA;
}
@ -875,22 +875,22 @@ int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *r
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
SResultColumn *pCol = &pResInfo->pCol[columnIndex];
if (*rows > pResInfo->numOfRows){
if (*rows > pResInfo->numOfRows) {
*rows = pResInfo->numOfRows;
}
if (IS_VAR_DATA_TYPE(pField->type)) {
for(int i = 0; i < *rows; i++){
if(pCol->offset[i] == -1){
for (int i = 0; i < *rows; i++) {
if (pCol->offset[i] == -1) {
result[i] = true;
}else{
} else {
result[i] = false;
}
}
}else{
for(int i = 0; i < *rows; i++){
if (colDataIsNull_f(pCol->nullbitmap, i)){
} else {
for (int i = 0; i < *rows; i++) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
result[i] = true;
}else{
} else {
result[i] = false;
}
}

View File

@ -113,15 +113,15 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
}
MonitorSlowLogData tmp = {.clusterId = p->clusterId,
.type = p->type,
.fileName = p->fileName,
.pFile = p->pFile,
.offset = p->offset,
.data = NULL};
.type = p->type,
.fileName = p->fileName,
.pFile = p->pFile,
.offset = p->offset,
.data = NULL};
if (monitorPutData2MonitorQueue(tmp) == 0) {
p->fileName = NULL;
} else {
if(taosCloseFile(&(p->pFile)) != 0) {
if (taosCloseFile(&(p->pFile)) != 0) {
tscError("failed to close file:%p", p->pFile);
}
}
@ -165,8 +165,8 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0;
int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo);
// int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
FAILED:
if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
@ -286,7 +286,7 @@ void monitorCreateClient(int64_t clusterId) {
return;
fail:
fail:
destroyMonitorClient(&pMonitor);
taosWUnLockLatch(&monitorLock);
}
@ -302,7 +302,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter == NULL) return;
MonitorClient* pMonitor = *ppMonitor;
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter);
goto end;
@ -315,7 +315,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
newCounter);
end:
end:
taosWUnLockLatch(&monitorLock);
}
@ -338,13 +338,13 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName);
goto end;
}
if (taos_counter_inc(*ppCounter, label_values) != 0){
if (taos_counter_inc(*ppCounter, label_values) != 0) {
tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName);
goto end;
}
tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end:
end:
taosWUnLockLatch(&monitorLock);
}
@ -413,7 +413,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
return NULL;
}
if((size <= *offset)){
if ((size <= *offset)) {
tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return NULL;
@ -510,13 +510,13 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pFile, offset, size);
if(data == NULL) return terrno;
if (data == NULL) return terrno;
return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
pInst->pTransporter, &ep);
}
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
if (fileName == NULL){
if (fileName == NULL) {
return;
}
int64_t size = getFileSize(*fileName);
@ -525,10 +525,11 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
} else {
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
if (code == 0){
if (code == 0) {
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId);
}else{
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code);
} else {
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId,
code);
}
*fileName = NULL;
}

View File

@ -552,9 +552,9 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
int64_t transporterId = 0;
// int64_t transporterId = 0;
(void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
if (code != 0) {
(void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
return code;
@ -955,8 +955,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
tscError("tmqSendHbReq asyncSendMsgToServer failed");
}
@ -1436,8 +1435,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
goto FAIL;
}
@ -2044,10 +2042,10 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
// int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) {
@ -3221,8 +3219,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
sendInfo->fp = tmCommittedCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
if (code != 0) {
(void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam);
@ -3498,13 +3495,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
// int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
if (code != 0) {
goto end;
}
@ -3668,8 +3665,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->fp = tmqSeekCb;
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
(void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam);

View File

@ -61,14 +61,14 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
if (code) {
pInserter->submitRes.code = code;
}
if (code == TSDB_CODE_SUCCESS) {
pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
if (NULL == pInserter->submitRes.pRsp) {
pInserter->submitRes.code = terrno;
goto _return;
}
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
@ -108,7 +108,7 @@ _return:
(void)tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData);
return TSDB_CODE_SUCCESS;
}
@ -136,8 +136,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int
pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
pMsgSendInfo->fp = inserterCallback;
int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
}
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
@ -166,7 +165,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
} else {
taosMemoryFree(pBuf);
}
return code;
}
@ -228,14 +227,15 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end;
}
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
if (pColInfoData->info.type != pCol->type) {
qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k, pCol->type);
qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
pCol->type);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end;
}
@ -331,11 +331,11 @@ _end:
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
return terrno;
}
*ppReq = pReq;
return TSDB_CODE_SUCCESS;
}
@ -462,7 +462,8 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->explain = pInserterNode->explain;
int64_t suid = 0;
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
&inserter->pSchema, &suid);
if (code) {
terrno = code;
goto _return;
@ -484,9 +485,9 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
false, HASH_NO_LOCK);
if (NULL == inserter->pCols) {
goto _return;
goto _return;
}
SNode* pNode = NULL;
int32_t i = 0;
FOREACH(pNode, pInserterNode->pCols) {

View File

@ -1575,8 +1575,8 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId, p,
numOfRows, GET_TASKID(pTaskInfo));
code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId,
p, numOfRows, GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&mr);
QUERY_CHECK_CODE(code, lino, _end);
@ -2170,8 +2170,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0;
code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
@ -2880,7 +2879,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
code = tableListGetSize(pTableListInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
void* pList = tableListGetInfo(pTableListInfo, 0);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);