Merge pull request #10805 from taosdata/feature/3.0_wxy
TD-13675 rollup, alter db, alter table syntax definition, and logic plan code reorganize.
This commit is contained in:
commit
bb913611c0
|
@ -111,15 +111,16 @@ typedef enum _mgmt_table {
|
|||
TSDB_MGMT_TABLE_MAX,
|
||||
} EShowType;
|
||||
|
||||
#define TSDB_ALTER_TABLE_ADD_TAG 1
|
||||
#define TSDB_ALTER_TABLE_DROP_TAG 2
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
|
||||
|
||||
#define TSDB_ALTER_TABLE_ADD_TAG 1
|
||||
#define TSDB_ALTER_TABLE_DROP_TAG 2
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
|
||||
#define TSDB_ALTER_TABLE_ADD_COLUMN 5
|
||||
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
|
||||
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
|
||||
#define TSDB_ALTER_TABLE_UPDATE_OPTIONS 9
|
||||
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10
|
||||
|
||||
#define TSDB_FILL_NONE 0
|
||||
#define TSDB_FILL_NULL 1
|
||||
|
|
|
@ -34,128 +34,147 @@
|
|||
#define TK_NK_REM 16
|
||||
#define TK_NK_CONCAT 17
|
||||
#define TK_CREATE 18
|
||||
#define TK_USER 19
|
||||
#define TK_PASS 20
|
||||
#define TK_NK_STRING 21
|
||||
#define TK_ALTER 22
|
||||
#define TK_PRIVILEGE 23
|
||||
#define TK_DROP 24
|
||||
#define TK_SHOW 25
|
||||
#define TK_USERS 26
|
||||
#define TK_DNODE 27
|
||||
#define TK_PORT 28
|
||||
#define TK_NK_INTEGER 29
|
||||
#define TK_DNODES 30
|
||||
#define TK_NK_ID 31
|
||||
#define TK_NK_IPTOKEN 32
|
||||
#define TK_QNODE 33
|
||||
#define TK_ON 34
|
||||
#define TK_QNODES 35
|
||||
#define TK_DATABASE 36
|
||||
#define TK_DATABASES 37
|
||||
#define TK_USE 38
|
||||
#define TK_IF 39
|
||||
#define TK_NOT 40
|
||||
#define TK_EXISTS 41
|
||||
#define TK_BLOCKS 42
|
||||
#define TK_CACHE 43
|
||||
#define TK_CACHELAST 44
|
||||
#define TK_COMP 45
|
||||
#define TK_DAYS 46
|
||||
#define TK_FSYNC 47
|
||||
#define TK_MAXROWS 48
|
||||
#define TK_MINROWS 49
|
||||
#define TK_KEEP 50
|
||||
#define TK_PRECISION 51
|
||||
#define TK_QUORUM 52
|
||||
#define TK_REPLICA 53
|
||||
#define TK_TTL 54
|
||||
#define TK_WAL 55
|
||||
#define TK_VGROUPS 56
|
||||
#define TK_SINGLE_STABLE 57
|
||||
#define TK_STREAM_MODE 58
|
||||
#define TK_TABLE 59
|
||||
#define TK_NK_LP 60
|
||||
#define TK_NK_RP 61
|
||||
#define TK_STABLE 62
|
||||
#define TK_TABLES 63
|
||||
#define TK_STABLES 64
|
||||
#define TK_USING 65
|
||||
#define TK_TAGS 66
|
||||
#define TK_NK_DOT 67
|
||||
#define TK_NK_COMMA 68
|
||||
#define TK_COMMENT 69
|
||||
#define TK_BOOL 70
|
||||
#define TK_TINYINT 71
|
||||
#define TK_SMALLINT 72
|
||||
#define TK_INT 73
|
||||
#define TK_INTEGER 74
|
||||
#define TK_BIGINT 75
|
||||
#define TK_FLOAT 76
|
||||
#define TK_DOUBLE 77
|
||||
#define TK_BINARY 78
|
||||
#define TK_TIMESTAMP 79
|
||||
#define TK_NCHAR 80
|
||||
#define TK_UNSIGNED 81
|
||||
#define TK_JSON 82
|
||||
#define TK_VARCHAR 83
|
||||
#define TK_MEDIUMBLOB 84
|
||||
#define TK_BLOB 85
|
||||
#define TK_VARBINARY 86
|
||||
#define TK_DECIMAL 87
|
||||
#define TK_SMA 88
|
||||
#define TK_INDEX 89
|
||||
#define TK_FULLTEXT 90
|
||||
#define TK_FUNCTION 91
|
||||
#define TK_INTERVAL 92
|
||||
#define TK_TOPIC 93
|
||||
#define TK_AS 94
|
||||
#define TK_MNODES 95
|
||||
#define TK_NK_FLOAT 96
|
||||
#define TK_NK_BOOL 97
|
||||
#define TK_NK_VARIABLE 98
|
||||
#define TK_BETWEEN 99
|
||||
#define TK_IS 100
|
||||
#define TK_NULL 101
|
||||
#define TK_NK_LT 102
|
||||
#define TK_NK_GT 103
|
||||
#define TK_NK_LE 104
|
||||
#define TK_NK_GE 105
|
||||
#define TK_NK_NE 106
|
||||
#define TK_NK_EQ 107
|
||||
#define TK_LIKE 108
|
||||
#define TK_MATCH 109
|
||||
#define TK_NMATCH 110
|
||||
#define TK_IN 111
|
||||
#define TK_FROM 112
|
||||
#define TK_JOIN 113
|
||||
#define TK_INNER 114
|
||||
#define TK_SELECT 115
|
||||
#define TK_DISTINCT 116
|
||||
#define TK_WHERE 117
|
||||
#define TK_PARTITION 118
|
||||
#define TK_BY 119
|
||||
#define TK_SESSION 120
|
||||
#define TK_STATE_WINDOW 121
|
||||
#define TK_SLIDING 122
|
||||
#define TK_FILL 123
|
||||
#define TK_VALUE 124
|
||||
#define TK_NONE 125
|
||||
#define TK_PREV 126
|
||||
#define TK_LINEAR 127
|
||||
#define TK_NEXT 128
|
||||
#define TK_GROUP 129
|
||||
#define TK_HAVING 130
|
||||
#define TK_ORDER 131
|
||||
#define TK_SLIMIT 132
|
||||
#define TK_SOFFSET 133
|
||||
#define TK_LIMIT 134
|
||||
#define TK_OFFSET 135
|
||||
#define TK_ASC 136
|
||||
#define TK_DESC 137
|
||||
#define TK_NULLS 138
|
||||
#define TK_FIRST 139
|
||||
#define TK_LAST 140
|
||||
#define TK_ACCOUNT 19
|
||||
#define TK_NK_ID 20
|
||||
#define TK_PASS 21
|
||||
#define TK_NK_STRING 22
|
||||
#define TK_ALTER 23
|
||||
#define TK_PPS 24
|
||||
#define TK_TSERIES 25
|
||||
#define TK_STORAGE 26
|
||||
#define TK_STREAMS 27
|
||||
#define TK_QTIME 28
|
||||
#define TK_DBS 29
|
||||
#define TK_USERS 30
|
||||
#define TK_CONNS 31
|
||||
#define TK_STATE 32
|
||||
#define TK_USER 33
|
||||
#define TK_PRIVILEGE 34
|
||||
#define TK_DROP 35
|
||||
#define TK_SHOW 36
|
||||
#define TK_DNODE 37
|
||||
#define TK_PORT 38
|
||||
#define TK_NK_INTEGER 39
|
||||
#define TK_DNODES 40
|
||||
#define TK_NK_IPTOKEN 41
|
||||
#define TK_LOCAL 42
|
||||
#define TK_QNODE 43
|
||||
#define TK_ON 44
|
||||
#define TK_QNODES 45
|
||||
#define TK_DATABASE 46
|
||||
#define TK_DATABASES 47
|
||||
#define TK_USE 48
|
||||
#define TK_IF 49
|
||||
#define TK_NOT 50
|
||||
#define TK_EXISTS 51
|
||||
#define TK_BLOCKS 52
|
||||
#define TK_CACHE 53
|
||||
#define TK_CACHELAST 54
|
||||
#define TK_COMP 55
|
||||
#define TK_DAYS 56
|
||||
#define TK_FSYNC 57
|
||||
#define TK_MAXROWS 58
|
||||
#define TK_MINROWS 59
|
||||
#define TK_KEEP 60
|
||||
#define TK_PRECISION 61
|
||||
#define TK_QUORUM 62
|
||||
#define TK_REPLICA 63
|
||||
#define TK_TTL 64
|
||||
#define TK_WAL 65
|
||||
#define TK_VGROUPS 66
|
||||
#define TK_SINGLE_STABLE 67
|
||||
#define TK_STREAM_MODE 68
|
||||
#define TK_RETENTIONS 69
|
||||
#define TK_FILE_FACTOR 70
|
||||
#define TK_NK_FLOAT 71
|
||||
#define TK_TABLE 72
|
||||
#define TK_NK_LP 73
|
||||
#define TK_NK_RP 74
|
||||
#define TK_STABLE 75
|
||||
#define TK_TABLES 76
|
||||
#define TK_STABLES 77
|
||||
#define TK_ADD 78
|
||||
#define TK_COLUMN 79
|
||||
#define TK_MODIFY 80
|
||||
#define TK_RENAME 81
|
||||
#define TK_TAG 82
|
||||
#define TK_SET 83
|
||||
#define TK_NK_EQ 84
|
||||
#define TK_USING 85
|
||||
#define TK_TAGS 86
|
||||
#define TK_NK_DOT 87
|
||||
#define TK_NK_COMMA 88
|
||||
#define TK_COMMENT 89
|
||||
#define TK_BOOL 90
|
||||
#define TK_TINYINT 91
|
||||
#define TK_SMALLINT 92
|
||||
#define TK_INT 93
|
||||
#define TK_INTEGER 94
|
||||
#define TK_BIGINT 95
|
||||
#define TK_FLOAT 96
|
||||
#define TK_DOUBLE 97
|
||||
#define TK_BINARY 98
|
||||
#define TK_TIMESTAMP 99
|
||||
#define TK_NCHAR 100
|
||||
#define TK_UNSIGNED 101
|
||||
#define TK_JSON 102
|
||||
#define TK_VARCHAR 103
|
||||
#define TK_MEDIUMBLOB 104
|
||||
#define TK_BLOB 105
|
||||
#define TK_VARBINARY 106
|
||||
#define TK_DECIMAL 107
|
||||
#define TK_SMA 108
|
||||
#define TK_ROLLUP 109
|
||||
#define TK_INDEX 110
|
||||
#define TK_FULLTEXT 111
|
||||
#define TK_FUNCTION 112
|
||||
#define TK_INTERVAL 113
|
||||
#define TK_TOPIC 114
|
||||
#define TK_AS 115
|
||||
#define TK_MNODES 116
|
||||
#define TK_NK_BOOL 117
|
||||
#define TK_NK_VARIABLE 118
|
||||
#define TK_BETWEEN 119
|
||||
#define TK_IS 120
|
||||
#define TK_NULL 121
|
||||
#define TK_NK_LT 122
|
||||
#define TK_NK_GT 123
|
||||
#define TK_NK_LE 124
|
||||
#define TK_NK_GE 125
|
||||
#define TK_NK_NE 126
|
||||
#define TK_LIKE 127
|
||||
#define TK_MATCH 128
|
||||
#define TK_NMATCH 129
|
||||
#define TK_IN 130
|
||||
#define TK_FROM 131
|
||||
#define TK_JOIN 132
|
||||
#define TK_INNER 133
|
||||
#define TK_SELECT 134
|
||||
#define TK_DISTINCT 135
|
||||
#define TK_WHERE 136
|
||||
#define TK_PARTITION 137
|
||||
#define TK_BY 138
|
||||
#define TK_SESSION 139
|
||||
#define TK_STATE_WINDOW 140
|
||||
#define TK_SLIDING 141
|
||||
#define TK_FILL 142
|
||||
#define TK_VALUE 143
|
||||
#define TK_NONE 144
|
||||
#define TK_PREV 145
|
||||
#define TK_LINEAR 146
|
||||
#define TK_NEXT 147
|
||||
#define TK_GROUP 148
|
||||
#define TK_HAVING 149
|
||||
#define TK_ORDER 150
|
||||
#define TK_SLIMIT 151
|
||||
#define TK_SOFFSET 152
|
||||
#define TK_LIMIT 153
|
||||
#define TK_OFFSET 154
|
||||
#define TK_ASC 155
|
||||
#define TK_DESC 156
|
||||
#define TK_NULLS 157
|
||||
#define TK_FIRST 158
|
||||
#define TK_LAST 159
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
|
|
@ -27,7 +27,7 @@ extern "C" {
|
|||
typedef int32_t VarDataOffsetT;
|
||||
typedef uint32_t TDRowLenT;
|
||||
typedef uint8_t TDRowValT;
|
||||
typedef uint16_t col_id_t;
|
||||
typedef int16_t col_id_t;
|
||||
typedef int8_t col_type_t;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
|
|
@ -127,6 +127,18 @@ typedef struct SDropSuperTableStmt {
|
|||
bool ignoreNotExists;
|
||||
} SDropSuperTableStmt;
|
||||
|
||||
typedef struct SAlterTableStmt {
|
||||
ENodeType type;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
int8_t alterType;
|
||||
char colName[TSDB_COL_NAME_LEN];
|
||||
char newColName[TSDB_COL_NAME_LEN];
|
||||
STableOptions* pOptions;
|
||||
SDataType dataType;
|
||||
SValueNode* pVal;
|
||||
} SAlterTableStmt;
|
||||
|
||||
typedef struct SCreateUserStmt {
|
||||
ENodeType type;
|
||||
char useName[TSDB_USER_LEN];
|
||||
|
@ -158,6 +170,13 @@ typedef struct SDropDnodeStmt {
|
|||
int32_t port;
|
||||
} SDropDnodeStmt;
|
||||
|
||||
typedef struct SAlterDnodeStmt {
|
||||
ENodeType type;
|
||||
int32_t dnodeId;
|
||||
char config[TSDB_DNODE_CONFIG_LEN];
|
||||
char value[TSDB_DNODE_VALUE_LEN];
|
||||
} SAlterDnodeStmt;
|
||||
|
||||
typedef struct SShowStmt {
|
||||
ENodeType type;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
|
@ -215,6 +234,12 @@ typedef struct SDropTopicStmt {
|
|||
bool ignoreNotExists;
|
||||
} SDropTopicStmt;
|
||||
|
||||
typedef struct SAlterLocalStmt {
|
||||
ENodeType type;
|
||||
char config[TSDB_DNODE_CONFIG_LEN];
|
||||
char value[TSDB_DNODE_VALUE_LEN];
|
||||
} SAlterLocalStmt;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -85,6 +85,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_DROP_TABLE_CLAUSE,
|
||||
QUERY_NODE_DROP_TABLE_STMT,
|
||||
QUERY_NODE_DROP_SUPER_TABLE_STMT,
|
||||
QUERY_NODE_ALTER_TABLE_STMT,
|
||||
QUERY_NODE_SHOW_TABLES_STMT, // temp
|
||||
QUERY_NODE_SHOW_STABLES_STMT,
|
||||
QUERY_NODE_CREATE_USER_STMT,
|
||||
|
@ -94,6 +95,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_USE_DATABASE_STMT,
|
||||
QUERY_NODE_CREATE_DNODE_STMT,
|
||||
QUERY_NODE_DROP_DNODE_STMT,
|
||||
QUERY_NODE_ALTER_DNODE_STMT,
|
||||
QUERY_NODE_SHOW_DNODES_STMT,
|
||||
QUERY_NODE_SHOW_VGROUPS_STMT,
|
||||
QUERY_NODE_SHOW_MNODES_STMT,
|
||||
|
@ -104,6 +106,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_DROP_QNODE_STMT,
|
||||
QUERY_NODE_CREATE_TOPIC_STMT,
|
||||
QUERY_NODE_DROP_TOPIC_STMT,
|
||||
QUERY_NODE_ALTER_LOCAL_STMT,
|
||||
|
||||
// logic plan node
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN,
|
||||
|
@ -163,6 +166,7 @@ SNodeList* nodesMakeList();
|
|||
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode);
|
||||
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode);
|
||||
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
|
||||
int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc);
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
|
||||
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index);
|
||||
void nodesDestroyList(SNodeList* pList);
|
||||
|
|
|
@ -26,7 +26,6 @@ extern "C" {
|
|||
|
||||
typedef struct SLogicNode {
|
||||
ENodeType type;
|
||||
int32_t id;
|
||||
SNodeList* pTargets; // SColumnNode
|
||||
SNode* pConditions;
|
||||
SNodeList* pChildren;
|
||||
|
@ -167,6 +166,7 @@ typedef struct SScanPhysiNode {
|
|||
|
||||
typedef SScanPhysiNode SSystemTableScanPhysiNode;
|
||||
typedef SScanPhysiNode STagScanPhysiNode;
|
||||
typedef SScanPhysiNode SStreamScanPhysiNode;
|
||||
|
||||
typedef struct STableScanPhysiNode {
|
||||
SScanPhysiNode scan;
|
||||
|
|
|
@ -23,6 +23,9 @@ extern "C" {
|
|||
#include "nodes.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) * sizeof(SSchema)))
|
||||
#define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo)))
|
||||
|
||||
typedef struct SRawExprNode {
|
||||
ENodeType nodeType;
|
||||
char* p;
|
||||
|
|
|
@ -26,6 +26,7 @@ typedef struct SParseContext {
|
|||
uint64_t requestId;
|
||||
int32_t acctId;
|
||||
const char *db;
|
||||
bool streamQuery;
|
||||
void *pTransporter;
|
||||
SEpSet mgmtEpSet;
|
||||
const char *pSql; // sql string
|
||||
|
|
|
@ -482,6 +482,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_PASSWD_EMPTY TAOS_DEF_ERROR_CODE(0, 0x2611)
|
||||
#define TSDB_CODE_PAR_INVALID_PORT TAOS_DEF_ERROR_CODE(0, 0x2612)
|
||||
#define TSDB_CODE_PAR_INVALID_ENDPOINT TAOS_DEF_ERROR_CODE(0, 0x2613)
|
||||
#define TSDB_CODE_PAR_EXPRIE_STATEMENT TAOS_DEF_ERROR_CODE(0, 0x2614)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal);
|
|||
int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pVal);
|
||||
int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal);
|
||||
int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal);
|
||||
int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal);
|
||||
int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal);
|
||||
int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal);
|
||||
int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal);
|
||||
|
@ -60,6 +61,7 @@ int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void*
|
|||
typedef int32_t (*FToObject)(const SJson* pJson, void* pObj);
|
||||
|
||||
int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj);
|
||||
int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize);
|
||||
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize);
|
||||
|
||||
char* tjsonToString(const SJson* pJson);
|
||||
|
|
|
@ -179,6 +179,7 @@ typedef struct SRequestObj {
|
|||
uint64_t requestId;
|
||||
int32_t type; // request type
|
||||
STscObj* pTscObj;
|
||||
char* pDb;
|
||||
char* sqlstr; // sql string
|
||||
int32_t sqlLen;
|
||||
int64_t self;
|
||||
|
@ -229,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
|
|||
|
||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery);
|
||||
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery);
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||
|
||||
// --- heartbeat
|
||||
|
|
|
@ -150,6 +150,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pRequest->pDb = getDbOfConnection(pObj);
|
||||
pRequest->requestId = generateRequestId();
|
||||
pRequest->metric.start = taosGetTimestampMs();
|
||||
|
||||
|
@ -180,6 +181,7 @@ static void doDestroyRequest(void *p) {
|
|||
tfree(pRequest->msgBuf);
|
||||
tfree(pRequest->sqlstr);
|
||||
tfree(pRequest->pInfo);
|
||||
tfree(pRequest->pDb);
|
||||
|
||||
doFreeReqResultInfo(&pRequest->body.resInfo);
|
||||
qDestroyQueryPlan(pRequest->body.pDag);
|
||||
|
|
|
@ -137,13 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
|
||||
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery) {
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
SParseContext cxt = {
|
||||
.requestId = pRequest->requestId,
|
||||
.acctId = pTscObj->acctId,
|
||||
.db = getDbOfConnection(pTscObj),
|
||||
.db = pRequest->pDb,
|
||||
.streamQuery = streamQuery,
|
||||
.pSql = pRequest->sqlstr,
|
||||
.sqlLen = pRequest->sqlLen,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
|
@ -154,7 +155,6 @@ int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
|
|||
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tfree(cxt.db);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -163,7 +163,6 @@ int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
|
|||
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
|
||||
}
|
||||
|
||||
tfree(cxt.db);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -249,7 +248,7 @@ TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
|
|||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQuery), _return);
|
||||
|
||||
if (pQuery->directRpc) {
|
||||
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
||||
|
|
|
@ -482,38 +482,24 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
}
|
||||
|
||||
tscDebug("start to create topic, %s", topicName);
|
||||
#if 0
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
|
||||
|
||||
pQueryNode->streamQuery = true;
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
|
||||
|
||||
// todo check for invalid sql statement and return with error code
|
||||
|
||||
SSchema* schema = NULL;
|
||||
int32_t numOfCols = 0;
|
||||
CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, NULL), _return);
|
||||
|
||||
pStr = qQueryPlanToString(pRequest->body.pDag);
|
||||
if (pStr == NULL) {
|
||||
goto _return;
|
||||
}
|
||||
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &pStr, NULL), _return);
|
||||
|
||||
/*printf("%s\n", pStr);*/
|
||||
|
||||
// The topic should be related to a database that the queried table is belonged to.
|
||||
SName name = {0};
|
||||
char dbName[TSDB_DB_FNAME_LEN] = {0};
|
||||
// tNameGetFullDbName(&((SQueryStmtInfo*)pQueryNode)->pTableMetaInfo[0]->name, dbName);
|
||||
|
||||
tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameFromString(&name, topicName, T_NAME_TABLE);
|
||||
SName name = { .acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T };
|
||||
strcpy(name.dbname, pRequest->pDb);
|
||||
strcpy(name.tname, topicName);
|
||||
|
||||
SCMCreateTopicReq req = {
|
||||
.igExists = 1,
|
||||
.physicalPlan = (char*)pStr,
|
||||
.ast = (char*)pStr,
|
||||
.sql = (char*)sql,
|
||||
.logicalPlan = (char*)"no logic plan",
|
||||
};
|
||||
tNameExtractFullName(&name, req.name);
|
||||
|
||||
|
@ -536,7 +522,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
#endif
|
||||
|
||||
_return:
|
||||
qDestroyQuery(pQueryNode);
|
||||
/*if (sendInfo != NULL) {*/
|
||||
|
|
|
@ -1072,6 +1072,10 @@ void blockDataClearup(SSDataBlock* pDataBlock) {
|
|||
}
|
||||
|
||||
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
if (0 == numOfRows) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
||||
char* tmp = realloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
|
||||
if (tmp == NULL) {
|
||||
|
@ -1092,7 +1096,7 @@ int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRo
|
|||
|
||||
pColumn->nullbitmap = tmp;
|
||||
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
|
||||
|
||||
assert(pColumn->info.bytes);
|
||||
tmp = realloc(pColumn->pData, numOfRows * pColumn->info.bytes);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1137,7 +1141,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
|
|||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
tfree(pBlock->pBlockAgg);
|
||||
tfree(pBlock);
|
||||
// tfree(pBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1190,7 +1194,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t len = colDataGetLength(pColData, rows);
|
||||
taosEncodeFixedI32(buf, len);
|
||||
tlen += taosEncodeFixedI32(buf, len);
|
||||
|
||||
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tdatablock.h"
|
||||
#include "vnode.h"
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||
|
@ -128,10 +129,13 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
|
||||
int j = 0;
|
||||
for (int32_t i = 0; i < colNumNeed; i++) {
|
||||
int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i);
|
||||
int16_t colId = *(int16_t*)taosArrayGet(pHandle->pColIdList, i);
|
||||
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
|
||||
j++;
|
||||
}
|
||||
if (j >= pSchemaWrapper->nCols) {
|
||||
continue;
|
||||
}
|
||||
SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
|
||||
SColumnInfoData colInfo = {0};
|
||||
int sz = numOfRows * pColSchema->bytes;
|
||||
|
@ -145,6 +149,8 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
taosArrayDestroy(pArray);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureColumnCapacity(&colInfo, numOfRows);
|
||||
taosArrayPush(pArray, &colInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -190,7 +190,6 @@ static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) {
|
|||
}
|
||||
|
||||
static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
|
||||
COPY_SCALAR_FIELD(id);
|
||||
CLONE_NODE_LIST_FIELD(pTargets);
|
||||
CLONE_NODE_FIELD(pConditions);
|
||||
CLONE_NODE_LIST_FIELD(pChildren);
|
||||
|
@ -198,7 +197,7 @@ static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
|
|||
}
|
||||
|
||||
static STableMeta* tableMetaClone(const STableMeta* pSrc) {
|
||||
int32_t len = sizeof(STableMeta) + (pSrc->tableInfo.numOfTags + pSrc->tableInfo.numOfColumns) * sizeof(SSchema);
|
||||
int32_t len = TABLE_META_SIZE(pSrc);
|
||||
STableMeta* pDst = malloc(len);
|
||||
if (NULL == pDst) {
|
||||
return NULL;
|
||||
|
@ -208,7 +207,7 @@ static STableMeta* tableMetaClone(const STableMeta* pSrc) {
|
|||
}
|
||||
|
||||
static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) {
|
||||
int32_t len = sizeof(SVgroupsInfo) + pSrc->numOfVgroups * sizeof(SVgroupInfo);
|
||||
int32_t len = VGROUPS_INFO_SIZE(pSrc);
|
||||
SVgroupsInfo* pDst = malloc(len);
|
||||
if (NULL == pDst) {
|
||||
return NULL;
|
||||
|
|
|
@ -193,7 +193,17 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkLogicPlanId = "Id";
|
||||
static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) {
|
||||
STableMeta* pNode = (STableMeta*)pObj;
|
||||
|
||||
int32_t code = tjsonGetUBigIntValue(pJson, jkTableMetaUid, &pNode->uid);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkTableMetaSuid, &pNode->suid);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkLogicPlanTargets = "Targets";
|
||||
static const char* jkLogicPlanConditions = "Conditions";
|
||||
static const char* jkLogicPlanChildren = "Children";
|
||||
|
@ -201,10 +211,7 @@ static const char* jkLogicPlanChildren = "Children";
|
|||
static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SLogicNode* pNode = (const SLogicNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkLogicPlanId, pNode->id);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkLogicPlanTargets, pNode->pTargets);
|
||||
}
|
||||
int32_t code = nodeListToJson(pJson, jkLogicPlanTargets, pNode->pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkLogicPlanConditions, nodeToJson, pNode->pConditions);
|
||||
}
|
||||
|
@ -445,6 +452,14 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
return physiScanNodeToJson(pObj, pJson);
|
||||
}
|
||||
|
||||
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) {
|
||||
return jsonToPhysiScanNode(pJson, pObj);
|
||||
}
|
||||
|
||||
static const char* jkProjectPhysiPlanProjections = "Projections";
|
||||
|
||||
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -1273,6 +1288,193 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkTableDbName = "DbName";
|
||||
static const char* jkTableTableName = "tableName";
|
||||
static const char* jkTableTableAlias = "tableAlias";
|
||||
|
||||
static int32_t tableNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableNode* pNode = (const STableNode*)pObj;
|
||||
|
||||
int32_t code = exprNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkTableDbName, pNode->dbName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkTableTableName, pNode->tableName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkTableTableAlias, pNode->tableAlias);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToTableNode(const SJson* pJson, void* pObj) {
|
||||
STableNode* pNode = (STableNode*)pObj;
|
||||
|
||||
int32_t code = jsonToExprNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkTableDbName, pNode->dbName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkTableTableName, pNode->tableName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkTableTableAlias, pNode->tableAlias);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkEpSetInUse = "InUse";
|
||||
static const char* jkEpSetNumOfEps = "NumOfEps";
|
||||
static const char* jkEpSetEps = "Eps";
|
||||
|
||||
static int32_t epSetToJson(const void* pObj, SJson* pJson) {
|
||||
const SEpSet* pNode = (const SEpSet*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkEpSetInUse, pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkEpSetNumOfEps, pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddArray(pJson, jkEpSetEps, epToJson, pNode->eps, sizeof(SEp), pNode->numOfEps);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToEpSet(const SJson* pJson, void* pObj) {
|
||||
SEpSet* pNode = (SEpSet*)pObj;
|
||||
|
||||
int32_t code = tjsonGetTinyIntValue(pJson, jkEpSetInUse, &pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetTinyIntValue(pJson, jkEpSetNumOfEps, &pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToArray(pJson, jkEpSetEps, jsonToEp, pNode->eps, sizeof(SEp));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkVgroupInfoVgId = "VgId";
|
||||
static const char* jkVgroupInfoHashBegin = "HashBegin";
|
||||
static const char* jkVgroupInfoHashEnd = "HashEnd";
|
||||
static const char* jkVgroupInfoEpSet = "EpSet";
|
||||
static const char* jkVgroupInfoNumOfTable = "NumOfTable";
|
||||
|
||||
static int32_t vgroupInfoToJson(const void* pObj, SJson* pJson) {
|
||||
const SVgroupInfo* pNode = (const SVgroupInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkVgroupInfoVgId, pNode->vgId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkVgroupInfoHashBegin, pNode->hashBegin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkVgroupInfoHashEnd, pNode->hashEnd);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkVgroupInfoEpSet, epSetToJson, &pNode->epSet);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkVgroupInfoNumOfTable, pNode->numOfTable);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToVgroupInfo(const SJson* pJson, void* pObj) {
|
||||
SVgroupInfo* pNode = (SVgroupInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonGetIntValue(pJson, jkVgroupInfoVgId, &pNode->vgId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUIntValue(pJson, jkVgroupInfoHashBegin, &pNode->hashBegin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUIntValue(pJson, jkVgroupInfoHashEnd, &pNode->hashEnd);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToObject(pJson, jkVgroupInfoEpSet, jsonToEpSet, &pNode->epSet);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkVgroupInfoNumOfTable, &pNode->numOfTable);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkVgroupsInfoNum = "Num";
|
||||
static const char* jkVgroupsInfoVgroups = "Vgroups";
|
||||
|
||||
static int32_t vgroupsInfoToJson(const void* pObj, SJson* pJson) {
|
||||
const SVgroupsInfo* pNode = (const SVgroupsInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkVgroupsInfoNum, pNode->numOfVgroups);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddArray(pJson, jkVgroupsInfoVgroups, vgroupInfoToJson, pNode->vgroups, sizeof(SVgroupInfo), pNode->numOfVgroups);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToVgroupsInfo(const SJson* pJson, void* pObj) {
|
||||
SVgroupsInfo* pNode = (SVgroupsInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonGetIntValue(pJson, jkVgroupsInfoNum, &pNode->numOfVgroups);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToArray(pJson, jkVgroupsInfoVgroups, jsonToVgroupInfo, pNode->vgroups, sizeof(SVgroupInfo));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkRealTableMetaSize = "MetaSize";
|
||||
static const char* jkRealTableMeta = "Meta";
|
||||
static const char* jkRealTableVgroupsInfoSize = "VgroupsInfoSize";
|
||||
static const char* jkRealTableVgroupsInfo = "VgroupsInfo";
|
||||
|
||||
static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SRealTableNode* pNode = (const SRealTableNode*)pObj;
|
||||
|
||||
int32_t code = tableNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkRealTableMetaSize, TABLE_META_SIZE(pNode->pMeta));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkRealTableMeta, tableMetaToJson, pNode->pMeta);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkRealTableVgroupsInfoSize, VGROUPS_INFO_SIZE(pNode->pVgroupList));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkRealTableVgroupsInfo, vgroupsInfoToJson, pNode->pVgroupList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) {
|
||||
SRealTableNode* pNode = (SRealTableNode*)pObj;
|
||||
|
||||
int32_t objSize = 0;
|
||||
int32_t code = jsonToTableNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkRealTableMetaSize, &objSize);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonMakeObject(pJson, jkRealTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkRealTableVgroupsInfoSize, &objSize);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonMakeObject(pJson, jkRealTableVgroupsInfo, jsonToVgroupsInfo, (void**)&pNode->pVgroupList, objSize);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkGroupingSetType = "GroupingSetType";
|
||||
static const char* jkGroupingSetParameter = "Parameters";
|
||||
|
||||
|
@ -1464,7 +1666,7 @@ static const char* jkSelectStmtSlimit = "Slimit";
|
|||
static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
|
||||
const SSelectStmt* pNode = (const SSelectStmt*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct);
|
||||
int32_t code = tjsonAddBoolToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkSelectStmtProjections, pNode->pProjectionList);
|
||||
}
|
||||
|
@ -1499,6 +1701,44 @@ static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
|
||||
SSelectStmt* pNode = (SSelectStmt*)pObj;
|
||||
|
||||
int32_t code = tjsonGetBoolValue(pJson, jkSelectStmtDistinct, &pNode->isDistinct);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtProjections, &pNode->pProjectionList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtFrom, &pNode->pFromTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtWhere, &pNode->pWhere);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtPartitionBy, &pNode->pPartitionByList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtWindow, &pNode->pWindow);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtGroupBy, &pNode->pGroupByList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtHaving, &pNode->pHaving);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtOrderBy, &pNode->pOrderByList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtLimit, &pNode->pLimit);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtSlimit, &pNode->pSlimit);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
||||
switch (nodeType(pObj)) {
|
||||
case QUERY_NODE_COLUMN:
|
||||
|
@ -1512,6 +1752,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_FUNCTION:
|
||||
return functionNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
return realTableNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
case QUERY_NODE_JOIN_TABLE:
|
||||
break;
|
||||
|
@ -1565,9 +1806,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return physiTagScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return physiTableScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
break;
|
||||
return physiStreamScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return physiProjectNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
@ -1589,6 +1829,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN:
|
||||
return planToJson(pObj, pJson);
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
nodesWarn("specificNodeToJson unknown node = %s", nodesNodeName(nodeType(pObj)));
|
||||
|
@ -1607,7 +1848,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToLogicConditionNode(pJson, pObj);
|
||||
case QUERY_NODE_FUNCTION:
|
||||
return jsonToFunctionNode(pJson, pObj);
|
||||
// case QUERY_NODE_REAL_TABLE:
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
return jsonToRealTableNode(pJson, pObj);
|
||||
// case QUERY_NODE_TEMP_TABLE:
|
||||
// case QUERY_NODE_JOIN_TABLE:
|
||||
// break;
|
||||
|
@ -1633,8 +1875,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToDownstreamSourceNode(pJson, pObj);
|
||||
// case QUERY_NODE_SET_OPERATOR:
|
||||
// break;
|
||||
// case QUERY_NODE_SELECT_STMT:
|
||||
// return jsonToSelectStmt(pJson, pObj);
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
return jsonToSelectStmt(pJson, pObj);
|
||||
// case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
// return jsonToLogicScanNode(pJson, pObj);
|
||||
// case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
|
@ -1647,6 +1889,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToPhysiTagScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return jsonToPhysiTableScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
return jsonToPhysiStreamScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return jsonToPhysiProjectNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
@ -1768,6 +2012,7 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode) {
|
|||
int32_t code = makeNodeByJson(pJson, pNode);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = NULL;
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -125,6 +125,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SCreateDnodeStmt));
|
||||
case QUERY_NODE_DROP_DNODE_STMT:
|
||||
return makeNode(type, sizeof(SDropDnodeStmt));
|
||||
case QUERY_NODE_ALTER_DNODE_STMT:
|
||||
return makeNode(type, sizeof(SAlterDnodeStmt));
|
||||
case QUERY_NODE_SHOW_DNODES_STMT:
|
||||
return makeNode(type, sizeof(SShowStmt));
|
||||
case QUERY_NODE_SHOW_VGROUPS_STMT:
|
||||
|
@ -168,7 +170,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
return makeNode(type, sizeof(STableSeqScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
return makeNode(type, sizeof(SNode));
|
||||
return makeNode(type, sizeof(SStreamScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return makeNode(type, sizeof(SProjectPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
@ -357,6 +359,17 @@ int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) {
|
||||
if (NULL == pSrc) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t code = nodesListAppendList(pTarget, pSrc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyList(pSrc);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
|
||||
if (NULL == pCell->pPrev) {
|
||||
pList->pHead = pCell->pNext;
|
||||
|
@ -571,7 +584,7 @@ typedef struct SCollectFuncsCxt {
|
|||
static EDealRes collectFuncs(SNode* pNode, void* pContext) {
|
||||
SCollectFuncsCxt* pCxt = (SCollectFuncsCxt*)pContext;
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId)) {
|
||||
pCxt->errCode = nodesListAppend(pCxt->pFuncs, pNode);
|
||||
pCxt->errCode = nodesListStrictAppend(pCxt->pFuncs, nodesCloneNode(pNode));
|
||||
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
|
|
|
@ -50,8 +50,10 @@ typedef enum EDatabaseOptionType {
|
|||
DB_OPTION_TTL,
|
||||
DB_OPTION_WAL,
|
||||
DB_OPTION_VGROUPS,
|
||||
DB_OPTION_SINGLESTABLE,
|
||||
DB_OPTION_STREAMMODE,
|
||||
DB_OPTION_SINGLE_STABLE,
|
||||
DB_OPTION_STREAM_MODE,
|
||||
DB_OPTION_RETENTIONS,
|
||||
DB_OPTION_FILE_FACTOR,
|
||||
|
||||
DB_OPTION_MAX
|
||||
} EDatabaseOptionType;
|
||||
|
@ -65,6 +67,11 @@ typedef enum ETableOptionType {
|
|||
TABLE_OPTION_MAX
|
||||
} ETableOptionType;
|
||||
|
||||
typedef struct SAlterOption {
|
||||
int32_t type;
|
||||
SToken val;
|
||||
} SAlterOption;
|
||||
|
||||
extern SToken nil_token;
|
||||
|
||||
void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt);
|
||||
|
@ -110,13 +117,16 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
|
|||
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight);
|
||||
|
||||
SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt);
|
||||
SNode* createDefaultAlterDatabaseOptions(SAstCreateContext* pCxt);
|
||||
SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOptionType type, const SToken* pVal);
|
||||
SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SNode* pOptions);
|
||||
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pDbName);
|
||||
SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName, SNode* pOptions);
|
||||
SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
|
||||
SNode* createDefaultAlterTableOptions(SAstCreateContext* pCxt);
|
||||
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, const SToken* pVal);
|
||||
SNode* setTableSmaOption(SAstCreateContext* pCxt, SNode* pOptions, SNodeList* pSma);
|
||||
SNode* setTableRollupOption(SAstCreateContext* pCxt, SNode* pOptions, SNodeList* pFuncs);
|
||||
SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDataType dataType, const SToken* pComment);
|
||||
SDataType createDataType(uint8_t type);
|
||||
SDataType createVarLenDataType(uint8_t type, const SToken* pLen);
|
||||
|
@ -126,6 +136,11 @@ SNode* createCreateMultiTableStmt(SAstCreateContext* pCxt, SNodeList* pSubTables
|
|||
SNode* createDropTableClause(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable);
|
||||
SNode* createDropTableStmt(SAstCreateContext* pCxt, SNodeList* pTables);
|
||||
SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable);
|
||||
SNode* createAlterTableOption(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions);
|
||||
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pColName, SDataType dataType);
|
||||
SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pColName);
|
||||
SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pOldColName, const SToken* pNewColName);
|
||||
SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, const SToken* pTagName, SNode* pVal);
|
||||
SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName);
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDbName);
|
||||
SNode* createCreateUserStmt(SAstCreateContext* pCxt, const SToken* pUserName, const SToken* pPassword);
|
||||
|
@ -133,6 +148,7 @@ SNode* createAlterUserStmt(SAstCreateContext* pCxt, const SToken* pUserName, int
|
|||
SNode* createDropUserStmt(SAstCreateContext* pCxt, const SToken* pUserName);
|
||||
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
|
||||
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
|
||||
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
|
||||
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, const SToken* pIndexName, const SToken* pTableName, SNodeList* pCols, SNode* pOptions);
|
||||
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding);
|
||||
SNode* createDropIndexStmt(SAstCreateContext* pCxt, const SToken* pIndexName, const SToken* pTableName);
|
||||
|
@ -140,6 +156,7 @@ SNode* createCreateQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
|
|||
SNode* createDropQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
|
||||
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery, const SToken* pSubscribeDbName);
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName);
|
||||
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -21,13 +21,15 @@
|
|||
#include "parAst.h"
|
||||
}
|
||||
|
||||
%syntax_error {
|
||||
if(TOKEN.z) {
|
||||
generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, TOKEN.z);
|
||||
} else {
|
||||
generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INCOMPLETE_SQL);
|
||||
%syntax_error {
|
||||
if (pCxt->valid) {
|
||||
if(TOKEN.z) {
|
||||
generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, TOKEN.z);
|
||||
} else {
|
||||
generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INCOMPLETE_SQL);
|
||||
}
|
||||
pCxt->valid = false;
|
||||
}
|
||||
pCxt->valid = false;
|
||||
}
|
||||
|
||||
%left OR.
|
||||
|
@ -41,6 +43,41 @@
|
|||
%left NK_CONCAT.
|
||||
//%right NK_BITNOT.
|
||||
|
||||
/************************************************ create/alter account *****************************************/
|
||||
cmd ::= CREATE ACCOUNT NK_ID PASS NK_STRING account_options. { pCxt->valid = false; generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); }
|
||||
cmd ::= ALTER ACCOUNT NK_ID alter_account_options. { pCxt->valid = false; generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); }
|
||||
|
||||
%type account_options { int32_t }
|
||||
%destructor account_options { }
|
||||
account_options ::= . { }
|
||||
account_options ::= account_options PPS literal. { }
|
||||
account_options ::= account_options TSERIES literal. { }
|
||||
account_options ::= account_options STORAGE literal. { }
|
||||
account_options ::= account_options STREAMS literal. { }
|
||||
account_options ::= account_options QTIME literal. { }
|
||||
account_options ::= account_options DBS literal. { }
|
||||
account_options ::= account_options USERS literal. { }
|
||||
account_options ::= account_options CONNS literal. { }
|
||||
account_options ::= account_options STATE literal. { }
|
||||
|
||||
%type alter_account_options { int32_t }
|
||||
%destructor alter_account_options { }
|
||||
alter_account_options ::= alter_account_option. { }
|
||||
alter_account_options ::= alter_account_options alter_account_option. { }
|
||||
|
||||
%type alter_account_option { int32_t }
|
||||
%destructor alter_account_option { }
|
||||
alter_account_option ::= PASS literal. { }
|
||||
alter_account_option ::= PPS literal. { }
|
||||
alter_account_option ::= TSERIES literal. { }
|
||||
alter_account_option ::= STORAGE literal. { }
|
||||
alter_account_option ::= STREAMS literal. { }
|
||||
alter_account_option ::= QTIME literal. { }
|
||||
alter_account_option ::= DBS literal. { }
|
||||
alter_account_option ::= USERS literal. { }
|
||||
alter_account_option ::= CONNS literal. { }
|
||||
alter_account_option ::= STATE literal. { }
|
||||
|
||||
/************************************************ create/alter/drop/show user *****************************************/
|
||||
cmd ::= CREATE USER user_name(A) PASS NK_STRING(B). { pCxt->pRootNode = createCreateUserStmt(pCxt, &A, &B); }
|
||||
cmd ::= ALTER USER user_name(A) PASS NK_STRING(B). { pCxt->pRootNode = createAlterUserStmt(pCxt, &A, TSDB_ALTER_USER_PASSWD, &B); }
|
||||
|
@ -48,12 +85,16 @@ cmd ::= ALTER USER user_name(A) PRIVILEGE NK_STRING(B).
|
|||
cmd ::= DROP USER user_name(A). { pCxt->pRootNode = createDropUserStmt(pCxt, &A); }
|
||||
cmd ::= SHOW USERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_USERS_STMT, NULL); }
|
||||
|
||||
/************************************************ create/drop/show dnode **********************************************/
|
||||
/************************************************ create/drop/alter/show dnode ****************************************/
|
||||
cmd ::= CREATE DNODE dnode_endpoint(A). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, NULL); }
|
||||
cmd ::= CREATE DNODE dnode_host_name(A) PORT NK_INTEGER(B). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, &B); }
|
||||
cmd ::= DROP DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A); }
|
||||
cmd ::= DROP DNODE dnode_endpoint(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A); }
|
||||
cmd ::= SHOW DNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DNODES_STMT, NULL); }
|
||||
cmd ::= ALTER DNODE NK_INTEGER(A) NK_STRING(B). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, &A, &B, NULL); }
|
||||
cmd ::= ALTER DNODE NK_INTEGER(A) NK_STRING(B) NK_STRING(C). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, &A, &B, &C); }
|
||||
cmd ::= ALTER ALL DNODES NK_STRING(A). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, NULL, &A, NULL); }
|
||||
cmd ::= ALTER ALL DNODES NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, NULL, &A, &B); }
|
||||
|
||||
%type dnode_endpoint { SToken }
|
||||
%destructor dnode_endpoint { }
|
||||
|
@ -64,9 +105,13 @@ dnode_endpoint(A) ::= NK_STRING(B).
|
|||
dnode_host_name(A) ::= NK_ID(B). { A = B; }
|
||||
dnode_host_name(A) ::= NK_IPTOKEN(B). { A = B; }
|
||||
|
||||
/************************************************ alter local *********************************************************/
|
||||
cmd ::= ALTER LOCAL NK_STRING(A). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, NULL); }
|
||||
cmd ::= ALTER LOCAL NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, &B); }
|
||||
|
||||
/************************************************ create/drop qnode ***************************************************/
|
||||
cmd ::= CREATE QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateQnodeStmt(pCxt, &A); }
|
||||
cmd ::= DROP QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropQnodeStmt(pCxt, &A); }
|
||||
cmd ::= DROP QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropQnodeStmt(pCxt, &A); }
|
||||
cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT, NULL); }
|
||||
|
||||
/************************************************ create/drop/show/use database ***************************************/
|
||||
|
@ -74,7 +119,7 @@ cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C).
|
|||
cmd ::= DROP DATABASE exists_opt(A) db_name(B). { pCxt->pRootNode = createDropDatabaseStmt(pCxt, A, &B); }
|
||||
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASES_STMT, NULL); }
|
||||
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A); }
|
||||
cmd ::= ALTER DATABASE db_name(A) db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); }
|
||||
cmd ::= ALTER DATABASE db_name(A) alter_db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); }
|
||||
|
||||
%type not_exists_opt { bool }
|
||||
%destructor not_exists_opt { }
|
||||
|
@ -102,20 +147,55 @@ db_options(A) ::= db_options(B) REPLICA NK_INTEGER(C).
|
|||
db_options(A) ::= db_options(B) TTL NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_TTL, &C); }
|
||||
db_options(A) ::= db_options(B) WAL NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_WAL, &C); }
|
||||
db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_VGROUPS, &C); }
|
||||
db_options(A) ::= db_options(B) SINGLE_STABLE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SINGLESTABLE, &C); }
|
||||
db_options(A) ::= db_options(B) STREAM_MODE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_STREAMMODE, &C); }
|
||||
db_options(A) ::= db_options(B) SINGLE_STABLE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SINGLE_STABLE, &C); }
|
||||
db_options(A) ::= db_options(B) STREAM_MODE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_STREAM_MODE, &C); }
|
||||
db_options(A) ::= db_options(B) RETENTIONS NK_STRING(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_RETENTIONS, &C); }
|
||||
db_options(A) ::= db_options(B) FILE_FACTOR NK_FLOAT(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_FILE_FACTOR, &C); }
|
||||
|
||||
alter_db_options(A) ::= alter_db_option(B). { A = createDefaultAlterDatabaseOptions(pCxt); A = setDatabaseOption(pCxt, A, B.type, &B.val); }
|
||||
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setDatabaseOption(pCxt, B, C.type, &C.val); }
|
||||
|
||||
%type alter_db_option { SAlterOption }
|
||||
%destructor alter_db_option { }
|
||||
alter_db_option(A) ::= BLOCKS NK_INTEGER(B). { A.type = DB_OPTION_BLOCKS; A.val = B; }
|
||||
alter_db_option(A) ::= FSYNC NK_INTEGER(B). { A.type = DB_OPTION_FSYNC; A.val = B; }
|
||||
alter_db_option(A) ::= KEEP NK_INTEGER(B). { A.type = DB_OPTION_KEEP; A.val = B; }
|
||||
alter_db_option(A) ::= WAL NK_INTEGER(B). { A.type = DB_OPTION_WAL; A.val = B; }
|
||||
alter_db_option(A) ::= QUORUM NK_INTEGER(B). { A.type = DB_OPTION_QUORUM; A.val = B; }
|
||||
alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.val = B; }
|
||||
|
||||
/************************************************ create/drop/show table/stable ***************************************/
|
||||
cmd ::= CREATE TABLE not_exists_opt(A) full_table_name(B)
|
||||
NK_LP column_def_list(C) NK_RP tags_def_opt(D) table_options(E). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, B, C, D, E);}
|
||||
cmd ::= CREATE TABLE multi_create_clause(A). { pCxt->pRootNode = createCreateMultiTableStmt(pCxt, A);}
|
||||
NK_LP column_def_list(C) NK_RP tags_def_opt(D) table_options(E). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, B, C, D, E); }
|
||||
cmd ::= CREATE TABLE multi_create_clause(A). { pCxt->pRootNode = createCreateMultiTableStmt(pCxt, A); }
|
||||
cmd ::= CREATE STABLE not_exists_opt(A) full_table_name(B)
|
||||
NK_LP column_def_list(C) NK_RP tags_def(D) table_options(E). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, B, C, D, E);}
|
||||
NK_LP column_def_list(C) NK_RP tags_def(D) table_options(E). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, B, C, D, E); }
|
||||
cmd ::= DROP TABLE multi_drop_clause(A). { pCxt->pRootNode = createDropTableStmt(pCxt, A); }
|
||||
cmd ::= DROP STABLE exists_opt(A) full_table_name(B). { pCxt->pRootNode = createDropSuperTableStmt(pCxt, A, B); }
|
||||
cmd ::= SHOW TABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TABLES_STMT, NULL); }
|
||||
cmd ::= SHOW STABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_STABLES_STMT, NULL); }
|
||||
|
||||
cmd ::= ALTER TABLE alter_table_clause(A). { pCxt->pRootNode = A; }
|
||||
cmd ::= ALTER STABLE alter_table_clause(A). { pCxt->pRootNode = A; }
|
||||
|
||||
alter_table_clause(A) ::= full_table_name(B) alter_table_options(C). { A = createAlterTableOption(pCxt, B, C); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) ADD COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, &C, D); }
|
||||
alter_table_clause(A) ::= full_table_name(B) DROP COLUMN column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_COLUMN, &C); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) MODIFY COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, &C, D); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) RENAME COLUMN column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, &C, &D); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) ADD TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, &C, D); }
|
||||
alter_table_clause(A) ::= full_table_name(B) DROP TAG column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_TAG, &C); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) MODIFY TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, &C, D); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) SET TAG column_name(C) NK_EQ literal(D). { A = createAlterTableSetTag(pCxt, B, &C, D); }
|
||||
|
||||
%type multi_create_clause { SNodeList* }
|
||||
%destructor multi_create_clause { nodesDestroyList($$); }
|
||||
multi_create_clause(A) ::= create_subtable_clause(B). { A = createNodeList(pCxt, B); }
|
||||
|
@ -183,11 +263,21 @@ tags_def_opt(A) ::= tags_def(B).
|
|||
%destructor tags_def { nodesDestroyList($$); }
|
||||
tags_def(A) ::= TAGS NK_LP column_def_list(B) NK_RP. { A = B; }
|
||||
|
||||
table_options(A) ::= . { A = createDefaultTableOptions(pCxt);}
|
||||
table_options(A) ::= . { A = createDefaultTableOptions(pCxt); }
|
||||
table_options(A) ::= table_options(B) COMMENT NK_STRING(C). { A = setTableOption(pCxt, B, TABLE_OPTION_COMMENT, &C); }
|
||||
table_options(A) ::= table_options(B) KEEP NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_KEEP, &C); }
|
||||
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
|
||||
table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { A = setTableSmaOption(pCxt, B, C); }
|
||||
table_options(A) ::= table_options(B) ROLLUP NK_LP func_name_list(C) NK_RP. { A = setTableRollupOption(pCxt, B, C); }
|
||||
|
||||
alter_table_options(A) ::= alter_table_option(B). { A = createDefaultAlterTableOptions(pCxt); A = setTableOption(pCxt, A, B.type, &B.val); }
|
||||
alter_table_options(A) ::= alter_table_options(B) alter_table_option(C). { A = setTableOption(pCxt, B, C.type, &C.val); }
|
||||
|
||||
%type alter_table_option { SAlterOption }
|
||||
%destructor alter_table_option { }
|
||||
alter_table_option(A) ::= COMMENT NK_STRING(B). { A.type = TABLE_OPTION_COMMENT; A.val = B; }
|
||||
alter_table_option(A) ::= KEEP NK_INTEGER(B). { A.type = TABLE_OPTION_KEEP; A.val = B; }
|
||||
alter_table_option(A) ::= TTL NK_INTEGER(B). { A.type = TABLE_OPTION_TTL; A.val = B; }
|
||||
|
||||
%type col_name_list { SNodeList* }
|
||||
%destructor col_name_list { nodesDestroyList($$); }
|
||||
|
@ -196,6 +286,13 @@ col_name_list(A) ::= col_name_list(B) NK_COMMA col_name(C).
|
|||
|
||||
col_name(A) ::= column_name(B). { A = createColumnNode(pCxt, NULL, &B); }
|
||||
|
||||
%type func_name_list { SNodeList* }
|
||||
%destructor func_name_list { nodesDestroyList($$); }
|
||||
func_name_list(A) ::= func_name(B). { A = createNodeList(pCxt, B); }
|
||||
func_name_list(A) ::= func_name_list(B) NK_COMMA col_name(C). { A = addNodeToList(pCxt, B, C); }
|
||||
|
||||
func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); }
|
||||
|
||||
/************************************************ create index ********************************************************/
|
||||
cmd ::= CREATE SMA INDEX index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, &A, &B, NULL, C); }
|
||||
cmd ::= CREATE FULLTEXT INDEX
|
||||
|
@ -546,7 +643,7 @@ query_expression_body(A) ::=
|
|||
query_primary(A) ::= query_specification(B). { A = B; }
|
||||
//query_primary(A) ::=
|
||||
// NK_LP query_expression_body(B)
|
||||
// order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { A = B;}
|
||||
// order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { A = B; }
|
||||
|
||||
%type order_by_clause_opt { SNodeList* }
|
||||
%destructor order_by_clause_opt { nodesDestroyList($$); }
|
||||
|
|
|
@ -246,6 +246,16 @@ static SDatabaseOptions* setDbStreamMode(SAstCreateContext* pCxt, SDatabaseOptio
|
|||
return pOptions;
|
||||
}
|
||||
|
||||
static SDatabaseOptions* setDbRetentions(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) {
|
||||
// todo
|
||||
return pOptions;
|
||||
}
|
||||
|
||||
static SDatabaseOptions* setDbFileFactor(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) {
|
||||
// todo
|
||||
return pOptions;
|
||||
}
|
||||
|
||||
static void initSetDatabaseOptionFp() {
|
||||
setDbOptionFuncs[DB_OPTION_BLOCKS] = setDbBlocks;
|
||||
setDbOptionFuncs[DB_OPTION_CACHE] = setDbCache;
|
||||
|
@ -262,8 +272,10 @@ static void initSetDatabaseOptionFp() {
|
|||
setDbOptionFuncs[DB_OPTION_TTL] = setDbTtl;
|
||||
setDbOptionFuncs[DB_OPTION_WAL] = setDbWal;
|
||||
setDbOptionFuncs[DB_OPTION_VGROUPS] = setDbVgroups;
|
||||
setDbOptionFuncs[DB_OPTION_SINGLESTABLE] = setDbSingleStable;
|
||||
setDbOptionFuncs[DB_OPTION_STREAMMODE] = setDbStreamMode;
|
||||
setDbOptionFuncs[DB_OPTION_SINGLE_STABLE] = setDbSingleStable;
|
||||
setDbOptionFuncs[DB_OPTION_STREAM_MODE] = setDbStreamMode;
|
||||
setDbOptionFuncs[DB_OPTION_RETENTIONS] = setDbRetentions;
|
||||
setDbOptionFuncs[DB_OPTION_FILE_FACTOR] = setDbFileFactor;
|
||||
}
|
||||
|
||||
static STableOptions* setTableKeep(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) {
|
||||
|
@ -772,6 +784,29 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
|
|||
return (SNode*)pOptions;
|
||||
}
|
||||
|
||||
SNode* createDefaultAlterDatabaseOptions(SAstCreateContext* pCxt) {
|
||||
SDatabaseOptions* pOptions = nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS);
|
||||
CHECK_OUT_OF_MEM(pOptions);
|
||||
pOptions->numOfBlocks = -1;
|
||||
pOptions->cacheBlockSize = -1;
|
||||
pOptions->cachelast = -1;
|
||||
pOptions->compressionLevel = -1;
|
||||
pOptions->daysPerFile = -1;
|
||||
pOptions->fsyncPeriod = -1;
|
||||
pOptions->maxRowsPerBlock = -1;
|
||||
pOptions->minRowsPerBlock = -1;
|
||||
pOptions->keep = -1;
|
||||
pOptions->precision = -1;
|
||||
pOptions->quorum = -1;
|
||||
pOptions->replica = -1;
|
||||
pOptions->ttl = -1;
|
||||
pOptions->walLevel = -1;
|
||||
pOptions->numOfVgroups = -1;
|
||||
pOptions->singleStable = -1;
|
||||
pOptions->streamMode = -1;
|
||||
return (SNode*)pOptions;
|
||||
}
|
||||
|
||||
SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOptionType type, const SToken* pVal) {
|
||||
return (SNode*)setDbOptionFuncs[type](pCxt, (SDatabaseOptions*)pOptions, pVal);
|
||||
}
|
||||
|
@ -818,6 +853,14 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) {
|
|||
return (SNode*)pOptions;
|
||||
}
|
||||
|
||||
SNode* createDefaultAlterTableOptions(SAstCreateContext* pCxt) {
|
||||
STableOptions* pOptions = nodesMakeNode(QUERY_NODE_TABLE_OPTIONS);
|
||||
CHECK_OUT_OF_MEM(pOptions);
|
||||
pOptions->keep = -1;
|
||||
pOptions->ttl = -1;
|
||||
return (SNode*)pOptions;
|
||||
}
|
||||
|
||||
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, const SToken* pVal) {
|
||||
return (SNode*)setTableOptionFuncs[type](pCxt, (STableOptions*)pOptions, pVal);
|
||||
}
|
||||
|
@ -827,6 +870,11 @@ SNode* setTableSmaOption(SAstCreateContext* pCxt, SNode* pOptions, SNodeList* pS
|
|||
return pOptions;
|
||||
}
|
||||
|
||||
SNode* setTableRollupOption(SAstCreateContext* pCxt, SNode* pOptions, SNodeList* pFuncs) {
|
||||
// todo
|
||||
return pOptions;
|
||||
}
|
||||
|
||||
SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDataType dataType, const SToken* pComment) {
|
||||
SColumnDefNode* pCol = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF);
|
||||
CHECK_OUT_OF_MEM(pCol);
|
||||
|
@ -912,6 +960,49 @@ SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, S
|
|||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createAlterTableOption(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions) {
|
||||
SAlterTableStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->alterType = TSDB_ALTER_TABLE_UPDATE_OPTIONS;
|
||||
pStmt->pOptions = (STableOptions*)pOptions;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pColName, SDataType dataType) {
|
||||
SAlterTableStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->alterType = alterType;
|
||||
strncpy(pStmt->colName, pColName->z, pColName->n);
|
||||
pStmt->dataType = dataType;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pColName) {
|
||||
SAlterTableStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->alterType = alterType;
|
||||
strncpy(pStmt->colName, pColName->z, pColName->n);
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pOldColName, const SToken* pNewColName) {
|
||||
SAlterTableStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->alterType = alterType;
|
||||
strncpy(pStmt->colName, pOldColName->z, pOldColName->n);
|
||||
strncpy(pStmt->newColName, pNewColName->z, pNewColName->n);
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, const SToken* pTagName, SNode* pVal) {
|
||||
SAlterTableStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->alterType = TSDB_ALTER_TABLE_UPDATE_TAG_VAL;
|
||||
strncpy(pStmt->colName, pTagName->z, pTagName->n);
|
||||
pStmt->pVal = (SValueNode*)pVal;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
|
||||
SUseDatabaseStmt* pStmt = (SUseDatabaseStmt*)nodesMakeNode(QUERY_NODE_USE_DATABASE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
|
@ -1009,6 +1100,17 @@ SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode) {
|
|||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue) {
|
||||
SAlterDnodeStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_DNODE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->dnodeId = strtol(pDnode->z, NULL, 10);
|
||||
trimString(pConfig->z, pConfig->n, pStmt->config, sizeof(pStmt->config));
|
||||
if (NULL != pValue) {
|
||||
trimString(pValue->z, pValue->n, pStmt->value, sizeof(pStmt->value));
|
||||
}
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, const SToken* pIndexName, const SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
|
||||
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName)) {
|
||||
return NULL;
|
||||
|
@ -1077,3 +1179,13 @@ SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const
|
|||
pStmt->ignoreNotExists = ignoreNotExists;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue) {
|
||||
SAlterLocalStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_LOCAL_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
trimString(pConfig->z, pConfig->n, pStmt->config, sizeof(pStmt->config));
|
||||
if (NULL != pValue) {
|
||||
trimString(pValue->z, pValue->n, pStmt->value, sizeof(pStmt->value));
|
||||
}
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ typedef struct SKeyword {
|
|||
|
||||
// keywords in sql string
|
||||
static SKeyword keywordTable[] = {
|
||||
{"ACCOUNT", TK_ACCOUNT},
|
||||
{"ALL", TK_ALL},
|
||||
{"ALTER", TK_ALTER},
|
||||
{"AND", TK_AND},
|
||||
|
@ -168,7 +169,6 @@ static SKeyword keywordTable[] = {
|
|||
// {"SCORES", TK_SCORES},
|
||||
// {"GRANTS", TK_GRANTS},
|
||||
// {"DOT", TK_DOT},
|
||||
// {"ACCOUNT", TK_ACCOUNT},
|
||||
// {"DESCRIBE", TK_DESCRIBE},
|
||||
// {"SYNCDB", TK_SYNCDB},
|
||||
// {"LOCAL", TK_LOCAL},
|
||||
|
|
|
@ -126,7 +126,7 @@ static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SCol
|
|||
static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode* pTable, SNodeList* pList) {
|
||||
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
|
||||
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
|
||||
int32_t nums = pMeta->tableInfo.numOfColumns + ((TSDB_SUPER_TABLE == pMeta->tableType)? pMeta->tableInfo.numOfTags:0);
|
||||
int32_t nums = pMeta->tableInfo.numOfColumns + ((TSDB_SUPER_TABLE == pMeta->tableType) ? pMeta->tableInfo.numOfTags : 0);
|
||||
for (int32_t i = 0; i < nums; ++i) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
|
@ -499,6 +499,10 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
|
|||
}
|
||||
|
||||
static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) {
|
||||
if (pCxt->streamQuery) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
|
||||
SArray* vgroupList = NULL;
|
||||
int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, &vgroupList);
|
||||
|
@ -962,6 +966,73 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS
|
|||
return doTranslateDropSuperTable(pCxt, &tableName, pStmt->ignoreNotExists);
|
||||
}
|
||||
|
||||
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) {
|
||||
pAlterReq->pFields = taosArrayInit(2, sizeof(TAOS_FIELD));
|
||||
if (NULL == pAlterReq->pFields) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
switch (pStmt->alterType) {
|
||||
case TSDB_ALTER_TABLE_ADD_TAG:
|
||||
case TSDB_ALTER_TABLE_DROP_TAG:
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: {
|
||||
TAOS_FIELD field = { .type = pStmt->dataType.type, .bytes = pStmt->dataType.bytes };
|
||||
strcpy(field.name, pStmt->colName);
|
||||
taosArrayPush(pAlterReq->pFields, &field);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
|
||||
TAOS_FIELD oldField = {0};
|
||||
strcpy(oldField.name, pStmt->colName);
|
||||
taosArrayPush(pAlterReq->pFields, &oldField);
|
||||
TAOS_FIELD newField = {0};
|
||||
strcpy(oldField.name, pStmt->newColName);
|
||||
taosArrayPush(pAlterReq->pFields, &newField);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
|
||||
SMAltertbReq alterReq = {0};
|
||||
SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
|
||||
strcpy(tableName.dbname, pStmt->dbName);
|
||||
strcpy(tableName.tname, pStmt->tableName);
|
||||
tNameExtractFullName(&tableName, alterReq.name);
|
||||
alterReq.alterType = pStmt->alterType;
|
||||
alterReq.numOfFields = 1;
|
||||
if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) {
|
||||
// todo
|
||||
} else {
|
||||
if (TSDB_CODE_SUCCESS != setAlterTableField(pStmt, &alterReq)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
|
||||
if (NULL == pCxt->pCmdMsg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
|
||||
pCxt->pCmdMsg->msgType = TDMT_MND_ALTER_STB;
|
||||
pCxt->pCmdMsg->msgLen = tSerializeSMAlterStbReq(NULL, 0, &alterReq);
|
||||
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
|
||||
if (NULL == pCxt->pCmdMsg->pMsg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
tSerializeSMAlterStbReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &alterReq);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* pStmt) {
|
||||
SName name = {0};
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
|
||||
|
@ -1099,6 +1170,28 @@ static int32_t translateDropDnode(STranslateContext* pCxt, SDropDnodeStmt* pStmt
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pStmt) {
|
||||
SMCfgDnodeReq cfgReq = {0};
|
||||
cfgReq.dnodeId = pStmt->dnodeId;
|
||||
strcpy(cfgReq.config, pStmt->config);
|
||||
strcpy(cfgReq.value, pStmt->value);
|
||||
|
||||
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
|
||||
if (NULL == pCxt->pCmdMsg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
|
||||
pCxt->pCmdMsg->msgType = TDMT_MND_CONFIG_DNODE;
|
||||
pCxt->pCmdMsg->msgLen = tSerializeSMCfgDnodeReq(NULL, 0, &cfgReq);
|
||||
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
|
||||
if (NULL == pCxt->pCmdMsg->pMsg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
tSerializeSMCfgDnodeReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &cfgReq);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t nodeTypeToShowType(ENodeType nt) {
|
||||
switch (nt) {
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
|
@ -1300,6 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p
|
|||
SCMCreateTopicReq createReq = {0};
|
||||
|
||||
if (NULL != pStmt->pQuery) {
|
||||
pCxt->pParseCxt->streamQuery = true;
|
||||
int32_t code = translateQuery(pCxt, pStmt->pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
|
||||
|
@ -1364,6 +1458,11 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateAlterLocal(STranslateContext* pCxt, SAlterLocalStmt* pStmt) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pNode)) {
|
||||
|
@ -1388,6 +1487,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
|
||||
code = translateDropSuperTable(pCxt, (SDropSuperTableStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_ALTER_TABLE_STMT:
|
||||
code = translateAlterTable(pCxt, (SAlterTableStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_USER_STMT:
|
||||
code = translateCreateUser(pCxt, (SCreateUserStmt*)pNode);
|
||||
break;
|
||||
|
@ -1406,6 +1508,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_DROP_DNODE_STMT:
|
||||
code = translateDropDnode(pCxt, (SDropDnodeStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_ALTER_DNODE_STMT:
|
||||
code = translateAlterDnode(pCxt, (SAlterDnodeStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
case QUERY_NODE_SHOW_STABLES_STMT:
|
||||
case QUERY_NODE_SHOW_USERS_STMT:
|
||||
|
@ -1436,6 +1541,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_DROP_TOPIC_STMT:
|
||||
code = translateDropTopic(pCxt, (SDropTopicStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_ALTER_LOCAL_STMT:
|
||||
code = translateAlterLocal(pCxt, (SAlterLocalStmt*)pNode);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -1855,6 +1963,11 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
|
|||
return rewriteToVnodeModifOpStmt(pQuery, pBufArray);
|
||||
}
|
||||
|
||||
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pQuery->pRoot)) {
|
||||
|
@ -1866,6 +1979,11 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
|
||||
code = rewriteCreateMultiTable(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_ALTER_TABLE_STMT:
|
||||
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == ((SAlterTableStmt*)pQuery->pRoot)->alterType) {
|
||||
code = rewriteAlterTable(pCxt, pQuery);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -46,17 +46,19 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
case TSDB_CODE_PAR_NOT_SINGLE_GROUP:
|
||||
return "Not a single-group group function";
|
||||
case TSDB_CODE_PAR_TAGS_NOT_MATCHED:
|
||||
return "tags number not matched";
|
||||
return "Tags number not matched";
|
||||
case TSDB_CODE_PAR_INVALID_TAG_NAME:
|
||||
return "invalid tag name : %s";
|
||||
return "Invalid tag name : %s";
|
||||
case TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG:
|
||||
return "name or password too long";
|
||||
return "Name or password too long";
|
||||
case TSDB_CODE_PAR_PASSWD_EMPTY:
|
||||
return "password can not be empty";
|
||||
return "Password can not be empty";
|
||||
case TSDB_CODE_PAR_INVALID_PORT:
|
||||
return "port should be an integer that is less than 65535 and greater than 0";
|
||||
return "Port should be an integer that is less than 65535 and greater than 0";
|
||||
case TSDB_CODE_PAR_INVALID_ENDPOINT:
|
||||
return "endpoint should be in the format of 'fqdn:port'";
|
||||
return "Endpoint should be in the format of 'fqdn:port'";
|
||||
case TSDB_CODE_PAR_EXPRIE_STATEMENT:
|
||||
return "This statement is no longer supported";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -302,6 +302,13 @@ TEST_F(ParserTest, createUser) {
|
|||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, alterAccount) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("alter account ac_wxy pass '123456'");
|
||||
ASSERT_TRUE(run(TSDB_CODE_PAR_EXPRIE_STATEMENT));
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, createDnode) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
|
@ -312,6 +319,16 @@ TEST_F(ParserTest, createDnode) {
|
|||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, alterDnode) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("alter dnode 1 'resetLog'");
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("alter dnode 1 'debugFlag' '134'");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, createDatabase) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
|
|
|
@ -18,13 +18,13 @@
|
|||
#include "functionMgt.h"
|
||||
|
||||
typedef struct SLogicPlanContext {
|
||||
int32_t errCode;
|
||||
int32_t planNodeId;
|
||||
int32_t acctId;
|
||||
SPlanContext* pPlanCxt;
|
||||
} SLogicPlanContext;
|
||||
|
||||
static SLogicNode* createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt);
|
||||
static SLogicNode* createLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable);
|
||||
typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, SSelectStmt*, SLogicNode**);
|
||||
|
||||
static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable, SLogicNode** pLogicNode);
|
||||
static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode);
|
||||
|
||||
typedef struct SRewriteExprCxt {
|
||||
int32_t errCode;
|
||||
|
@ -66,7 +66,6 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
|||
}
|
||||
|
||||
typedef struct SNameExprCxt {
|
||||
int32_t planNodeId;
|
||||
int32_t rewriteId;
|
||||
} SNameExprCxt;
|
||||
|
||||
|
@ -76,7 +75,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
|
|||
case QUERY_NODE_LOGIC_CONDITION:
|
||||
case QUERY_NODE_FUNCTION: {
|
||||
SNameExprCxt* pCxt = (SNameExprCxt*)pContext;
|
||||
sprintf(((SExprNode*)pNode)->aliasName, "#expr_%d_%d", pCxt->planNodeId, pCxt->rewriteId++);
|
||||
sprintf(((SExprNode*)pNode)->aliasName, "#expr_%d", pCxt->rewriteId++);
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
default:
|
||||
|
@ -86,131 +85,194 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t rewriteExpr(int32_t planNodeId, int32_t rewriteId, SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
|
||||
SNameExprCxt nameCxt = { .planNodeId = planNodeId, .rewriteId = rewriteId };
|
||||
static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
|
||||
static int32_t rewriteId = 1;
|
||||
SNameExprCxt nameCxt = { .rewriteId = rewriteId };
|
||||
nodesWalkList(pExprs, doNameExpr, &nameCxt);
|
||||
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
|
||||
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static SLogicNode* pushLogicNode(SLogicPlanContext* pCxt, SLogicNode* pRoot, SLogicNode* pNode) {
|
||||
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (NULL == pRoot) {
|
||||
return pNode;
|
||||
}
|
||||
|
||||
if (NULL == pNode) {
|
||||
return pRoot;
|
||||
}
|
||||
|
||||
if (NULL == pNode->pChildren) {
|
||||
pNode->pChildren = nodesMakeList();
|
||||
if (NULL == pNode->pChildren) {
|
||||
goto error;
|
||||
static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) {
|
||||
if (NULL == pNewRoot->pChildren) {
|
||||
pNewRoot->pChildren = nodesMakeList();
|
||||
if (NULL == pNewRoot->pChildren) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pNode->pChildren, (SNode*)pRoot)) {
|
||||
goto error;
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pNewRoot->pChildren, (SNode*)*pOldRoot)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pRoot->pParent = pNode;
|
||||
return pNode;
|
||||
error:
|
||||
nodesDestroyNode((SNode*)pNode);
|
||||
return pRoot;
|
||||
|
||||
(*pOldRoot)->pParent = pNewRoot;
|
||||
*pOldRoot = pNewRoot;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) {
|
||||
static int32_t createChildLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, FCreateLogicNode func, SLogicNode** pRoot) {
|
||||
SLogicNode* pNode = NULL;
|
||||
int32_t code = func(pCxt, pSelect, &pNode);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pNode) {
|
||||
code = pushLogicNode(pCxt, pRoot, pNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pNode);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable, SLogicNode** pLogicNode) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
|
||||
CHECK_ALLOC(pScan, NULL);
|
||||
pScan->node.id = pCxt->planNodeId++;
|
||||
if (NULL == pScan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
|
||||
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
|
||||
|
||||
// set columns to scan
|
||||
SNodeList* pCols = NULL;
|
||||
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, &pCols), (SLogicNode*)pScan);
|
||||
if (NULL != pCols) {
|
||||
pScan->pScanCols = nodesCloneList(pCols);
|
||||
CHECK_ALLOC(pScan->pScanCols, (SLogicNode*)pScan);
|
||||
}
|
||||
|
||||
// set output
|
||||
if (NULL != pCols) {
|
||||
pScan->node.pTargets = nodesCloneList(pCols);
|
||||
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
|
||||
}
|
||||
|
||||
pScan->scanType = SCAN_TYPE_TABLE;
|
||||
pScan->scanType = pCxt->pPlanCxt->streamQuery ? SCAN_TYPE_STREAM : SCAN_TYPE_TABLE;
|
||||
pScan->scanFlag = MAIN_SCAN;
|
||||
pScan->scanRange = TSWINDOW_INITIALIZER;
|
||||
pScan->tableName.type = TSDB_TABLE_NAME_T;
|
||||
pScan->tableName.acctId = pCxt->acctId;
|
||||
pScan->tableName.acctId = pCxt->pPlanCxt->acctId;
|
||||
strcpy(pScan->tableName.dbname, pRealTable->table.dbName);
|
||||
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
|
||||
|
||||
return (SLogicNode*)pScan;
|
||||
}
|
||||
|
||||
static SLogicNode* createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable) {
|
||||
SLogicNode* pRoot = createQueryLogicNode(pCxt, pTable->pSubquery);
|
||||
CHECK_ALLOC(pRoot, NULL);
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pRoot->pTargets) {
|
||||
strcpy(((SColumnNode*)pNode)->tableAlias, pTable->table.tableAlias);
|
||||
// set columns to scan
|
||||
SNodeList* pCols = NULL;
|
||||
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, &pCols);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pCols) {
|
||||
pScan->pScanCols = nodesCloneList(pCols);
|
||||
if (NULL == pScan) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return pRoot;
|
||||
|
||||
// set output
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pCols) {
|
||||
pScan->node.pTargets = nodesCloneList(pCols);
|
||||
if (NULL == pScan) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pScan;
|
||||
} else {
|
||||
nodesDestroyNode(pScan);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLogicNode* createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable) {
|
||||
static int32_t createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable, SLogicNode** pLogicNode) {
|
||||
int32_t code = createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, (*pLogicNode)->pTargets) {
|
||||
strcpy(((SColumnNode*)pNode)->tableAlias, pTable->table.tableAlias);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable, SLogicNode** pLogicNode) {
|
||||
SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_JOIN);
|
||||
CHECK_ALLOC(pJoin, NULL);
|
||||
pJoin->node.id = pCxt->planNodeId++;
|
||||
if (NULL == pJoin) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pJoin->joinType = pJoinTable->joinType;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// set left and right node
|
||||
pJoin->node.pChildren = nodesMakeList();
|
||||
CHECK_ALLOC(pJoin->node.pChildren, (SLogicNode*)pJoin);
|
||||
SLogicNode* pLeft = createLogicNodeByTable(pCxt, pSelect, pJoinTable->pLeft);
|
||||
CHECK_ALLOC(pLeft, (SLogicNode*)pJoin);
|
||||
CHECK_CODE(nodesListAppend(pJoin->node.pChildren, (SNode*)pLeft), (SLogicNode*)pJoin);
|
||||
SLogicNode* pRight = createLogicNodeByTable(pCxt, pSelect, pJoinTable->pRight);
|
||||
CHECK_ALLOC(pRight, (SLogicNode*)pJoin);
|
||||
CHECK_CODE(nodesListAppend(pJoin->node.pChildren, (SNode*)pRight), (SLogicNode*)pJoin);
|
||||
if (NULL == pJoin->node.pChildren) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SLogicNode* pLeft = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = doCreateLogicNodeByTable(pCxt, pSelect, pJoinTable->pLeft, &pLeft);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppend(pJoin->node.pChildren, (SNode*)pLeft);
|
||||
}
|
||||
}
|
||||
|
||||
SLogicNode* pRight = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = doCreateLogicNodeByTable(pCxt, pSelect, pJoinTable->pRight, &pRight);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppend(pJoin->node.pChildren, (SNode*)pRight);
|
||||
}
|
||||
}
|
||||
|
||||
// set on conditions
|
||||
if (NULL != pJoinTable->pOnCond) {
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinTable->pOnCond) {
|
||||
pJoin->pOnConditions = nodesCloneNode(pJoinTable->pOnCond);
|
||||
CHECK_ALLOC(pJoin->pOnConditions, (SLogicNode*)pJoin);
|
||||
if (NULL == pJoin->pOnConditions) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
// set the output
|
||||
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
|
||||
CHECK_ALLOC(pJoin->node.pTargets, (SLogicNode*)pJoin);
|
||||
SNodeList* pTargets = nodesCloneList(pRight->pTargets);
|
||||
CHECK_ALLOC(pTargets, (SLogicNode*)pJoin);
|
||||
nodesListAppendList(pJoin->node.pTargets, pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
|
||||
if (NULL == pJoin->pOnConditions) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppendList(pJoin->node.pTargets, nodesCloneList(pRight->pTargets));
|
||||
}
|
||||
}
|
||||
|
||||
return (SLogicNode*)pJoin;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pJoin;
|
||||
} else {
|
||||
nodesDestroyNode(pJoin);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLogicNode* createLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable) {
|
||||
static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable, SLogicNode** pLogicNode) {
|
||||
switch (nodeType(pTable)) {
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
return createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable);
|
||||
return createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable, pLogicNode);
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable);
|
||||
return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable, pLogicNode);
|
||||
case QUERY_NODE_JOIN_TABLE:
|
||||
return createJoinLogicNode(pCxt, pSelect, (SJoinTableNode*)pTable);
|
||||
return createJoinLogicNode(pCxt, pSelect, (SJoinTableNode*)pTable, pLogicNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return NULL;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t createLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable, SLogicNode** pLogicNode) {
|
||||
SLogicNode* pNode = NULL;
|
||||
int32_t code = doCreateLogicNodeByTable(pCxt, pSelect, pTable, &pNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pNode->pConditions = nodesCloneNode(pSelect->pWhere);
|
||||
if (NULL != pSelect->pWhere && NULL == pNode->pConditions) {
|
||||
nodesDestroyNode(pNode);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
*pLogicNode = pNode;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static SColumnNode* createColumnByExpr(SExprNode* pExpr) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return NULL;
|
||||
}
|
||||
pCol->node.resType = pExpr->resType;
|
||||
strcpy(pCol->colName, pExpr->aliasName);
|
||||
return pCol;
|
||||
}
|
||||
|
||||
typedef struct SCreateColumnCxt {
|
||||
|
@ -245,197 +307,241 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static SNodeList* createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* pExprs) {
|
||||
SCreateColumnCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pList = nodesMakeList() };
|
||||
CHECK_ALLOC(cxt.pList, NULL);
|
||||
static int32_t createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pList) {
|
||||
SCreateColumnCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList) };
|
||||
if (NULL == cxt.pList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
nodesWalkList(pExprs, doCreateColumn, &cxt);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyList(cxt.pList);
|
||||
return NULL;
|
||||
return cxt.errCode;
|
||||
}
|
||||
return cxt.pList;
|
||||
if (NULL == *pList) {
|
||||
*pList = cxt.pList;
|
||||
}
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
|
||||
static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
SNodeList* pAggFuncs = NULL;
|
||||
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL);
|
||||
int32_t code = nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
if (NULL == pAggFuncs && NULL == pSelect->pGroupByList) {
|
||||
return NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
|
||||
CHECK_ALLOC(pAgg, NULL);
|
||||
pAgg->node.id = pCxt->planNodeId++;
|
||||
if (NULL == pAgg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// set grouyp keys, agg funcs and having conditions
|
||||
if (NULL != pSelect->pGroupByList) {
|
||||
pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
|
||||
CHECK_ALLOC(pAgg->pGroupKeys, (SLogicNode*)pAgg);
|
||||
if (NULL == pAgg->pGroupKeys) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (NULL != pAggFuncs) {
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
|
||||
pAgg->pAggFuncs = nodesCloneList(pAggFuncs);
|
||||
CHECK_ALLOC(pAgg->pAggFuncs, (SLogicNode*)pAgg);
|
||||
if (NULL == pAgg->pAggFuncs) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
// rewrite the expression in subsequent clauses
|
||||
CHECK_CODE(rewriteExpr(pAgg->node.id, 1, pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY), (SLogicNode*)pAgg);
|
||||
CHECK_CODE(rewriteExpr(pAgg->node.id, 1 + LIST_LENGTH(pAgg->pGroupKeys), pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY), (SLogicNode*)pAgg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExpr(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||
}
|
||||
|
||||
if (NULL != pSelect->pHaving) {
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
|
||||
pAgg->node.pConditions = nodesCloneNode(pSelect->pHaving);
|
||||
CHECK_ALLOC(pAgg->node.pConditions, (SLogicNode*)pAgg);
|
||||
if (NULL == pAgg->node.pConditions) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
// set the output
|
||||
pAgg->node.pTargets = nodesMakeList();
|
||||
CHECK_ALLOC(pAgg->node.pTargets, (SLogicNode*)pAgg);
|
||||
if (NULL != pAgg->pGroupKeys) {
|
||||
SNodeList* pTargets = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys);
|
||||
CHECK_ALLOC(pAgg->node.pTargets, (SLogicNode*)pAgg);
|
||||
nodesListAppendList(pAgg->node.pTargets, pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) {
|
||||
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
|
||||
}
|
||||
if (NULL != pAgg->pAggFuncs) {
|
||||
SNodeList* pTargets = createColumnByRewriteExps(pCxt, pAgg->pAggFuncs);
|
||||
CHECK_ALLOC(pTargets, (SLogicNode*)pAgg);
|
||||
nodesListAppendList(pAgg->node.pTargets, pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
|
||||
code = createColumnByRewriteExps(pCxt, pAgg->pAggFuncs, &pAgg->node.pTargets);
|
||||
}
|
||||
|
||||
return (SLogicNode*)pAgg;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pAgg;
|
||||
} else {
|
||||
nodesDestroyNode(pAgg);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SIntervalWindowNode* pInterval, SSelectStmt* pSelect) {
|
||||
static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SIntervalWindowNode* pInterval, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
|
||||
CHECK_ALLOC(pWindow, NULL);
|
||||
pWindow->node.id = pCxt->planNodeId++;
|
||||
if (NULL == pWindow) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pWindow->winType = WINDOW_TYPE_INTERVAL;
|
||||
//SValueNode* pIntervalNode = (SValueNode*)((SRawExprNode*)(pInterval->pInterval))->pNode;
|
||||
SValueNode* pIntervalNode = (SValueNode*)(pInterval->pInterval);
|
||||
|
||||
pWindow->interval = pIntervalNode->datum.i;
|
||||
pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i;
|
||||
pWindow->intervalUnit = ((SValueNode*)pInterval->pInterval)->unit;
|
||||
pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
|
||||
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
|
||||
pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (NULL != pInterval->pFill) {
|
||||
pWindow->pFill = nodesCloneNode(pInterval->pFill);
|
||||
CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow);
|
||||
if (NULL == pWindow->pFill) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
SNodeList* pFuncs = NULL;
|
||||
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pFuncs), NULL);
|
||||
if (NULL != pFuncs) {
|
||||
pWindow->pFuncs = nodesCloneList(pFuncs);
|
||||
CHECK_ALLOC(pWindow->pFuncs, (SLogicNode*)pWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesCollectFuncs(pSelect, fmIsAggFunc, &pWindow->pFuncs);
|
||||
}
|
||||
|
||||
CHECK_CODE(rewriteExpr(pWindow->node.id, 1, pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW), (SLogicNode*)pWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
|
||||
}
|
||||
|
||||
pWindow->node.pTargets = createColumnByRewriteExps(pCxt, pWindow->pFuncs);
|
||||
CHECK_ALLOC(pWindow->node.pTargets, (SLogicNode*)pWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExps(pCxt, pWindow->pFuncs, &pWindow->node.pTargets);
|
||||
}
|
||||
|
||||
return (SLogicNode*)pWindow;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pWindow;
|
||||
} else {
|
||||
nodesDestroyNode(pWindow);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLogicNode* createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
|
||||
static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
if (NULL == pSelect->pWindow) {
|
||||
return NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
switch (nodeType(pSelect->pWindow)) {
|
||||
case QUERY_NODE_INTERVAL_WINDOW:
|
||||
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect);
|
||||
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static SNodeList* createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs) {
|
||||
static int32_t createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pCols) {
|
||||
SNodeList* pList = nodesMakeList();
|
||||
CHECK_ALLOC(pList, NULL);
|
||||
if (NULL == pList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pExprs) {
|
||||
SExprNode* pExpr = (SExprNode*)pNode;
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
goto error;
|
||||
}
|
||||
pCol->node.resType = pExpr->resType;
|
||||
strcpy(pCol->colName, pExpr->aliasName);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pList, (SNode*)pCol)) {
|
||||
goto error;
|
||||
FOREACH(pNode, pExprs) {
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pList, createColumnByExpr((SExprNode*)pNode))) {
|
||||
nodesDestroyList(pList);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return pList;
|
||||
error:
|
||||
nodesDestroyList(pList);
|
||||
return NULL;
|
||||
|
||||
*pCols = pList;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SLogicNode* createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
|
||||
static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
SProjectLogicNode* pProject = (SProjectLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PROJECT);
|
||||
CHECK_ALLOC(pProject, NULL);
|
||||
pProject->node.id = pCxt->planNodeId++;
|
||||
if (NULL == pProject) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pProject->pProjections = nodesCloneList(pSelect->pProjectionList);
|
||||
if (NULL == pProject->pProjections) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pProject->node.pTargets = createColumnByProjections(pCxt,pSelect->pProjectionList);
|
||||
CHECK_ALLOC(pProject->node.pTargets, (SLogicNode*)pProject);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByProjections(pCxt,pSelect->pProjectionList, &pProject->node.pTargets);
|
||||
}
|
||||
|
||||
return (SLogicNode*)pProject;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pProject;
|
||||
} else {
|
||||
nodesDestroyNode(pProject);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
|
||||
SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode && NULL != pSelect->pWhere) {
|
||||
pRoot->pConditions = nodesCloneNode(pSelect->pWhere);
|
||||
CHECK_ALLOC(pRoot->pConditions, pRoot);
|
||||
static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
SLogicNode* pRoot = NULL;
|
||||
int32_t code = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable, &pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createChildLogicNode(pCxt, pSelect, createWindowLogicNode, &pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pRoot = pushLogicNode(pCxt, pRoot, createWindowLogicNode(pCxt, pSelect));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createChildLogicNode(pCxt, pSelect, createAggLogicNode, &pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createChildLogicNode(pCxt, pSelect, createProjectLogicNode, &pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pRoot = pushLogicNode(pCxt, pRoot, createProjectLogicNode(pCxt, pSelect));
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = pRoot;
|
||||
} else {
|
||||
nodesDestroyNode(pRoot);
|
||||
}
|
||||
return pRoot;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getMsgType(ENodeType sqlType) {
|
||||
return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType) ? TDMT_VND_CREATE_TABLE : TDMT_VND_SUBMIT;
|
||||
}
|
||||
|
||||
static SLogicNode* createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIF);
|
||||
CHECK_ALLOC(pModif, NULL);
|
||||
static int32_t createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpStmt* pStmt, SLogicNode** pLogicNode) {
|
||||
SVnodeModifLogicNode* pModif = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIF);
|
||||
if (NULL == pModif) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pModif->pDataBlocks = pStmt->pDataBlocks;
|
||||
pModif->msgType = getMsgType(pStmt->sqlNodeType);
|
||||
return (SLogicNode*)pModif;
|
||||
*pLogicNode = (SLogicNode*)pModif;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SLogicNode* createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt) {
|
||||
static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode) {
|
||||
switch (nodeType(pStmt)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
return createSelectLogicNode(pCxt, (SSelectStmt*)pStmt);
|
||||
return createSelectLogicNode(pCxt, (SSelectStmt*)pStmt, pLogicNode);
|
||||
case QUERY_NODE_VNODE_MODIF_STMT:
|
||||
return createVnodeModifLogicNode(pCxt, (SVnodeModifOpStmt*)pStmt);
|
||||
return createVnodeModifLogicNode(pCxt, (SVnodeModifOpStmt*)pStmt, pLogicNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return NULL; // to avoid compiler error
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) {
|
||||
SLogicPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = 1, .acctId = pCxt->acctId };
|
||||
SLogicNode* pRoot = createQueryLogicNode(&cxt, pCxt->pAstRoot);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyNode((SNode*)pRoot);
|
||||
return cxt.errCode;
|
||||
SLogicPlanContext cxt = { .pPlanCxt = pCxt };
|
||||
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, pLogicNode);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
*pLogicNode = pRoot;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -259,15 +259,21 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p
|
|||
return (SPhysiNode*)pTableScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
|
||||
SStreamScanPhysiNode* pTableScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
CHECK_ALLOC(pTableScan, NULL);
|
||||
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
|
||||
return (SPhysiNode*)pTableScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
return createTagScanPhysiNode(pCxt, pScanLogicNode);
|
||||
case SCAN_TYPE_TABLE:
|
||||
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
|
||||
case SCAN_TYPE_STABLE:
|
||||
case SCAN_TYPE_STREAM:
|
||||
break;
|
||||
return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -202,9 +202,16 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV
|
|||
return (errno == ERANGE||errno == EINVAL) ? TSDB_CODE_FAILED:TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) {
|
||||
uint64_t val = 0;
|
||||
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
|
||||
*pVal = val;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) {
|
||||
uint64_t val = 0;
|
||||
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
|
||||
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
|
||||
*pVal = val;
|
||||
return code;
|
||||
}
|
||||
|
@ -239,6 +246,22 @@ int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, voi
|
|||
return func(pJsonObj, pObj);
|
||||
}
|
||||
|
||||
int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize) {
|
||||
if (objSize <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SJson* pJsonObj = tjsonGetObjectItem(pJson, pName);
|
||||
if (NULL == pJsonObj) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
*pObj = calloc(1, objSize);
|
||||
if (NULL == *pObj) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return func(pJsonObj, *pObj);
|
||||
}
|
||||
|
||||
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize) {
|
||||
const cJSON* jArray = tjsonGetObjectItem(pJson, pName);
|
||||
int32_t size = (NULL == jArray ? 0 : tjsonGetArraySize(jArray));
|
||||
|
|
Loading…
Reference in New Issue