fix: memory leak issue
This commit is contained in:
parent
fe4e45647e
commit
cb70861986
|
@ -59,7 +59,7 @@ typedef struct SDataSinkMgtCfg {
|
|||
uint32_t maxDataBlockNumPerQuery;
|
||||
} SDataSinkMgtCfg;
|
||||
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI);
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager);
|
||||
|
||||
typedef struct SInputData {
|
||||
const struct SSDataBlock* pData;
|
||||
|
@ -83,7 +83,7 @@ typedef struct SOutputData {
|
|||
* @param pHandle output
|
||||
* @return error code
|
||||
*/
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
|
||||
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
|
||||
|
||||
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);
|
||||
|
||||
|
|
|
@ -281,6 +281,8 @@ _end:
|
|||
if (deleter != NULL) {
|
||||
destroyDataSinker((SDataSinkHandle*)deleter);
|
||||
taosMemoryFree(deleter);
|
||||
} else {
|
||||
taosMemoryFree(pManager);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
|
|||
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
|
||||
if (NULL == dispatcher) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _return;
|
||||
}
|
||||
dispatcher->sink.fPut = putDataBlock;
|
||||
dispatcher->sink.fEndPut = endPut;
|
||||
|
@ -258,8 +258,13 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
|
|||
if (NULL == dispatcher->pDataBlocks) {
|
||||
taosMemoryFree(dispatcher);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _return;
|
||||
}
|
||||
*pHandle = dispatcher;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(pManager);
|
||||
return terrno;
|
||||
}
|
||||
|
|
|
@ -413,7 +413,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
if (NULL == inserter) {
|
||||
taosMemoryFree(pParam);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
|
||||
|
@ -433,23 +433,18 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
int64_t suid = 0;
|
||||
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
||||
if (code) {
|
||||
destroyDataSinker((SDataSinkHandle*)inserter);
|
||||
taosMemoryFree(inserter);
|
||||
return code;
|
||||
terrno = code;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (pInserterNode->stableId != suid) {
|
||||
destroyDataSinker((SDataSinkHandle*)inserter);
|
||||
taosMemoryFree(inserter);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
return terrno;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
||||
taosThreadMutexInit(&inserter->mutex, NULL);
|
||||
if (NULL == inserter->pDataBlocks) {
|
||||
destroyDataSinker((SDataSinkHandle*)inserter);
|
||||
taosMemoryFree(inserter);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -473,4 +468,15 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
|
||||
*pHandle = inserter;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (inserter) {
|
||||
destroyDataSinker((SDataSinkHandle*)inserter);
|
||||
taosMemoryFree(inserter);
|
||||
} else {
|
||||
taosMemoryFree(pManager);
|
||||
}
|
||||
|
||||
return terrno;
|
||||
}
|
||||
|
|
|
@ -49,8 +49,11 @@ int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, D
|
|||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
|
||||
return createDataInserter(pManager, pDataSink, pHandle, pParam);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
taosMemoryFree(pSinkManager);
|
||||
qError("invalid input node type:%d, %s", nodeType(pDataSink), id);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
|
|
@ -511,19 +511,20 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
|
||||
void* pSinkManager = NULL;
|
||||
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (handle) {
|
||||
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
|
||||
void* pSinkManager = NULL;
|
||||
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
void* pSinkParam = NULL;
|
||||
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
|
||||
taosMemoryFree(pSinkManager);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue