fix: modify vtable src table number

This commit is contained in:
dapan1121 2025-03-20 14:36:48 +08:00
parent b0d0500047
commit 2c8bc8b097
1 changed files with 92 additions and 2 deletions

View File

@ -13467,6 +13467,93 @@ static int32_t buildStreamNotifyOptions(STranslateContext* pCxt, SStreamNotifyOp
return code;
}
static int32_t buildQueryTableColIdList(SSelectStmt *pSelect, SArray** ppRes) {
STableNode* pTable = (STableNode*)pSelect->pFromTable;
SNodeList* pColList = NULL;
SNode* pCol = NULL;
int32_t code = 0;
PAR_ERR_RET(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pTable->tableAlias, COLLECT_COL_TYPE_COL, &pColList));
*ppRes = taosArrayInit(pColList->length, sizeof(int16_t));
if (NULL == *ppRes) {
code = terrno;
parserError("taosArrayInit %d colId failed, errno:0x%x", *ppRes, code);
goto _return;
}
FOREACH(pCol, pColList) {
if (NULL == taosArrayPush(*ppRes, &((SColumnNode*)pCol)->colId)) {
code = terrno;
parserError("taosArrayPush colId failed, errno:0x%x", *ppRes, code);
goto _return;
}
}
_return:
nodesDestroyList(pColList);
if (code) {
taosArrayDestroy(*ppRes);
*ppRes = NULL;
}
return code;
}
static int32_t modifyVtableSrcNumBasedOnCols(SVCTableRefCols* pTb, SArray* pColIdList, SSHashObj* pTbHash) {
tSimpleHashClear(pTbHash);
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t colNum = taosArrayGetSize(pColIdList);
for (int32_t i = 0; i < colNum; ++i) {
int16_t *colId = taosArrayGet(pColIdList, i);
for (int32_t m = 0; m < pTb->numOfColRefs; ++m) {
if (*colId == pTb->refCols[m].colId) {
snprintf(tbFName, sizeof(tbFName), "%s.%s", pTb->refCols[m].refDbName, pTb->refCols[m].refTableName);
PAR_ERR_RET(tSimpleHashPut(pTbHash, tbFName, strlen(tbFName) + 1, &colNum, sizeof(colNum)));
}
}
}
pTb->numOfSrcTbls = tSimpleHashGetSize(pTbHash);
return TSDB_CODE_SUCCESS;
}
static int32_t modifyVtableSrcNumBasedOnQuery(SArray* pVSubTables, SNode* pStmt) {
SSelectStmt *pSelect = (SSelectStmt*)pStmt;
SArray* pColIdList = NULL;
SSHashObj* pTbHash = NULL;
int32_t code = 0;
int32_t colNum = 0;
int32_t vgNum = taosArrayGetSize(pVSubTables);
if (vgNum > 0) {
PAR_ERR_JRET(buildQueryTableColIdList(pSelect, &pColIdList));
colNum = taosArrayGetSize(pColIdList);
pTbHash = tSimpleHashInit(colNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == pTbHash) {
code = terrno;
parserError("tSimpleHashInit %d failed, errno:0x%x", code);
PAR_ERR_JRET(code);
}
}
for (int32_t i = 0; i < vgNum; ++i) {
SVSubTablesRsp* pVg = (SVSubTablesRsp*)taosArrayGet(pVSubTables, i);
int32_t vtbNum = taosArrayGetSize(pVg->pTables);
for (int32_t m = 0; m < vtbNum; ++m) {
SVCTableRefCols* pTb = (SVCTableRefCols*)taosArrayGetP(pVg->pTables, m);
PAR_ERR_JRET(modifyVtableSrcNumBasedOnCols(pTb, pColIdList, pTbHash));
}
}
_return:
taosArrayDestroy(pColIdList);
tSimpleHashCleanup(pTbHash);
return code;
}
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
@ -13522,8 +13609,11 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
if (TSDB_CODE_SUCCESS == code) {
code = buildStreamNotifyOptions(pCxt, pStmt->pNotifyOptions, pReq);
}
if (TSDB_CODE_SUCCESS == code && pCxt->pMetaCache != NULL) {
TSWAP(pReq->pVSubTables, pCxt->pMetaCache->pVSubTables);
if (TSDB_CODE_SUCCESS == code && pCxt->pMetaCache != NULL && pCxt->pMetaCache->pVSubTables != NULL) {
code = modifyVtableSrcNumBasedOnQuery(pCxt->pMetaCache->pVSubTables, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
TSWAP(pReq->pVSubTables, pCxt->pMetaCache->pVSubTables);
}
}
return code;
}