diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 3bfbb85958..2b45a5d206 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -164,6 +164,14 @@ typedef enum EStreamType { STREAM_FILL_OVER, } EStreamType; +typedef struct { + SArray* pGroupList; + SArray* pTableList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid + bool needSortTableByGroupId; + uint64_t suid; +} STableListInfo; + #pragma pack(push, 1) typedef struct SColumnDataAgg { int16_t colId; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e5b42f6c27..bed6e93e5a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -152,9 +152,10 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num); -int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - STsdbReader **ppReader, const char *idstr); +int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); +int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, + const char *idstr); + void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid); @@ -169,8 +170,8 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); -int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, - int32_t numOfCols, uint64_t suid, void** pReader); +int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid, + void **pReader); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d1626cb700..e5b8a1f327 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -716,10 +716,7 @@ typedef struct SCacheRowsReader { int32_t numOfCols; int32_t type; int32_t tableIndex; // currently returned result tables - - STableKeyInfo *pTableList; // table id list - int32_t numOfTables; - + SArray *pTableList; // table id list SSttBlockLoadInfo *pLoadInfo; STsdbReadSnap *pReadSnap; SDataFReader *pDataFReader; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 94178727be..a5dc4431ab 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -162,7 +162,10 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); +STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, + void* pMemRef); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); +int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list); // tq int tqInit(); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 9b9a03c478..19a0fbd629 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -713,13 +713,13 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, output->info.uid, pItem->level, terrstr()); + smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); goto _err; } - smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, - SMA_VID(pSma), suid, output->info.uid, pItem->level, output->info.version, htonl(pReq->header.contLen)); + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, + SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen)); taosMemoryFreeClear(pReq); } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 222d396eaa..b8f49f38e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -97,9 +97,10 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } } -int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void** pReader) { +int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid, + void** pReader) { *pReader = NULL; + SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -110,15 +111,14 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->numOfCols = numOfCols; p->suid = suid; - if (numOfTables == 0) { + if (taosArrayGetSize(pTableIdList) == 0) { *pReader = p; return TSDB_CODE_SUCCESS; } - STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0]; + STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1); p->pTableList = pTableIdList; - p->numOfTables = numOfTables; p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); if (p->transferBuf == NULL) { @@ -205,6 +205,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; LRUHandle* h = NULL; SArray* pRow = NULL; + size_t numOfTables = taosArrayGetSize(pr->pTableList); bool hasRes = false; SArray* pLastCols = NULL; @@ -242,8 +243,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { - for (int32_t i = 0; i < pr->numOfTables; ++i) { - STableKeyInfo* pKeyInfo = &pr->pTableList[i]; + for (int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { @@ -307,8 +308,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { - for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { - STableKeyInfo* pKeyInfo = &pr->pTableList[i]; + for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4f56a5d702..5a495f263e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -270,27 +270,24 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { } } -static void clearBlockScanInfo(STableBlockScanInfo* p) { - p->iterInit = false; - p->iiter.hasVal = false; - - if (p->iter.iter != NULL) { - p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); - } - - if (p->iiter.iter != NULL) { - p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter); - } - - p->delSkyline = taosArrayDestroy(p->delSkyline); - p->pBlockList = taosArrayDestroy(p->pBlockList); - tMapDataClear(&p->mapData); -} - static void destroyBlockScanInfo(SHashObj* pTableMap) { STableBlockScanInfo* p = NULL; + while ((p = taosHashIterate(pTableMap, p)) != NULL) { - clearBlockScanInfo(p); + p->iterInit = false; + p->iiter.hasVal = false; + + if (p->iter.iter != NULL) { + p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + } + + if (p->iiter.iter != NULL) { + p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter); + } + + p->delSkyline = taosArrayDestroy(p->delSkyline); + p->pBlockList = taosArrayDestroy(p->pBlockList); + tMapDataClear(&p->mapData); } taosHashCleanup(pTableMap); @@ -3455,23 +3452,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return TSDB_CODE_SUCCESS; } -// TODO refactor: with createDataBlockScanInfo -int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { +// todo refactor, use arraylist instead +int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { ASSERT(pReader != NULL); - - STableBlockScanInfo* p = NULL; - while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) { - clearBlockScanInfo(p); - } - taosHashClear(pReader->status.pTableMap); - STableKeyInfo* pList = (STableKeyInfo*) pTableList; - for(int32_t i = 0; i < num; ++i) { - STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid}; - taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); - } - + STableBlockScanInfo info = {.lastKey = 0, .uid = uid}; + taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); return TDB_CODE_SUCCESS; } @@ -3507,8 +3494,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } // ====================================== EXPOSED APIs ====================================== -int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, - STsdbReader** ppReader, const char* idstr) { +int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, + const char* idstr) { STimeWindow window = pCond->twindows; if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { pCond->twindows.skey += 1; @@ -3567,8 +3554,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL if (pReader->pSchema == NULL) { tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr); } - } else if (numOfTables > 0) { - STableKeyInfo* pKey = pTableList; + } else if (taosArrayGetSize(pTableList) > 0) { + STableKeyInfo* pKey = taosArrayGet(pTableList, 0); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); if (pReader->pSchema == NULL) { tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); @@ -3577,7 +3564,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader; - pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables); + int32_t numOfTables = taosArrayGetSize(pTableList); + pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables); if (pReader->status.pTableMap == NULL) { tsdbReaderClose(pReader); *ppReader = NULL; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 1965a8cbf6..0cfef7dc24 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -28,7 +28,7 @@ do { \ ASSERT((_c) != -1); \ longjmp((_obj), (_c)); \ - } while (0) + } while (0); #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ do { \ @@ -95,25 +95,6 @@ typedef struct SColMatchInfo { int32_t matchType; // determinate the source according to col id or slot id } SColMatchInfo; -// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly -// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups -typedef struct STableListInfo { - bool oneTableForEachGroup; - int32_t numOfOuputGroups; // the data block will be generated one by one - int32_t* groupOffset; // keep the offset value for each group in the tableList - SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid - uint64_t suid; -} STableListInfo; - -void destroyTableList(STableListInfo* pTableList); -int32_t getNumOfOutputGroups(const STableListInfo* pTableList); -bool oneTableForEachGroup(const STableListInfo* pTableList); -uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); -int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid); -int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); -uint64_t getTotalTables(const STableListInfo* pTableList); - struct SqlFunctionCtx; size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 50c62b9017..8769e8ac2f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -174,12 +174,13 @@ typedef struct { } SSchemaInfo; typedef struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - int32_t code; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; + int64_t version; // used for stream to record wal version SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; @@ -1076,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); bool groupbyTbname(SNodeList* pGroupList); -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort); +int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 7c0b52c86a..f645e71c6e 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -48,10 +48,6 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe int32_t numOfCols = 0; code = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - removeRedundantTsCol(pScanNode, &pInfo->matchInfo); code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); @@ -65,15 +61,11 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); - // partition by tbname, todo opt perf - if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) { + // partition by tbname + if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); - - STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0); - size_t num = taosArrayGetSize(pTableList->pTableList); - - code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, + code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -175,7 +167,16 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid); + if (pTableList->map != NULL) { + int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); + if (groupId != NULL) { + pInfo->pRes->info.groupId = *groupId; + } + } else { + ASSERT(taosArrayGetSize(pTableList->pTableList) == 1); + STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0); + pInfo->pRes->info.groupId = pKeyInfo->groupId; + } pInfo->indexOfBufferedRes += 1; return pInfo->pRes; @@ -184,25 +185,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } } else { - size_t totalGroups = getNumOfOutputGroups(pTableList); + size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); while (pInfo->currentGroupIndex < totalGroups) { + SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); - STableKeyInfo* pList = NULL; - int32_t num = 0; - - int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, + tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); - code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); + int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + longjmp(pTaskInfo->env, code); } pInfo->currentGroupIndex += 1; @@ -212,7 +206,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0]; + STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); pInfo->pRes->info.groupId = pKeyInfo->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index dbef032041..971b28eb09 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -544,7 +544,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - ctx.index = 0; ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); if (ctx.cInfoList == NULL) { @@ -607,7 +606,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } else { void* tag = taosHashGet(tags, uid, sizeof(int64_t)); ASSERT(tag); - STagVal tagVal = {0}; tagVal.cid = pColInfo->info.colId; const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); @@ -638,7 +636,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } } } - pResBlock->info.rows = rows; // int64_t st1 = taosGetTimestampUs(); @@ -664,12 +661,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } break; } - default: code = TSDB_CODE_OPS_NOT_SUPPORT; goto end; } - if (nodeType(pNode) == QUERY_NODE_COLUMN) { SColumnNode* pSColumnNode = (SColumnNode*)pNode; SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId); @@ -679,12 +674,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } else { code = scalarCalculate(pNode, pBlockList, &output); } - if (code != TSDB_CODE_SUCCESS) { releaseColInfoData(output.columnData); goto end; } - taosArrayPush(groupData, &output.columnData); } @@ -703,7 +696,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - for (int i = 0; i < rows; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); @@ -825,86 +817,38 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) { taosArrayDestroy(validUid); return 0; } - -static int32_t nameComparFn(const void* p1, const void* p2) { - const char* pName1 = *(const char**) p1; - const char* pName2 = *(const char**) p2; - - int32_t ret = strcmp(pName1, pName2); - if (ret == 0) { - return 0; - } else { - return (ret > 0)? 1:-1; - } -} - -static SArray* getTableNameList(const SNodeListNode* pList) { - int32_t len = LIST_LENGTH(pList->pNodeList); - SListCell* cell = pList->pNodeList->pHead; - - SArray* pTbList = taosArrayInit(len, POINTER_BYTES); - for (int i = 0; i < pList->pNodeList->length; i++) { - SValueNode* valueNode = (SValueNode*) cell->pNode; - if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { - terrno = TSDB_CODE_INVALID_PARA; - taosArrayDestroy(pTbList); - return NULL; - } - - char* name = varDataVal(valueNode->datum.p); - taosArrayPush(pTbList, &name); - cell = cell->pNext; - } - - size_t numOfTables = taosArrayGetSize(pTbList); - - // order the name - taosArraySort(pTbList, nameComparFn); - - // remove the duplicates - SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*)); - taosArrayPush(pNewList, taosArrayGet(pTbList, 0)); - - for (int32_t i = 1; i < numOfTables; ++i) { - char** name = taosArrayGetLast(pNewList); - char** nameInOldList = taosArrayGet(pTbList, i); - if (strcmp(*name, *nameInOldList) == 0) { - continue; - } - - taosArrayPush(pNewList, nameInOldList); - } - - taosArrayDestroy(pTbList); - return pNewList; -} - static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) { if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) { return -1; } - SOperatorNode* pNode = (SOperatorNode*)pTagCond; if (pNode->opType != OP_TYPE_IN) { return -1; } - if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) && (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) { SNodeListNode* pList = (SNodeListNode*)pNode->pRight; int32_t len = LIST_LENGTH(pList->pNodeList); - if (len <= 0) { - return -1; + if (len <= 0) return -1; + + SListCell* cell = pList->pNodeList->pHead; + + SArray* pTbList = taosArrayInit(len, sizeof(void*)); + for (int i = 0; i < pList->pNodeList->length; i++) { + SValueNode* valueNode = (SValueNode*)cell->pNode; + if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { + taosArrayDestroy(pTbList); + return -1; + } + char* name = varDataVal(valueNode->datum.p); + taosArrayPush(pTbList, &name); + cell = cell->pNext; } - SArray* pTbList = getTableNameList(pList); - int32_t numOfTables = taosArrayGetSize(pTbList); - - for (int i = 0; i < numOfTables; i++) { - char* name = taosArrayGetP(pTbList, i); - + for (int i = 0; i < taosArrayGetSize(pTbList); i++) { + char* name = taosArrayGetP(pTbList, i); uint64_t uid = 0; if (metaGetTableUidByName(metaHandle, name, &uid) == 0) { ETableType tbType = TSDB_TABLE_MAX; @@ -919,14 +863,11 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* terrno = 0; } } - taosArrayDestroy(pTbList); return 0; } - return -1; } - int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1005,6 +946,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, } taosArrayDestroy(res); + + pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES); + if (pListInfo->pGroupList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + // put into list as default group, remove it if grouping sorting is required later + taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList); return code; } @@ -1655,81 +1604,3 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->remainOffset = limit.offset; pLimitInfo->remainGroupOffset = slimit.offset; } - -uint64_t getTotalTables(const STableListInfo* pTableList) { - if (pTableList->map != NULL) { - ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); - } - - return taosArrayGetSize(pTableList->pTableList); -} - -uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { - if (pTableList->oneTableForEachGroup) { - return tableUid; - } - - uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); - if (groupId != NULL) { - return *groupId; - } else { - return 0; - } -} - -int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { - STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; - - taosArrayPush(pTableList->pTableList, &keyInfo); - if (pTableList->oneTableForEachGroup || pTableList->numOfOuputGroups > 1) { - taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); - } - return TSDB_CODE_SUCCESS; -} - -int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) { - int32_t total = getNumOfOutputGroups(pTableList); - if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { - return TSDB_CODE_INVALID_PARA; - } - - // here handle two special cases: - // 1. only one group exists, and 2. one table exists for each group. - if (total == 1) { - *size = getTotalTables(pTableList); - *pKeyInfo = taosArrayGet(pTableList->pTableList, 0); - return TSDB_CODE_SUCCESS; - } else if (total == getTotalTables(pTableList)) { - *size = 1; - *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex); - return TSDB_CODE_SUCCESS; - } - - int32_t offset = pTableList->groupOffset[ordinalGroupIndex]; - if (ordinalGroupIndex < total - 1) { - *size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset]; - } else { - *size = total - pTableList->groupOffset[offset] - 1; - } - - *pKeyInfo = taosArrayGet(pTableList->pTableList, offset); - return TSDB_CODE_SUCCESS; -} - -int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { - return pTableList->numOfOuputGroups; -} - -bool oneTableForEachGroup(const STableListInfo* pTableList) { - return pTableList->oneTableForEachGroup; -} - -void destroyTableList(STableListInfo* pTableqinfoList) { - pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList); - taosMemoryFreeClear(pTableqinfoList->groupOffset); - - taosHashCleanup(pTableqinfoList->map); - - pTableqinfoList->pTableList = NULL; - pTableqinfoList->map = NULL; -} \ No newline at end of file diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5aa9669681..9546c3895e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -293,7 +293,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); } - + if (pListInfo->map == NULL) { + pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + } // traverse to the stream scanner node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; @@ -305,10 +307,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo SStreamScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); - int32_t numOfQualifiedTables = taosArrayGetSize(qa); - - qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables); + qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(qa); @@ -328,9 +328,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList; - - for (int32_t i = 0; i < numOfQualifiedTables; ++i) { + for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { uint64_t* uid = taosArrayGet(qa, i); STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; @@ -360,7 +358,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (!exists) { #endif - addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId); + taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); + taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); } if (keyBuf != NULL) { @@ -936,7 +935,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList); + int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); #ifndef NDEBUG qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid, @@ -945,7 +944,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT #endif bool found = false; - for (int32_t i = 0; i < numOfTables; i++) { + for (int32_t i = 0; i < tableSz; i++) { STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); if (pTableInfo->uid == uid) { found = true; @@ -958,17 +957,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ASSERT(found); if (pTableScanInfo->dataReader == NULL) { - STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - int32_t num = getTotalTables(&pTaskInfo->tableqinfoList); - - if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, - &pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) { + if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, + pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || + pTableScanInfo->dataReader == NULL) { ASSERT(0); } } - STableKeyInfo tki = {.uid = uid}; - tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1); + tsdbSetTableId(pTableScanInfo->dataReader, uid); int64_t oldSkey = pTableScanInfo->cond.twindows.skey; pTableScanInfo->cond.twindows.skey = ts + 1; tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -976,7 +972,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTableScanInfo->scanTimes = 0; qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, - ts, pTableScanInfo->currentTable, numOfTables); + ts, pTableScanInfo->currentTable, tableSz); /*}*/ } else { ASSERT(0); @@ -998,15 +994,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; - - STableListInfo* pListInfo = &pTaskInfo->tableqinfoList; - - pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); - - STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0); - - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList), + pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c6a22cd28e..75db42fccc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1739,6 +1739,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t return TSDB_CODE_SUCCESS; } +static void doDestroyTableList(STableListInfo* pTableqinfoList); + typedef struct SFetchRspHandleWrapper { uint32_t exchangeId; int32_t sourceIndex; @@ -3364,116 +3366,62 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } -static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { - STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; - STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; - - if (pInfo1->groupId == pInfo2->groupId) { - return 0; - } else { - return pInfo1->groupId < pInfo2->groupId? -1:1; - } -} - static int32_t sortTableGroup(STableListInfo* pTableListInfo) { - int32_t code = TSDB_CODE_SUCCESS; - - taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); - int32_t size = getTotalTables(pTableListInfo); - - SArray* pList = taosArrayInit(4, sizeof(int32_t)); - - STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); - uint64_t gid = pInfo->groupId; - - int32_t start = 0; - taosArrayPush(pList, &start); - - for(int32_t i = 1; i < size; ++i) { - pInfo = taosArrayGet(pTableListInfo->pTableList, i); - if (pInfo->groupId != gid) { - taosArrayPush(pList, &i); - gid = pInfo->groupId; - } - } - - pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); - pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); - memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); - taosArrayDestroy(pList); - -# if 0 - + taosArrayClear(pTableListInfo->pGroupList); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); - if (sortSupport == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - size_t num = taosArrayGetSize(pTableListInfo->pTableList); - for (int32_t i = 0; i < num; i++) { + if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; + for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t)); int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ); if (index == -1) { void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT); - SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo)); if (tGroup == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + taosArrayDestroy(sortSupport); + return TSDB_CODE_OUT_OF_MEMORY; } - if (taosArrayPush(tGroup, info) == NULL) { qError("taos push info array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + taosArrayDestroy(sortSupport); + return TSDB_CODE_QRY_APP_ERROR; } - if (p == NULL) { if (taosArrayPush(sortSupport, groupId) == NULL) { qError("taos push support array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + taosArrayDestroy(sortSupport); + return TSDB_CODE_QRY_APP_ERROR; } - if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) { qError("taos push group array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + taosArrayDestroy(sortSupport); + return TSDB_CODE_QRY_APP_ERROR; } } else { int32_t pos = TARRAY_ELEM_IDX(sortSupport, p); if (taosArrayInsert(sortSupport, pos, groupId) == NULL) { qError("taos insert support array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + taosArrayDestroy(sortSupport); + return TSDB_CODE_QRY_APP_ERROR; } - if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) { qError("taos insert group array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + taosArrayDestroy(sortSupport); + return TSDB_CODE_QRY_APP_ERROR; } } } else { SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index); if (taosArrayPush(tGroup, info) == NULL) { qError("taos push uid array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + taosArrayDestroy(sortSupport); + return TSDB_CODE_QRY_APP_ERROR; } } } - taosArrayDestroy(sortSupport); -#endif - return TDB_CODE_SUCCESS; - - _error: -// taosArrayDestroy(sortSupport); - return code; } bool groupbyTbname(SNodeList* pGroupList) { @@ -3489,44 +3437,38 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { - int32_t code = TSDB_CODE_SUCCESS; +int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { if (group == NULL) { - return code; + return TDB_CODE_SUCCESS; } pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (pTableListInfo->map == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; + return TSDB_CODE_OUT_OF_MEMORY; } - bool assignUid = groupbyTbname(group); + bool assignUid = groupbyTbname(group); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if (assignUid) { // in case of group/partition by tbname, the group id is equalled to the uid of table + if (assignUid) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); info->groupId = info->uid; taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } - - pTableListInfo->oneTableForEachGroup = true; - if (groupSort) { - pTableListInfo->numOfOuputGroups = numOfTables; - } } else { - code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); + int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - - if (groupSort) { - code = sortTableGroup(pTableListInfo); - } } - return code; + if (pTableListInfo->needSortTableByGroupId) { + return sortTableGroup(pTableListInfo); + } + + return TDB_CODE_SUCCESS; } static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { @@ -3563,12 +3505,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - // NOTE: this is an patch to fix the physical plan - // TODO remove it later - if (pTableScanNode->scan.node.pLimit != NULL) { - pTableScanNode->groupSort = true; - } - int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); @@ -3627,10 +3563,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t sz = taosArrayGetSize(pTableListInfo->pTableList); for (int32_t i = 0; i < sz; i++) { STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); - qDebug("creating stream task: add table uid:%" PRIu64, pKeyInfo->uid); + qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid); } - - qDebug("table in hashmap, %d", (int32_t) getTotalTables(pTableListInfo)); #endif } @@ -3665,20 +3599,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SQueryTableDataCond cond = {0}; - - int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); + int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); if (code != TSDB_CODE_SUCCESS) { return NULL; } - size_t num = getTotalTables(pTableListInfo); - void* pList = NULL; - if (num > 0) { - pList = taosArrayGet(pTableListInfo->pTableList, 0); - } - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, ""); + tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, ""); cleanupQueryTableDataCond(&cond); pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); @@ -3712,8 +3639,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } - size_t size = LIST_LENGTH(pPhyNode->pChildren); - + size_t size = LIST_LENGTH(pPhyNode->pChildren); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); @@ -4049,10 +3975,28 @@ _complete: return code; } +void doDestroyTableList(STableListInfo* pTableqinfoList) { + taosArrayDestroy(pTableqinfoList->pTableList); + taosHashCleanup(pTableqinfoList->map); + if (pTableqinfoList->needSortTableByGroupId) { + for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { + SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); + if (tmp == pTableqinfoList->pTableList) { + continue; + } + taosArrayDestroy(tmp); + } + } + taosArrayDestroy(pTableqinfoList->pGroupList); + + pTableqinfoList->pTableList = NULL; + pTableqinfoList->map = NULL; +} + void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - destroyTableList(&pTaskInfo->tableqinfoList); + doDestroyTableList(&pTaskInfo->tableqinfoList); destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index e7d4a14098..4e4c33d4c3 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -159,7 +159,6 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S // reset the value for a new group data // existing rows that belongs to previous group. - // TODO refactor with doTableScan pLimitInfo->numOfOutputRows = 0; pLimitInfo->remainOffset = pLimitInfo->limit.offset; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cebf2da56b..e2f3b1c6c4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -377,7 +377,9 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo int32_t keep = pBlock->info.rows - overflowRows; blockDataKeepFirstNRows(pBlock, keep); - qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); + qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); + + // setTaskStatus(pTaskInfo, TASK_COMPLETED); pOperator->status = OP_EXEC_DONE; } } @@ -681,7 +683,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo)); + qDebug( + "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks " + "due to query func required", + GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -708,7 +713,8 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo)); + qDebug("%s start to repeat descending order scan data blocks due to query func required", + GET_TASKID(pTaskInfo)); tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); } } @@ -721,7 +727,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - // scan table one by one sequentially + // if scan table by table if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); @@ -738,63 +744,54 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); - tsdbSetTableList(pInfo->dataReader, pTableInfo, 1); + tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str); tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; } - } else { // scan table group by group sequentially - if (pInfo->currentGroupId == -1) { - if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { - doSetOperatorCompleted(pOperator); - return NULL; - } + } - int32_t num = 0; - STableKeyInfo* pList = NULL; - getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num); - ASSERT(pInfo->dataReader == NULL); - - int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader, - GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - } - - SSDataBlock* result = doTableScanGroup(pOperator); - if (result != NULL) { - return result; - } - - if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { - doSetOperatorCompleted(pOperator); + if (pInfo->currentGroupId == -1) { + pInfo->currentGroupId++; + if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { + setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; } - // reset value for the next group data output - pOperator->status = OP_OPENED; - pInfo->limitInfo.numOfOutputRows = 0; - pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; + SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); + tsdbReaderClose(pInfo->dataReader); - int32_t num = 0; - STableKeyInfo* pList = NULL; - getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num); - - tsdbSetTableList(pInfo->dataReader, pList, num); - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); - pInfo->scanTimes = 0; - - result = doTableScanGroup(pOperator); - if (result != NULL) { - return result; + int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, + GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + return NULL; } + } - doSetOperatorCompleted(pOperator); + SSDataBlock* result = doTableScanGroup(pOperator); + if (result) { + return result; + } + + pInfo->currentGroupId++; + if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { + setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; } + + tsdbReaderReset(pInfo->dataReader, &pInfo->cond); + pInfo->scanTimes = 0; + + result = doTableScanGroup(pOperator); + if (result) { + return result; + } + + setTaskStatus(pTaskInfo, TASK_COMPLETED); + return NULL; } static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { @@ -822,7 +819,7 @@ static void destroyTableScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -840,6 +837,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); + code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1079,55 +1077,39 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { pTableScanInfo->cond.twindows = *pWin; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; +} + +static void freeArray(void* array) { taosArrayDestroy(array); } + +static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { + STableScanInfo* pTableScanInfo = pTableScanOp->info; + pTableScanInfo->cond.startVersion = -1; + pTableScanInfo->cond.endVersion = -1; + SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; + SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList; + taosArrayClearP(gpTbls, freeArray); + taosArrayPush(gpTbls, &allTbls); + STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; + resetTableScanInfo(pTableScanOp->info, &win); } static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) { + SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; + taosArrayClear(gpTbls); STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; + SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(tbls, &tblInfo); + taosArrayPush(gpTbls, &tbls); - STableScanInfo* pTableScanInfo = pTableScanOp->info; - SQueryTableDataCond cond = pTableScanInfo->cond; - - cond.startVersion = -1; - cond.endVersion = maxVersion; - cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs}; - - SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo; - - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - blockDataCleanup(pBlock); - - STsdbReader* pReader = NULL; - int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader, - GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } - - bool hasBlock = tsdbNextDataBlock(pReader); - if (hasBlock) { - SDataBlockInfo binfo = {0}; - tsdbRetrieveDataBlockInfo(pReader, &binfo); - - SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); - blockDataEnsureCapacity(pBlock, binfo.rows); - - pBlock->info.window = binfo.window; - pBlock->info.uid = binfo.uid; - pBlock->info.rows = binfo.rows; - - relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); - } - - tsdbReaderClose(pReader); - qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64 - ", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid); - - return pBlock->info.rows > 0 ? pBlock : NULL; + STimeWindow win = {.skey = startTs, .ekey = endTs}; + STableScanInfo* pTableScanInfo = pTableScanOp->info; + pTableScanInfo->cond.startVersion = -1; + pTableScanInfo->cond.endVersion = maxVersion; + resetTableScanInfo(pTableScanOp->info, &win); + SSDataBlock* pRes = doTableScan(pTableScanOp); + resetTableScanOperator(pTableScanOp); + return pRes; } static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -2353,14 +2335,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pTSInfo->cond.endVersion = pHandle->version; } - STableKeyInfo* pList = NULL; - int32_t num = 0; - getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num); - + SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->dataReader = NULL; - if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL) < 0) { + if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; } @@ -2388,7 +2367,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); - code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); + int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; @@ -4174,9 +4153,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { @@ -4234,8 +4210,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags return TSDB_CODE_SUCCESS; } - pTableListInfo->numOfOuputGroups = 1; - code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags, groupSort); + pTableListInfo->needSortTableByGroupId = groupSort; + code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4249,10 +4225,14 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) { for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { - STableKeyInfo* pList = taosArrayGet(pTableListInfo->pTableList, i); - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr); + SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i)); + + STsdbReader* pReader = NULL; + tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr); taosArrayPush(arrayReader, &pReader); + + taosArrayDestroy(subTableList); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index d65e49c4fc..26f1932b12 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -654,7 +654,6 @@ _retry: SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } - pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; pDataBlock->info.groupId = pInfo->groupId; diff --git a/tests/script/tsim/parser/limit1_stb.sim b/tests/script/tsim/parser/limit1_stb.sim index 077e310354..be0963c0fd 100644 --- a/tests/script/tsim/parser/limit1_stb.sim +++ b/tests/script/tsim/parser/limit1_stb.sim @@ -39,7 +39,6 @@ endi $val = $totalNum - 1 sql select * from $stb limit $totalNum offset 1 if $rows != $val then - print expect $val , actual: $rows return -1 endi if $data01 != 1 then @@ -493,9 +492,3 @@ sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), if $rows != 6 then return -1 endi - -sql select * from $stb partition by tbname limit 1 -if $rows != 10 then - return -1 -endi - diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 74ddfa5cf0..6950df9ee1 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -722,7 +722,6 @@ sql select bottom(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu limit 5 offse if $rows != 0 then return -1 endi - sql select bottom(c1, 5) from $stb where ts >= $ts0 and ts <= $tsu limit 3 offset 5 if $rows != 0 then return -1