From 79b0d1fcbdb6735f52de0dc1fb037a5df17e4e3b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Sep 2022 15:02:23 +0800 Subject: [PATCH 1/6] fix(query): fix memory leak. --- source/libs/executor/src/executor.c | 3 ++- source/libs/qworker/src/qworker.c | 20 +++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 95415e1113..ea2180124a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -480,6 +480,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { if (ret != TSDB_CODE_SUCCESS) { pTaskInfo->code = ret; cleanUpUdfs(); + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); atomic_store_64(&pTaskInfo->owner, 0); @@ -512,8 +513,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { } cleanUpUdfs(); - uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; + uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 61c38f59db..fbda8ee37f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -9,6 +9,7 @@ #include "tcommon.h" #include "tmsg.h" #include "tname.h" +#include "tdatablock.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; +static void freeBlock(void* param) { + SSDataBlock* pBlock = *(SSDataBlock**)param; + blockDataDestroy(pBlock); +} + int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; @@ -71,7 +77,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } - +\ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t code = 0; bool qcontinue = true; @@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { qwDbgSimulateSleep(); + code = qExecTaskOpt(taskHandle, pResList, &useconds); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { @@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } _return: - - taosArrayDestroy(pResList); + taosArrayDestroyEx(pResList, freeBlock); QW_RET(code); } @@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { void *pIter = taosHashIterate(mgmt->schHash, NULL); while (pIter) { - SQWSchStatus *sch = (SQWSchStatus *)pIter; - if (NULL == sch->hbConnInfo.handle) { + SQWSchStatus *sch1 = (SQWSchStatus *)pIter; + if (NULL == sch1->hbConnInfo.handle) { uint64_t *sId = taosHashGetKey(pIter, NULL); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); - if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && - taosHashGetSize(sch->tasksHash) <= 0) { + if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && + taosHashGetSize(sch1->tasksHash) <= 0) { taosArrayPush(pExpiredSch, sId); } From 5ba41111b4466a04f4a5edf898382dc17331bc93 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Sep 2022 16:17:39 +0800 Subject: [PATCH 2/6] refactor: add some logs. --- source/dnode/vnode/src/inc/tsdb.h | 4 ++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 4 ++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 14 +++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a836fa2bc5..4264a3232b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -299,8 +299,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap); -void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); +int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap, const char* id); +void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap, const char* id); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8da783a5bd..8452d14c86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs tb_uid_t suid = getTableSuidByUid(uid, pTsdb); - tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap); + tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap, NULL); STbData *pMem = NULL; if (pIter->pReadSnap->pMem) { @@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { taosArrayDestroy(pIter->pSkyline); } - tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap); + tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap, NULL); _err: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index eafc5b3213..34737bb7fd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3360,7 +3360,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap); + code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -3384,7 +3384,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl STsdbReader* pPrevReader = pReader->innerReader[0]; SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; - code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap); + code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -3441,7 +3441,7 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); } - tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); + tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SIOCostSummary* pCost = &pReader->cost; @@ -3863,7 +3863,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { +int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) { int32_t code = 0; // alloc @@ -3906,12 +3906,12 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { goto _exit; } - tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); + tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); _exit: return code; } -void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) { +void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) { if (pSnap) { if (pSnap->pMem) { tsdbUnrefMemTable(pSnap->pMem); @@ -3925,5 +3925,5 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) { taosMemoryFree(pSnap); } - tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); + tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); } \ No newline at end of file From 8e62ebab64e87eaeef1a54a52326652c801ff41c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Sep 2022 16:18:44 +0800 Subject: [PATCH 3/6] other: remove a typo. --- source/libs/qworker/src/qworker.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index fbda8ee37f..4dcc06b470 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -77,7 +77,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } -\ + int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t code = 0; bool qcontinue = true; From 0c71d957735c221d8c231e632e908f033e5035f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Sep 2022 17:44:28 +0800 Subject: [PATCH 4/6] fix(query):set correct block load condition. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 34737bb7fd..6a8de672f1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2096,8 +2096,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // it is a clean block, load it directly if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) { - copyBlockDataToSDataBlock(pReader, pBlockScanInfo); - goto _end; + if (pReader->order == TSDB_ORDER_ASC || (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) { + copyBlockDataToSDataBlock(pReader, pBlockScanInfo); + goto _end; + } } } else { // file blocks not exist pBlockScanInfo = pReader->status.pTableIter; From 0e308d29ebb8427e2a51f5a9ac10a0858407e904 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Sep 2022 12:06:37 +0800 Subject: [PATCH 5/6] fix(query): check the duplicate before adding table into query list --- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 51d83d8eed..e2641aad86 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -885,6 +885,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq if (NULL != submitBlkRsp.pMeta) { vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); } + + taosArrayPush(newTbUids, &createTbReq.uid); } taosArrayPush(newTbUids, &createTbReq.uid); From 15f656e683691d5cc0e385390d7d2795856b489d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Sep 2022 13:06:57 +0800 Subject: [PATCH 6/6] refactor(query): add some logs. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 7 ++++++- source/libs/executor/src/executor.c | 25 ++++++++++++++++++++----- source/libs/executor/src/scanoperator.c | 13 +++++++++++-- 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 22727a4311..14dbfbd63e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3719,8 +3719,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { } } - tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", - pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%"PRId64", query range:%" PRId64 " - %" PRId64 " in query %s", + pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e2641aad86..6e9eba306a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -533,6 +533,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR taosArrayPush(rsp.pArray, &cRsp); } + vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); tqUpdateTbUidList(pVnode->pTq, tbUids, true); if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) { goto _exit; @@ -888,7 +889,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq taosArrayPush(newTbUids, &createTbReq.uid); } - taosArrayPush(newTbUids, &createTbReq.uid); submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); @@ -919,6 +919,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq submitRsp.affectedRows += submitBlkRsp.affectedRows; taosArrayPush(submitRsp.pArray, &submitBlkRsp); } + + if (taosArrayGetSize(newTbUids) > 0) { + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); + } + tqUpdateTbUidList(pVnode->pTq, newTbUids, true); _exit: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ea2180124a..a2bcca9545 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -265,6 +265,15 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + STableListInfo* pListInfo = &pTaskInfo->tableqinfoList; + + if (isAdd) { + qDebug("add %d tables id into query list, %s", (int32_t) taosArrayGetSize(tableIdList), pTaskInfo->id.str); + } + + if (pListInfo->map == NULL) { + pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + } // traverse to the stream scanner node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; @@ -311,13 +320,19 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); - if (pTaskInfo->tableqinfoList.map == NULL) { - pTaskInfo->tableqinfoList.map = - taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + bool exists = false; + for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) { + STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k); + if (pKeyInfo->uid == keyInfo.uid) { + qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str); + exists = true; + } } - taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); + if (!exists) { + taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); + taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); + } } if (keyBuf != NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ad9cd1ffe7..a6d718bafa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -617,19 +617,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if scan table by table if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - if (pInfo->noTable) return NULL; + if (pInfo->noTable) { + return NULL; + } + + int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + while (1) { SSDataBlock* result = doTableScanGroup(pOperator); if (result) { return result; } + // if no data, switch to next table and continue scan pInfo->currentTable++; - if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) { + if (pInfo->currentTable >= numOfTables) { return NULL; } + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); + qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str); + tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; }