[TD-98]fix memory leaks during query and fix query bugs.

This commit is contained in:
hjxilinx 2020-04-06 16:29:11 +08:00
parent 5316ca3281
commit 72add021b7
8 changed files with 75 additions and 27 deletions

View File

@ -213,8 +213,6 @@ typedef struct SDataBlockList {
int32_t idx; int32_t idx;
uint32_t nSize; uint32_t nSize;
uint32_t nAlloc; uint32_t nAlloc;
char * userParam; /* user assigned parameters for async query */
void * udfp; /* user defined function pointer, used in async model */
STableDataBlocks **pData; STableDataBlocks **pData;
} SDataBlockList; } SDataBlockList;
@ -451,7 +449,6 @@ void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);

View File

@ -77,7 +77,7 @@ int32_t main(int32_t argc, char *argv[]) {
} }
/* Set termination handler. */ /* Set termination handler. */
struct sigaction act; struct sigaction act = {0};
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
act.sa_sigaction = signal_handler; act.sa_sigaction = signal_handler;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);

View File

@ -290,7 +290,8 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
if (qHasMoreResultsToRetrieve(pQInfo)) { if (qHasMoreResultsToRetrieve(pQInfo)) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg); qDestroyQueryInfo(pQInfo);
// dnodeProcessReadResult(pVnode, pMsg);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
} }
@ -305,5 +306,4 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle); dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
vnodeRelease(pVnode);
} }

View File

@ -198,6 +198,12 @@ typedef struct SQInfo {
*/ */
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/**
* destroy the query info struct
* @param pQInfo
*/
void qDestroyQueryInfo(SQInfo* pQInfo);
/** /**
* query on single table * query on single table
* @param pReadMsg * @param pReadMsg

View File

@ -1566,6 +1566,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
} }
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tfree(pRuntimeEnv->pQuery);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
@ -4202,8 +4205,9 @@ int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTable
} }
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
taosArrayDestroy(cols);
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = param; pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1; pRuntimeEnv->cur.vnodeIndex = -1;
if (param != NULL) { if (param != NULL) {
@ -5444,13 +5448,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
*tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); *tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
} }
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " dTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, "
"timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey,
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType, pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols,
pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols,
pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen,
pQueryMsg->limit, pQueryMsg->offset); pQueryMsg->limit, pQueryMsg->offset);
@ -5972,7 +5974,7 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
tfree(pQuery->pGroupbyExpr); tfree(pQuery->pGroupbyExpr);
tfree(pQuery); taosArrayDestroy(pQInfo->pTableIdList);
dTrace("QInfo:%p QInfo is freed", pQInfo); dTrace("QInfo:%p QInfo is freed", pQInfo);
@ -6120,6 +6122,10 @@ _query_over:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qDestroyQueryInfo(SQInfo* pQInfo) {
freeQInfo(pQInfo);
}
void qTableQuery(SQInfo *pQInfo) { void qTableQuery(SQInfo *pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo); dTrace("%p freed abort query", pQInfo);
@ -6133,6 +6139,9 @@ void qTableQuery(SQInfo *pQInfo) {
dTrace("QInfo:%p query task is launched", pQInfo); dTrace("QInfo:%p query task is launched", pQInfo);
// sem_post(&pQInfo->dataReady);
// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER;
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) { if (numOfTables == 1) {
singleTableQueryImpl(pQInfo); singleTableQueryImpl(pQInfo);
@ -6212,7 +6221,6 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
(*pRsp)->completed = 1; // notify no more result to client (*pRsp)->completed = 1; // notify no more result to client
freeQInfo(pQInfo);
} }
return code; return code;

View File

@ -347,7 +347,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
va_start(argpointer, format); va_start(argpointer, format);
int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer); int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer);
if (writeLen <= 0) { if (writeLen <= 0) {
char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE]; char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE] = {0};
writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer); writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer);
strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE); strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE);
len += MAX_LOGLINE_CONTENT_SIZE; len += MAX_LOGLINE_CONTENT_SIZE;

View File

@ -328,6 +328,12 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
*/ */
SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len); SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -282,7 +282,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->pColumns = pColumnInfo;
pQueryHandle->loadDataAfterSeek = false; pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true; pQueryHandle->isFirstSlot = true;
@ -331,9 +330,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
int32_t vnodeId = 1;
vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo);
return (tsdb_query_handle_t)pQueryHandle; return (tsdb_query_handle_t)pQueryHandle;
} }
@ -468,6 +464,7 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
} }
} }
taosArrayDestroy(sa);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
@ -751,7 +748,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
int32_t tid = pCheckInfo->tableId.tid; int32_t tid = pCheckInfo->tableId.tid;
while (pCheckInfo->pFileGroup != NULL) { while (pCheckInfo->pFileGroup != NULL) {
if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) { if (getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup) != TSDB_CODE_SUCCESS) {
break; break;
} }
@ -761,7 +758,6 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pCheckInfo->pFileGroup->fileId, tid); pCheckInfo->pFileGroup->fileId, tid);
pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter); pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter);
continue; continue;
} }
@ -790,7 +786,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks); assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks);
// load first data block into memory failed, caused by disk block error // load first data block into memory failed, caused by disk block error
bool blockLoaded = false; bool blockLoaded = false;
SArray *sa = getDefaultLoadColumns(pQueryHandle, true); SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
// todo no need to loaded at all // todo no need to loaded at all
@ -810,8 +806,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pFile->fd = open(pFile->fname, O_RDONLY); pFile->fd = open(pFile->fname, O_RDONLY);
} }
if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1, if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
pCheckInfo->pDataCols, data) == 0) {
blockLoaded = true; blockLoaded = true;
} }
@ -825,12 +820,19 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
// todo search qualified points in blk, according to primary key (timestamp) column // todo search qualified points in blk, according to primary key (timestamp) column
SDataCols* pDataCols = pCheckInfo->pDataCols; SDataCols* pDataCols = pCheckInfo->pDataCols;
TSKEY* d = (TSKEY*) pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order); cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order);
cur->fid = pCheckInfo->pFileGroup->fileId; cur->fid = pCheckInfo->pFileGroup->fileId;
assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0); assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
taosArrayDestroy(sa);
tfree(data);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
@ -838,8 +840,6 @@ static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) {
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
// SQueryFilePos* cur = &pHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
if (!pCheckInfo->checkFirstFileBlock) { if (!pCheckInfo->checkFirstFileBlock) {
@ -1351,3 +1351,34 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo
return result; return result;
} }
} }
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter);
tfree(pTableCheckInfo->pDataCols->buf);
tfree(pTableCheckInfo->pDataCols);
tfree(pTableCheckInfo->pCompInfo);
tfree(pTableCheckInfo->compIndex);
}
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < cols; ++i) {
SColumnInfoEx *pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData);
}
taosArrayDestroy(pQueryHandle->pColumns);
tfree(pQueryHandle->unzipBuffer);
tfree(pQueryHandle->secondaryUnzipBuffer);
tfree(pQueryHandle);
}