Merge branch '3.0' into fix/TD-19189
This commit is contained in:
commit
11d8dc7b30
|
@ -2,7 +2,7 @@
|
|||
# taos-tools
|
||||
ExternalProject_Add(taos-tools
|
||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||
GIT_TAG 8207c74
|
||||
GIT_TAG cf1df1c
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -116,7 +116,7 @@ The parameters are described as follows:
|
|||
- **protocol**: Specify which connection method to use. For example, `taos+ws://localhost:6041` uses Websocket to establish connections.
|
||||
- **username/password**: Username and password used to create connections.
|
||||
- **host/port**: Specifies the server and port to establish a connection. If you do not specify a hostname or port, native connections default to `localhost:6030` and Websocket connections default to `localhost:6041`.
|
||||
- **database**: Specify the default database to connect to.
|
||||
- **database**: Specify the default database to connect to. It's optional.
|
||||
- **params**:Optional parameters.
|
||||
|
||||
A sample DSN description string is as follows:
|
||||
|
|
|
@ -151,7 +151,7 @@ The parameters are described as follows:
|
|||
* **protocol**: Specify which connection method to use (support http/ws). For example, `ws://localhost:6041` uses Websocket to establish connections.
|
||||
* **username/password**: Username and password used to create connections.
|
||||
* **host/port**: Specifies the server and port to establish a connection. Websocket connections default to `localhost:6041`.
|
||||
* **database**: Specify the default database to connect to.
|
||||
* **database**: Specify the default database to connect to. It's optional.
|
||||
* **params**:Optional parameters.
|
||||
|
||||
A sample DSN description string is as follows:
|
||||
|
|
|
@ -116,7 +116,7 @@ DSN 描述字符串基本结构如下:
|
|||
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
|
||||
- **username/password**: 用于创建连接的用户名及密码。
|
||||
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。
|
||||
- **database**: 指定默认连接的数据库名。
|
||||
- **database**: 指定默认连接的数据库名,可选参数。
|
||||
- **params**:其他可选参数。
|
||||
|
||||
一个完整的 DSN 描述字符串示例如下:
|
||||
|
|
|
@ -154,7 +154,7 @@ namespace TDengineExample
|
|||
|
||||
* **host/port**: 指定创建连接的服务器及端口,WebSocket 连接默认为 `localhost:6041` 。
|
||||
|
||||
* **database**: 指定默认连接的数据库名。
|
||||
* **database**: 指定默认连接的数据库名,可选参数。
|
||||
|
||||
* **params**:其他可选参数。
|
||||
|
||||
|
|
|
@ -234,96 +234,100 @@
|
|||
#define TK_CURRENT_USER 216
|
||||
#define TK_COUNT 217
|
||||
#define TK_LAST_ROW 218
|
||||
#define TK_BETWEEN 219
|
||||
#define TK_IS 220
|
||||
#define TK_NK_LT 221
|
||||
#define TK_NK_GT 222
|
||||
#define TK_NK_LE 223
|
||||
#define TK_NK_GE 224
|
||||
#define TK_NK_NE 225
|
||||
#define TK_MATCH 226
|
||||
#define TK_NMATCH 227
|
||||
#define TK_CONTAINS 228
|
||||
#define TK_IN 229
|
||||
#define TK_JOIN 230
|
||||
#define TK_INNER 231
|
||||
#define TK_SELECT 232
|
||||
#define TK_DISTINCT 233
|
||||
#define TK_WHERE 234
|
||||
#define TK_PARTITION 235
|
||||
#define TK_BY 236
|
||||
#define TK_SESSION 237
|
||||
#define TK_STATE_WINDOW 238
|
||||
#define TK_SLIDING 239
|
||||
#define TK_FILL 240
|
||||
#define TK_VALUE 241
|
||||
#define TK_NONE 242
|
||||
#define TK_PREV 243
|
||||
#define TK_LINEAR 244
|
||||
#define TK_NEXT 245
|
||||
#define TK_HAVING 246
|
||||
#define TK_RANGE 247
|
||||
#define TK_EVERY 248
|
||||
#define TK_ORDER 249
|
||||
#define TK_SLIMIT 250
|
||||
#define TK_SOFFSET 251
|
||||
#define TK_LIMIT 252
|
||||
#define TK_OFFSET 253
|
||||
#define TK_ASC 254
|
||||
#define TK_NULLS 255
|
||||
#define TK_ABORT 256
|
||||
#define TK_AFTER 257
|
||||
#define TK_ATTACH 258
|
||||
#define TK_BEFORE 259
|
||||
#define TK_BEGIN 260
|
||||
#define TK_BITAND 261
|
||||
#define TK_BITNOT 262
|
||||
#define TK_BITOR 263
|
||||
#define TK_BLOCKS 264
|
||||
#define TK_CHANGE 265
|
||||
#define TK_COMMA 266
|
||||
#define TK_COMPACT 267
|
||||
#define TK_CONCAT 268
|
||||
#define TK_CONFLICT 269
|
||||
#define TK_COPY 270
|
||||
#define TK_DEFERRED 271
|
||||
#define TK_DELIMITERS 272
|
||||
#define TK_DETACH 273
|
||||
#define TK_DIVIDE 274
|
||||
#define TK_DOT 275
|
||||
#define TK_EACH 276
|
||||
#define TK_END 277
|
||||
#define TK_FAIL 278
|
||||
#define TK_FILE 279
|
||||
#define TK_FOR 280
|
||||
#define TK_GLOB 281
|
||||
#define TK_ID 282
|
||||
#define TK_IMMEDIATE 283
|
||||
#define TK_IMPORT 284
|
||||
#define TK_INITIALLY 285
|
||||
#define TK_INSTEAD 286
|
||||
#define TK_ISNULL 287
|
||||
#define TK_KEY 288
|
||||
#define TK_NK_BITNOT 289
|
||||
#define TK_NK_SEMI 290
|
||||
#define TK_NOTNULL 291
|
||||
#define TK_OF 292
|
||||
#define TK_PLUS 293
|
||||
#define TK_PRIVILEGE 294
|
||||
#define TK_RAISE 295
|
||||
#define TK_REPLACE 296
|
||||
#define TK_RESTRICT 297
|
||||
#define TK_ROW 298
|
||||
#define TK_SEMI 299
|
||||
#define TK_STAR 300
|
||||
#define TK_STATEMENT 301
|
||||
#define TK_STRING 302
|
||||
#define TK_TIMES 303
|
||||
#define TK_UPDATE 304
|
||||
#define TK_VALUES 305
|
||||
#define TK_VARIABLE 306
|
||||
#define TK_VIEW 307
|
||||
#define TK_WAL 308
|
||||
#define TK_CASE 219
|
||||
#define TK_END 220
|
||||
#define TK_WHEN 221
|
||||
#define TK_THEN 222
|
||||
#define TK_ELSE 223
|
||||
#define TK_BETWEEN 224
|
||||
#define TK_IS 225
|
||||
#define TK_NK_LT 226
|
||||
#define TK_NK_GT 227
|
||||
#define TK_NK_LE 228
|
||||
#define TK_NK_GE 229
|
||||
#define TK_NK_NE 230
|
||||
#define TK_MATCH 231
|
||||
#define TK_NMATCH 232
|
||||
#define TK_CONTAINS 233
|
||||
#define TK_IN 234
|
||||
#define TK_JOIN 235
|
||||
#define TK_INNER 236
|
||||
#define TK_SELECT 237
|
||||
#define TK_DISTINCT 238
|
||||
#define TK_WHERE 239
|
||||
#define TK_PARTITION 240
|
||||
#define TK_BY 241
|
||||
#define TK_SESSION 242
|
||||
#define TK_STATE_WINDOW 243
|
||||
#define TK_SLIDING 244
|
||||
#define TK_FILL 245
|
||||
#define TK_VALUE 246
|
||||
#define TK_NONE 247
|
||||
#define TK_PREV 248
|
||||
#define TK_LINEAR 249
|
||||
#define TK_NEXT 250
|
||||
#define TK_HAVING 251
|
||||
#define TK_RANGE 252
|
||||
#define TK_EVERY 253
|
||||
#define TK_ORDER 254
|
||||
#define TK_SLIMIT 255
|
||||
#define TK_SOFFSET 256
|
||||
#define TK_LIMIT 257
|
||||
#define TK_OFFSET 258
|
||||
#define TK_ASC 259
|
||||
#define TK_NULLS 260
|
||||
#define TK_ABORT 261
|
||||
#define TK_AFTER 262
|
||||
#define TK_ATTACH 263
|
||||
#define TK_BEFORE 264
|
||||
#define TK_BEGIN 265
|
||||
#define TK_BITAND 266
|
||||
#define TK_BITNOT 267
|
||||
#define TK_BITOR 268
|
||||
#define TK_BLOCKS 269
|
||||
#define TK_CHANGE 270
|
||||
#define TK_COMMA 271
|
||||
#define TK_COMPACT 272
|
||||
#define TK_CONCAT 273
|
||||
#define TK_CONFLICT 274
|
||||
#define TK_COPY 275
|
||||
#define TK_DEFERRED 276
|
||||
#define TK_DELIMITERS 277
|
||||
#define TK_DETACH 278
|
||||
#define TK_DIVIDE 279
|
||||
#define TK_DOT 280
|
||||
#define TK_EACH 281
|
||||
#define TK_FAIL 282
|
||||
#define TK_FILE 283
|
||||
#define TK_FOR 284
|
||||
#define TK_GLOB 285
|
||||
#define TK_ID 286
|
||||
#define TK_IMMEDIATE 287
|
||||
#define TK_IMPORT 288
|
||||
#define TK_INITIALLY 289
|
||||
#define TK_INSTEAD 290
|
||||
#define TK_ISNULL 291
|
||||
#define TK_KEY 292
|
||||
#define TK_NK_BITNOT 293
|
||||
#define TK_NK_SEMI 294
|
||||
#define TK_NOTNULL 295
|
||||
#define TK_OF 296
|
||||
#define TK_PLUS 297
|
||||
#define TK_PRIVILEGE 298
|
||||
#define TK_RAISE 299
|
||||
#define TK_REPLACE 300
|
||||
#define TK_RESTRICT 301
|
||||
#define TK_ROW 302
|
||||
#define TK_SEMI 303
|
||||
#define TK_STAR 304
|
||||
#define TK_STATEMENT 305
|
||||
#define TK_STRING 306
|
||||
#define TK_TIMES 307
|
||||
#define TK_UPDATE 308
|
||||
#define TK_VALUES 309
|
||||
#define TK_VARIABLE 310
|
||||
#define TK_VIEW 311
|
||||
#define TK_WAL 312
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
|
|
@ -103,6 +103,8 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_STREAM_OPTIONS,
|
||||
QUERY_NODE_LEFT_VALUE,
|
||||
QUERY_NODE_COLUMN_REF,
|
||||
QUERY_NODE_WHEN_THEN,
|
||||
QUERY_NODE_CASE_WHEN,
|
||||
|
||||
// Statement nodes are used in parser and planner module.
|
||||
QUERY_NODE_SET_OPERATOR = 100,
|
||||
|
|
|
@ -165,7 +165,8 @@ typedef struct SVnodeModifyLogicNode {
|
|||
|
||||
typedef struct SExchangeLogicNode {
|
||||
SLogicNode node;
|
||||
int32_t srcGroupId;
|
||||
int32_t srcStartGroupId;
|
||||
int32_t srcEndGroupId;
|
||||
} SExchangeLogicNode;
|
||||
|
||||
typedef struct SMergeLogicNode {
|
||||
|
@ -399,7 +400,10 @@ typedef struct SDownstreamSourceNode {
|
|||
|
||||
typedef struct SExchangePhysiNode {
|
||||
SPhysiNode node;
|
||||
int32_t srcGroupId; // group id of datasource suplans
|
||||
// for set operators, there will be multiple execution groups under one exchange, and the ids of these execution
|
||||
// groups are consecutive
|
||||
int32_t srcStartGroupId;
|
||||
int32_t srcEndGroupId;
|
||||
bool singleChannel;
|
||||
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
|
||||
} SExchangePhysiNode;
|
||||
|
|
|
@ -241,6 +241,19 @@ typedef struct SFillNode {
|
|||
STimeWindow timeRange;
|
||||
} SFillNode;
|
||||
|
||||
typedef struct SWhenThenNode {
|
||||
SExprNode node; // QUERY_NODE_WHEN_THEN
|
||||
SNode* pWhen;
|
||||
SNode* pThen;
|
||||
} SWhenThenNode;
|
||||
|
||||
typedef struct SCaseWhenNode {
|
||||
SExprNode node; // QUERY_NODE_CASE_WHEN
|
||||
SNode* pCase;
|
||||
SNode* pElse;
|
||||
SNodeList* pWhenThenList;
|
||||
} SCaseWhenNode;
|
||||
|
||||
typedef struct SSelectStmt {
|
||||
ENodeType type; // QUERY_NODE_SELECT_STMT
|
||||
bool isDistinct;
|
||||
|
|
|
@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1;
|
|||
int32_t tsNumOfMnodeReadThreads = 1;
|
||||
int32_t tsNumOfVnodeQueryThreads = 4;
|
||||
int32_t tsNumOfVnodeStreamThreads = 2;
|
||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||
int32_t tsNumOfVnodeFetchThreads = 1;
|
||||
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||
|
@ -165,8 +165,8 @@ int32_t tsMqRebalanceInterval = 2;
|
|||
int32_t tsTtlUnit = 86400;
|
||||
int32_t tsTtlPushInterval = 86400;
|
||||
int32_t tsGrantHBInterval = 60;
|
||||
int32_t tsUptimeInterval = 300; // seconds
|
||||
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits
|
||||
int32_t tsUptimeInterval = 300; // seconds
|
||||
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits
|
||||
|
||||
#ifndef _STORAGE
|
||||
int32_t taosSetTfsCfg(SConfig *pCfg) {
|
||||
|
@ -371,9 +371,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
||||
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
|
||||
tsNumOfVnodeFetchThreads = 1;
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
||||
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
|
||||
|
|
|
@ -479,6 +479,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
||||
|
||||
#if 1
|
||||
|
||||
#endif
|
||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
|
|
|
@ -25,17 +25,17 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
|||
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
||||
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
|
||||
if (tEncodeI32(pEncoder, size) < 0) return -1;
|
||||
void *pIter = NULL;
|
||||
void* pIter = NULL;
|
||||
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
||||
while(pIter){
|
||||
int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL);
|
||||
while (pIter) {
|
||||
int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
|
||||
if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
|
||||
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
||||
}
|
||||
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||
}
|
||||
tEndEncode(pEncoder);
|
||||
|
@ -52,17 +52,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
|||
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
||||
}else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
int32_t size = 0;
|
||||
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
||||
for(int32_t i = 0; i < size; i++){
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
int64_t tbUid = 0;
|
||||
if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
|
||||
taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||
}
|
||||
tEndDecode(pDecoder);
|
||||
|
@ -117,7 +117,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
|
||||
if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -284,7 +284,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
};
|
||||
|
||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
|
||||
handle.execHandle.task = qCreateQueueExecTaskInfo(
|
||||
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
|
||||
ASSERT(handle.execHandle.task);
|
||||
|
@ -297,9 +296,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||
|
||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
||||
handle.execHandle.task =
|
||||
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
||||
(SSnapContext**)(&reader.sContext));
|
||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
|
||||
|
@ -314,9 +313,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
||||
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
||||
handle.execHandle.task =
|
||||
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
||||
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
||||
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||
|
|
|
@ -1287,14 +1287,14 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons
|
|||
if (p->version >= pBlock->minVer) {
|
||||
if (i < num - 1) {
|
||||
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
|
||||
if (i + 1 == num - 1) { // pnext is the last point
|
||||
// if (i + 1 == num - 1) { // pnext is the last point
|
||||
if (pnext->ts >= pBlock->minKey.ts) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
|
||||
return true;
|
||||
}
|
||||
// }
|
||||
// } else {
|
||||
// if (pnext->ts >= pBlock->minKey.ts) {
|
||||
// return true;
|
||||
// }
|
||||
}
|
||||
} else { // it must be the last point
|
||||
ASSERT(p->version == 0);
|
||||
|
|
|
@ -764,9 +764,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
|
||||
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
|
||||
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId));
|
||||
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId));
|
||||
if (NULL == group) {
|
||||
qError("exchange src group %d not in groupHash", pExchNode->srcGroupId);
|
||||
qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -801,7 +801,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
}
|
||||
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1));
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1));
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
|
||||
|
|
|
@ -49,6 +49,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
|
||||
#if 0
|
||||
// TODO: if a block was set but not consumed,
|
||||
// prevent setting a different type of block
|
||||
pInfo->validBlockIndex = 0;
|
||||
|
@ -57,6 +58,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
} else {
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
}
|
||||
#endif
|
||||
|
||||
ASSERT(pInfo->validBlockIndex == 0);
|
||||
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
|
||||
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
// ASSERT(numOfBlocks > 1);
|
||||
|
@ -79,7 +84,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
|
||||
|
||||
#if 0
|
||||
// TODO optimize
|
||||
SSDataBlock* p = createOneDataBlock(pDataBlock, false);
|
||||
p->info = pDataBlock->info;
|
||||
|
@ -87,6 +94,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
taosArrayClear(p->pDataBlock);
|
||||
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
||||
taosArrayPush(pInfo->pBlockLists, &p);
|
||||
#endif
|
||||
}
|
||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||
} else {
|
||||
|
@ -100,6 +108,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }
|
||||
|
||||
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
||||
#if 0
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
|
||||
return;
|
||||
|
@ -116,6 +125,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
|||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
||||
|
|
|
@ -920,6 +920,19 @@ _error:
|
|||
}
|
||||
|
||||
static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
||||
#if 0
|
||||
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
for (int32_t i = 0; i < total; i++) {
|
||||
SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
|
||||
taosArrayDestroy(p->pDataBlock);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
pInfo->validBlockIndex = 0;
|
||||
#if 0
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
||||
pInfo->validBlockIndex = 0;
|
||||
|
@ -928,6 +941,7 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
|||
blockDataDestroy(p);
|
||||
}
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
#endif
|
||||
}
|
||||
|
||||
static bool isSessionWindow(SStreamScanInfo* pInfo) {
|
||||
|
@ -1580,9 +1594,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
// TODO: refactor
|
||||
FETCH_NEXT_BLOCK:
|
||||
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||
if (pInfo->validBlockIndex >= total) {
|
||||
/*doClearBufferedBlocks(pInfo);*/
|
||||
doClearBufferedBlocks(pInfo);
|
||||
/*pOperator->status = OP_EXEC_DONE;*/
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1606,27 +1621,40 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
} break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
printDataBlock(pBlock, "stream scan delete recv");
|
||||
SSDataBlock* pDelBlock = NULL;
|
||||
if (pInfo->tqReader) {
|
||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||
filterDelBlockByUid(pDelBlock, pBlock, pInfo);
|
||||
pBlock = pDelBlock;
|
||||
} else {
|
||||
pDelBlock = pBlock;
|
||||
}
|
||||
printDataBlock(pBlock, "stream scan delete recv filtered");
|
||||
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
|
||||
generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes);
|
||||
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
|
||||
printDataBlock(pBlock, "stream scan delete result");
|
||||
return pInfo->pDeleteDataRes;
|
||||
printDataBlock(pDelBlock, "stream scan delete result");
|
||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||
return pInfo->pDeleteDataRes;
|
||||
} else {
|
||||
goto FETCH_NEXT_BLOCK;
|
||||
}
|
||||
} else {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->updateResIndex = 0;
|
||||
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
|
||||
generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
printDataBlock(pBlock, "stream scan delete data");
|
||||
return pInfo->pDeleteDataRes;
|
||||
printDataBlock(pDelBlock, "stream scan delete data");
|
||||
if (pInfo->tqReader) {
|
||||
blockDataDestroy(pDelBlock);
|
||||
}
|
||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||
return pInfo->pDeleteDataRes;
|
||||
} else {
|
||||
goto FETCH_NEXT_BLOCK;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
|
@ -1688,10 +1716,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
||||
NEXT_SUBMIT_BLK:
|
||||
while (1) {
|
||||
if (pInfo->tqReader->pMsg == NULL) {
|
||||
if (pInfo->validBlockIndex >= totBlockNum) {
|
||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||
doClearBufferedBlocks(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1763,7 +1793,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
qDebug("scan rows: %d", pBlockInfo->rows);
|
||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||
if (pBlockInfo->rows > 0) {
|
||||
return pInfo->pRes;
|
||||
} else {
|
||||
goto NEXT_SUBMIT_BLK;
|
||||
}
|
||||
/*return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;*/
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
|
|
|
@ -5658,7 +5658,6 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
|
|||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardRows = 0;
|
||||
int32_t aa = 4;
|
||||
|
||||
ASSERT(pSDataBlock->pDataBlock != NULL);
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
|
|
|
@ -324,6 +324,21 @@ static int32_t fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t whenThenNodeCopy(const SWhenThenNode* pSrc, SWhenThenNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
|
||||
CLONE_NODE_FIELD(pWhen);
|
||||
CLONE_NODE_FIELD(pThen);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t caseWhenNodeCopy(const SCaseWhenNode* pSrc, SCaseWhenNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
|
||||
CLONE_NODE_FIELD(pCase);
|
||||
CLONE_NODE_FIELD(pElse);
|
||||
CLONE_NODE_LIST_FIELD(pWhenThenList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
|
||||
CLONE_NODE_LIST_FIELD(pTargets);
|
||||
CLONE_NODE_FIELD(pConditions);
|
||||
|
@ -414,7 +429,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
|
|||
|
||||
static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(srcStartGroupId);
|
||||
COPY_SCALAR_FIELD(srcEndGroupId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -711,6 +727,12 @@ SNode* nodesCloneNode(const SNode* pNode) {
|
|||
case QUERY_NODE_LEFT_VALUE:
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
break;
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
code = whenThenNodeCopy((const SWhenThenNode*)pNode, (SWhenThenNode*)pDst);
|
||||
break;
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
code = caseWhenNodeCopy((const SCaseWhenNode*)pNode, (SCaseWhenNode*)pDst);
|
||||
break;
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
code = selectStmtCopy((const SSelectStmt*)pNode, (SSelectStmt*)pDst);
|
||||
break;
|
||||
|
|
|
@ -81,6 +81,10 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "IndexOptions";
|
||||
case QUERY_NODE_LEFT_VALUE:
|
||||
return "LeftValue";
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
return "WhenThen";
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
return "CaseWhen";
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return "SetOperator";
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
@ -722,14 +726,18 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkExchangeLogicPlanSrcStartGroupId = "SrcStartGroupId";
|
||||
static const char* jkExchangeLogicPlanSrcEndGroupId = "SrcEndGroupId";
|
||||
|
||||
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
|
||||
|
||||
int32_t code = logicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId);
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcStartGroupId, pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcEndGroupId, pNode->srcEndGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -740,7 +748,10 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
|
|||
|
||||
int32_t code = jsonToLogicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId);
|
||||
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcStartGroupId, &pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcEndGroupId, &pNode->srcEndGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1833,7 +1844,8 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkExchangePhysiPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
|
||||
|
||||
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -1841,7 +1853,10 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
|
||||
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcGroupId, pNode->srcGroupId);
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcStartGroupId, pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcEndGroupId, pNode->srcEndGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
|
||||
|
@ -1855,7 +1870,10 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
|
|||
|
||||
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcGroupId, &pNode->srcGroupId);
|
||||
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcStartGroupId, &pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcEndGroupId, &pNode->srcEndGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
|
||||
|
@ -3917,6 +3935,75 @@ static int32_t jsonToDatabaseOptions(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkWhenThenWhen = "When";
|
||||
static const char* jkWhenThenThen = "Then";
|
||||
|
||||
static int32_t whenThenNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SWhenThenNode* pNode = (const SWhenThenNode*)pObj;
|
||||
|
||||
int32_t code = exprNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkWhenThenWhen, nodeToJson, pNode->pWhen);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkWhenThenThen, nodeToJson, pNode->pThen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToWhenThenNode(const SJson* pJson, void* pObj) {
|
||||
SWhenThenNode* pNode = (SWhenThenNode*)pObj;
|
||||
|
||||
int32_t code = jsonToExprNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkWhenThenWhen, &pNode->pWhen);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkWhenThenThen, &pNode->pThen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkCaseWhenCase = "Case";
|
||||
static const char* jkCaseWhenWhenThenList = "WhenThenList";
|
||||
static const char* jkCaseWhenElse = "Else";
|
||||
|
||||
static int32_t caseWhenNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SCaseWhenNode* pNode = (const SCaseWhenNode*)pObj;
|
||||
|
||||
int32_t code = exprNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkCaseWhenCase, nodeToJson, pNode->pCase);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkCaseWhenWhenThenList, pNode->pWhenThenList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkCaseWhenElse, nodeToJson, pNode->pElse);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToCaseWhenNode(const SJson* pJson, void* pObj) {
|
||||
SCaseWhenNode* pNode = (SCaseWhenNode*)pObj;
|
||||
|
||||
int32_t code = jsonToExprNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkCaseWhenCase, &pNode->pCase);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkCaseWhenWhenThenList, &pNode->pWhenThenList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkCaseWhenElse, &pNode->pElse);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkDataBlockDescDataBlockId = "DataBlockId";
|
||||
static const char* jkDataBlockDescSlots = "Slots";
|
||||
static const char* jkDataBlockTotalRowSize = "TotalRowSize";
|
||||
|
@ -4399,6 +4486,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return databaseOptionsToJson(pObj, pJson);
|
||||
case QUERY_NODE_LEFT_VALUE:
|
||||
return TSDB_CODE_SUCCESS; // SLeftValueNode has no fields to serialize.
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
return whenThenNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
return caseWhenNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return setOperatorToJson(pObj, pJson);
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
@ -4562,6 +4653,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToDatabaseOptions(pJson, pObj);
|
||||
case QUERY_NODE_LEFT_VALUE:
|
||||
return TSDB_CODE_SUCCESS; // SLeftValueNode has no fields to deserialize.
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
return jsonToWhenThenNode(pJson, pObj);
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
return jsonToCaseWhenNode(pJson, pObj);
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return jsonToSetOperator(pJson, pObj);
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
|
|
@ -140,6 +140,19 @@ static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool whenThenNodeEqual(const SWhenThenNode* a, const SWhenThenNode* b) {
|
||||
COMPARE_NODE_FIELD(pWhen);
|
||||
COMPARE_NODE_FIELD(pThen);
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool caseWhenNodeEqual(const SCaseWhenNode* a, const SCaseWhenNode* b) {
|
||||
COMPARE_NODE_FIELD(pCase);
|
||||
COMPARE_NODE_FIELD(pElse);
|
||||
COMPARE_NODE_LIST_FIELD(pWhenThenList);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nodesEqualNode(const SNode* a, const SNode* b) {
|
||||
if (a == b) {
|
||||
return true;
|
||||
|
@ -164,13 +177,17 @@ bool nodesEqualNode(const SNode* a, const SNode* b) {
|
|||
return logicConditionNodeEqual((const SLogicConditionNode*)a, (const SLogicConditionNode*)b);
|
||||
case QUERY_NODE_FUNCTION:
|
||||
return functionNodeEqual((const SFunctionNode*)a, (const SFunctionNode*)b);
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
return whenThenNodeEqual((const SWhenThenNode*)a, (const SWhenThenNode*)b);
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
return caseWhenNodeEqual((const SCaseWhenNode*)a, (const SCaseWhenNode*)b);
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
case QUERY_NODE_JOIN_TABLE:
|
||||
case QUERY_NODE_GROUPING_SET:
|
||||
case QUERY_NODE_ORDER_BY_EXPR:
|
||||
case QUERY_NODE_LIMIT:
|
||||
return false; // todo
|
||||
return false;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1733,6 +1733,92 @@ static int32_t msgToDownstreamSourceNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
enum { WHEN_THEN_CODE_EXPR_BASE = 1, WHEN_THEN_CODE_WHEN, WHEN_THEN_CODE_THEN };
|
||||
|
||||
static int32_t whenThenNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SWhenThenNode* pNode = (const SWhenThenNode*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeObj(pEncoder, WHEN_THEN_CODE_EXPR_BASE, exprNodeToMsg, pNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, WHEN_THEN_CODE_WHEN, nodeToMsg, pNode->pWhen);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, WHEN_THEN_CODE_THEN, nodeToMsg, pNode->pThen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToWhenThenNode(STlvDecoder* pDecoder, void* pObj) {
|
||||
SWhenThenNode* pNode = (SWhenThenNode*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case WHEN_THEN_CODE_EXPR_BASE:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node);
|
||||
break;
|
||||
case WHEN_THEN_CODE_WHEN:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pWhen);
|
||||
break;
|
||||
case WHEN_THEN_CODE_THEN:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pThen);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum { CASE_WHEN_CODE_EXPR_BASE = 1, CASE_WHEN_CODE_CASE, CASE_WHEN_CODE_ELSE, CASE_WHEN_CODE_WHEN_THEN_LIST };
|
||||
|
||||
static int32_t caseWhenNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SCaseWhenNode* pNode = (const SCaseWhenNode*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeObj(pEncoder, CASE_WHEN_CODE_EXPR_BASE, exprNodeToMsg, pNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, CASE_WHEN_CODE_CASE, nodeToMsg, pNode->pCase);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, CASE_WHEN_CODE_ELSE, nodeToMsg, pNode->pElse);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, CASE_WHEN_CODE_WHEN_THEN_LIST, nodeListToMsg, pNode->pWhenThenList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToCaseWhenNode(STlvDecoder* pDecoder, void* pObj) {
|
||||
SCaseWhenNode* pNode = (SCaseWhenNode*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case CASE_WHEN_CODE_EXPR_BASE:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node);
|
||||
break;
|
||||
case CASE_WHEN_CODE_CASE:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pCase);
|
||||
break;
|
||||
case CASE_WHEN_CODE_ELSE:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pElse);
|
||||
break;
|
||||
case CASE_WHEN_CODE_WHEN_THEN_LIST:
|
||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pWhenThenList);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum {
|
||||
PHY_NODE_CODE_OUTPUT_DESC = 1,
|
||||
PHY_NODE_CODE_CONDITIONS,
|
||||
|
@ -2294,7 +2380,8 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
|
||||
enum {
|
||||
PHY_EXCHANGE_CODE_BASE_NODE = 1,
|
||||
PHY_EXCHANGE_CODE_SRC_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
|
||||
PHY_EXCHANGE_CODE_SRC_ENDPOINTS
|
||||
};
|
||||
|
@ -2304,7 +2391,10 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
|
||||
int32_t code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_GROUP_ID, pNode->srcGroupId);
|
||||
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, pNode->srcEndGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, pNode->singleChannel);
|
||||
|
@ -2326,8 +2416,11 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_EXCHANGE_CODE_BASE_NODE:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
|
||||
break;
|
||||
case PHY_EXCHANGE_CODE_SRC_GROUP_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->srcGroupId);
|
||||
case PHY_EXCHANGE_CODE_SRC_START_GROUP_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->srcStartGroupId);
|
||||
break;
|
||||
case PHY_EXCHANGE_CODE_SRC_END_GROUP_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->srcEndGroupId);
|
||||
break;
|
||||
case PHY_EXCHANGE_CODE_SINGLE_CHANNEL:
|
||||
code = tlvDecodeBool(pTlv, &pNode->singleChannel);
|
||||
|
@ -3434,9 +3527,16 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
code = slotDescNodeToMsg(pObj, pEncoder);
|
||||
break;
|
||||
case QUERY_NODE_DOWNSTREAM_SOURCE:
|
||||
return downstreamSourceNodeToMsg(pObj, pEncoder);
|
||||
code = downstreamSourceNodeToMsg(pObj, pEncoder);
|
||||
break;
|
||||
case QUERY_NODE_LEFT_VALUE:
|
||||
break;
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
code = whenThenNodeToMsg(pObj, pEncoder);
|
||||
break;
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
code = caseWhenNodeToMsg(pObj, pEncoder);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
code = physiScanNodeToMsg(pObj, pEncoder);
|
||||
|
@ -3563,9 +3663,15 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
code = msgToSlotDescNode(pDecoder, pObj);
|
||||
break;
|
||||
case QUERY_NODE_DOWNSTREAM_SOURCE:
|
||||
return msgToDownstreamSourceNode(pDecoder, pObj);
|
||||
code = msgToDownstreamSourceNode(pDecoder, pObj);
|
||||
case QUERY_NODE_LEFT_VALUE:
|
||||
break;
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
code = msgToWhenThenNode(pDecoder, pObj);
|
||||
break;
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
code = msgToCaseWhenNode(pDecoder, pObj);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
code = msgToPhysiScanNode(pDecoder, pObj);
|
||||
|
|
|
@ -146,6 +146,25 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa
|
|||
case QUERY_NODE_TARGET:
|
||||
res = walkExpr(((STargetNode*)pNode)->pExpr, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_WHEN_THEN: {
|
||||
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
|
||||
res = walkExpr(pWhenThen->pWhen, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkExpr(pWhenThen->pThen, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_CASE_WHEN: {
|
||||
SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)pNode;
|
||||
res = walkExpr(pCaseWhen->pCase, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkExpr(pCaseWhen->pElse, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkExprs(pCaseWhen->pWhenThenList, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -291,6 +310,25 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
case QUERY_NODE_TARGET:
|
||||
res = rewriteExpr(&(((STargetNode*)pNode)->pExpr), order, rewriter, pContext);
|
||||
break;
|
||||
case QUERY_NODE_WHEN_THEN: {
|
||||
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
|
||||
res = rewriteExpr(&pWhenThen->pWhen, order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteExpr(&pWhenThen->pThen, order, rewriter, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_CASE_WHEN: {
|
||||
SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)pNode;
|
||||
res = rewriteExpr(&pCaseWhen->pCase, order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteExpr(&pCaseWhen->pElse, order, rewriter, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteExprs(pCaseWhen->pWhenThenList, order, rewriter, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -291,6 +291,10 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SLeftValueNode));
|
||||
case QUERY_NODE_COLUMN_REF:
|
||||
return makeNode(type, sizeof(SColumnDefNode));
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
return makeNode(type, sizeof(SWhenThenNode));
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
return makeNode(type, sizeof(SCaseWhenNode));
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return makeNode(type, sizeof(SSetOperator));
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
@ -738,7 +742,21 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_LEFT_VALUE: // no pointer field
|
||||
case QUERY_NODE_COLUMN_REF: // no pointer field
|
||||
break;
|
||||
case QUERY_NODE_WHEN_THEN: {
|
||||
SWhenThenNode* pStmt = (SWhenThenNode*)pNode;
|
||||
nodesDestroyNode(pStmt->pWhen);
|
||||
nodesDestroyNode(pStmt->pThen);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_CASE_WHEN: {
|
||||
SCaseWhenNode* pStmt = (SCaseWhenNode*)pNode;
|
||||
nodesDestroyNode(pStmt->pCase);
|
||||
nodesDestroyNode(pStmt->pElse);
|
||||
nodesDestroyList(pStmt->pWhenThenList);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_SET_OPERATOR: {
|
||||
SSetOperator* pStmt = (SSetOperator*)pNode;
|
||||
nodesDestroyList(pStmt->pProjectionList);
|
||||
|
|
|
@ -119,6 +119,8 @@ SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode
|
|||
SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues);
|
||||
SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode);
|
||||
SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd);
|
||||
SNode* createWhenThenNode(SAstCreateContext* pCxt, SNode* pWhen, SNode* pThen);
|
||||
SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhenThenList, SNode* pElse);
|
||||
|
||||
SNode* addWhereClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pWhere);
|
||||
SNode* addPartitionByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pPartitionByList);
|
||||
|
|
|
@ -625,42 +625,44 @@ stream_name(A) ::= NK_ID(B).
|
|||
cgroup_name(A) ::= NK_ID(B). { A = B; }
|
||||
|
||||
/************************************************ expression **********************************************************/
|
||||
expr_or_subquery(A) ::= expression(B). { A = B; }
|
||||
expr_or_subquery(A) ::= subquery(B). { A = B; }
|
||||
|
||||
expression(A) ::= literal(B). { A = B; }
|
||||
expression(A) ::= pseudo_column(B). { A = B; }
|
||||
expression(A) ::= column_reference(B). { A = B; }
|
||||
expression(A) ::= function_expression(B). { A = B; }
|
||||
//expression(A) ::= case_expression(B). { A = B; }
|
||||
expression(A) ::= subquery(B). { A = B; }
|
||||
expression(A) ::= case_when_expression(B). { A = B; }
|
||||
expression(A) ::= NK_LP(B) expression(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, releaseRawExprNode(pCxt, C)); }
|
||||
expression(A) ::= NK_PLUS(B) expression(C). {
|
||||
expression(A) ::= NK_PLUS(B) expr_or_subquery(C). {
|
||||
SToken t = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &B, &t, releaseRawExprNode(pCxt, C));
|
||||
}
|
||||
expression(A) ::= NK_MINUS(B) expression(C). {
|
||||
expression(A) ::= NK_MINUS(B) expr_or_subquery(C). {
|
||||
SToken t = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &B, &t, createOperatorNode(pCxt, OP_TYPE_MINUS, releaseRawExprNode(pCxt, C), NULL));
|
||||
}
|
||||
expression(A) ::= expression(B) NK_PLUS expression(C). {
|
||||
expression(A) ::= expr_or_subquery(B) NK_PLUS expr_or_subquery(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_ADD, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
}
|
||||
expression(A) ::= expression(B) NK_MINUS expression(C). {
|
||||
expression(A) ::= expr_or_subquery(B) NK_MINUS expr_or_subquery(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_SUB, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
}
|
||||
expression(A) ::= expression(B) NK_STAR expression(C). {
|
||||
expression(A) ::= expr_or_subquery(B) NK_STAR expr_or_subquery(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_MULTI, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
}
|
||||
expression(A) ::= expression(B) NK_SLASH expression(C). {
|
||||
expression(A) ::= expr_or_subquery(B) NK_SLASH expr_or_subquery(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_DIV, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
}
|
||||
expression(A) ::= expression(B) NK_REM expression(C). {
|
||||
expression(A) ::= expr_or_subquery(B) NK_REM expr_or_subquery(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_REM, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
|
@ -669,12 +671,12 @@ expression(A) ::= column_reference(B) NK_ARROW NK_STRING(C).
|
|||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
A = createRawExprNodeExt(pCxt, &s, &C, createOperatorNode(pCxt, OP_TYPE_JSON_GET_VALUE, releaseRawExprNode(pCxt, B), createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)));
|
||||
}
|
||||
expression(A) ::= expression(B) NK_BITAND expression(C). {
|
||||
expression(A) ::= expr_or_subquery(B) NK_BITAND expr_or_subquery(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_BIT_AND, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
}
|
||||
expression(A) ::= expression(B) NK_BITOR expression(C). {
|
||||
expression(A) ::= expr_or_subquery(B) NK_BITOR expr_or_subquery(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, C);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, OP_TYPE_BIT_OR, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)));
|
||||
|
@ -682,8 +684,8 @@ expression(A) ::= expression(B) NK_BITOR expression(C).
|
|||
|
||||
%type expression_list { SNodeList* }
|
||||
%destructor expression_list { nodesDestroyList($$); }
|
||||
expression_list(A) ::= expression(B). { A = createNodeList(pCxt, releaseRawExprNode(pCxt, B)); }
|
||||
expression_list(A) ::= expression_list(B) NK_COMMA expression(C). { A = addNodeToList(pCxt, B, releaseRawExprNode(pCxt, C)); }
|
||||
expression_list(A) ::= expr_or_subquery(B). { A = createNodeList(pCxt, releaseRawExprNode(pCxt, B)); }
|
||||
expression_list(A) ::= expression_list(B) NK_COMMA expr_or_subquery(C). { A = addNodeToList(pCxt, B, releaseRawExprNode(pCxt, C)); }
|
||||
|
||||
column_reference(A) ::= column_name(B). { A = createRawExprNode(pCxt, &B, createColumnNode(pCxt, NULL, &B)); }
|
||||
column_reference(A) ::= table_name(B) NK_DOT column_name(C). { A = createRawExprNodeExt(pCxt, &B, &C, createColumnNode(pCxt, &B, &C)); }
|
||||
|
@ -700,7 +702,8 @@ pseudo_column(A) ::= WDURATION(B).
|
|||
|
||||
function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
|
||||
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
|
||||
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
|
||||
function_expression(A) ::=
|
||||
CAST(B) NK_LP expr_or_subquery(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
|
||||
function_expression(A) ::= literal_func(B). { A = B; }
|
||||
|
||||
literal_func(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNode(pCxt, &B, NULL)); }
|
||||
|
@ -735,35 +738,52 @@ star_func_para_list(A) ::= other_para_list(B).
|
|||
other_para_list(A) ::= star_func_para(B). { A = createNodeList(pCxt, B); }
|
||||
other_para_list(A) ::= other_para_list(B) NK_COMMA star_func_para(C). { A = addNodeToList(pCxt, B, C); }
|
||||
|
||||
star_func_para(A) ::= expression(B). { A = releaseRawExprNode(pCxt, B); }
|
||||
star_func_para(A) ::= expr_or_subquery(B). { A = releaseRawExprNode(pCxt, B); }
|
||||
star_func_para(A) ::= table_name(B) NK_DOT NK_STAR(C). { A = createColumnNode(pCxt, &B, &C); }
|
||||
|
||||
case_when_expression(A) ::=
|
||||
CASE(E) when_then_list(C) case_when_else_opt(D) END(F). { A = createRawExprNodeExt(pCxt, &E, &F, createCaseWhenNode(pCxt, NULL, C, D)); }
|
||||
case_when_expression(A) ::=
|
||||
CASE(E) common_expression(B) when_then_list(C) case_when_else_opt(D) END(F). { A = createRawExprNodeExt(pCxt, &E, &F, createCaseWhenNode(pCxt, releaseRawExprNode(pCxt, B), C, D)); }
|
||||
|
||||
%type when_then_list { SNodeList* }
|
||||
%destructor when_then_list { nodesDestroyList($$); }
|
||||
when_then_list(A) ::= when_then_expr(B). { A = createNodeList(pCxt, B); }
|
||||
when_then_list(A) ::= when_then_list(B) when_then_expr(C). { A = addNodeToList(pCxt, B, C); }
|
||||
|
||||
when_then_expr(A) ::= WHEN common_expression(B) THEN common_expression(C). { A = createWhenThenNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
|
||||
|
||||
case_when_else_opt(A) ::= . { A = NULL; }
|
||||
case_when_else_opt(A) ::= ELSE common_expression(B). { A = releaseRawExprNode(pCxt, B); }
|
||||
|
||||
/************************************************ predicate ***********************************************************/
|
||||
predicate(A) ::= expression(B) compare_op(C) expression(D). {
|
||||
predicate(A) ::= expr_or_subquery(B) compare_op(C) expr_or_subquery(D). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, D);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, C, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, D)));
|
||||
}
|
||||
//predicate(A) ::= expression(B) compare_op sub_type expression(B).
|
||||
predicate(A) ::= expression(B) BETWEEN expression(C) AND expression(D). {
|
||||
predicate(A) ::=
|
||||
expr_or_subquery(B) BETWEEN expr_or_subquery(C) AND expr_or_subquery(D). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, D);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createBetweenAnd(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), releaseRawExprNode(pCxt, D)));
|
||||
}
|
||||
predicate(A) ::= expression(B) NOT BETWEEN expression(C) AND expression(D). {
|
||||
predicate(A) ::=
|
||||
expr_or_subquery(B) NOT BETWEEN expr_or_subquery(C) AND expr_or_subquery(D). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, D);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), releaseRawExprNode(pCxt, D)));
|
||||
}
|
||||
predicate(A) ::= expression(B) IS NULL(C). {
|
||||
predicate(A) ::= expr_or_subquery(B) IS NULL(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
A = createRawExprNodeExt(pCxt, &s, &C, createOperatorNode(pCxt, OP_TYPE_IS_NULL, releaseRawExprNode(pCxt, B), NULL));
|
||||
}
|
||||
predicate(A) ::= expression(B) IS NOT NULL(C). {
|
||||
predicate(A) ::= expr_or_subquery(B) IS NOT NULL(C). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
A = createRawExprNodeExt(pCxt, &s, &C, createOperatorNode(pCxt, OP_TYPE_IS_NOT_NULL, releaseRawExprNode(pCxt, B), NULL));
|
||||
}
|
||||
predicate(A) ::= expression(B) in_op(C) in_predicate_value(D). {
|
||||
predicate(A) ::= expr_or_subquery(B) in_op(C) in_predicate_value(D). {
|
||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||
SToken e = getTokenFromRawExprNode(pCxt, D);
|
||||
A = createRawExprNodeExt(pCxt, &s, &e, createOperatorNode(pCxt, C, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, D)));
|
||||
|
@ -813,7 +833,7 @@ boolean_primary(A) ::= predicate(B).
|
|||
boolean_primary(A) ::= NK_LP(C) boolean_value_expression(B) NK_RP(D). { A = createRawExprNodeExt(pCxt, &C, &D, releaseRawExprNode(pCxt, B)); }
|
||||
|
||||
/************************************************ common_expression ********************************************/
|
||||
common_expression(A) ::= expression(B). { A = B; }
|
||||
common_expression(A) ::= expr_or_subquery(B). { A = B; }
|
||||
common_expression(A) ::= boolean_value_expression(B). { A = B; }
|
||||
|
||||
/************************************************ from_clause_opt *********************************************************/
|
||||
|
@ -894,7 +914,7 @@ partition_by_clause_opt(A) ::= PARTITION BY expression_list(B).
|
|||
twindow_clause_opt(A) ::= . { A = NULL; }
|
||||
twindow_clause_opt(A) ::=
|
||||
SESSION NK_LP column_reference(B) NK_COMMA duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
|
||||
twindow_clause_opt(A) ::= STATE_WINDOW NK_LP expression(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); }
|
||||
twindow_clause_opt(A) ::= STATE_WINDOW NK_LP expr_or_subquery(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); }
|
||||
twindow_clause_opt(A) ::=
|
||||
INTERVAL NK_LP duration_literal(B) NK_RP sliding_opt(C) fill_opt(D). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL, C, D); }
|
||||
twindow_clause_opt(A) ::=
|
||||
|
@ -923,14 +943,15 @@ group_by_clause_opt(A) ::= GROUP BY group_by_list(B).
|
|||
|
||||
%type group_by_list { SNodeList* }
|
||||
%destructor group_by_list { nodesDestroyList($$); }
|
||||
group_by_list(A) ::= expression(B). { A = createNodeList(pCxt, createGroupingSetNode(pCxt, releaseRawExprNode(pCxt, B))); }
|
||||
group_by_list(A) ::= group_by_list(B) NK_COMMA expression(C). { A = addNodeToList(pCxt, B, createGroupingSetNode(pCxt, releaseRawExprNode(pCxt, C))); }
|
||||
group_by_list(A) ::= expr_or_subquery(B). { A = createNodeList(pCxt, createGroupingSetNode(pCxt, releaseRawExprNode(pCxt, B))); }
|
||||
group_by_list(A) ::= group_by_list(B) NK_COMMA expr_or_subquery(C). { A = addNodeToList(pCxt, B, createGroupingSetNode(pCxt, releaseRawExprNode(pCxt, C))); }
|
||||
|
||||
having_clause_opt(A) ::= . { A = NULL; }
|
||||
having_clause_opt(A) ::= HAVING search_condition(B). { A = B; }
|
||||
|
||||
range_opt(A) ::= . { A = NULL; }
|
||||
range_opt(A) ::= RANGE NK_LP expression(B) NK_COMMA expression(C) NK_RP. { A = createInterpTimeRange(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
|
||||
range_opt(A) ::=
|
||||
RANGE NK_LP expr_or_subquery(B) NK_COMMA expr_or_subquery(C) NK_RP. { A = createInterpTimeRange(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
|
||||
|
||||
every_opt(A) ::= . { A = NULL; }
|
||||
every_opt(A) ::= EVERY NK_LP duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
|
||||
|
@ -974,6 +995,7 @@ limit_clause_opt(A) ::= LIMIT NK_INTEGER(C) NK_COMMA NK_INTEGER(B).
|
|||
|
||||
/************************************************ subquery ************************************************************/
|
||||
subquery(A) ::= NK_LP(B) query_expression(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, C); }
|
||||
subquery(A) ::= NK_LP(B) subquery(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, releaseRawExprNode(pCxt, C)); }
|
||||
|
||||
/************************************************ search_condition ****************************************************/
|
||||
search_condition(A) ::= common_expression(B). { A = releaseRawExprNode(pCxt, B); }
|
||||
|
@ -986,7 +1008,7 @@ sort_specification_list(A) ::=
|
|||
sort_specification_list(B) NK_COMMA sort_specification(C). { A = addNodeToList(pCxt, B, C); }
|
||||
|
||||
sort_specification(A) ::=
|
||||
expression(B) ordering_specification_opt(C) null_ordering_opt(D). { A = createOrderByExprNode(pCxt, releaseRawExprNode(pCxt, B), C, D); }
|
||||
expr_or_subquery(B) ordering_specification_opt(C) null_ordering_opt(D). { A = createOrderByExprNode(pCxt, releaseRawExprNode(pCxt, B), C, D); }
|
||||
|
||||
%type ordering_specification_opt EOrder
|
||||
%destructor ordering_specification_opt { }
|
||||
|
|
|
@ -647,6 +647,25 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd
|
|||
return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt, NULL), pStart, pEnd);
|
||||
}
|
||||
|
||||
SNode* createWhenThenNode(SAstCreateContext* pCxt, SNode* pWhen, SNode* pThen) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SWhenThenNode* pWhenThen = (SWhenThenNode*)nodesMakeNode(QUERY_NODE_WHEN_THEN);
|
||||
CHECK_OUT_OF_MEM(pWhenThen);
|
||||
pWhenThen->pWhen = pWhen;
|
||||
pWhenThen->pThen = pThen;
|
||||
return (SNode*)pWhenThen;
|
||||
}
|
||||
|
||||
SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhenThenList, SNode* pElse) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)nodesMakeNode(QUERY_NODE_CASE_WHEN);
|
||||
CHECK_OUT_OF_MEM(pCaseWhen);
|
||||
pCaseWhen->pCase = pCase;
|
||||
pCaseWhen->pWhenThenList = pWhenThenList;
|
||||
pCaseWhen->pElse = pElse;
|
||||
return (SNode*)pCaseWhen;
|
||||
}
|
||||
|
||||
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
trimEscape(pAlias);
|
||||
|
|
|
@ -54,6 +54,7 @@ static SKeyword keywordTable[] = {
|
|||
{"CACHE", TK_CACHE},
|
||||
{"CACHEMODEL", TK_CACHEMODEL},
|
||||
{"CACHESIZE", TK_CACHESIZE},
|
||||
{"CASE", TK_CASE},
|
||||
{"CAST", TK_CAST},
|
||||
{"CLIENT_VERSION", TK_CLIENT_VERSION},
|
||||
{"CLUSTER", TK_CLUSTER},
|
||||
|
@ -82,7 +83,9 @@ static SKeyword keywordTable[] = {
|
|||
{"DOUBLE", TK_DOUBLE},
|
||||
{"DROP", TK_DROP},
|
||||
{"DURATION", TK_DURATION},
|
||||
{"ELSE", TK_ELSE},
|
||||
{"ENABLE", TK_ENABLE},
|
||||
{"END", TK_END},
|
||||
{"EXISTS", TK_EXISTS},
|
||||
{"EXPIRED", TK_EXPIRED},
|
||||
{"EXPLAIN", TK_EXPLAIN},
|
||||
|
@ -205,6 +208,7 @@ static SKeyword keywordTable[] = {
|
|||
{"TAG", TK_TAG},
|
||||
{"TAGS", TK_TAGS},
|
||||
{"TBNAME", TK_TBNAME},
|
||||
{"THEN", TK_THEN},
|
||||
{"TIMESTAMP", TK_TIMESTAMP},
|
||||
{"TIMEZONE", TK_TIMEZONE},
|
||||
{"TINYINT", TK_TINYINT},
|
||||
|
@ -240,6 +244,7 @@ static SKeyword keywordTable[] = {
|
|||
{"WAL_ROLL_PERIOD", TK_WAL_ROLL_PERIOD},
|
||||
{"WAL_SEGMENT_SIZE", TK_WAL_SEGMENT_SIZE},
|
||||
{"WATERMARK", TK_WATERMARK},
|
||||
{"WHEN", TK_WHEN},
|
||||
{"WHERE", TK_WHERE},
|
||||
{"WINDOW_CLOSE", TK_WINDOW_CLOSE},
|
||||
{"WITH", TK_WITH},
|
||||
|
|
|
@ -1813,6 +1813,59 @@ static EDealRes translateLogicCond(STranslateContext* pCxt, SLogicConditionNode*
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType dt, SNode** pCast) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pFunc->functionName, "cast");
|
||||
pFunc->node.resType = dt;
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeAppend(&pFunc->pParameterList, pExpr)) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != getFuncInfo(pCxt, pFunc)) {
|
||||
nodesClearList(pFunc->pParameterList);
|
||||
pFunc->pParameterList = NULL;
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)pExpr)->aliasName);
|
||||
}
|
||||
*pCast = (SNode*)pFunc;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EDealRes translateWhenThen(STranslateContext* pCxt, SWhenThenNode* pWhenThen) {
|
||||
pWhenThen->node.resType = ((SExprNode*)pWhenThen->pThen)->resType;
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes translateCaseWhen(STranslateContext* pCxt, SCaseWhenNode* pCaseWhen) {
|
||||
bool first = true;
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pCaseWhen->pWhenThenList) {
|
||||
if (first) {
|
||||
pCaseWhen->node.resType = ((SExprNode*)pNode)->resType;
|
||||
} else if (!dataTypeEqual(&pCaseWhen->node.resType, &((SExprNode*)pNode)->resType)) {
|
||||
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
|
||||
SNode* pCastFunc = NULL;
|
||||
if (TSDB_CODE_SUCCESS != createCastFunc(pCxt, pWhenThen->pThen, pCaseWhen->node.resType, &pCastFunc)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "CASE WHEN data type mismatch");
|
||||
}
|
||||
pWhenThen->pThen = pCastFunc;
|
||||
pWhenThen->node.resType = pCaseWhen->node.resType;
|
||||
}
|
||||
}
|
||||
if (NULL != pCaseWhen->pElse && !dataTypeEqual(&pCaseWhen->node.resType, &((SExprNode*)pCaseWhen->pElse)->resType)) {
|
||||
SNode* pCastFunc = NULL;
|
||||
if (TSDB_CODE_SUCCESS != createCastFunc(pCxt, pCaseWhen->pElse, pCaseWhen->node.resType, &pCastFunc)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "CASE WHEN data type mismatch");
|
||||
}
|
||||
pCaseWhen->pElse = pCastFunc;
|
||||
((SExprNode*)pCaseWhen->pElse)->resType = pCaseWhen->node.resType;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes doTranslateExpr(SNode** pNode, void* pContext) {
|
||||
STranslateContext* pCxt = (STranslateContext*)pContext;
|
||||
switch (nodeType(*pNode)) {
|
||||
|
@ -1828,6 +1881,10 @@ static EDealRes doTranslateExpr(SNode** pNode, void* pContext) {
|
|||
return translateLogicCond(pCxt, (SLogicConditionNode*)*pNode);
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
return translateExprSubquery(pCxt, ((STempTableNode*)*pNode)->pSubquery);
|
||||
case QUERY_NODE_WHEN_THEN:
|
||||
return translateWhenThen(pCxt, (SWhenThenNode*)*pNode);
|
||||
case QUERY_NODE_CASE_WHEN:
|
||||
return translateCaseWhen(pCxt, (SCaseWhenNode*)*pNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -3228,27 +3285,6 @@ static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) {
|
|||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType dt, SNode** pCast) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pFunc->functionName, "cast");
|
||||
pFunc->node.resType = dt;
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeAppend(&pFunc->pParameterList, pExpr)) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != getFuncInfo(pCxt, pFunc)) {
|
||||
nodesClearList(pFunc->pParameterList);
|
||||
pFunc->pParameterList = NULL;
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)pExpr)->aliasName);
|
||||
}
|
||||
*pCast = (SNode*)pFunc;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
||||
SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft);
|
||||
SNodeList* pRightProjections = getProjectList(pSetOperator->pRight);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -60,6 +60,9 @@ TEST_F(ParserSelectTest, expression) {
|
|||
run("SELECT ts > 0, c1 between 10 and 20 and c2 = 'qaz' FROM t1");
|
||||
|
||||
run("SELECT c1 | 10, c2 & 20, c4 | c5 FROM t1");
|
||||
|
||||
run("SELECT CASE WHEN ts > '2020-1-1 10:10:10' THEN c1 + 10 ELSE c1 - 10 END FROM t1 "
|
||||
"WHERE CASE c1 WHEN c3 + 20 THEN c3 - 1 WHEN c3 + 10 THEN c3 - 2 ELSE 10 END > 0");
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, condition) {
|
||||
|
@ -312,6 +315,8 @@ TEST_F(ParserSelectTest, subquery) {
|
|||
run("SELECT _C0 FROM (SELECT _ROWTS, ts FROM st1s1)");
|
||||
|
||||
run("SELECT ts FROM (SELECT t1.ts FROM st1s1 t1)");
|
||||
|
||||
run("(((SELECT t1.ts FROM st1s1 t1)))");
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, subquerySemanticCheck) {
|
||||
|
|
|
@ -1046,7 +1046,8 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
|
||||
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
|
||||
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
|
||||
*pPhyNode = (SPhysiNode*)pExchange;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1425,7 +1426,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
|
|||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pMerge->srcGroupId;
|
||||
pExchange->srcStartGroupId = pMerge->srcGroupId;
|
||||
pExchange->srcEndGroupId = pMerge->srcGroupId;
|
||||
pExchange->singleChannel = true;
|
||||
pExchange->node.pParent = (SPhysiNode*)pMerge;
|
||||
pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
|
||||
|
|
|
@ -84,7 +84,8 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
|
|||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
pExchange->srcStartGroupId = pCxt->groupId;
|
||||
pExchange->srcEndGroupId = pCxt->groupId;
|
||||
pExchange->node.precision = pChild->precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
|
@ -112,7 +113,8 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
|
|||
|
||||
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
|
||||
return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
|
||||
return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
|
||||
groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
|
||||
}
|
||||
|
||||
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
|
||||
|
@ -1184,6 +1186,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
|
|||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesDestroyList(pSubplanChildren);
|
||||
|
@ -1207,12 +1210,14 @@ static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan,
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
|
||||
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
|
||||
SProjectLogicNode* pProject) {
|
||||
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
pExchange->srcStartGroupId = startGroupId;
|
||||
pExchange->srcEndGroupId = pCxt->groupId - 1;
|
||||
pExchange->node.precision = pProject->node.precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
|
@ -1246,11 +1251,11 @@ static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t startGroupId = pCxt->groupId;
|
||||
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
|
||||
code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
return code;
|
||||
}
|
||||
|
@ -1260,12 +1265,14 @@ typedef struct SUnionDistinctSplitInfo {
|
|||
SLogicSubplan* pSubplan;
|
||||
} SUnionDistinctSplitInfo;
|
||||
|
||||
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
|
||||
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
|
||||
SAggLogicNode* pAgg) {
|
||||
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
pExchange->srcStartGroupId = startGroupId;
|
||||
pExchange->srcEndGroupId = pCxt->groupId - 1;
|
||||
pExchange->node.precision = pAgg->node.precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
|
@ -1293,11 +1300,11 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t startGroupId = pCxt->groupId;
|
||||
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
|
||||
code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
return code;
|
||||
}
|
||||
|
@ -1430,7 +1437,7 @@ static const SSplitRule splitRuleSet[] = {
|
|||
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
|
||||
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
|
||||
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
|
||||
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
|
||||
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, // not used yet
|
||||
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
|
||||
};
|
||||
// clang-format on
|
||||
|
|
|
@ -63,7 +63,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
|
|||
static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) {
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
|
||||
if (pExchange->srcGroupId == groupId) {
|
||||
if (groupId >= pExchange->srcStartGroupId && groupId <= pExchange->srcEndGroupId) {
|
||||
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource));
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
|
||||
|
|
|
@ -40,6 +40,13 @@ TEST_F(PlanBasicTest, whereClause) {
|
|||
run("SELECT ts, c1 FROM t1 WHERE ts > NOW AND ts IS NULL AND (c1 > 0 OR c3 < 20)");
|
||||
}
|
||||
|
||||
TEST_F(PlanBasicTest, caseWhen) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT CASE WHEN ts > '2020-1-1 10:10:10' THEN c1 + 10 ELSE c1 - 10 END FROM t1 "
|
||||
"WHERE CASE c1 WHEN c2 + 20 THEN c4 - 1 WHEN c2 + 10 THEN c4 - 2 ELSE 10 END > 0");
|
||||
}
|
||||
|
||||
TEST_F(PlanBasicTest, func) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
|
|
@ -205,6 +205,7 @@ typedef struct SQWorker {
|
|||
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
|
||||
SMsgCb msgCb;
|
||||
SQWStat stat;
|
||||
int32_t *destroyed;
|
||||
} SQWorker;
|
||||
|
||||
typedef struct SQWorkerMgmt {
|
||||
|
|
|
@ -485,6 +485,8 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
}
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
|
||||
*mgmt->destroyed = 1;
|
||||
|
||||
taosMemoryFree(mgmt);
|
||||
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
|
|
|
@ -1114,10 +1114,17 @@ void qWorkerDestroy(void **qWorkerMgmt) {
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t destroyed = 0;
|
||||
SQWorker *mgmt = *qWorkerMgmt;
|
||||
|
||||
mgmt->destroyed = &destroyed;
|
||||
|
||||
if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
|
||||
qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
|
||||
return;
|
||||
}
|
||||
|
||||
while (0 == destroyed) {
|
||||
taosMsleep(2);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ typedef struct SFilterRange {
|
|||
|
||||
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
|
||||
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
|
||||
typedef bool(*filter_exec_func)(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols);
|
||||
typedef bool(*filter_exec_func)(void*, int32_t, SColumnInfoData*, SColumnDataAgg*, int16_t, int32_t*);
|
||||
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **);
|
||||
|
||||
typedef struct SFilterRangeCompare {
|
||||
|
|
|
@ -3067,15 +3067,16 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) {
|
||||
return true;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) {
|
||||
return false;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes,
|
||||
SColumnDataAgg *statis, int16_t numOfCols, int32_t *numOfQualified) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
|
@ -3097,7 +3098,9 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
|
|||
|
||||
return all;
|
||||
}
|
||||
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
|
||||
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes,
|
||||
SColumnDataAgg *statis, int16_t numOfCols, int32_t *numOfQualified) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
|
@ -3120,7 +3123,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
|
|||
return all;
|
||||
}
|
||||
|
||||
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
uint16_t dataSize = info->cunits[0].dataSize;
|
||||
|
@ -3136,8 +3139,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe
|
|||
int8_t* p = (int8_t*) pRes->pData;
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
|
||||
SColumnInfoData* pData = info->cunits[0].colData;
|
||||
|
||||
void *colData = colDataGetData(pData, i);
|
||||
if (colData == NULL || colDataIsNull_s(pData, i)) {
|
||||
all = false;
|
||||
continue;
|
||||
|
@ -3147,13 +3151,16 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe
|
|||
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
} else {
|
||||
(*numOfQualified)++;
|
||||
}
|
||||
}
|
||||
|
||||
return all;
|
||||
}
|
||||
|
||||
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SColumnDataAgg *statis,
|
||||
int16_t numOfCols, int32_t *numOfQualified) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
|
@ -3195,8 +3202,8 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes
|
|||
return all;
|
||||
}
|
||||
|
||||
|
||||
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SColumnDataAgg *statis, int16_t numOfCols,
|
||||
int32_t *numOfQualified) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
|
@ -4048,7 +4055,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SC
|
|||
*p = output.columnData;
|
||||
output.numOfRows = pSrc->info.rows;
|
||||
|
||||
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols);
|
||||
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
|
||||
|
||||
// todo this should be return during filter procedure
|
||||
int32_t num = 0;
|
||||
|
|
|
@ -107,7 +107,7 @@ class TDTestCase:
|
|||
|
||||
|
||||
# create stream
|
||||
tdSql.execute('''create stream current_stream into stream_max_stable_1 as select _wstart as start, _wend as end, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s);''')
|
||||
tdSql.execute('''create stream current_stream into stream_max_stable_1 as select _wstart as start, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s);''')
|
||||
|
||||
# insert data
|
||||
for i in range(num_random*n):
|
||||
|
@ -185,20 +185,20 @@ class TDTestCase:
|
|||
tdSql.checkData(0,0,num_random*n)
|
||||
|
||||
# stream data check
|
||||
tdSql.query("select start,end,max_int from stream_max_stable_1 ;")
|
||||
tdSql.query("select start,wend,max_int from stream_max_stable_1 ;")
|
||||
tdSql.checkRows(20)
|
||||
tdSql.query("select sum(max_int) from stream_max_stable_1 ;")
|
||||
stream_data_1 = tdSql.queryResult[0][0]
|
||||
tdSql.query("select sum(min_int) from stream_max_stable_1 ;")
|
||||
stream_data_2 = tdSql.queryResult[0][0]
|
||||
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as start, _wend as end, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s));")
|
||||
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as start, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s));")
|
||||
sql_data_1 = tdSql.queryResult[0][0]
|
||||
sql_data_2 = tdSql.queryResult[0][1]
|
||||
|
||||
self.stream_value_check(stream_data_1,sql_data_1)
|
||||
self.stream_value_check(stream_data_2,sql_data_2)
|
||||
|
||||
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as start, _wend as end, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 interval (5s));")
|
||||
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as start, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 interval (5s));")
|
||||
sql_data_1 = tdSql.queryResult[0][0]
|
||||
sql_data_2 = tdSql.queryResult[0][1]
|
||||
|
||||
|
|
Loading…
Reference in New Issue