Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
Hongze Cheng 2024-03-12 09:48:47 +08:00
commit 3d64b10350
2 changed files with 48 additions and 12 deletions

View File

@ -527,7 +527,10 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
if (NULL == pScan) { if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
}
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
} }
@ -538,8 +541,9 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
if (NULL == pScan) { if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
}
pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx; pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
@ -563,8 +567,9 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
pScan->groupSort = pScanLogicNode->groupSort; pScan->groupSort = pScanLogicNode->groupSort;
pScan->ignoreNull = pScanLogicNode->igLastNull; pScan->ignoreNull = pScanLogicNode->igLastNull;
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
}
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) { if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) {
@ -609,8 +614,9 @@ static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
} }
pScan->groupSort = pScanLogicNode->groupSort; pScan->groupSort = pScanLogicNode->groupSort;
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
}
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
} }
@ -680,7 +686,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) || if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) { 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) {
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
}
} else { } else {
pSubplan->execNode.nodeId = MNODE_HANDLE; pSubplan->execNode.nodeId = MNODE_HANDLE;
pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet; pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
@ -2216,11 +2224,12 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
pInserter->stableId = pModify->stableId; pInserter->stableId = pModify->stableId;
pInserter->tableType = pModify->tableType; pInserter->tableType = pModify->tableType;
strcpy(pInserter->tableName, pModify->tableName); strcpy(pInserter->tableName, pModify->tableName);
pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
if (pModify->pVgroupList) {
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
}
int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols, int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
&pInserter->pCols); &pInserter->pCols);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -423,9 +423,36 @@ class TDTestCase:
consumer.close() consumer.close()
def consume_ts_4551(self):
tdSql.execute(f'use d1')
tdSql.execute(f'create topic topic_stable as stable stt where tbname like "t%"')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["topic_stable"])
except TmqError:
tdLog.exit(f"subscribe error")
try:
while True:
res = consumer.poll(1)
if not res:
break
finally:
consumer.close()
print("consume_ts_4551 ok")
def run(self): def run(self):
self.consumeTest() self.consumeTest()
self.consume_ts_4544() self.consume_ts_4544()
self.consume_ts_4551()
self.consume_TS_4540_Test() self.consume_TS_4540_Test()
tdSql.prepare() tdSql.prepare()