diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e3d26edf30..d3e9840987 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -128,10 +128,12 @@ typedef struct SVnodeModifyLogicNode { SVgDataBlocks* pVgDataBlocks; SNode* pAffectedRows; // SColumnNode uint64_t tableId; + uint64_t stableId; int8_t tableType; // table type char tableFName[TSDB_TABLE_FNAME_LEN]; STimeWindow deleteTimeRange; SVgroupsInfo* pVgroupList; + SNodeList* pInsertCols; } SVnodeModifyLogicNode; typedef struct SExchangeLogicNode { @@ -459,7 +461,9 @@ typedef struct SDataInserterNode { typedef struct SQueryInserterNode { SDataSinkNode sink; + SNodeList* pCols; uint64_t tableId; + uint64_t stableId; int8_t tableType; // table type char tableFName[TSDB_TABLE_FNAME_LEN]; int32_t vgId; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 1a1aca8bdb..a1b0cc5947 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -394,10 +394,12 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi COPY_SCALAR_FIELD(msgType); CLONE_NODE_FIELD(pAffectedRows); COPY_SCALAR_FIELD(tableId); + COPY_SCALAR_FIELD(stableId); COPY_SCALAR_FIELD(tableType); COPY_CHAR_ARRAY_FIELD(tableFName); COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow)); CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); + CLONE_NODE_LIST_FIELD(pInsertCols); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 44bfa39dbd..0bff063ea1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2214,6 +2214,8 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); } +static const char* jkQueryInsertPhysiPlanInsertCols = "InsertCols"; +static const char* jkQueryInsertPhysiPlanStableId = "StableId"; static const char* jkQueryInsertPhysiPlanTableId = "TableId"; static const char* jkQueryInsertPhysiPlanTableType = "TableType"; static const char* jkQueryInsertPhysiPlanTableFName = "TableFName"; @@ -2224,6 +2226,12 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) { const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj; int32_t code = physicDataSinkNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkQueryInsertPhysiPlanInsertCols, pNode->pCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanStableId, pNode->stableId); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId); } @@ -2247,6 +2255,12 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) { SQueryInserterNode* pNode = (SQueryInserterNode*)pObj; int32_t code = jsonToPhysicDataSinkNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkQueryInsertPhysiPlanInsertCols, &pNode->pCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanStableId, &pNode->stableId); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index a1a9532ace..e61c25b980 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1670,5 +1670,10 @@ SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols pStmt->pTable = pTable; pStmt->pCols = pCols; pStmt->pQuery = pQuery; + if (QUERY_NODE_SELECT_STMT == nodeType(pQuery)) { + strcpy(((SSelectStmt*)pQuery)->stmtName, ((STableNode*)pTable)->tableAlias); + } else if (QUERY_NODE_SET_OPERATOR == nodeType(pQuery)) { + strcpy(((SSetOperator*)pQuery)->stmtName, ((STableNode*)pTable)->tableAlias); + } return (SNode*)pStmt; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1f4294279b..4d72772730 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2839,17 +2839,87 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) { return code; } +static int32_t translateInsertCols(STranslateContext* pCxt, SInsertStmt* pInsert) { + if (NULL == pInsert->pCols) { + return createAllColumns(pCxt, false, &pInsert->pCols); + } + return translateExprList(pCxt, pInsert->pCols); +} + +static int32_t translateInsertQuery(STranslateContext* pCxt, SInsertStmt* pInsert) { + int32_t code = resetTranslateNamespace(pCxt); + if (TSDB_CODE_SUCCESS == code) { + code = translateQuery(pCxt, pInsert->pQuery); + } + return code; +} + +static int32_t addOrderByPrimaryKeyToQueryImpl(STranslateContext* pCxt, SNode* pPrimaryKeyExpr, + SNodeList** pOrderByList) { + SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (NULL == pOrderByExpr) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pOrderByExpr->nullOrder = NULL_ORDER_FIRST; + pOrderByExpr->order = ORDER_ASC; + pOrderByExpr->pExpr = nodesCloneNode(pPrimaryKeyExpr); + if (NULL == pOrderByExpr->pExpr) { + nodesDestroyNode((SNode*)pOrderByExpr); + return TSDB_CODE_OUT_OF_MEMORY; + } + ((SExprNode*)pOrderByExpr->pExpr)->orderAlias = true; + NODES_DESTORY_LIST(*pOrderByList); + return nodesListMakeStrictAppend(pOrderByList, (SNode*)pOrderByExpr); +} + +static int32_t addOrderByPrimaryKeyToQuery(STranslateContext* pCxt, SNode* pPrimaryKeyExpr, SNode* pStmt) { + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { + return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSelectStmt*)pStmt)->pOrderByList); + } + return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSetOperator*)pStmt)->pOrderByList); +} + +static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pInsert) { + SNodeList* pProjects = getProjectList(pInsert->pQuery); + if (LIST_LENGTH(pInsert->pCols) != LIST_LENGTH(pProjects)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns"); + } + + SNode* pPrimaryKeyExpr = NULL; + SNode* pBoundCol = NULL; + SNode* pProj = NULL; + FORBOTH(pBoundCol, pInsert->pCols, pProj, pProjects) { + SColumnNode* pCol = (SColumnNode*)pBoundCol; + SExprNode* pExpr = (SExprNode*)pProj; + if (!dataTypeEqual(&pCol->node.resType, &pExpr->resType)) { + SNode* pFunc = NULL; + int32_t code = createCastFunc(pCxt, pProj, pCol->node.resType, &pFunc); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + REPLACE_LIST2_NODE(pFunc); + pExpr = (SExprNode*)pFunc; + } + snprintf(pExpr->aliasName, sizeof(pExpr->aliasName), "%s", pCol->colName); + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { + pPrimaryKeyExpr = pProj; + } + } + + return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery); +} + static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) { pCxt->pCurrStmt = (SNode*)pInsert; int32_t code = translateFrom(pCxt, pInsert->pTable); if (TSDB_CODE_SUCCESS == code) { - code = translateExprList(pCxt, pInsert->pCols); + code = translateInsertCols(pCxt, pInsert); } if (TSDB_CODE_SUCCESS == code) { - code = resetTranslateNamespace(pCxt); + code = translateInsertQuery(pCxt, pInsert); } if (TSDB_CODE_SUCCESS == code) { - code = translateQuery(pCxt, pInsert->pQuery); + code = translateInsertProject(pCxt, pInsert); } return code; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 74d780b8c7..d306bb1697 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1279,10 +1279,16 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser pModify->modifyType = MODIFY_TABLE_TYPE_INSERT; pModify->tableId = pRealTable->pMeta->uid; + pModify->stableId = pRealTable->pMeta->suid; pModify->tableType = pRealTable->pMeta->tableType; snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName); TSWAP(pModify->pVgroupList, pRealTable->pVgroupList); + pModify->pInsertCols = nodesCloneList(pInsert->pCols); + if (NULL == pModify->pInsertCols) { + nodesDestroyNode((SNode*)pModify); + return TSDB_CODE_OUT_OF_MEMORY; + } *pLogicNode = (SLogicNode*)pModify; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 7ec3af31b2..2356eb368e 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1510,18 +1510,21 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod } pInserter->tableId = pModify->tableId; + pInserter->stableId = pModify->stableId; pInserter->tableType = pModify->tableType; strcpy(pInserter->tableFName, pModify->tableFName); pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); - int32_t code = TSDB_CODE_SUCCESS; - - pInserter->sink.pInputDataBlockDesc = - (SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc); - if (NULL == pInserter->sink.pInputDataBlockDesc) { - code = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols, + &pInserter->pCols); + if (TSDB_CODE_SUCCESS == code) { + pInserter->sink.pInputDataBlockDesc = + (SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc); + if (NULL == pInserter->sink.pInputDataBlockDesc) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code) { @@ -1530,7 +1533,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod nodesDestroyNode((SNode*)pInserter); } - return TSDB_CODE_SUCCESS; + return code; } static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) { diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index 7fd38cc5c8..6add1cf630 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -96,4 +96,8 @@ TEST_F(PlanOtherTest, insert) { useDb("root", "test"); run("INSERT INTO t1 SELECT * FROM t1"); + + run("INSERT INTO t1 (ts, c1, c2) SELECT ts, c1, c2 FROM st1"); + + run("INSERT INTO t1 (ts, c1, c2) SELECT ts, c1, c2 FROM st1s1 UNION ALL SELECT ts, c1, c2 FROM st2"); }