From 407a3453caebfcdff329e083f42bac727ce11ccd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Oct 2022 14:45:35 +0800 Subject: [PATCH] fix(query): set fetch block info. --- source/libs/executor/inc/executil.h | 11 ++++--- source/libs/executor/src/cachescanoperator.c | 4 +-- source/libs/executor/src/executil.c | 21 +++++++++---- source/libs/executor/src/executorimpl.c | 24 ++++++++------- source/libs/executor/src/scanoperator.c | 31 ++++++++++++++------ source/libs/executor/src/sortoperator.c | 1 + 6 files changed, 61 insertions(+), 31 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 0941f9db15..1965a8cbf6 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -95,17 +95,20 @@ typedef struct SColMatchInfo { int32_t matchType; // determinate the source according to col id or slot id } SColMatchInfo; +// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly +// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups typedef struct STableListInfo { bool oneTableForEachGroup; - int32_t numOfGroups; - int32_t* groupOffset; // keep the offset value for each group in the tableList + int32_t numOfOuputGroups; // the data block will be generated one by one + int32_t* groupOffset; // keep the offset value for each group in the tableList SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid + SHashObj* map; // speedup acquire the tableQueryInfo by table uid uint64_t suid; } STableListInfo; void destroyTableList(STableListInfo* pTableList); -int32_t getNumOfGroups(const STableListInfo* pTableList); +int32_t getNumOfOutputGroups(const STableListInfo* pTableList); +bool oneTableForEachGroup(const STableListInfo* pTableList); uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid); int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 7ffb66a785..817b6c1826 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -62,7 +62,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); // partition by tbname, todo opt perf - if (getNumOfGroups(pTableList) == getTotalTables(pTableList)) { + if (oneTableForEachGroup(pTableList)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); @@ -189,7 +189,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } } else { - size_t totalGroups = getNumOfGroups(pTableList); + size_t totalGroups = getNumOfOutputGroups(pTableList); while (pInfo->currentGroupIndex < totalGroups) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index bd8e4acc85..05c9f8a9b0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1665,6 +1665,11 @@ uint64_t getTotalTables(const STableListInfo* pTableList) { } uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { + if (pTableList->oneTableForEachGroup) { + ASSERT(pTableList->map == NULL); + return tableUid; + } + uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); if (groupId != NULL) { return *groupId; @@ -1677,25 +1682,25 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; taosArrayPush(pTableList->pTableList, &keyInfo); - if (pTableList->oneTableForEachGroup || pTableList->numOfGroups > 1) { + if (pTableList->oneTableForEachGroup || pTableList->numOfOuputGroups > 1) { taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); } return TSDB_CODE_SUCCESS; } int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) { - int32_t total = getNumOfGroups(pTableList); + int32_t total = getNumOfOutputGroups(pTableList); if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { return TSDB_CODE_INVALID_PARA; } // here handle two special cases: // 1. only one group exists, and 2. one table exists for each group. - if (pTableList->numOfGroups == 1) { + if (total == 1) { *size = getTotalTables(pTableList); *pKeyInfo = taosArrayGet(pTableList->pTableList, 0); return TSDB_CODE_SUCCESS; - } else if (pTableList->numOfGroups == getTotalTables(pTableList)) { + } else if (total == getTotalTables(pTableList)) { *size = 1; *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex); return TSDB_CODE_SUCCESS; @@ -1712,8 +1717,12 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI return TSDB_CODE_SUCCESS; } -int32_t getNumOfGroups(const STableListInfo* pTableList) { - return pTableList->numOfGroups; +int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { + return pTableList->numOfOuputGroups; +} + +bool oneTableForEachGroup(const STableListInfo* pTableList) { + return pTableList->oneTableForEachGroup; } void destroyTableList(STableListInfo* pTableqinfoList) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fb554520e5..c6a22cd28e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3397,9 +3397,9 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { } } - pTableListInfo->numOfGroups = taosArrayGetSize(pList); - pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfGroups); - memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfGroups); + pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); + pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); taosArrayDestroy(pList); # if 0 @@ -3490,13 +3490,15 @@ bool groupbyTbname(SNodeList* pGroupList) { } int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { + int32_t code = TSDB_CODE_SUCCESS; if (group == NULL) { - return TDB_CODE_SUCCESS; + return code; } pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (pTableListInfo->map == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } bool assignUid = groupbyTbname(group); @@ -3508,21 +3510,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, info->groupId = info->uid; taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } + pTableListInfo->oneTableForEachGroup = true; - pTableListInfo->numOfGroups = numOfTables; + if (groupSort) { + pTableListInfo->numOfOuputGroups = numOfTables; + } } else { - int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); + code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } if (groupSort) { - return sortTableGroup(pTableListInfo); + code = sortTableGroup(pTableListInfo); } } - qDebug("-------------------, %d", (int) taosHashGetSize(pTableListInfo->map)); - return TDB_CODE_SUCCESS; + return code; } static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 187eae3cf1..5f470ea793 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -746,7 +746,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } } else { // scan table group by group sequentially if (pInfo->currentGroupId == -1) { - if ((++pInfo->currentGroupId) >= getNumOfGroups(&pTaskInfo->tableqinfoList)) { + if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { doSetOperatorCompleted(pOperator); return NULL; } @@ -768,7 +768,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - if ((++pInfo->currentGroupId) >= getNumOfGroups(&pTaskInfo->tableqinfoList)) { + if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { doSetOperatorCompleted(pOperator); return NULL; } @@ -1094,7 +1094,9 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs}; SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo; - blockDataCleanup(pTableScanInfo->pResBlock); + + SSDataBlock* pBlock = pTableScanInfo->pResBlock; + blockDataCleanup(pBlock); STsdbReader* pReader = NULL; int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader, @@ -1106,14 +1108,25 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU bool hasBlock = tsdbNextDataBlock(pReader); if (hasBlock) { - SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); - relocateColumnData(pTableScanInfo->pResBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pTableScanInfo->pResBlock, pTaskInfo); + SDataBlockInfo binfo = {0}; + tsdbRetrieveDataBlockInfo(pReader, &binfo); + + SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); + blockDataEnsureCapacity(pBlock, binfo.rows); + + pBlock->info.window = binfo.window; + pBlock->info.uid = binfo.uid; + pBlock->info.rows = binfo.rows; + + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); } tsdbReaderClose(pReader); + qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64 + ", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid); - return pTableScanInfo->pResBlock->info.rows > 0? pTableScanInfo->pResBlock:NULL; + return pBlock->info.rows > 0 ? pBlock : NULL; } static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -2371,7 +2384,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); - int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); + code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; @@ -4152,7 +4165,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags return TSDB_CODE_SUCCESS; } - pTableListInfo->numOfGroups = 1; + pTableListInfo->numOfOuputGroups = 1; code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags, groupSort); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 16d853ac7e..4311d7f293 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -654,6 +654,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } + pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; pDataBlock->info.groupId = pInfo->groupId; }