diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index daa6b3aff6..b9c4b9e307 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3463,6 +3463,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) { clearBlockScanInfo(p); } + taosHashClear(pReader->status.pTableMap); STableKeyInfo* pList = (STableKeyInfo*) pTableList; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 891af95073..0941f9db15 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -105,10 +105,11 @@ typedef struct STableListInfo { } STableListInfo; void destroyTableList(STableListInfo* pTableList); -int32_t getNumOfGroups(STableListInfo* pTableList); -uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid); -int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); -uint64_t getTotalTables(STableListInfo* pTableList); +int32_t getNumOfGroups(const STableListInfo* pTableList); +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); +int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid); +int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); +uint64_t getTotalTables(const STableListInfo* pTableList); struct SqlFunctionCtx; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 49e4566cc9..bd8e4acc85 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -825,38 +825,86 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) { taosArrayDestroy(validUid); return 0; } + +static int32_t nameComparFn(const void* p1, const void* p2) { + const char* pName1 = *(const char**) p1; + const char* pName2 = *(const char**) p2; + + int32_t ret = strcmp(pName1, pName2); + if (ret == 0) { + return 0; + } else { + return (ret > 0)? 1:-1; + } +} + +static SArray* getTableNameList(const SNodeListNode* pList) { + int32_t len = LIST_LENGTH(pList->pNodeList); + SListCell* cell = pList->pNodeList->pHead; + + SArray* pTbList = taosArrayInit(len, POINTER_BYTES); + for (int i = 0; i < pList->pNodeList->length; i++) { + SValueNode* valueNode = (SValueNode*) cell->pNode; + if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { + terrno = TSDB_CODE_INVALID_PARA; + taosArrayDestroy(pTbList); + return NULL; + } + + char* name = varDataVal(valueNode->datum.p); + taosArrayPush(pTbList, &name); + cell = cell->pNext; + } + + size_t numOfTables = taosArrayGetSize(pTbList); + + // order the name + taosArraySort(pTbList, nameComparFn); + + // remove the duplicates + SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*)); + taosArrayPush(pNewList, taosArrayGet(pTbList, 0)); + + for (int32_t i = 1; i < numOfTables; ++i) { + char** name = taosArrayGetLast(pNewList); + char** nameInOldList = taosArrayGet(pTbList, i); + if (strcmp(*name, *nameInOldList) == 0) { + continue; + } + + taosArrayPush(pNewList, nameInOldList); + } + + taosArrayDestroy(pTbList); + return pNewList; +} + static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) { if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) { return -1; } + SOperatorNode* pNode = (SOperatorNode*)pTagCond; if (pNode->opType != OP_TYPE_IN) { return -1; } + if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) && (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) { SNodeListNode* pList = (SNodeListNode*)pNode->pRight; int32_t len = LIST_LENGTH(pList->pNodeList); - if (len <= 0) return -1; - - SListCell* cell = pList->pNodeList->pHead; - - SArray* pTbList = taosArrayInit(len, sizeof(void*)); - for (int i = 0; i < pList->pNodeList->length; i++) { - SValueNode* valueNode = (SValueNode*)cell->pNode; - if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { - taosArrayDestroy(pTbList); - return -1; - } - char* name = varDataVal(valueNode->datum.p); - taosArrayPush(pTbList, &name); - cell = cell->pNext; + if (len <= 0) { + return -1; } - for (int i = 0; i < taosArrayGetSize(pTbList); i++) { - char* name = taosArrayGetP(pTbList, i); + SArray* pTbList = getTableNameList(pList); + int32_t numOfTables = taosArrayGetSize(pTbList); + + for (int i = 0; i < numOfTables; i++) { + char* name = taosArrayGetP(pTbList, i); + uint64_t uid = 0; if (metaGetTableUidByName(metaHandle, name, &uid) == 0) { ETableType tbType = TSDB_TABLE_MAX; @@ -871,11 +919,14 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* terrno = 0; } } + taosArrayDestroy(pTbList); return 0; } + return -1; } + int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1605,7 +1656,7 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->remainGroupOffset = slimit.offset; } -uint64_t getTotalTables(STableListInfo* pTableList) { +uint64_t getTotalTables(const STableListInfo* pTableList) { if (pTableList->map != NULL) { ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); } @@ -1613,7 +1664,7 @@ uint64_t getTotalTables(STableListInfo* pTableList) { return taosArrayGetSize(pTableList->pTableList); } -uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid) { +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); if (groupId != NULL) { return *groupId; @@ -1622,7 +1673,17 @@ uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid) { } } -int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) { +int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { + STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; + + taosArrayPush(pTableList->pTableList, &keyInfo); + if (pTableList->oneTableForEachGroup || pTableList->numOfGroups > 1) { + taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); + } + return TSDB_CODE_SUCCESS; +} + +int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) { int32_t total = getNumOfGroups(pTableList); if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { return TSDB_CODE_INVALID_PARA; @@ -1651,7 +1712,7 @@ int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalGroupIndex, return TSDB_CODE_SUCCESS; } -int32_t getNumOfGroups(STableListInfo* pTableList) { +int32_t getNumOfGroups(const STableListInfo* pTableList) { return pTableList->numOfGroups; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c51439ee74..5aa9669681 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -293,9 +293,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); } - if (pListInfo->map == NULL) { - pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - } + // traverse to the stream scanner node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; @@ -307,8 +305,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo SStreamScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); + int32_t numOfQualifiedTables = taosArrayGetSize(qa); + + qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables); - qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(qa); @@ -328,7 +328,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { + STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList; + + for (int32_t i = 0; i < numOfQualifiedTables; ++i) { uint64_t* uid = taosArrayGet(qa, i); STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; @@ -358,8 +360,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (!exists) { #endif - taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); - taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); + addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId); } if (keyBuf != NULL) { @@ -935,7 +936,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList); #ifndef NDEBUG qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid, @@ -944,7 +945,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT #endif bool found = false; - for (int32_t i = 0; i < tableSz; i++) { + for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); if (pTableInfo->uid == uid) { found = true; @@ -957,12 +958,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ASSERT(found); if (pTableScanInfo->dataReader == NULL) { - STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - int32_t num = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + int32_t num = getTotalTables(&pTaskInfo->tableqinfoList); + if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, - &pTableScanInfo->dataReader, NULL) < 0 || - pTableScanInfo->dataReader == NULL) { + &pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) { ASSERT(0); } } @@ -976,7 +976,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTableScanInfo->scanTimes = 0; qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, - ts, pTableScanInfo->currentTable, tableSz); + ts, pTableScanInfo->currentTable, numOfTables); /*}*/ } else { ASSERT(0); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 42767aa74a..fb554520e5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3521,6 +3521,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, } } + qDebug("-------------------, %d", (int) taosHashGetSize(pTableListInfo->map)); return TDB_CODE_SUCCESS; } @@ -3622,8 +3623,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t sz = taosArrayGetSize(pTableListInfo->pTableList); for (int32_t i = 0; i < sz; i++) { STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); - qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid); + qDebug("creating stream task: add table uid:%" PRIu64, pKeyInfo->uid); } + + qDebug("table in hashmap, %d", (int32_t) getTotalTables(pTableListInfo)); #endif } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 23b7a9e1f8..187eae3cf1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1078,22 +1078,10 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { pTableScanInfo->cond.twindows = *pWin; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; + tsdbReaderClose(pTableScanInfo->dataReader); + pTableScanInfo->dataReader = NULL; } -//static void freeArray(void* array) { taosArrayDestroy(array); } -// -//static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { -// STableScanInfo* pTableScanInfo = pTableScanOp->info; -// pTableScanInfo->cond.startVersion = -1; -// pTableScanInfo->cond.endVersion = -1; -//// SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; -//// SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList; -//// taosArrayClearP(gpTbls, freeArray); -//// taosArrayPush(gpTbls, &allTbls); -// STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; -// resetTableScanInfo(pTableScanOp->info, &win); -//} - static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) { STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; @@ -4104,6 +4092,9 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) {