Merge pull request #14572 from taosdata/feature/3.0_debug_wxy
feat: sql command 'insert ... select'
This commit is contained in:
commit
fd0202ed99
|
@ -204,65 +204,65 @@
|
|||
#define TK_SPLIT 186
|
||||
#define TK_SYNCDB 187
|
||||
#define TK_DELETE 188
|
||||
#define TK_NULL 189
|
||||
#define TK_NK_QUESTION 190
|
||||
#define TK_NK_ARROW 191
|
||||
#define TK_ROWTS 192
|
||||
#define TK_TBNAME 193
|
||||
#define TK_QSTARTTS 194
|
||||
#define TK_QENDTS 195
|
||||
#define TK_WSTARTTS 196
|
||||
#define TK_WENDTS 197
|
||||
#define TK_WDURATION 198
|
||||
#define TK_CAST 199
|
||||
#define TK_NOW 200
|
||||
#define TK_TODAY 201
|
||||
#define TK_TIMEZONE 202
|
||||
#define TK_CLIENT_VERSION 203
|
||||
#define TK_SERVER_VERSION 204
|
||||
#define TK_SERVER_STATUS 205
|
||||
#define TK_CURRENT_USER 206
|
||||
#define TK_COUNT 207
|
||||
#define TK_LAST_ROW 208
|
||||
#define TK_BETWEEN 209
|
||||
#define TK_IS 210
|
||||
#define TK_NK_LT 211
|
||||
#define TK_NK_GT 212
|
||||
#define TK_NK_LE 213
|
||||
#define TK_NK_GE 214
|
||||
#define TK_NK_NE 215
|
||||
#define TK_MATCH 216
|
||||
#define TK_NMATCH 217
|
||||
#define TK_CONTAINS 218
|
||||
#define TK_JOIN 219
|
||||
#define TK_INNER 220
|
||||
#define TK_SELECT 221
|
||||
#define TK_DISTINCT 222
|
||||
#define TK_WHERE 223
|
||||
#define TK_PARTITION 224
|
||||
#define TK_BY 225
|
||||
#define TK_SESSION 226
|
||||
#define TK_STATE_WINDOW 227
|
||||
#define TK_SLIDING 228
|
||||
#define TK_FILL 229
|
||||
#define TK_VALUE 230
|
||||
#define TK_NONE 231
|
||||
#define TK_PREV 232
|
||||
#define TK_LINEAR 233
|
||||
#define TK_NEXT 234
|
||||
#define TK_HAVING 235
|
||||
#define TK_RANGE 236
|
||||
#define TK_EVERY 237
|
||||
#define TK_ORDER 238
|
||||
#define TK_SLIMIT 239
|
||||
#define TK_SOFFSET 240
|
||||
#define TK_LIMIT 241
|
||||
#define TK_OFFSET 242
|
||||
#define TK_ASC 243
|
||||
#define TK_NULLS 244
|
||||
#define TK_ID 245
|
||||
#define TK_NK_BITNOT 246
|
||||
#define TK_INSERT 247
|
||||
#define TK_INSERT 189
|
||||
#define TK_NULL 190
|
||||
#define TK_NK_QUESTION 191
|
||||
#define TK_NK_ARROW 192
|
||||
#define TK_ROWTS 193
|
||||
#define TK_TBNAME 194
|
||||
#define TK_QSTARTTS 195
|
||||
#define TK_QENDTS 196
|
||||
#define TK_WSTARTTS 197
|
||||
#define TK_WENDTS 198
|
||||
#define TK_WDURATION 199
|
||||
#define TK_CAST 200
|
||||
#define TK_NOW 201
|
||||
#define TK_TODAY 202
|
||||
#define TK_TIMEZONE 203
|
||||
#define TK_CLIENT_VERSION 204
|
||||
#define TK_SERVER_VERSION 205
|
||||
#define TK_SERVER_STATUS 206
|
||||
#define TK_CURRENT_USER 207
|
||||
#define TK_COUNT 208
|
||||
#define TK_LAST_ROW 209
|
||||
#define TK_BETWEEN 210
|
||||
#define TK_IS 211
|
||||
#define TK_NK_LT 212
|
||||
#define TK_NK_GT 213
|
||||
#define TK_NK_LE 214
|
||||
#define TK_NK_GE 215
|
||||
#define TK_NK_NE 216
|
||||
#define TK_MATCH 217
|
||||
#define TK_NMATCH 218
|
||||
#define TK_CONTAINS 219
|
||||
#define TK_JOIN 220
|
||||
#define TK_INNER 221
|
||||
#define TK_SELECT 222
|
||||
#define TK_DISTINCT 223
|
||||
#define TK_WHERE 224
|
||||
#define TK_PARTITION 225
|
||||
#define TK_BY 226
|
||||
#define TK_SESSION 227
|
||||
#define TK_STATE_WINDOW 228
|
||||
#define TK_SLIDING 229
|
||||
#define TK_FILL 230
|
||||
#define TK_VALUE 231
|
||||
#define TK_NONE 232
|
||||
#define TK_PREV 233
|
||||
#define TK_LINEAR 234
|
||||
#define TK_NEXT 235
|
||||
#define TK_HAVING 236
|
||||
#define TK_RANGE 237
|
||||
#define TK_EVERY 238
|
||||
#define TK_ORDER 239
|
||||
#define TK_SLIMIT 240
|
||||
#define TK_SOFFSET 241
|
||||
#define TK_LIMIT 242
|
||||
#define TK_OFFSET 243
|
||||
#define TK_ASC 244
|
||||
#define TK_NULLS 245
|
||||
#define TK_ID 246
|
||||
#define TK_NK_BITNOT 247
|
||||
#define TK_VALUES 248
|
||||
#define TK_IMPORT 249
|
||||
#define TK_NK_SEMI 250
|
||||
|
|
|
@ -194,6 +194,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_KILL_QUERY_STMT,
|
||||
QUERY_NODE_KILL_TRANSACTION_STMT,
|
||||
QUERY_NODE_DELETE_STMT,
|
||||
QUERY_NODE_INSERT_STMT,
|
||||
QUERY_NODE_QUERY,
|
||||
|
||||
// logic plan node
|
||||
|
@ -247,6 +248,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC,
|
||||
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
|
||||
QUERY_NODE_PHYSICAL_PLAN_INSERT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_DELETE,
|
||||
QUERY_NODE_PHYSICAL_SUBPLAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN
|
||||
|
|
|
@ -131,6 +131,7 @@ typedef struct SVnodeModifyLogicNode {
|
|||
int8_t tableType; // table type
|
||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||
STimeWindow deleteTimeRange;
|
||||
SVgroupsInfo* pVgroupList;
|
||||
} SVnodeModifyLogicNode;
|
||||
|
||||
typedef struct SExchangeLogicNode {
|
||||
|
@ -456,6 +457,15 @@ typedef struct SDataInserterNode {
|
|||
char* pData;
|
||||
} SDataInserterNode;
|
||||
|
||||
typedef struct SQueryInserterNode {
|
||||
SDataSinkNode sink;
|
||||
uint64_t tableId;
|
||||
int8_t tableType; // table type
|
||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t vgId;
|
||||
SEpSet epSet;
|
||||
} SQueryInserterNode;
|
||||
|
||||
typedef struct SDataDeleterNode {
|
||||
SDataSinkNode sink;
|
||||
uint64_t tableId;
|
||||
|
|
|
@ -302,6 +302,14 @@ typedef struct SDeleteStmt {
|
|||
bool deleteZeroRows;
|
||||
} SDeleteStmt;
|
||||
|
||||
typedef struct SInsertStmt {
|
||||
ENodeType type; // QUERY_NODE_INSERT_STMT
|
||||
SNode* pTable;
|
||||
SNodeList* pCols;
|
||||
SNode* pQuery;
|
||||
uint8_t precision;
|
||||
} SInsertStmt;
|
||||
|
||||
typedef enum {
|
||||
PAYLOAD_TYPE_KV = 0,
|
||||
PAYLOAD_TYPE_RAW = 1,
|
||||
|
|
|
@ -56,7 +56,7 @@ typedef struct SParseContext {
|
|||
} SParseContext;
|
||||
|
||||
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
|
||||
bool qIsInsertSql(const char* pStr, size_t length);
|
||||
bool qIsInsertValuesSql(const char* pStr, size_t length);
|
||||
|
||||
// for async mode
|
||||
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
|
||||
|
|
|
@ -324,9 +324,9 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
|||
}
|
||||
|
||||
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) {
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||
.requestId = pStmt->exec.pRequest->requestId,
|
||||
.requestObjRefId = pStmt->exec.pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
||||
|
@ -391,13 +391,12 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
STMT_RET(stmtCleanBindInfo(pStmt));
|
||||
}
|
||||
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||
.requestId = pStmt->exec.pRequest->requestId,
|
||||
.requestObjRefId = pStmt->exec.pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
||||
int32_t code =
|
||||
catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
|
||||
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
|
||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
|
||||
|
||||
|
@ -849,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
|
|||
if (pStmt->sql.type) {
|
||||
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
|
||||
} else {
|
||||
*insert = qIsInsertSql(pStmt->sql.sqlStr, 0);
|
||||
*insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -861,7 +860,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
|
|||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
|
||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
|
||||
|
||||
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
|
||||
|
@ -893,7 +892,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
|
|||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
|
||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
|
||||
|
||||
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
|
||||
|
@ -919,7 +918,6 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
|
@ -952,13 +950,13 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
|
||||
int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
|
||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
|
||||
|
||||
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
|
||||
|
@ -979,8 +977,8 @@ int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
|
|||
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||
}
|
||||
|
||||
int32_t nums = 0;
|
||||
TAOS_FIELD_E *pField = NULL;
|
||||
int32_t nums = 0;
|
||||
TAOS_FIELD_E* pField = NULL;
|
||||
STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
|
||||
if (idx >= nums) {
|
||||
tscError("idx %d is too big", idx);
|
||||
|
|
|
@ -165,7 +165,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
|
|||
memcpy(pDst->datum.p, pSrc->datum.p, len);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_JSON:{
|
||||
case TSDB_DATA_TYPE_JSON: {
|
||||
int32_t len = getJsonValueLen(pSrc->datum.p);
|
||||
pDst->datum.p = taosMemoryCalloc(1, len);
|
||||
if (NULL == pDst->datum.p) {
|
||||
|
@ -397,6 +397,7 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
|
|||
COPY_SCALAR_FIELD(tableType);
|
||||
COPY_CHAR_ARRAY_FIELD(tableFName);
|
||||
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
|
||||
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
#include "query.h"
|
||||
#include "querynodes.h"
|
||||
#include "taoserror.h"
|
||||
#include "tjson.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tjson.h"
|
||||
|
||||
static int32_t nodeToJson(const void* pObj, SJson* pJson);
|
||||
static int32_t jsonToNode(const SJson* pJson, void* pObj);
|
||||
|
@ -179,6 +179,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "ShowVnodeStmt";
|
||||
case QUERY_NODE_DELETE_STMT:
|
||||
return "DeleteStmt";
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
return "InsertStmt";
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return "LogicScan";
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
|
@ -271,6 +273,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "PhysiDispatch";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
|
||||
return "PhysiInsert";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
|
||||
return "PhysiQueryInsert";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
|
||||
return "PhysiDelete";
|
||||
case QUERY_NODE_PHYSICAL_SUBPLAN:
|
||||
|
@ -2210,6 +2214,58 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return
|
|||
|
||||
static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); }
|
||||
|
||||
static const char* jkQueryInsertPhysiPlanTableId = "TableId";
|
||||
static const char* jkQueryInsertPhysiPlanTableType = "TableType";
|
||||
static const char* jkQueryInsertPhysiPlanTableFName = "TableFName";
|
||||
static const char* jkQueryInsertPhysiPlanVgId = "VgId";
|
||||
static const char* jkQueryInsertPhysiPlanEpSet = "EpSet";
|
||||
|
||||
static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj;
|
||||
|
||||
int32_t code = physicDataSinkNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkQueryInsertPhysiPlanEpSet, epSetToJson, &pNode->epSet);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
|
||||
SQueryInserterNode* pNode = (SQueryInserterNode*)pObj;
|
||||
|
||||
int32_t code = jsonToPhysicDataSinkNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToObject(pJson, jkQueryInsertPhysiPlanEpSet, jsonToEpSet, &pNode->epSet);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkDeletePhysiPlanTableId = "TableId";
|
||||
static const char* jkDeletePhysiPlanTableType = "TableType";
|
||||
static const char* jkDeletePhysiPlanTableFName = "TableFName";
|
||||
|
@ -2641,9 +2697,9 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
|
|||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:{
|
||||
case TSDB_DATA_TYPE_JSON: {
|
||||
int32_t len = getJsonValueLen(pNode->datum.p);
|
||||
char* buf = taosMemoryCalloc( len * 2 + 1, sizeof(char));
|
||||
char* buf = taosMemoryCalloc(len * 2 + 1, sizeof(char));
|
||||
code = taosHexEncode(pNode->datum.p, buf, len);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(buf);
|
||||
|
@ -2775,7 +2831,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_JSON:{
|
||||
case TSDB_DATA_TYPE_JSON: {
|
||||
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes);
|
||||
if (NULL == pNode->datum.p) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -4232,6 +4288,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return physiDispatchNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
|
||||
return physiQueryInsertNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
|
||||
return physiDeleteNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_SUBPLAN:
|
||||
|
@ -4374,6 +4432,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToPhysiInterpFuncNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
return jsonToPhysiDispatchNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
|
||||
return jsonToPhysiQueryInsertNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
|
||||
return jsonToPhysiDeleteNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_SUBPLAN:
|
||||
|
|
|
@ -229,6 +229,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SKillStmt));
|
||||
case QUERY_NODE_DELETE_STMT:
|
||||
return makeNode(type, sizeof(SDeleteStmt));
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
return makeNode(type, sizeof(SInsertStmt));
|
||||
case QUERY_NODE_QUERY:
|
||||
return makeNode(type, sizeof(SQuery));
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
|
@ -325,6 +327,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SDataDispatcherNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
|
||||
return makeNode(type, sizeof(SDataInserterNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
|
||||
return makeNode(type, sizeof(SQueryInserterNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
|
||||
return makeNode(type, sizeof(SDataDeleterNode));
|
||||
case QUERY_NODE_PHYSICAL_SUBPLAN:
|
||||
|
@ -690,6 +694,13 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyNode(pStmt->pTagCond);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_INSERT_STMT: {
|
||||
SInsertStmt* pStmt = (SInsertStmt*)pNode;
|
||||
nodesDestroyNode(pStmt->pTable);
|
||||
nodesDestroyList(pStmt->pCols);
|
||||
nodesDestroyNode(pStmt->pQuery);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_QUERY: {
|
||||
SQuery* pQuery = (SQuery*)pNode;
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
|
@ -925,6 +936,11 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
taosMemoryFreeClear(pSink->pData);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
|
||||
SQueryInserterNode* pSink = (SQueryInserterNode*)pNode;
|
||||
destroyDataSinkNode((SDataSinkNode*)pSink);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
|
||||
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
|
||||
destroyDataSinkNode((SDataSinkNode*)pSink);
|
||||
|
@ -1524,7 +1540,6 @@ int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, EColle
|
|||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
typedef struct SCollectFuncsCxt {
|
||||
|
|
|
@ -210,6 +210,7 @@ SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName);
|
|||
SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
|
||||
SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
|
||||
SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere);
|
||||
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -259,7 +259,7 @@ multi_create_clause(A) ::= multi_create_clause(B) create_subtable_clause(C).
|
|||
|
||||
create_subtable_clause(A) ::=
|
||||
not_exists_opt(B) full_table_name(C) USING full_table_name(D)
|
||||
specific_tags_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
|
||||
specific_cols_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
|
||||
|
||||
%type multi_drop_clause { SNodeList* }
|
||||
%destructor multi_drop_clause { nodesDestroyList($$); }
|
||||
|
@ -268,10 +268,10 @@ multi_drop_clause(A) ::= multi_drop_clause(B) drop_table_clause(C).
|
|||
|
||||
drop_table_clause(A) ::= exists_opt(B) full_table_name(C). { A = createDropTableClause(pCxt, B, C); }
|
||||
|
||||
%type specific_tags_opt { SNodeList* }
|
||||
%destructor specific_tags_opt { nodesDestroyList($$); }
|
||||
specific_tags_opt(A) ::= . { A = NULL; }
|
||||
specific_tags_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
|
||||
%type specific_cols_opt { SNodeList* }
|
||||
%destructor specific_cols_opt { nodesDestroyList($$); }
|
||||
specific_cols_opt(A) ::= . { A = NULL; }
|
||||
specific_cols_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
|
||||
|
||||
full_table_name(A) ::= table_name(B). { A = createRealTableNode(pCxt, NULL, &B, NULL); }
|
||||
full_table_name(A) ::= db_name(B) NK_DOT table_name(C). { A = createRealTableNode(pCxt, &B, &C, NULL); }
|
||||
|
@ -515,6 +515,9 @@ cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B).
|
|||
/************************************************ select **************************************************************/
|
||||
cmd ::= query_expression(A). { pCxt->pRootNode = A; }
|
||||
|
||||
/************************************************ insert **************************************************************/
|
||||
cmd ::= INSERT INTO full_table_name(A) specific_cols_opt(B) query_expression(C). { pCxt->pRootNode = createInsertStmt(pCxt, A, B, C); }
|
||||
|
||||
/************************************************ literal *************************************************************/
|
||||
literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B)); }
|
||||
literal(A) ::= NK_FLOAT(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B)); }
|
||||
|
@ -973,4 +976,4 @@ null_ordering_opt(A) ::= .
|
|||
null_ordering_opt(A) ::= NULLS FIRST. { A = NULL_ORDER_FIRST; }
|
||||
null_ordering_opt(A) ::= NULLS LAST. { A = NULL_ORDER_LAST; }
|
||||
|
||||
%fallback ID NK_BITNOT INSERT VALUES IMPORT NK_SEMI FILE.
|
||||
%fallback ID NK_BITNOT VALUES IMPORT NK_SEMI FILE.
|
||||
|
|
|
@ -1662,3 +1662,13 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
|
|||
}
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SInsertStmt* pStmt = (SInsertStmt*)nodesMakeNode(QUERY_NODE_INSERT_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->pTable = pTable;
|
||||
pStmt->pCols = pCols;
|
||||
pStmt->pQuery = pQuery;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
|
|
@ -447,6 +447,14 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
|
|||
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromInsert(SCollectMetaKeyCxt* pCxt, SInsertStmt* pStmt) {
|
||||
int32_t code = collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pTable, AUTH_TYPE_WRITE);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) {
|
||||
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
|
||||
strcpy(name.dbname, pStmt->dbName);
|
||||
|
@ -554,6 +562,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
|||
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
|
||||
case QUERY_NODE_DELETE_STMT:
|
||||
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
return collectMetaKeyFromInsert(pCxt, (SInsertStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
|
||||
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
|
||||
|
|
|
@ -39,7 +39,7 @@ static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, AUTH_TYPE type) {
|
|||
if (NULL != pCxt->pMetaCache) {
|
||||
code = getUserAuthFromCache(pCxt->pMetaCache, pParseCxt->pUser, dbFname, type, &pass);
|
||||
} else {
|
||||
SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter,
|
||||
SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter,
|
||||
.requestId = pParseCxt->requestId,
|
||||
.requestObjRefId = pParseCxt->requestRid,
|
||||
.mgmtEps = pParseCxt->mgmtEpSet};
|
||||
|
@ -88,6 +88,14 @@ static int32_t authDelete(SAuthCxt* pCxt, SDeleteStmt* pDelete) {
|
|||
return checkAuth(pCxt, ((SRealTableNode*)pDelete->pFromTable)->table.dbName, AUTH_TYPE_WRITE);
|
||||
}
|
||||
|
||||
static int32_t authInsert(SAuthCxt* pCxt, SInsertStmt* pInsert) {
|
||||
int32_t code = checkAuth(pCxt, ((SRealTableNode*)pInsert->pTable)->table.dbName, AUTH_TYPE_WRITE);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = authQuery(pCxt, pInsert->pQuery);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
|
||||
switch (nodeType(pStmt)) {
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
|
@ -98,6 +106,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
|
|||
return authDropUser(pCxt, (SDropUserStmt*)pStmt);
|
||||
case QUERY_NODE_DELETE_STMT:
|
||||
return authDelete(pCxt, (SDeleteStmt*)pStmt);
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
return authInsert(pCxt, (SInsertStmt*)pStmt);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -300,6 +300,14 @@ static int32_t calcConstDelete(SCalcConstContext* pCxt, SDeleteStmt* pDelete) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t calcConstInsert(SCalcConstContext* pCxt, SInsertStmt* pInsert) {
|
||||
int32_t code = calcConstFromTable(pCxt, pInsert->pTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = calcConstQuery(pCxt, pInsert->pQuery, false);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subquery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pStmt)) {
|
||||
|
@ -320,6 +328,9 @@ static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subque
|
|||
case QUERY_NODE_DELETE_STMT:
|
||||
code = calcConstDelete(pCxt, (SDeleteStmt*)pStmt);
|
||||
break;
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
code = calcConstInsert(pCxt, (SInsertStmt*)pStmt);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -110,7 +110,7 @@ typedef struct SMemParam {
|
|||
static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
|
||||
SToken sToken;
|
||||
NEXT_TOKEN(*pSql, sToken);
|
||||
if (TK_INSERT != sToken.type) {
|
||||
if (TK_INSERT != sToken.type && TK_IMPORT != sToken.type) {
|
||||
return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
|
||||
}
|
||||
NEXT_TOKEN(*pSql, sToken);
|
||||
|
|
|
@ -2839,6 +2839,21 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||
pCxt->pCurrStmt = (SNode*)pInsert;
|
||||
int32_t code = translateFrom(pCxt, pInsert->pTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExprList(pCxt, pInsert->pCols);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = resetTranslateNamespace(pCxt);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateQuery(pCxt, pInsert->pQuery);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int64_t getUnitPerMinute(uint8_t precision) {
|
||||
switch (precision) {
|
||||
case TSDB_TIME_PRECISION_MILLI:
|
||||
|
@ -4608,6 +4623,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_DELETE_STMT:
|
||||
code = translateDelete(pCxt, (SDeleteStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
code = translateInsert(pCxt, (SInsertStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_DATABASE_STMT:
|
||||
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
|
||||
break;
|
||||
|
@ -6224,6 +6242,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->msgType = TDMT_VND_DELETE;
|
||||
break;
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||
break;
|
||||
case QUERY_NODE_VNODE_MODIF_STMT:
|
||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
|
||||
|
|
|
@ -19,19 +19,28 @@
|
|||
#include "parInt.h"
|
||||
#include "parToken.h"
|
||||
|
||||
bool qIsInsertSql(const char* pStr, size_t length) {
|
||||
bool qIsInsertValuesSql(const char* pStr, size_t length) {
|
||||
if (NULL == pStr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const char* pSql = pStr;
|
||||
|
||||
int32_t index = 0;
|
||||
SToken t = tStrGetToken((char*)pStr, &index, false);
|
||||
if (TK_INSERT != t.type && TK_IMPORT != t.type) {
|
||||
return false;
|
||||
}
|
||||
|
||||
do {
|
||||
SToken t0 = tStrGetToken((char*)pStr, &index, false);
|
||||
if (t0.type != TK_NK_LP) {
|
||||
return t0.type == TK_INSERT || t0.type == TK_IMPORT;
|
||||
pStr += index;
|
||||
index = 0;
|
||||
t = tStrGetToken((char*)pStr, &index, false);
|
||||
if (TK_USING == t.type || TK_VALUES == t.type) {
|
||||
return true;
|
||||
}
|
||||
} while (1);
|
||||
} while (pStr - pSql < length);
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
|
||||
|
@ -148,7 +157,7 @@ static void rewriteExprAlias(SNode* pRoot) {
|
|||
|
||||
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||
code = parseInsertSql(pCxt, pQuery, NULL);
|
||||
} else {
|
||||
code = parseSqlIntoAst(pCxt, pQuery);
|
||||
|
@ -160,7 +169,7 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
|
|||
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
|
||||
SParseMetaCache metaCache = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||
code = parseInsertSyntax(pCxt, pQuery, &metaCache);
|
||||
} else {
|
||||
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -40,6 +40,12 @@ TEST_F(ParserExplainToSyncdbTest, grant) {
|
|||
run("GRANT READ, WRITE ON test.* TO wxy");
|
||||
}
|
||||
|
||||
TEST_F(ParserExplainToSyncdbTest, insert) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("INSERT INTO t1 SELECT * FROM t1");
|
||||
}
|
||||
|
||||
// todo kill connection
|
||||
// todo kill query
|
||||
// todo kill stream
|
||||
|
|
|
@ -26,6 +26,7 @@ typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**);
|
|||
typedef int32_t (*FCreateSelectLogicNode)(SLogicPlanContext*, SSelectStmt*, SLogicNode**);
|
||||
typedef int32_t (*FCreateSetOpLogicNode)(SLogicPlanContext*, SSetOperator*, SLogicNode**);
|
||||
typedef int32_t (*FCreateDeleteLogicNode)(SLogicPlanContext*, SDeleteStmt*, SLogicNode**);
|
||||
typedef int32_t (*FCreateInsertLogicNode)(SLogicPlanContext*, SInsertStmt*, SLogicNode**);
|
||||
|
||||
static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable,
|
||||
SLogicNode** pLogicNode);
|
||||
|
@ -1262,6 +1263,47 @@ static int32_t createDeleteLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pDele
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t creatInsertRootLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, FCreateInsertLogicNode func,
|
||||
SLogicNode** pRoot) {
|
||||
return createRootLogicNode(pCxt, pInsert, pInsert->precision, (FCreateLogicNode)func, pRoot);
|
||||
}
|
||||
|
||||
static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInsertStmt* pInsert,
|
||||
SLogicNode** pLogicNode) {
|
||||
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY);
|
||||
if (NULL == pModify) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)pInsert->pTable;
|
||||
|
||||
pModify->modifyType = MODIFY_TABLE_TYPE_INSERT;
|
||||
pModify->tableId = pRealTable->pMeta->uid;
|
||||
pModify->tableType = pRealTable->pMeta->tableType;
|
||||
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
|
||||
pRealTable->table.dbName, pRealTable->table.tableName);
|
||||
TSWAP(pModify->pVgroupList, pRealTable->pVgroupList);
|
||||
|
||||
*pLogicNode = (SLogicNode*)pModify;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createInsertLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, SLogicNode** pLogicNode) {
|
||||
SLogicNode* pRoot = NULL;
|
||||
int32_t code = createQueryLogicNode(pCxt, pInsert->pQuery, &pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = creatInsertRootLogicNode(pCxt, pInsert, createVnodeModifLogicNodeByInsert, &pRoot);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = pRoot;
|
||||
} else {
|
||||
nodesDestroyNode((SNode*)pRoot);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode) {
|
||||
switch (nodeType(pStmt)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
@ -1274,6 +1316,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
|
|||
return createSetOperatorLogicNode(pCxt, (SSetOperator*)pStmt, pLogicNode);
|
||||
case QUERY_NODE_DELETE_STMT:
|
||||
return createDeleteLogicNode(pCxt, (SDeleteStmt*)pStmt, pLogicNode);
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
return createInsertLogicNode(pCxt, (SInsertStmt*)pStmt, pLogicNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -485,7 +485,7 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject
|
|||
return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond);
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode * pJoin, SNode** pCond) {
|
||||
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
|
||||
return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond);
|
||||
}
|
||||
|
||||
|
@ -557,9 +557,9 @@ static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogic
|
|||
static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNodeList* pOnConds = NULL;
|
||||
SNode* pCond = NULL;
|
||||
SNode* pCond = NULL;
|
||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) {
|
||||
*ppMergeCond = nodesCloneNode(pCond);
|
||||
|
@ -604,8 +604,8 @@ static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMe
|
|||
|
||||
static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
int32_t code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin);
|
||||
SNode* pJoinMergeCond = NULL;
|
||||
SNode* pJoinOnCond = NULL;
|
||||
SNode* pJoinMergeCond = NULL;
|
||||
SNode* pJoinOnCond = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond);
|
||||
}
|
||||
|
@ -820,12 +820,12 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
|
|||
|
||||
typedef struct SRewriteProjCondContext {
|
||||
SProjectLogicNode* pProj;
|
||||
int32_t errCode;
|
||||
}SRewriteProjCondContext;
|
||||
int32_t errCode;
|
||||
} SRewriteProjCondContext;
|
||||
|
||||
static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) {
|
||||
SRewriteProjCondContext* pCxt = pContext;
|
||||
SProjectLogicNode* pProj = pCxt->pProj;
|
||||
SProjectLogicNode* pProj = pCxt->pProj;
|
||||
if (QUERY_NODE_COLUMN == nodeType(*ppNode)) {
|
||||
SNode* pTarget = NULL;
|
||||
FOREACH(pTarget, pProj->node.pTargets) {
|
||||
|
@ -840,18 +840,19 @@ static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext
|
|||
}
|
||||
nodesDestroyNode(*ppNode);
|
||||
*ppNode = pExpr;
|
||||
} // end if expr alias name equal column name
|
||||
} // end for each project
|
||||
} // end if target node equals cond column node
|
||||
} // end for each targets
|
||||
} // end if expr alias name equal column name
|
||||
} // end for each project
|
||||
} // end if target node equals cond column node
|
||||
} // end for each targets
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) {
|
||||
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject,
|
||||
SNode** ppProjectCond) {
|
||||
SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS};
|
||||
SNode* pProjectCond = pProject->node.pConditions;
|
||||
SNode* pProjectCond = pProject->node.pConditions;
|
||||
nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt);
|
||||
*ppProjectCond = pProjectCond;
|
||||
pProject->node.pConditions = NULL;
|
||||
|
@ -873,7 +874,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
|
|||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pProjCond = NULL;
|
||||
SNode* pProjCond = NULL;
|
||||
code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
|
||||
|
@ -2082,13 +2083,18 @@ static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimi
|
|||
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
|
||||
char* pStr = NULL;
|
||||
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
|
||||
qDebugL("apply optimize %s rule: %s", pRuleName, pStr);
|
||||
if (NULL == pRuleName) {
|
||||
qDebugL("before optimize: %s", pStr);
|
||||
} else {
|
||||
qDebugL("apply optimize %s rule: %s", pRuleName, pStr);
|
||||
}
|
||||
taosMemoryFree(pStr);
|
||||
}
|
||||
|
||||
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false};
|
||||
bool optimized = false;
|
||||
dumpLogicSubplan(NULL, pLogicSubplan);
|
||||
do {
|
||||
optimized = false;
|
||||
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
|
||||
|
|
|
@ -632,8 +632,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
|
||||
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOnConditions,
|
||||
&pJoin->pOnConditions);
|
||||
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
|
||||
pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -1496,12 +1496,60 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
|
|||
return pSubplan;
|
||||
}
|
||||
|
||||
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
||||
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
||||
pSubplan->msgType = pModify->msgType;
|
||||
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
|
||||
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
|
||||
}
|
||||
|
||||
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
|
||||
SDataSinkNode** pSink) {
|
||||
SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT);
|
||||
if (NULL == pInserter) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pInserter->tableId = pModify->tableId;
|
||||
pInserter->tableType = pModify->tableType;
|
||||
strcpy(pInserter->tableFName, pModify->tableFName);
|
||||
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
|
||||
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
|
||||
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pInserter->sink.pInputDataBlockDesc =
|
||||
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
|
||||
if (NULL == pInserter->sink.pInputDataBlockDesc) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pSink = (SDataSinkNode*)pInserter;
|
||||
} else {
|
||||
nodesDestroyNode((SNode*)pInserter);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
||||
int32_t code =
|
||||
createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
|
||||
}
|
||||
pSubplan->msgType = TDMT_VND_SUBMIT;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
||||
if (NULL == pModify->node.pChildren) {
|
||||
return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
|
||||
}
|
||||
return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
|
||||
}
|
||||
|
||||
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
|
||||
SDataSinkNode** pSink) {
|
||||
SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
|
||||
|
|
|
@ -82,29 +82,41 @@ static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
|
||||
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
|
||||
}
|
||||
|
||||
static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
|
||||
SNodeList* pGroup) {
|
||||
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
|
||||
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
|
||||
for (int32_t i = 0; i < numOfVgroups; ++i) {
|
||||
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
|
||||
if (NULL == pNewSubplan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
|
||||
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
|
||||
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
|
||||
if (NULL == pNode->node.pChildren) {
|
||||
return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
|
||||
}
|
||||
return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
|
||||
}
|
||||
|
||||
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
|
||||
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
|
||||
if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
|
||||
return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
|
||||
} else {
|
||||
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
|
||||
for (int32_t i = 0; i < numOfVgroups; ++i) {
|
||||
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
|
||||
if (NULL == pNewSubplan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks =
|
||||
(SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
|
||||
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
|
||||
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
|
||||
return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
|
||||
}
|
||||
|
||||
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#define SPLIT_FLAG_MASK(n) (1 << n)
|
||||
|
||||
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
|
||||
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
|
||||
|
||||
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
|
||||
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
@ -1196,6 +1197,41 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct SInsertSelectSplitInfo {
|
||||
SLogicNode* pQueryRoot;
|
||||
SLogicSubplan* pSubplan;
|
||||
} SInsertSelectSplitInfo;
|
||||
|
||||
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
|
||||
SInsertSelectSplitInfo* pInfo) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
|
||||
MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
|
||||
pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
pInfo->pSubplan = pSubplan;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||
SInsertSelectSplitInfo info = {0};
|
||||
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, info.pSubplan->subplanType);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pQueryRoot, 0));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
info.pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
|
||||
SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef struct SQnodeSplitInfo {
|
||||
SLogicNode* pSplitNode;
|
||||
SLogicSubplan* pSubplan;
|
||||
|
@ -1249,7 +1285,8 @@ static const SSplitRule splitRuleSet[] = {
|
|||
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
|
||||
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
|
||||
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
|
||||
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
|
||||
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
|
||||
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
@ -1258,7 +1295,11 @@ static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
|||
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
|
||||
char* pStr = NULL;
|
||||
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
|
||||
qDebugL("apply split %s rule: %s", pRuleName, pStr);
|
||||
if (NULL == pRuleName) {
|
||||
qDebugL("before split: %s", pStr);
|
||||
} else {
|
||||
qDebugL("apply split %s rule: %s", pRuleName, pStr);
|
||||
}
|
||||
taosMemoryFree(pStr);
|
||||
}
|
||||
|
||||
|
@ -1266,6 +1307,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
SSplitContext cxt = {
|
||||
.pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
|
||||
bool split = false;
|
||||
dumpLogicSubplan(NULL, pSubplan);
|
||||
do {
|
||||
split = false;
|
||||
for (int32_t i = 0; i < splitRuleNum; ++i) {
|
||||
|
@ -1293,8 +1335,16 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
|
|||
FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
|
||||
}
|
||||
|
||||
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
|
||||
return true;
|
||||
}
|
||||
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
|
||||
return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
|
||||
}
|
||||
|
||||
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
|
||||
if (!needSplitSubplan(pLogicSubplan)) {
|
||||
setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -91,3 +91,9 @@ TEST_F(PlanOtherTest, delete) {
|
|||
|
||||
run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10");
|
||||
}
|
||||
|
||||
TEST_F(PlanOtherTest, insert) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("INSERT INTO t1 SELECT * FROM t1");
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ class TDTestCase:
|
|||
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
|
||||
)
|
||||
# conn.load_table_info("log")
|
||||
|
||||
tdLog.debug("statement start")
|
||||
start = datetime.now()
|
||||
stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
|
||||
|
||||
|
@ -118,8 +118,11 @@ class TDTestCase:
|
|||
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
|
||||
params[15].timestamp([None, None, 1626861392591])
|
||||
# print(type(stmt))
|
||||
tdLog.debug("bind_param_batch start")
|
||||
stmt.bind_param_batch(params)
|
||||
tdLog.debug("bind_param_batch end")
|
||||
stmt.execute()
|
||||
tdLog.debug("execute end")
|
||||
end = datetime.now()
|
||||
print("elapsed time: ", end - start)
|
||||
assert stmt.affected_rows == 3
|
||||
|
@ -155,7 +158,7 @@ class TDTestCase:
|
|||
print(rows1)
|
||||
assert str(rows1[0][0]) == "2021-07-21 17:56:32.589000"
|
||||
assert rows1[0][10] == 3
|
||||
|
||||
tdLog.debug("close start")
|
||||
|
||||
stmt.close()
|
||||
|
||||
|
|
Loading…
Reference in New Issue