feat: sql command 'insert ... select'
This commit is contained in:
parent
2a52beb56b
commit
c1eef9911b
|
@ -128,10 +128,12 @@ typedef struct SVnodeModifyLogicNode {
|
|||
SVgDataBlocks* pVgDataBlocks;
|
||||
SNode* pAffectedRows; // SColumnNode
|
||||
uint64_t tableId;
|
||||
uint64_t stableId;
|
||||
int8_t tableType; // table type
|
||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||
STimeWindow deleteTimeRange;
|
||||
SVgroupsInfo* pVgroupList;
|
||||
SNodeList* pInsertCols;
|
||||
} SVnodeModifyLogicNode;
|
||||
|
||||
typedef struct SExchangeLogicNode {
|
||||
|
@ -459,7 +461,9 @@ typedef struct SDataInserterNode {
|
|||
|
||||
typedef struct SQueryInserterNode {
|
||||
SDataSinkNode sink;
|
||||
SNodeList* pCols;
|
||||
uint64_t tableId;
|
||||
uint64_t stableId;
|
||||
int8_t tableType; // table type
|
||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t vgId;
|
||||
|
|
|
@ -394,10 +394,12 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
|
|||
COPY_SCALAR_FIELD(msgType);
|
||||
CLONE_NODE_FIELD(pAffectedRows);
|
||||
COPY_SCALAR_FIELD(tableId);
|
||||
COPY_SCALAR_FIELD(stableId);
|
||||
COPY_SCALAR_FIELD(tableType);
|
||||
COPY_CHAR_ARRAY_FIELD(tableFName);
|
||||
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
|
||||
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
|
||||
CLONE_NODE_LIST_FIELD(pInsertCols);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2214,6 +2214,8 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return
|
|||
|
||||
static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); }
|
||||
|
||||
static const char* jkQueryInsertPhysiPlanInsertCols = "InsertCols";
|
||||
static const char* jkQueryInsertPhysiPlanStableId = "StableId";
|
||||
static const char* jkQueryInsertPhysiPlanTableId = "TableId";
|
||||
static const char* jkQueryInsertPhysiPlanTableType = "TableType";
|
||||
static const char* jkQueryInsertPhysiPlanTableFName = "TableFName";
|
||||
|
@ -2224,6 +2226,12 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
|
|||
const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj;
|
||||
|
||||
int32_t code = physicDataSinkNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkQueryInsertPhysiPlanInsertCols, pNode->pCols);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanStableId, pNode->stableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId);
|
||||
}
|
||||
|
@ -2247,6 +2255,12 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
|
|||
SQueryInserterNode* pNode = (SQueryInserterNode*)pObj;
|
||||
|
||||
int32_t code = jsonToPhysicDataSinkNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkQueryInsertPhysiPlanInsertCols, &pNode->pCols);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanStableId, &pNode->stableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId);
|
||||
}
|
||||
|
|
|
@ -1670,5 +1670,10 @@ SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols
|
|||
pStmt->pTable = pTable;
|
||||
pStmt->pCols = pCols;
|
||||
pStmt->pQuery = pQuery;
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pQuery)) {
|
||||
strcpy(((SSelectStmt*)pQuery)->stmtName, ((STableNode*)pTable)->tableAlias);
|
||||
} else if (QUERY_NODE_SET_OPERATOR == nodeType(pQuery)) {
|
||||
strcpy(((SSetOperator*)pQuery)->stmtName, ((STableNode*)pTable)->tableAlias);
|
||||
}
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
|
|
@ -2839,17 +2839,87 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateInsertCols(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||
if (NULL == pInsert->pCols) {
|
||||
return createAllColumns(pCxt, false, &pInsert->pCols);
|
||||
}
|
||||
return translateExprList(pCxt, pInsert->pCols);
|
||||
}
|
||||
|
||||
static int32_t translateInsertQuery(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||
int32_t code = resetTranslateNamespace(pCxt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateQuery(pCxt, pInsert->pQuery);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addOrderByPrimaryKeyToQueryImpl(STranslateContext* pCxt, SNode* pPrimaryKeyExpr,
|
||||
SNodeList** pOrderByList) {
|
||||
SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (NULL == pOrderByExpr) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pOrderByExpr->nullOrder = NULL_ORDER_FIRST;
|
||||
pOrderByExpr->order = ORDER_ASC;
|
||||
pOrderByExpr->pExpr = nodesCloneNode(pPrimaryKeyExpr);
|
||||
if (NULL == pOrderByExpr->pExpr) {
|
||||
nodesDestroyNode((SNode*)pOrderByExpr);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
((SExprNode*)pOrderByExpr->pExpr)->orderAlias = true;
|
||||
NODES_DESTORY_LIST(*pOrderByList);
|
||||
return nodesListMakeStrictAppend(pOrderByList, (SNode*)pOrderByExpr);
|
||||
}
|
||||
|
||||
static int32_t addOrderByPrimaryKeyToQuery(STranslateContext* pCxt, SNode* pPrimaryKeyExpr, SNode* pStmt) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||
return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSelectStmt*)pStmt)->pOrderByList);
|
||||
}
|
||||
return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSetOperator*)pStmt)->pOrderByList);
|
||||
}
|
||||
|
||||
static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||
SNodeList* pProjects = getProjectList(pInsert->pQuery);
|
||||
if (LIST_LENGTH(pInsert->pCols) != LIST_LENGTH(pProjects)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
|
||||
}
|
||||
|
||||
SNode* pPrimaryKeyExpr = NULL;
|
||||
SNode* pBoundCol = NULL;
|
||||
SNode* pProj = NULL;
|
||||
FORBOTH(pBoundCol, pInsert->pCols, pProj, pProjects) {
|
||||
SColumnNode* pCol = (SColumnNode*)pBoundCol;
|
||||
SExprNode* pExpr = (SExprNode*)pProj;
|
||||
if (!dataTypeEqual(&pCol->node.resType, &pExpr->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pProj, pCol->node.resType, &pFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
REPLACE_LIST2_NODE(pFunc);
|
||||
pExpr = (SExprNode*)pFunc;
|
||||
}
|
||||
snprintf(pExpr->aliasName, sizeof(pExpr->aliasName), "%s", pCol->colName);
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
|
||||
pPrimaryKeyExpr = pProj;
|
||||
}
|
||||
}
|
||||
|
||||
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
|
||||
}
|
||||
|
||||
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||
pCxt->pCurrStmt = (SNode*)pInsert;
|
||||
int32_t code = translateFrom(pCxt, pInsert->pTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExprList(pCxt, pInsert->pCols);
|
||||
code = translateInsertCols(pCxt, pInsert);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = resetTranslateNamespace(pCxt);
|
||||
code = translateInsertQuery(pCxt, pInsert);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateQuery(pCxt, pInsert->pQuery);
|
||||
code = translateInsertProject(pCxt, pInsert);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1279,10 +1279,16 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser
|
|||
|
||||
pModify->modifyType = MODIFY_TABLE_TYPE_INSERT;
|
||||
pModify->tableId = pRealTable->pMeta->uid;
|
||||
pModify->stableId = pRealTable->pMeta->suid;
|
||||
pModify->tableType = pRealTable->pMeta->tableType;
|
||||
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
|
||||
pRealTable->table.dbName, pRealTable->table.tableName);
|
||||
TSWAP(pModify->pVgroupList, pRealTable->pVgroupList);
|
||||
pModify->pInsertCols = nodesCloneList(pInsert->pCols);
|
||||
if (NULL == pModify->pInsertCols) {
|
||||
nodesDestroyNode((SNode*)pModify);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pLogicNode = (SLogicNode*)pModify;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -1510,18 +1510,21 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
|
|||
}
|
||||
|
||||
pInserter->tableId = pModify->tableId;
|
||||
pInserter->stableId = pModify->stableId;
|
||||
pInserter->tableType = pModify->tableType;
|
||||
strcpy(pInserter->tableFName, pModify->tableFName);
|
||||
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
|
||||
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
|
||||
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pInserter->sink.pInputDataBlockDesc =
|
||||
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
|
||||
if (NULL == pInserter->sink.pInputDataBlockDesc) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
|
||||
&pInserter->pCols);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pInserter->sink.pInputDataBlockDesc =
|
||||
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
|
||||
if (NULL == pInserter->sink.pInputDataBlockDesc) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -1530,7 +1533,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
|
|||
nodesDestroyNode((SNode*)pInserter);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
||||
|
|
|
@ -96,4 +96,8 @@ TEST_F(PlanOtherTest, insert) {
|
|||
useDb("root", "test");
|
||||
|
||||
run("INSERT INTO t1 SELECT * FROM t1");
|
||||
|
||||
run("INSERT INTO t1 (ts, c1, c2) SELECT ts, c1, c2 FROM st1");
|
||||
|
||||
run("INSERT INTO t1 (ts, c1, c2) SELECT ts, c1, c2 FROM st1s1 UNION ALL SELECT ts, c1, c2 FROM st2");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue