diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index baef39144c..64249207f5 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -527,7 +527,10 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } - vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + + if (pScanLogicNode->pVgroupList) { + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + } return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode); } @@ -538,8 +541,9 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } - vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); - + if (pScanLogicNode->pVgroupList) { + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + } pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx; return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); @@ -563,8 +567,9 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu pScan->groupSort = pScanLogicNode->groupSort; pScan->ignoreNull = pScanLogicNode->igLastNull; - vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); - + if (pScanLogicNode->pVgroupList) { + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + } int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) { @@ -609,8 +614,9 @@ static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* } pScan->groupSort = pScanLogicNode->groupSort; - vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); - + if (pScanLogicNode->pVgroupList) { + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + } return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); } @@ -680,7 +686,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) || 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) { - vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + if (pScanLogicNode->pVgroupList) { + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + } } else { pSubplan->execNode.nodeId = MNODE_HANDLE; pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet; @@ -2216,11 +2224,12 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod pInserter->stableId = pModify->stableId; pInserter->tableType = pModify->tableType; strcpy(pInserter->tableName, pModify->tableName); - pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; - pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false); - vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); - + if (pModify->pVgroupList) { + pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; + pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; + vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); + } int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols, &pInserter->pCols); if (TSDB_CODE_SUCCESS == code) { diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 2712caed3c..45c5284ab3 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -423,9 +423,36 @@ class TDTestCase: consumer.close() + def consume_ts_4551(self): + tdSql.execute(f'use d1') + + tdSql.execute(f'create topic topic_stable as stable stt where tbname like "t%"') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_stable"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + finally: + consumer.close() + print("consume_ts_4551 ok") + def run(self): self.consumeTest() self.consume_ts_4544() + self.consume_ts_4551() self.consume_TS_4540_Test() tdSql.prepare()