From c46769ef3789d02f42c588b72ac895fe6b446891 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 26 Jan 2024 14:10:02 +0800 Subject: [PATCH] fix: reader can be in memory or not --- source/libs/executor/inc/executorInt.h | 5 ++ source/libs/executor/src/scanoperator.c | 69 ++++++++++++++++++------- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f59f0b9c57..6418654ed5 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -291,6 +291,9 @@ typedef enum ESubTableInputType { typedef struct STmsSubTableInput { STsdbReader* pReader; + SQueryTableDataCond tblCond; + STableKeyInfo* pKeyInfo; + bool bInMemReader; ESubTableInputType type; SSDataBlock* pReaderBlock; @@ -313,6 +316,8 @@ typedef struct STmsSubTablesMergeInfo { int32_t numTableBlocksInMem; SDiskbasedBuf* pBlocksBuf; + + int32_t numInMemReaders; } STmsSubTablesMergeInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a8fa42c6c6..031ac9a5e7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3344,31 +3344,57 @@ static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* p return ret; } +int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { + memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); + dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); + for (int i = 0; i < src->numOfCols; i++) { + dst->colList[i] = src->colList[i]; + } + return 0; +} + static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pSubTableHasBlock) { + int32_t code = 0; + STableMergeScanInfo* pInfo = pOperator->info; + SReadHandle* pHandle = &pInfo->base.readHandle; STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; const SStorageAPI* pAPI= &pTaskInfo->storageAPI; blockDataCleanup(pInput->pReaderBlock); + if (!pInput->bInMemReader) { + code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock, + (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); + if (code != 0) { + T_LONG_JMP(pTaskInfo->env, code); + } + } pInfo->base.dataReader = pInput->pReader; while (true) { bool hasNext = false; - int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext); + int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext); if (code != 0) { - pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader); + pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader); pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, code); } if (!hasNext || isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) { - pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader); + pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader); } + *pSubTableHasBlock = false; break; } + if (pInput->tblCond.order == TSDB_ORDER_ASC) { + pInput->tblCond.twindows.skey = pInput->pReaderBlock->info.window.ekey + 1; + } else { + pInput->tblCond.twindows.ekey = pInput->pReaderBlock->info.window.skey - 1; + } + uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status); if (code != 0) { @@ -3376,6 +3402,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu T_LONG_JMP(pTaskInfo->env, code); } if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { + *pSubTableHasBlock = false; break; } if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pReaderBlock->info.rows == 0) { @@ -3383,13 +3410,19 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu } *pSubTableHasBlock = true; + break; + } + + if (*pSubTableHasBlock) { pInput->pReaderBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pReaderBlock->info.id.uid); pOperator->resultInfo.totalRows += pInput->pReaderBlock->info.rows; - pInfo->base.dataReader = NULL; - return TSDB_CODE_SUCCESS; } + if (!pInput->bInMemReader || !*pSubTableHasBlock) { + pAPI->tsdReader.tsdReaderClose(pInput->pReader); + pInput->pReader = NULL; + } + pInfo->base.dataReader = NULL; - *pSubTableHasBlock = false; return TSDB_CODE_SUCCESS; } @@ -3483,7 +3516,8 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); - pSubTblsInfo->numTableBlocksInMem = 0; + pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables; + pSubTblsInfo->numInMemReaders = pSubTblsInfo->numSubTables; return TSDB_CODE_SUCCESS; } @@ -3497,11 +3531,19 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; pInput->type = SUB_TABLE_MEM_BLOCK; + dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond); pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false); STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); - pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInput->pReaderBlock, + pInput->pKeyInfo = keyInfo; + if (i + 1 < pSubTblsInfo->numInMemReaders) { + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, keyInfo, 1, pInput->pReaderBlock, (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); + pInput->bInMemReader = true; + } else { + pInput->pReader = NULL; + pInput->bInMemReader = false; + } bool hasNext = true; fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); if (!hasNext) { @@ -3656,7 +3698,7 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) { for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; - + taosMemoryFree(pInput->tblCond.colList); blockDataDestroy(pInput->pReaderBlock); blockDataDestroy(pInput->pPageBlock); taosArrayDestroy(pInput->aBlockPages); @@ -3903,15 +3945,6 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { return pList; } -int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { - memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); - dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); - for (int i = 0; i < src->numOfCols; i++) { - dst->colList[i] = src->colList[i]; - } - return 0; -} - void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { STableMergeScanInfo* pTmsInfo = param; if (type == TSD_READER_NOTIFY_DURATION_START) {