diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index e1f7160d46..3f370dd1b1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -13467,6 +13467,93 @@ static int32_t buildStreamNotifyOptions(STranslateContext* pCxt, SStreamNotifyOp return code; } +static int32_t buildQueryTableColIdList(SSelectStmt *pSelect, SArray** ppRes) { + STableNode* pTable = (STableNode*)pSelect->pFromTable; + SNodeList* pColList = NULL; + SNode* pCol = NULL; + int32_t code = 0; + PAR_ERR_RET(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pTable->tableAlias, COLLECT_COL_TYPE_COL, &pColList)); + *ppRes = taosArrayInit(pColList->length, sizeof(int16_t)); + if (NULL == *ppRes) { + code = terrno; + parserError("taosArrayInit %d colId failed, errno:0x%x", *ppRes, code); + goto _return; + } + + FOREACH(pCol, pColList) { + if (NULL == taosArrayPush(*ppRes, &((SColumnNode*)pCol)->colId)) { + code = terrno; + parserError("taosArrayPush colId failed, errno:0x%x", *ppRes, code); + goto _return; + } + } + +_return: + + nodesDestroyList(pColList); + if (code) { + taosArrayDestroy(*ppRes); + *ppRes = NULL; + } + + return code; +} + +static int32_t modifyVtableSrcNumBasedOnCols(SVCTableRefCols* pTb, SArray* pColIdList, SSHashObj* pTbHash) { + tSimpleHashClear(pTbHash); + + char tbFName[TSDB_TABLE_FNAME_LEN]; + int32_t colNum = taosArrayGetSize(pColIdList); + for (int32_t i = 0; i < colNum; ++i) { + int16_t *colId = taosArrayGet(pColIdList, i); + for (int32_t m = 0; m < pTb->numOfColRefs; ++m) { + if (*colId == pTb->refCols[m].colId) { + snprintf(tbFName, sizeof(tbFName), "%s.%s", pTb->refCols[m].refDbName, pTb->refCols[m].refTableName); + PAR_ERR_RET(tSimpleHashPut(pTbHash, tbFName, strlen(tbFName) + 1, &colNum, sizeof(colNum))); + } + } + } + + pTb->numOfSrcTbls = tSimpleHashGetSize(pTbHash); + + return TSDB_CODE_SUCCESS; +} + +static int32_t modifyVtableSrcNumBasedOnQuery(SArray* pVSubTables, SNode* pStmt) { + SSelectStmt *pSelect = (SSelectStmt*)pStmt; + SArray* pColIdList = NULL; + SSHashObj* pTbHash = NULL; + int32_t code = 0; + int32_t colNum = 0; + int32_t vgNum = taosArrayGetSize(pVSubTables); + if (vgNum > 0) { + PAR_ERR_JRET(buildQueryTableColIdList(pSelect, &pColIdList)); + colNum = taosArrayGetSize(pColIdList); + pTbHash = tSimpleHashInit(colNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (NULL == pTbHash) { + code = terrno; + parserError("tSimpleHashInit %d failed, errno:0x%x", code); + PAR_ERR_JRET(code); + } + } + + for (int32_t i = 0; i < vgNum; ++i) { + SVSubTablesRsp* pVg = (SVSubTablesRsp*)taosArrayGet(pVSubTables, i); + int32_t vtbNum = taosArrayGetSize(pVg->pTables); + for (int32_t m = 0; m < vtbNum; ++m) { + SVCTableRefCols* pTb = (SVCTableRefCols*)taosArrayGetP(pVg->pTables, m); + PAR_ERR_JRET(modifyVtableSrcNumBasedOnCols(pTb, pColIdList, pTbHash)); + } + } + +_return: + + taosArrayDestroy(pColIdList); + tSimpleHashCleanup(pTbHash); + + return code; +} + static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { pReq->igExists = pStmt->ignoreExists; @@ -13522,8 +13609,11 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* if (TSDB_CODE_SUCCESS == code) { code = buildStreamNotifyOptions(pCxt, pStmt->pNotifyOptions, pReq); } - if (TSDB_CODE_SUCCESS == code && pCxt->pMetaCache != NULL) { - TSWAP(pReq->pVSubTables, pCxt->pMetaCache->pVSubTables); + if (TSDB_CODE_SUCCESS == code && pCxt->pMetaCache != NULL && pCxt->pMetaCache->pVSubTables != NULL) { + code = modifyVtableSrcNumBasedOnQuery(pCxt->pMetaCache->pVSubTables, pStmt->pQuery); + if (TSDB_CODE_SUCCESS == code) { + TSWAP(pReq->pVSubTables, pCxt->pMetaCache->pVSubTables); + } } return code; }