feat: CREATE STREAM statement implement, and syntax of JSON data type implement.
This commit is contained in:
parent
5143c8dbdb
commit
ba1f3f7815
|
@ -171,60 +171,69 @@
|
|||
#define TK_BUFSIZE 153
|
||||
#define TK_STREAM 154
|
||||
#define TK_INTO 155
|
||||
#define TK_KILL 156
|
||||
#define TK_CONNECTION 157
|
||||
#define TK_MERGE 158
|
||||
#define TK_VGROUP 159
|
||||
#define TK_REDISTRIBUTE 160
|
||||
#define TK_SPLIT 161
|
||||
#define TK_SYNCDB 162
|
||||
#define TK_NULL 163
|
||||
#define TK_FIRST 164
|
||||
#define TK_LAST 165
|
||||
#define TK_CAST 166
|
||||
#define TK_NOW 167
|
||||
#define TK_TODAY 168
|
||||
#define TK_ROWTS 169
|
||||
#define TK_TBNAME 170
|
||||
#define TK_QSTARTTS 171
|
||||
#define TK_QENDTS 172
|
||||
#define TK_WSTARTTS 173
|
||||
#define TK_WENDTS 174
|
||||
#define TK_WDURATION 175
|
||||
#define TK_BETWEEN 176
|
||||
#define TK_IS 177
|
||||
#define TK_NK_LT 178
|
||||
#define TK_NK_GT 179
|
||||
#define TK_NK_LE 180
|
||||
#define TK_NK_GE 181
|
||||
#define TK_NK_NE 182
|
||||
#define TK_MATCH 183
|
||||
#define TK_NMATCH 184
|
||||
#define TK_JOIN 185
|
||||
#define TK_INNER 186
|
||||
#define TK_SELECT 187
|
||||
#define TK_DISTINCT 188
|
||||
#define TK_WHERE 189
|
||||
#define TK_PARTITION 190
|
||||
#define TK_BY 191
|
||||
#define TK_SESSION 192
|
||||
#define TK_STATE_WINDOW 193
|
||||
#define TK_SLIDING 194
|
||||
#define TK_FILL 195
|
||||
#define TK_VALUE 196
|
||||
#define TK_NONE 197
|
||||
#define TK_PREV 198
|
||||
#define TK_LINEAR 199
|
||||
#define TK_NEXT 200
|
||||
#define TK_GROUP 201
|
||||
#define TK_HAVING 202
|
||||
#define TK_ORDER 203
|
||||
#define TK_SLIMIT 204
|
||||
#define TK_SOFFSET 205
|
||||
#define TK_LIMIT 206
|
||||
#define TK_OFFSET 207
|
||||
#define TK_ASC 208
|
||||
#define TK_NULLS 209
|
||||
#define TK_TRIGGER 156
|
||||
#define TK_AT_ONCE 157
|
||||
#define TK_WINDOW_CLOSE 158
|
||||
#define TK_WATERMARK 159
|
||||
#define TK_KILL 160
|
||||
#define TK_CONNECTION 161
|
||||
#define TK_MERGE 162
|
||||
#define TK_VGROUP 163
|
||||
#define TK_REDISTRIBUTE 164
|
||||
#define TK_SPLIT 165
|
||||
#define TK_SYNCDB 166
|
||||
#define TK_NULL 167
|
||||
#define TK_NK_QUESTION 168
|
||||
#define TK_NK_ARROW 169
|
||||
#define TK_NOW 170
|
||||
#define TK_TODAY 171
|
||||
#define TK_ROWTS 172
|
||||
#define TK_TBNAME 173
|
||||
#define TK_QSTARTTS 174
|
||||
#define TK_QENDTS 175
|
||||
#define TK_WSTARTTS 176
|
||||
#define TK_WENDTS 177
|
||||
#define TK_WDURATION 178
|
||||
#define TK_CAST 179
|
||||
#define TK_COUNT 180
|
||||
#define TK_FIRST 181
|
||||
#define TK_LAST 182
|
||||
#define TK_LAST_ROW 183
|
||||
#define TK_BETWEEN 184
|
||||
#define TK_IS 185
|
||||
#define TK_NK_LT 186
|
||||
#define TK_NK_GT 187
|
||||
#define TK_NK_LE 188
|
||||
#define TK_NK_GE 189
|
||||
#define TK_NK_NE 190
|
||||
#define TK_MATCH 191
|
||||
#define TK_NMATCH 192
|
||||
#define TK_CONTAINS 193
|
||||
#define TK_JOIN 194
|
||||
#define TK_INNER 195
|
||||
#define TK_SELECT 196
|
||||
#define TK_DISTINCT 197
|
||||
#define TK_WHERE 198
|
||||
#define TK_PARTITION 199
|
||||
#define TK_BY 200
|
||||
#define TK_SESSION 201
|
||||
#define TK_STATE_WINDOW 202
|
||||
#define TK_SLIDING 203
|
||||
#define TK_FILL 204
|
||||
#define TK_VALUE 205
|
||||
#define TK_NONE 206
|
||||
#define TK_PREV 207
|
||||
#define TK_LINEAR 208
|
||||
#define TK_NEXT 209
|
||||
#define TK_GROUP 210
|
||||
#define TK_HAVING 211
|
||||
#define TK_ORDER 212
|
||||
#define TK_SLIMIT 213
|
||||
#define TK_SOFFSET 214
|
||||
#define TK_LIMIT 215
|
||||
#define TK_OFFSET 216
|
||||
#define TK_ASC 217
|
||||
#define TK_NULLS 218
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
@ -233,7 +242,6 @@
|
|||
#define TK_NK_OCT 304 // oct number
|
||||
#define TK_NK_BIN 305 // bin format data 0b111
|
||||
#define TK_NK_FILE 306
|
||||
#define TK_NK_QUESTION 307 // denoting the placeholder of "?",when invoking statement bind query
|
||||
|
||||
#define TK_NK_BITNOT 501
|
||||
#define TK_INSERT 502
|
||||
|
|
|
@ -137,6 +137,7 @@ bool fmIsWindowPseudoColumnFunc(int32_t funcId);
|
|||
bool fmIsWindowClauseFunc(int32_t funcId);
|
||||
bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
||||
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
|
||||
bool fmIsMultiResFunc(int32_t funcId);
|
||||
|
||||
typedef enum EFuncDataRequired {
|
||||
FUNC_DATA_REQUIRED_ALL_NEEDED = 1,
|
||||
|
|
|
@ -272,6 +272,33 @@ typedef struct SKillStmt {
|
|||
int32_t targetId;
|
||||
} SKillStmt;
|
||||
|
||||
typedef enum EStreamTriggerType {
|
||||
STREAM_TRIGGER_AT_ONCE = 1,
|
||||
STREAM_TRIGGER_WINDOW_CLOSE
|
||||
} EStreamTriggerType;
|
||||
|
||||
typedef struct SStreamOptions {
|
||||
ENodeType type;
|
||||
EStreamTriggerType triggerType;
|
||||
SNode* pWatermark;
|
||||
} SStreamOptions;
|
||||
|
||||
typedef struct SCreateStreamStmt {
|
||||
ENodeType type;
|
||||
char streamName[TSDB_TABLE_NAME_LEN];
|
||||
char targetDbName[TSDB_DB_NAME_LEN];
|
||||
char targetTabName[TSDB_TABLE_NAME_LEN];
|
||||
bool ignoreExists;
|
||||
SStreamOptions* pOptions;
|
||||
SNode* pQuery;
|
||||
} SCreateStreamStmt;
|
||||
|
||||
typedef struct SDropStreamStmt {
|
||||
ENodeType type;
|
||||
char streamName[TSDB_TABLE_NAME_LEN];
|
||||
bool ignoreNotExists;
|
||||
} SDropStreamStmt;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -80,6 +80,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_TABLE_OPTIONS,
|
||||
QUERY_NODE_INDEX_OPTIONS,
|
||||
QUERY_NODE_EXPLAIN_OPTIONS,
|
||||
QUERY_NODE_STREAM_OPTIONS,
|
||||
|
||||
// Statement nodes are used in parser and planner module.
|
||||
QUERY_NODE_SET_OPERATOR,
|
||||
|
@ -151,6 +152,12 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_SHOW_CONNECTIONS_STMT,
|
||||
QUERY_NODE_SHOW_QUERIES_STMT,
|
||||
QUERY_NODE_SHOW_VNODES_STMT,
|
||||
QUERY_NODE_SHOW_APPS_STMT,
|
||||
QUERY_NODE_SHOW_SCORES_STMT,
|
||||
QUERY_NODE_SHOW_VARIABLE_STMT,
|
||||
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
|
||||
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
|
||||
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
|
||||
QUERY_NODE_KILL_CONNECTION_STMT,
|
||||
QUERY_NODE_KILL_QUERY_STMT,
|
||||
|
||||
|
|
|
@ -377,7 +377,7 @@ tDataTypeDescriptor tDataTypes[15] = {
|
|||
getStatics_i64},
|
||||
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
|
||||
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
|
||||
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
|
||||
{TSDB_DATA_TYPE_VARCHAR, 6, 0, "VARCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp,
|
||||
tsDecompressTimestamp, getStatics_i64},
|
||||
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
|
||||
|
@ -402,7 +402,7 @@ char tTokenTypeSwitcher[13] = {
|
|||
TSDB_DATA_TYPE_DOUBLE, // TK_DOUBLE
|
||||
TSDB_DATA_TYPE_BINARY, // TK_STRING
|
||||
TSDB_DATA_TYPE_BIGINT, // TK_TIMESTAMP
|
||||
TSDB_DATA_TYPE_BINARY, // TK_BINARY
|
||||
TSDB_DATA_TYPE_VARCHAR, // TK_BINARY
|
||||
TSDB_DATA_TYPE_NCHAR, // TK_NCHAR
|
||||
};
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ static const SInfosTableSchema userDBSchema[] = {
|
|||
{.name = "blocks", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "wallevel", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "wal", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
|
|
|
@ -37,6 +37,7 @@ extern "C" {
|
|||
#define FUNC_MGT_WINDOW_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(8)
|
||||
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
|
||||
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
|
||||
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
|
||||
|
||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
|
|
|
@ -203,9 +203,9 @@ static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len
|
|||
}
|
||||
|
||||
static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// first(*)/first(col_list) has been rewritten as first(col)
|
||||
// first(col_list) will be rewritten as first(col)
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
|
@ -370,6 +370,20 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SExprNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_VALUE != nodeType(pPara) || (!IS_VAR_DATA_TYPE(pPara->resType.type))) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_JSON].bytes, .type = TSDB_DATA_TYPE_JSON};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||
{
|
||||
.name = "count",
|
||||
|
@ -475,7 +489,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "last_row",
|
||||
.type = FUNCTION_TYPE_LAST_ROW,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC,
|
||||
.translateFunc = translateLastRow,
|
||||
.getEnvFunc = getMinmaxFuncEnv,
|
||||
.initFunc = maxFunctionSetup,
|
||||
|
@ -485,7 +499,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "first",
|
||||
.type = FUNCTION_TYPE_FIRST,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -495,7 +509,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "last",
|
||||
.type = FUNCTION_TYPE_LAST,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -861,6 +875,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = NULL,
|
||||
.sprocessFunc = winDurFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_json",
|
||||
.type = FUNCTION_TYPE_TO_JSON,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.translateFunc = translateToJson,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = NULL,
|
||||
.finalizeFunc = NULL
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -138,6 +138,10 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId) {
|
|||
return isSpecificClassifyFunc(funcId, FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED);
|
||||
}
|
||||
|
||||
bool fmIsMultiResFunc(int32_t funcId) {
|
||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC);
|
||||
}
|
||||
|
||||
void fmFuncMgtDestroy() {
|
||||
void* m = gFunMgtService.pFuncNameHashTable;
|
||||
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
|
||||
|
|
|
@ -84,6 +84,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SIndexOptions));
|
||||
case QUERY_NODE_EXPLAIN_OPTIONS:
|
||||
return makeNode(type, sizeof(SExplainOptions));
|
||||
case QUERY_NODE_STREAM_OPTIONS:
|
||||
return makeNode(type, sizeof(SStreamOptions));
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return makeNode(type, sizeof(SSetOperator));
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
@ -146,6 +148,19 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SDescribeStmt));
|
||||
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
|
||||
return makeNode(type, sizeof(SNode));
|
||||
case QUERY_NODE_COMPACT_STMT:
|
||||
case QUERY_NODE_CREATE_FUNCTION_STMT:
|
||||
case QUERY_NODE_DROP_FUNCTION_STMT:
|
||||
break;
|
||||
case QUERY_NODE_CREATE_STREAM_STMT:
|
||||
return makeNode(type, sizeof(SCreateStreamStmt));
|
||||
case QUERY_NODE_DROP_STREAM_STMT:
|
||||
return makeNode(type, sizeof(SDropStreamStmt));
|
||||
case QUERY_NODE_MERGE_VGROUP_STMT:
|
||||
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
||||
case QUERY_NODE_SPLIT_VGROUP_STMT:
|
||||
case QUERY_NODE_SYNCDB_STMT:
|
||||
break;
|
||||
case QUERY_NODE_SHOW_DNODES_STMT:
|
||||
case QUERY_NODE_SHOW_MNODES_STMT:
|
||||
case QUERY_NODE_SHOW_MODULES_STMT:
|
||||
|
@ -169,7 +184,16 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_SHOW_CONFIGS_STMT:
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
case QUERY_NODE_SHOW_VNODES_STMT:
|
||||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
case QUERY_NODE_SHOW_SCORES_STMT:
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
|
||||
return makeNode(type, sizeof(SShowStmt));
|
||||
case QUERY_NODE_KILL_CONNECTION_STMT:
|
||||
case QUERY_NODE_KILL_QUERY_STMT:
|
||||
return makeNode(type, sizeof(SKillStmt));
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return makeNode(type, sizeof(SScanLogicNode));
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
|
@ -675,6 +699,7 @@ int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) {
|
|||
if (NULL != pList->pTail) {
|
||||
pList->pTail->pNext = p;
|
||||
}
|
||||
p->pPrev = pList->pTail;
|
||||
pList->pTail = p;
|
||||
++(pList->length);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -86,6 +86,7 @@ SNode* createColumnNode(SAstCreateContext* pCxt, SToken* pTableAlias, SToken* pC
|
|||
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral);
|
||||
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
|
||||
SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt);
|
||||
SNode* createPlaceholderValueNode(SAstCreateContext* pCxt);
|
||||
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias);
|
||||
SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2);
|
||||
SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight);
|
||||
|
@ -165,8 +166,9 @@ SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
|
|||
SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups);
|
||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize);
|
||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName, const SToken* pTableName, SNode* pQuery);
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName);
|
||||
SNode* createStreamOptions(SAstCreateContext* pCxt);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNode* pQuery);
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pStreamName);
|
||||
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
|
||||
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);
|
||||
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes);
|
||||
|
|
|
@ -293,6 +293,7 @@ tags_def(A) ::= TAGS NK_LP column_def_list(B) NK_RP.
|
|||
table_options(A) ::= . { A = createTableOptions(pCxt); }
|
||||
table_options(A) ::= table_options(B) COMMENT NK_STRING(C). { ((STableOptions*)B)->pComments = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C); A = B; }
|
||||
table_options(A) ::= table_options(B) KEEP integer_list(C). { ((STableOptions*)B)->pKeep = C; A = B; }
|
||||
table_options(A) ::= table_options(B) KEEP variable_list(C). { ((STableOptions*)B)->pKeep = C; A = B; }
|
||||
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { ((STableOptions*)B)->pTtl = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
|
||||
table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { ((STableOptions*)B)->pSma = C; A = B; }
|
||||
table_options(A) ::= table_options(B) ROLLUP NK_LP func_name_list(C) NK_RP. { ((STableOptions*)B)->pFuncs = C; A = B; }
|
||||
|
@ -306,6 +307,7 @@ alter_table_options(A) ::= alter_table_options(B) alter_table_option(C).
|
|||
%destructor alter_table_option { }
|
||||
alter_table_option(A) ::= COMMENT NK_STRING(B). { A.type = TABLE_OPTION_COMMENT; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
|
||||
alter_table_option(A) ::= KEEP integer_list(B). { A.type = TABLE_OPTION_KEEP; A.pList = B; }
|
||||
alter_table_option(A) ::= KEEP variable_list(B). { A.type = TABLE_OPTION_KEEP; A.pList = B; }
|
||||
alter_table_option(A) ::= TTL NK_INTEGER(B). { A.type = TABLE_OPTION_TTL; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
|
||||
|
||||
%type col_name_list { SNodeList* }
|
||||
|
@ -424,8 +426,17 @@ bufsize_opt(A) ::= .
|
|||
bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B). { A = strtol(B.z, NULL, 10); }
|
||||
|
||||
/************************************************ create/drop stream **************************************************/
|
||||
cmd ::= CREATE STREAM stream_name(A) INTO table_name(B) AS query_expression(C). { pCxt->pRootNode = createCreateStreamStmt(pCxt, &A, &B, C); }
|
||||
cmd ::= DROP STREAM stream_name(A). { pCxt->pRootNode = createDropStreamStmt(pCxt, &A); }
|
||||
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A)
|
||||
stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, B, C, D); }
|
||||
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
|
||||
|
||||
into_opt(A) ::= . { A = NULL; }
|
||||
into_opt(A) ::= INTO full_table_name(B). { A = B; }
|
||||
|
||||
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
|
||||
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
|
||||
|
||||
/************************************************ kill connection/query ***********************************************/
|
||||
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
|
||||
|
@ -455,6 +466,7 @@ literal(A) ::= NK_BOOL(B).
|
|||
literal(A) ::= TIMESTAMP(B) NK_STRING(C). { A = createRawExprNodeExt(pCxt, &B, &C, createValueNode(pCxt, TSDB_DATA_TYPE_TIMESTAMP, &C)); }
|
||||
literal(A) ::= duration_literal(B). { A = B; }
|
||||
literal(A) ::= NULL(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_NULL, NULL)); }
|
||||
literal(A) ::= NK_QUESTION(B). { A = createRawExprNode(pCxt, &B, createPlaceholderValueNode(pCxt)); }
|
||||
|
||||
duration_literal(A) ::= NK_VARIABLE(B). { A = createRawExprNode(pCxt, &B, createDurationValueNode(pCxt, &B)); }
|
||||
|
||||
|
@ -501,8 +513,6 @@ column_name(A) ::= NK_ID(B).
|
|||
%type function_name { SToken }
|
||||
%destructor function_name { }
|
||||
function_name(A) ::= NK_ID(B). { A = B; }
|
||||
function_name(A) ::= FIRST(B). { A = B; }
|
||||
function_name(A) ::= LAST(B). { A = B; }
|
||||
|
||||
%type table_alias { SToken }
|
||||
%destructor table_alias { }
|
||||
|
@ -530,12 +540,9 @@ stream_name(A) ::= NK_ID(B).
|
|||
|
||||
/************************************************ expression **********************************************************/
|
||||
expression(A) ::= literal(B). { A = B; }
|
||||
//expression(A) ::= NK_QUESTION(B). { A = B; }
|
||||
expression(A) ::= pseudo_column(B). { A = B; }
|
||||
expression(A) ::= column_reference(B). { A = B; }
|
||||
expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
|
||||
expression(A) ::= function_name(B) NK_LP NK_STAR(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, createNodeList(pCxt, createColumnNode(pCxt, NULL, &C)))); }
|
||||
expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
|
||||
expression(A) ::= function_expression(B). { A = B; }
|
||||
//expression(A) ::= case_expression(B). { A = B; }
|
||||
expression(A) ::= subquery(B). { A = B; }
|
||||
expression(A) ::= NK_LP(B) expression(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, releaseRawExprNode(pCxt, C)); }
|
||||
|
@ -572,6 +579,10 @@ expression(A) ::= expression(B) NK_REM expression(C).
|
|||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_MOD, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
}
|
||||
expression(A) ::= column_reference(B) NK_ARROW NK_STRING(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
A = createRawExprNodeExt(pCxt, &s, &C, createOperatorNode(pCxt, OP_TYPE_JSON_GET_VALUE, releaseRawExprNode(pCxt, B), createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)));
|
||||
}
|
||||
|
||||
%type expression_list { SNodeList* }
|
||||
%destructor expression_list { nodesDestroyList($$); }
|
||||
|
@ -591,6 +602,30 @@ pseudo_column(A) ::= WSTARTTS(B).
|
|||
pseudo_column(A) ::= WENDTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||
pseudo_column(A) ::= WDURATION(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||
|
||||
function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
|
||||
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
|
||||
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
|
||||
|
||||
%type star_func { SToken }
|
||||
%destructor star_func { }
|
||||
star_func(A) ::= COUNT(B). { A = B; }
|
||||
star_func(A) ::= FIRST(B). { A = B; }
|
||||
star_func(A) ::= LAST(B). { A = B; }
|
||||
star_func(A) ::= LAST_ROW(B). { A = B; }
|
||||
|
||||
%type star_func_para_list { SNodeList* }
|
||||
%destructor star_func_para_list { nodesDestroyList($$); }
|
||||
star_func_para_list(A) ::= NK_STAR(B). { A = createNodeList(pCxt, createColumnNode(pCxt, NULL, &B)); }
|
||||
star_func_para_list(A) ::= other_para_list(B). { A = B; }
|
||||
|
||||
%type other_para_list { SNodeList* }
|
||||
%destructor other_para_list { nodesDestroyList($$); }
|
||||
other_para_list(A) ::= star_func_para(B). { A = createNodeList(pCxt, B); }
|
||||
other_para_list(A) ::= other_para_list(B) NK_COMMA star_func_para(C). { A = addNodeToList(pCxt, B, C); }
|
||||
|
||||
star_func_para(A) ::= expression(B). { A = releaseRawExprNode(pCxt, B); }
|
||||
star_func_para(A) ::= table_name(B) NK_DOT NK_STAR(C). { A = createColumnNode(pCxt, &B, &C); }
|
||||
|
||||
/************************************************ predicate ***********************************************************/
|
||||
predicate(A) ::= expression(B) compare_op(C) expression(D). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
|
@ -634,6 +669,7 @@ compare_op(A) ::= LIKE.
|
|||
compare_op(A) ::= NOT LIKE. { A = OP_TYPE_NOT_LIKE; }
|
||||
compare_op(A) ::= MATCH. { A = OP_TYPE_MATCH; }
|
||||
compare_op(A) ::= NMATCH. { A = OP_TYPE_NMATCH; }
|
||||
compare_op(A) ::= CONTAINS. { A = OP_TYPE_JSON_CONTAINS; }
|
||||
|
||||
%type in_op { EOperatorType }
|
||||
%destructor in_op { }
|
||||
|
|
|
@ -305,6 +305,13 @@ SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt) {
|
|||
return (SNode*)val;
|
||||
}
|
||||
|
||||
SNode* createPlaceholderValueNode(SAstCreateContext* pCxt) {
|
||||
SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
CHECK_OUT_OF_MEM(val);
|
||||
// todo
|
||||
return (SNode*)val;
|
||||
}
|
||||
|
||||
SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2) {
|
||||
SLogicConditionNode* cond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
|
||||
CHECK_OUT_OF_MEM(cond);
|
||||
|
@ -911,6 +918,18 @@ SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, S
|
|||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createShowCreateDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_SHOW_CREATE_DATABASE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pRealTable) {
|
||||
SNode* pStmt = nodesMakeNode(type);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword) {
|
||||
char password[TSDB_USET_PASSWORD_LEN] = {0};
|
||||
if (!checkUserName(pCxt, pUserName) || !checkPassword(pCxt, pPassword, password)) {
|
||||
|
@ -1142,16 +1161,34 @@ SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName)
|
|||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName, const SToken* pTableName, SNode* pQuery) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
SNode* createStreamOptions(SAstCreateContext* pCxt) {
|
||||
SStreamOptions* pOptions = nodesMakeNode(QUERY_NODE_STREAM_OPTIONS);
|
||||
CHECK_OUT_OF_MEM(pOptions);
|
||||
pOptions->triggerType = STREAM_TRIGGER_AT_ONCE;
|
||||
return (SNode*)pOptions;
|
||||
}
|
||||
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_DROP_STREAM_STMT);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNode* pQuery) {
|
||||
SCreateStreamStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
strncpy(pStmt->streamName, pStreamName->z, pStreamName->n);
|
||||
if (NULL != pRealTable) {
|
||||
strcpy(pStmt->targetDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->targetTabName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
}
|
||||
pStmt->ignoreExists = ignoreExists;
|
||||
pStmt->pOptions = (SStreamOptions*)pOptions;
|
||||
pStmt->pQuery = pQuery;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pStreamName) {
|
||||
SDropStreamStmt* pStmt = nodesMakeNode(QUERY_NODE_DROP_STREAM_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
strncpy(pStmt->streamName, pStreamName->z, pStreamName->n);
|
||||
pStmt->ignoreNotExists = ignoreNotExists;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) {
|
||||
|
|
|
@ -39,6 +39,7 @@ static SKeyword keywordTable[] = {
|
|||
{"APPS", TK_APPS},
|
||||
{"AS", TK_AS},
|
||||
{"ASC", TK_ASC},
|
||||
{"AT_ONCE", TK_AT_ONCE},
|
||||
{"BETWEEN", TK_BETWEEN},
|
||||
{"BINARY", TK_BINARY},
|
||||
{"BIGINT", TK_BIGINT},
|
||||
|
@ -57,6 +58,7 @@ static SKeyword keywordTable[] = {
|
|||
{"CONNS", TK_CONNS},
|
||||
{"CONNECTION", TK_CONNECTION},
|
||||
{"CONNECTIONS", TK_CONNECTIONS},
|
||||
{"COUNT", TK_COUNT},
|
||||
{"CREATE", TK_CREATE},
|
||||
{"DATABASE", TK_DATABASE},
|
||||
{"DATABASES", TK_DATABASES},
|
||||
|
@ -100,6 +102,7 @@ static SKeyword keywordTable[] = {
|
|||
{"KEEP", TK_KEEP},
|
||||
{"KILL", TK_KILL},
|
||||
{"LAST", TK_LAST},
|
||||
{"LAST_ROW", TK_LAST_ROW},
|
||||
{"LICENCE", TK_LICENCE},
|
||||
{"LIKE", TK_LIKE},
|
||||
{"LIMIT", TK_LIMIT},
|
||||
|
@ -132,10 +135,8 @@ static SKeyword keywordTable[] = {
|
|||
{"PRECISION", TK_PRECISION},
|
||||
{"PRIVILEGE", TK_PRIVILEGE},
|
||||
{"PREV", TK_PREV},
|
||||
{"_QENDTS", TK_QENDTS},
|
||||
{"QNODE", TK_QNODE},
|
||||
{"QNODES", TK_QNODES},
|
||||
{"_QSTARTTS", TK_QSTARTTS},
|
||||
{"QTIME", TK_QTIME},
|
||||
{"QUERIES", TK_QUERIES},
|
||||
{"QUERY", TK_QUERY},
|
||||
|
@ -145,7 +146,6 @@ static SKeyword keywordTable[] = {
|
|||
{"RESET", TK_RESET},
|
||||
{"RETENTIONS", TK_RETENTIONS},
|
||||
{"ROLLUP", TK_ROLLUP},
|
||||
{"_ROWTS", TK_ROWTS},
|
||||
{"SCORES", TK_SCORES},
|
||||
{"SELECT", TK_SELECT},
|
||||
{"SESSION", TK_SESSION},
|
||||
|
@ -178,6 +178,7 @@ static SKeyword keywordTable[] = {
|
|||
{"TODAY", TK_TODAY},
|
||||
{"TOPIC", TK_TOPIC},
|
||||
{"TOPICS", TK_TOPICS},
|
||||
{"TRIGGER", TK_TRIGGER},
|
||||
{"TSERIES", TK_TSERIES},
|
||||
{"TTL", TK_TTL},
|
||||
{"UNION", TK_UNION},
|
||||
|
@ -194,9 +195,14 @@ static SKeyword keywordTable[] = {
|
|||
{"VGROUPS", TK_VGROUPS},
|
||||
{"VNODES", TK_VNODES},
|
||||
{"WAL", TK_WAL},
|
||||
{"WATERMARK", TK_WATERMARK},
|
||||
{"WHERE", TK_WHERE},
|
||||
{"WINDOW_CLOSE", TK_WINDOW_CLOSE},
|
||||
{"_QENDTS", TK_QENDTS},
|
||||
{"_QSTARTTS", TK_QSTARTTS},
|
||||
{"_ROWTS", TK_ROWTS},
|
||||
{"_WDURATION", TK_WDURATION},
|
||||
{"_WENDTS", TK_WENDTS},
|
||||
{"WHERE", TK_WHERE},
|
||||
{"_WSTARTTS", TK_WSTARTTS},
|
||||
// {"ID", TK_ID},
|
||||
// {"STRING", TK_STRING},
|
||||
|
@ -260,7 +266,6 @@ static SKeyword keywordTable[] = {
|
|||
// {"RESTRICT", TK_RESTRICT},
|
||||
// {"ROW", TK_ROW},
|
||||
// {"STATEMENT", TK_STATEMENT},
|
||||
// {"TRIGGER", TK_TRIGGER},
|
||||
// {"VIEW", TK_VIEW},
|
||||
// {"SEMI", TK_SEMI},
|
||||
// {"PARTITIONS", TK_PARTITIONS},
|
||||
|
@ -345,6 +350,9 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
|
|||
}
|
||||
*tokenId = TK_NK_COMMENT;
|
||||
return i;
|
||||
} else if (z[1] == '>') {
|
||||
*tokenId = TK_NK_ARROW;
|
||||
return 2;
|
||||
}
|
||||
*tokenId = TK_NK_MINUS;
|
||||
return 1;
|
||||
|
|
|
@ -770,71 +770,47 @@ static int32_t createAllColumns(STranslateContext* pCxt, SNodeList** pCols) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool isFirstLastFunc(SFunctionNode* pFunc) {
|
||||
return (FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType);
|
||||
}
|
||||
|
||||
static bool isFirstLastStar(SNode* pNode) {
|
||||
if (QUERY_NODE_FUNCTION != nodeType(pNode) || !isFirstLastFunc((SFunctionNode*)pNode)) {
|
||||
static bool isMultiResFunc(SNode* pNode) {
|
||||
if (QUERY_NODE_FUNCTION != nodeType(pNode) || !fmIsMultiResFunc(((SFunctionNode*)pNode)->funcId)) {
|
||||
return false;
|
||||
}
|
||||
SNodeList* pParameterList = ((SFunctionNode*)pNode)->pParameterList;
|
||||
if (LIST_LENGTH(pParameterList) != 1) {
|
||||
return false;
|
||||
if (LIST_LENGTH(pParameterList) > 1) {
|
||||
return true;
|
||||
}
|
||||
SNode* pParam = nodesListGetNode(pParameterList, 0);
|
||||
return (QUERY_NODE_COLUMN == nodeType(pParam) ? 0 == strcmp(((SColumnNode*)pParam)->colName, "*") : false);
|
||||
}
|
||||
|
||||
static SNode* createFirstLastFunc(SFunctionNode* pSrcFunc, SColumnNode* pCol) {
|
||||
static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return NULL;
|
||||
}
|
||||
pFunc->pParameterList = nodesMakeList();
|
||||
if (NULL == pFunc->pParameterList || TSDB_CODE_SUCCESS != nodesListAppend(pFunc->pParameterList, pCol)) {
|
||||
if (NULL == pFunc->pParameterList || TSDB_CODE_SUCCESS != nodesListStrictAppend(pFunc->pParameterList, nodesCloneNode(pExpr))) {
|
||||
nodesDestroyNode(pFunc);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pFunc->node.resType = pCol->node.resType;
|
||||
pFunc->node.resType = pExpr->resType;
|
||||
pFunc->funcId = pSrcFunc->funcId;
|
||||
pFunc->funcType = pSrcFunc->funcType;
|
||||
strcpy(pFunc->functionName, pSrcFunc->functionName);
|
||||
snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName),
|
||||
(FUNCTION_TYPE_FIRST == pSrcFunc->funcType ? "first(%s)" : "last(%s)"), pCol->colName);
|
||||
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pExpr;
|
||||
snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName) - 1, "%s(%s.%s)", pSrcFunc->functionName, pCol->tableAlias, pCol->colName);
|
||||
} else {
|
||||
snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName) - 1, "%s(%s)", pSrcFunc->functionName, pExpr->aliasName);
|
||||
}
|
||||
|
||||
return (SNode*)pFunc;
|
||||
}
|
||||
|
||||
static int32_t createFirstLastAllCols(STranslateContext* pCxt, SFunctionNode* pSrcFunc, SNodeList** pOutput) {
|
||||
SNodeList* pCols = NULL;
|
||||
if (TSDB_CODE_SUCCESS != createAllColumns(pCxt, &pCols)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SNodeList* pFuncs = nodesMakeList();
|
||||
if (NULL == pFuncs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pCols) {
|
||||
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pFuncs, createFirstLastFunc(pSrcFunc, (SColumnNode*)pCol))) {
|
||||
nodesDestroyNode(pFuncs);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
*pOutput = pFuncs;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool isTableStar(SNode* pNode) {
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
}
|
||||
|
||||
static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SNodeList** pOutput) {
|
||||
*pOutput = nodesMakeList();
|
||||
if (NULL == *pOutput) {
|
||||
*pOutput = nodesMakeList();
|
||||
}
|
||||
if (NULL == *pOutput) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -858,15 +834,90 @@ static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SN
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool isStar(SNode* pNode) {
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
}
|
||||
|
||||
static bool isTableStar(SNode* pNode) {
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
}
|
||||
|
||||
static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrcParas, SNodeList** pOutput) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SNodeList* pExprs = NULL;
|
||||
SNode* pPara = NULL;
|
||||
FOREACH(pPara, pSrcParas) {
|
||||
if (isStar(pPara)) {
|
||||
code = createAllColumns(pCxt, &pExprs);
|
||||
// The syntax definition ensures that * and other parameters do not appear at the same time
|
||||
break;
|
||||
} else if (isTableStar(pPara)) {
|
||||
code = createTableAllCols(pCxt, (SColumnNode*)pPara, &pExprs);
|
||||
} else {
|
||||
code = nodesListMakeStrictAppend(&pExprs, nodesCloneNode(pPara));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOutput = pExprs;
|
||||
} else {
|
||||
nodesDestroyList(pExprs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createMultiResFuncs(SFunctionNode* pSrcFunc, SNodeList* pExprs, SNodeList** pOutput) {
|
||||
SNodeList* pFuncs = nodesMakeList();
|
||||
if (NULL == pFuncs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pExpr = NULL;
|
||||
FOREACH(pExpr, pExprs) {
|
||||
code = nodesListStrictAppend(pFuncs, createMultiResFunc(pSrcFunc, (SExprNode*)pExpr));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOutput = pFuncs;
|
||||
} else {
|
||||
nodesDestroyList(pFuncs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createMultiResFuncsFromStar(STranslateContext* pCxt, SFunctionNode* pSrcFunc, SNodeList** pOutput) {
|
||||
SNodeList* pExprs = NULL;
|
||||
int32_t code = createMultiResFuncsParas(pCxt, pSrcFunc->pParameterList, &pExprs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createMultiResFuncs(pSrcFunc, pExprs, pOutput);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyList(pExprs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if (NULL == pSelect->pProjectionList) { // select * ...
|
||||
return createAllColumns(pCxt, &pSelect->pProjectionList);
|
||||
} else {
|
||||
SNode* pNode = NULL;
|
||||
WHERE_EACH(pNode, pSelect->pProjectionList) {
|
||||
if (isFirstLastStar(pNode)) {
|
||||
if (isMultiResFunc(pNode)) {
|
||||
SNodeList* pFuncs = NULL;
|
||||
if (TSDB_CODE_SUCCESS != createFirstLastAllCols(pCxt, (SFunctionNode*)pNode, &pFuncs)) {
|
||||
if (TSDB_CODE_SUCCESS != createMultiResFuncsFromStar(pCxt, (SFunctionNode*)pNode, &pFuncs)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
INSERT_LIST(pSelect->pProjectionList, pFuncs);
|
||||
|
@ -2067,6 +2118,46 @@ static int32_t translateKillQuery(STranslateContext* pCxt, SKillStmt* pStmt) {
|
|||
return buildCmdMsg(pCxt, TDMT_MND_KILL_QUERY, (FSerializeFunc)tSerializeSKillQueryReq, &killReq);
|
||||
}
|
||||
|
||||
static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||
SCMCreateStreamReq createReq = {0};
|
||||
|
||||
createReq.igExists = pStmt->ignoreExists;
|
||||
|
||||
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
|
||||
strcpy(name.dbname, pCxt->pParseCxt->db);
|
||||
strcpy(name.tname, pStmt->streamName);
|
||||
tNameExtractFullName(&name, createReq.name);
|
||||
|
||||
if ('\0' != pStmt->targetTabName[0]) {
|
||||
strcpy(name.dbname, pStmt->targetDbName);
|
||||
strcpy(name.tname, pStmt->targetTabName);
|
||||
tNameExtractFullName(&name, createReq.outputSTbName);
|
||||
}
|
||||
|
||||
int32_t code = translateQuery(pCxt, pStmt->pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
createReq.sql = strdup(pCxt->pParseCxt->pSql);
|
||||
if (NULL == createReq.sql) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq);
|
||||
}
|
||||
|
||||
tFreeSCMCreateStreamReq(&createReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pStmt) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -2160,6 +2251,12 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_KILL_QUERY_STMT:
|
||||
code = translateKillQuery(pCxt, (SKillStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_STREAM_STMT:
|
||||
code = translateCreateStream(pCxt, (SCreateStreamStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_DROP_STREAM_STMT:
|
||||
code = translateDropStream(pCxt, (SDropStreamStmt*)pNode);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -18,6 +18,7 @@
|
|||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "parserTestUtil.h"
|
||||
#include "parInt.h"
|
||||
|
||||
using namespace std;
|
||||
|
@ -44,7 +45,7 @@ protected:
|
|||
query_ = nullptr;
|
||||
bool res = runImpl(parseCode, translateCode);
|
||||
qDestroyQuery(query_);
|
||||
if (1/*!res*/) {
|
||||
if (!res || g_isDump) {
|
||||
dump();
|
||||
}
|
||||
return res;
|
||||
|
@ -57,7 +58,7 @@ private:
|
|||
int32_t code = parse(&cxt_, &query_);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
parseErrStr_ = string("code:") + tstrerror(code) + string(", msg:") + errMagBuf_;
|
||||
return (terrno == parseCode);
|
||||
return (code == parseCode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != parseCode) {
|
||||
return false;
|
||||
|
@ -66,7 +67,7 @@ private:
|
|||
code = translate(&cxt_, query_);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
translateErrStr_ = string("code:") + tstrerror(code) + string(", msg:") + errMagBuf_;
|
||||
return (terrno == translateCode);
|
||||
return (code == translateCode);
|
||||
}
|
||||
translatedAstStr_ = toString(query_->pRoot);
|
||||
code = calculateConstant(&cxt_, query_);
|
||||
|
@ -243,6 +244,19 @@ TEST_F(ParserTest, selectPseudoColumn) {
|
|||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, selectMultiResFunc) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
// bind("SELECT last(*), first(*), last_row(*) FROM t1");
|
||||
// ASSERT_TRUE(run());
|
||||
|
||||
bind("SELECT last(c1, c2), first(t1.*), last_row(c3) FROM t1");
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("SELECT last(t2.*), first(t1.c1, t2.*), last_row(t1.*, t2.*) FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, selectClause) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
|
@ -726,6 +740,22 @@ TEST_F(ParserTest, dropTopic) {
|
|||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, createStream) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("create stream s1 as select * from t1");
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("create stream if not exists s1 as select * from t1");
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("create stream s1 into st1 as select * from t1");
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("create stream if not exists s1 trigger window_close watermark 10s into st1 as select * from t1");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, explain) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
|
|
|
@ -15,12 +15,16 @@
|
|||
|
||||
#include <string>
|
||||
|
||||
#include <getopt.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "mockCatalog.h"
|
||||
#include "parserTestUtil.h"
|
||||
#include "parToken.h"
|
||||
#include "functionMgt.h"
|
||||
|
||||
bool g_isDump = false;
|
||||
|
||||
class ParserEnv : public testing::Environment {
|
||||
public:
|
||||
virtual void SetUp() {
|
||||
|
@ -38,8 +42,27 @@ public:
|
|||
virtual ~ParserEnv() {}
|
||||
};
|
||||
|
||||
static void parseArg(int argc, char* argv[]) {
|
||||
int opt = 0;
|
||||
const char *optstring = "";
|
||||
static struct option long_options[] = {
|
||||
{"dump", no_argument, NULL, 'd'},
|
||||
{0, 0, 0, 0}
|
||||
};
|
||||
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
|
||||
switch (opt) {
|
||||
case 'd':
|
||||
g_isDump = true;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
testing::AddGlobalTestEnvironment(new ParserEnv());
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
parseArg(argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
extern bool g_isDump;
|
Loading…
Reference in New Issue