feat(stream): tag and child table name improvement

This commit is contained in:
Xiaoyu Wang 2022-09-26 18:39:47 +08:00
parent 1cbd1f4c5f
commit 1d62834e5c
27 changed files with 3819 additions and 3335 deletions

View File

@ -867,6 +867,7 @@ int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int32_t maxSpeed;
} STrimDbReq;
int32_t tSerializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq);
@ -1446,7 +1447,7 @@ typedef struct STableScanAnalyzeInfo {
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
void tFreeSExplainRsp(SExplainRsp *pRsp);
void tFreeSExplainRsp(SExplainRsp* pRsp);
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
@ -1729,6 +1730,8 @@ typedef struct {
int64_t maxDelay;
int64_t watermark;
int8_t igExpired;
int32_t numOfTags;
SArray* pTags; // array of SField
} SCMCreateStreamReq;
typedef struct {

View File

@ -107,227 +107,229 @@
#define TK_TABLE_PREFIX 89
#define TK_TABLE_SUFFIX 90
#define TK_NK_COLON 91
#define TK_TABLE 92
#define TK_NK_LP 93
#define TK_NK_RP 94
#define TK_STABLE 95
#define TK_ADD 96
#define TK_COLUMN 97
#define TK_MODIFY 98
#define TK_RENAME 99
#define TK_TAG 100
#define TK_SET 101
#define TK_NK_EQ 102
#define TK_USING 103
#define TK_TAGS 104
#define TK_COMMENT 105
#define TK_BOOL 106
#define TK_TINYINT 107
#define TK_SMALLINT 108
#define TK_INT 109
#define TK_INTEGER 110
#define TK_BIGINT 111
#define TK_FLOAT 112
#define TK_DOUBLE 113
#define TK_BINARY 114
#define TK_TIMESTAMP 115
#define TK_NCHAR 116
#define TK_UNSIGNED 117
#define TK_JSON 118
#define TK_VARCHAR 119
#define TK_MEDIUMBLOB 120
#define TK_BLOB 121
#define TK_VARBINARY 122
#define TK_DECIMAL 123
#define TK_MAX_DELAY 124
#define TK_WATERMARK 125
#define TK_ROLLUP 126
#define TK_TTL 127
#define TK_SMA 128
#define TK_FIRST 129
#define TK_LAST 130
#define TK_SHOW 131
#define TK_DATABASES 132
#define TK_TABLES 133
#define TK_STABLES 134
#define TK_MNODES 135
#define TK_MODULES 136
#define TK_QNODES 137
#define TK_FUNCTIONS 138
#define TK_INDEXES 139
#define TK_ACCOUNTS 140
#define TK_APPS 141
#define TK_CONNECTIONS 142
#define TK_LICENCES 143
#define TK_GRANTS 144
#define TK_QUERIES 145
#define TK_SCORES 146
#define TK_TOPICS 147
#define TK_VARIABLES 148
#define TK_BNODES 149
#define TK_SNODES 150
#define TK_CLUSTER 151
#define TK_TRANSACTIONS 152
#define TK_DISTRIBUTED 153
#define TK_CONSUMERS 154
#define TK_SUBSCRIPTIONS 155
#define TK_VNODES 156
#define TK_LIKE 157
#define TK_INDEX 158
#define TK_FUNCTION 159
#define TK_INTERVAL 160
#define TK_TOPIC 161
#define TK_AS 162
#define TK_WITH 163
#define TK_META 164
#define TK_CONSUMER 165
#define TK_GROUP 166
#define TK_DESC 167
#define TK_DESCRIBE 168
#define TK_RESET 169
#define TK_QUERY 170
#define TK_CACHE 171
#define TK_EXPLAIN 172
#define TK_ANALYZE 173
#define TK_VERBOSE 174
#define TK_NK_BOOL 175
#define TK_RATIO 176
#define TK_NK_FLOAT 177
#define TK_OUTPUTTYPE 178
#define TK_AGGREGATE 179
#define TK_BUFSIZE 180
#define TK_STREAM 181
#define TK_INTO 182
#define TK_TRIGGER 183
#define TK_AT_ONCE 184
#define TK_WINDOW_CLOSE 185
#define TK_IGNORE 186
#define TK_EXPIRED 187
#define TK_KILL 188
#define TK_CONNECTION 189
#define TK_TRANSACTION 190
#define TK_BALANCE 191
#define TK_VGROUP 192
#define TK_MERGE 193
#define TK_REDISTRIBUTE 194
#define TK_SPLIT 195
#define TK_DELETE 196
#define TK_INSERT 197
#define TK_NULL 198
#define TK_NK_QUESTION 199
#define TK_NK_ARROW 200
#define TK_ROWTS 201
#define TK_TBNAME 202
#define TK_QSTART 203
#define TK_QEND 204
#define TK_QDURATION 205
#define TK_WSTART 206
#define TK_WEND 207
#define TK_WDURATION 208
#define TK_CAST 209
#define TK_NOW 210
#define TK_TODAY 211
#define TK_TIMEZONE 212
#define TK_CLIENT_VERSION 213
#define TK_SERVER_VERSION 214
#define TK_SERVER_STATUS 215
#define TK_CURRENT_USER 216
#define TK_COUNT 217
#define TK_LAST_ROW 218
#define TK_CASE 219
#define TK_END 220
#define TK_WHEN 221
#define TK_THEN 222
#define TK_ELSE 223
#define TK_BETWEEN 224
#define TK_IS 225
#define TK_NK_LT 226
#define TK_NK_GT 227
#define TK_NK_LE 228
#define TK_NK_GE 229
#define TK_NK_NE 230
#define TK_MATCH 231
#define TK_NMATCH 232
#define TK_CONTAINS 233
#define TK_IN 234
#define TK_JOIN 235
#define TK_INNER 236
#define TK_SELECT 237
#define TK_DISTINCT 238
#define TK_WHERE 239
#define TK_PARTITION 240
#define TK_BY 241
#define TK_SESSION 242
#define TK_STATE_WINDOW 243
#define TK_SLIDING 244
#define TK_FILL 245
#define TK_VALUE 246
#define TK_NONE 247
#define TK_PREV 248
#define TK_LINEAR 249
#define TK_NEXT 250
#define TK_HAVING 251
#define TK_RANGE 252
#define TK_EVERY 253
#define TK_ORDER 254
#define TK_SLIMIT 255
#define TK_SOFFSET 256
#define TK_LIMIT 257
#define TK_OFFSET 258
#define TK_ASC 259
#define TK_NULLS 260
#define TK_ABORT 261
#define TK_AFTER 262
#define TK_ATTACH 263
#define TK_BEFORE 264
#define TK_BEGIN 265
#define TK_BITAND 266
#define TK_BITNOT 267
#define TK_BITOR 268
#define TK_BLOCKS 269
#define TK_CHANGE 270
#define TK_COMMA 271
#define TK_COMPACT 272
#define TK_CONCAT 273
#define TK_CONFLICT 274
#define TK_COPY 275
#define TK_DEFERRED 276
#define TK_DELIMITERS 277
#define TK_DETACH 278
#define TK_DIVIDE 279
#define TK_DOT 280
#define TK_EACH 281
#define TK_FAIL 282
#define TK_FILE 283
#define TK_FOR 284
#define TK_GLOB 285
#define TK_ID 286
#define TK_IMMEDIATE 287
#define TK_IMPORT 288
#define TK_INITIALLY 289
#define TK_INSTEAD 290
#define TK_ISNULL 291
#define TK_KEY 292
#define TK_NK_BITNOT 293
#define TK_NK_SEMI 294
#define TK_NOTNULL 295
#define TK_OF 296
#define TK_PLUS 297
#define TK_PRIVILEGE 298
#define TK_RAISE 299
#define TK_REPLACE 300
#define TK_RESTRICT 301
#define TK_ROW 302
#define TK_SEMI 303
#define TK_STAR 304
#define TK_STATEMENT 305
#define TK_STRING 306
#define TK_TIMES 307
#define TK_UPDATE 308
#define TK_VALUES 309
#define TK_VARIABLE 310
#define TK_VIEW 311
#define TK_WAL 312
#define TK_MAX_SPEED 92
#define TK_TABLE 93
#define TK_NK_LP 94
#define TK_NK_RP 95
#define TK_STABLE 96
#define TK_ADD 97
#define TK_COLUMN 98
#define TK_MODIFY 99
#define TK_RENAME 100
#define TK_TAG 101
#define TK_SET 102
#define TK_NK_EQ 103
#define TK_USING 104
#define TK_TAGS 105
#define TK_COMMENT 106
#define TK_BOOL 107
#define TK_TINYINT 108
#define TK_SMALLINT 109
#define TK_INT 110
#define TK_INTEGER 111
#define TK_BIGINT 112
#define TK_FLOAT 113
#define TK_DOUBLE 114
#define TK_BINARY 115
#define TK_TIMESTAMP 116
#define TK_NCHAR 117
#define TK_UNSIGNED 118
#define TK_JSON 119
#define TK_VARCHAR 120
#define TK_MEDIUMBLOB 121
#define TK_BLOB 122
#define TK_VARBINARY 123
#define TK_DECIMAL 124
#define TK_MAX_DELAY 125
#define TK_WATERMARK 126
#define TK_ROLLUP 127
#define TK_TTL 128
#define TK_SMA 129
#define TK_FIRST 130
#define TK_LAST 131
#define TK_SHOW 132
#define TK_DATABASES 133
#define TK_TABLES 134
#define TK_STABLES 135
#define TK_MNODES 136
#define TK_MODULES 137
#define TK_QNODES 138
#define TK_FUNCTIONS 139
#define TK_INDEXES 140
#define TK_ACCOUNTS 141
#define TK_APPS 142
#define TK_CONNECTIONS 143
#define TK_LICENCES 144
#define TK_GRANTS 145
#define TK_QUERIES 146
#define TK_SCORES 147
#define TK_TOPICS 148
#define TK_VARIABLES 149
#define TK_BNODES 150
#define TK_SNODES 151
#define TK_CLUSTER 152
#define TK_TRANSACTIONS 153
#define TK_DISTRIBUTED 154
#define TK_CONSUMERS 155
#define TK_SUBSCRIPTIONS 156
#define TK_VNODES 157
#define TK_LIKE 158
#define TK_INDEX 159
#define TK_FUNCTION 160
#define TK_INTERVAL 161
#define TK_TOPIC 162
#define TK_AS 163
#define TK_WITH 164
#define TK_META 165
#define TK_CONSUMER 166
#define TK_GROUP 167
#define TK_DESC 168
#define TK_DESCRIBE 169
#define TK_RESET 170
#define TK_QUERY 171
#define TK_CACHE 172
#define TK_EXPLAIN 173
#define TK_ANALYZE 174
#define TK_VERBOSE 175
#define TK_NK_BOOL 176
#define TK_RATIO 177
#define TK_NK_FLOAT 178
#define TK_OUTPUTTYPE 179
#define TK_AGGREGATE 180
#define TK_BUFSIZE 181
#define TK_STREAM 182
#define TK_INTO 183
#define TK_TRIGGER 184
#define TK_AT_ONCE 185
#define TK_WINDOW_CLOSE 186
#define TK_IGNORE 187
#define TK_EXPIRED 188
#define TK_SUBTABLE 189
#define TK_KILL 190
#define TK_CONNECTION 191
#define TK_TRANSACTION 192
#define TK_BALANCE 193
#define TK_VGROUP 194
#define TK_MERGE 195
#define TK_REDISTRIBUTE 196
#define TK_SPLIT 197
#define TK_DELETE 198
#define TK_INSERT 199
#define TK_NULL 200
#define TK_NK_QUESTION 201
#define TK_NK_ARROW 202
#define TK_ROWTS 203
#define TK_TBNAME 204
#define TK_QSTART 205
#define TK_QEND 206
#define TK_QDURATION 207
#define TK_WSTART 208
#define TK_WEND 209
#define TK_WDURATION 210
#define TK_CAST 211
#define TK_NOW 212
#define TK_TODAY 213
#define TK_TIMEZONE 214
#define TK_CLIENT_VERSION 215
#define TK_SERVER_VERSION 216
#define TK_SERVER_STATUS 217
#define TK_CURRENT_USER 218
#define TK_COUNT 219
#define TK_LAST_ROW 220
#define TK_CASE 221
#define TK_END 222
#define TK_WHEN 223
#define TK_THEN 224
#define TK_ELSE 225
#define TK_BETWEEN 226
#define TK_IS 227
#define TK_NK_LT 228
#define TK_NK_GT 229
#define TK_NK_LE 230
#define TK_NK_GE 231
#define TK_NK_NE 232
#define TK_MATCH 233
#define TK_NMATCH 234
#define TK_CONTAINS 235
#define TK_IN 236
#define TK_JOIN 237
#define TK_INNER 238
#define TK_SELECT 239
#define TK_DISTINCT 240
#define TK_WHERE 241
#define TK_PARTITION 242
#define TK_BY 243
#define TK_SESSION 244
#define TK_STATE_WINDOW 245
#define TK_SLIDING 246
#define TK_FILL 247
#define TK_VALUE 248
#define TK_NONE 249
#define TK_PREV 250
#define TK_LINEAR 251
#define TK_NEXT 252
#define TK_HAVING 253
#define TK_RANGE 254
#define TK_EVERY 255
#define TK_ORDER 256
#define TK_SLIMIT 257
#define TK_SOFFSET 258
#define TK_LIMIT 259
#define TK_OFFSET 260
#define TK_ASC 261
#define TK_NULLS 262
#define TK_ABORT 263
#define TK_AFTER 264
#define TK_ATTACH 265
#define TK_BEFORE 266
#define TK_BEGIN 267
#define TK_BITAND 268
#define TK_BITNOT 269
#define TK_BITOR 270
#define TK_BLOCKS 271
#define TK_CHANGE 272
#define TK_COMMA 273
#define TK_COMPACT 274
#define TK_CONCAT 275
#define TK_CONFLICT 276
#define TK_COPY 277
#define TK_DEFERRED 278
#define TK_DELIMITERS 279
#define TK_DETACH 280
#define TK_DIVIDE 281
#define TK_DOT 282
#define TK_EACH 283
#define TK_FAIL 284
#define TK_FILE 285
#define TK_FOR 286
#define TK_GLOB 287
#define TK_ID 288
#define TK_IMMEDIATE 289
#define TK_IMPORT 290
#define TK_INITIALLY 291
#define TK_INSTEAD 292
#define TK_ISNULL 293
#define TK_KEY 294
#define TK_NK_BITNOT 295
#define TK_NK_SEMI 296
#define TK_NOTNULL 297
#define TK_OF 298
#define TK_PLUS 299
#define TK_PRIVILEGE 300
#define TK_RAISE 301
#define TK_REPLACE 302
#define TK_RESTRICT 303
#define TK_ROW 304
#define TK_SEMI 305
#define TK_STAR 306
#define TK_STATEMENT 307
#define TK_STRING 308
#define TK_TIMES 309
#define TK_UPDATE 310
#define TK_VALUES 311
#define TK_VARIABLE 312
#define TK_VIEW 313
#define TK_WAL 314
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301

View File

@ -119,6 +119,7 @@ typedef struct SFlushDatabaseStmt {
typedef struct STrimDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
int32_t maxSpeed;
} STrimDatabaseStmt;
typedef struct STableOptions {
@ -383,6 +384,8 @@ typedef struct SCreateStreamStmt {
bool ignoreExists;
SStreamOptions* pOptions;
SNode* pQuery;
SNodeList* pTags;
SNode* pSubtable;
} SCreateStreamStmt;
typedef struct SDropStreamStmt {

View File

@ -94,6 +94,8 @@ typedef struct SScanLogicNode {
SArray* pSmaIndexes;
SNodeList* pGroupTags;
bool groupSort;
SNodeList* pTags; // for create stream
SNode* pSubtable; // for create stream
int8_t cacheLastMode;
bool hasNormalCols; // neither tag column nor primary key tag column
bool sortPrimaryKey;
@ -233,6 +235,8 @@ typedef struct SSortLogicNode {
typedef struct SPartitionLogicNode {
SLogicNode node;
SNodeList* pPartitionKeys;
SNodeList* pTags;
SNode* pSubtable;
} SPartitionLogicNode;
typedef enum ESubplanType {
@ -332,6 +336,8 @@ typedef struct STableScanPhysiNode {
SNodeList* pDynamicScanFuncs;
SNodeList* pGroupTags;
bool groupSort;
SNodeList* pTags;
SNode* pSubtable;
int64_t interval;
int64_t offset;
int64_t sliding;
@ -495,7 +501,11 @@ typedef struct SPartitionPhysiNode {
SNodeList* pTargets;
} SPartitionPhysiNode;
typedef SPartitionPhysiNode SStreamPartitionPhysiNode;
typedef struct SStreamPartitionPhysiNode {
SPartitionPhysiNode part;
SNodeList* pTags;
SNode* pSubtable;
} SStreamPartitionPhysiNode;
typedef struct SDataSinkNode {
ENodeType type;

View File

@ -261,6 +261,8 @@ typedef struct SSelectStmt {
SNode* pFromTable;
SNode* pWhere;
SNodeList* pPartitionByList;
SNodeList* pTags; // for create stream
SNode* pSubtable; // for create stream
SNode* pWindow;
SNodeList* pGroupByList; // SGroupingSetNode
SNode* pHaving;

View File

@ -2682,6 +2682,7 @@ int32_t tSerializeSTrimDbReq(void *buf, int32_t bufLen, STrimDbReq *pReq) {
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI32(&encoder, pReq->maxSpeed) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -2695,6 +2696,7 @@ int32_t tDeserializeSTrimDbReq(void *buf, int32_t bufLen, STrimDbReq *pReq) {
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->maxSpeed) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -4373,8 +4375,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1;
if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0)
return -1;
if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0) return -1;
}
tEndDecode(&decoder);
@ -4826,6 +4827,14 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI8(&encoder, pReq->igExpired) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i);
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
tEndEncode(&encoder);
@ -4864,6 +4873,28 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
}
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
if (pReq->numOfTags > 0) {
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
if (pReq->pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0};
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -990,7 +990,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);

View File

@ -1110,7 +1110,8 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
}
}
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status);
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
if (pFilterNode == NULL || pBlock->info.rows == 0) {
@ -1201,7 +1202,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
}
}
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
@ -1787,7 +1788,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
if (pSource->localExec) {
SDataBuf pBuf = {0};
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
int32_t code =
(*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
loadRemoteDataCallback(pWrapper, &pBuf, code);
taosMemoryFree(pWrapper);
} else {
@ -1798,8 +1801,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
}
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
sourceIndex, totalSources);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
pSource->execId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
@ -1824,7 +1827,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
int32_t code =
asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
}
return TSDB_CODE_SUCCESS;
@ -3356,9 +3360,7 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
tDeleteSSchemaWrapper(pSchemaInfo->qsw);
}
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSSchemaWrapper(pStreamInfo->schema);
}
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
taosArrayClear(pTableListInfo->pGroupList);
@ -3539,7 +3541,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
if (pHandle->vnode) {
@ -3729,7 +3732,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
pOptr = createStreamPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);

View File

@ -989,11 +989,11 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
goto _error;
}
int32_t code = TSDB_CODE_SUCCESS;
pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys);
if (pPartNode->pExprs != NULL) {
if (pPartNode->part.pExprs != NULL) {
int32_t num = 0;
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->pExprs, NULL, &num);
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -1008,7 +1008,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
}
pInfo->partitionSup.needCalc = true;
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc);
if (!pResBlock) {
goto _error;
}
@ -1022,7 +1022,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
pOperator->name = "StreamPartitionOperator";
pOperator->blocking = false;

View File

@ -381,6 +381,8 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(igExpired);
CLONE_NODE_LIST_FIELD(pGroupTags);
COPY_SCALAR_FIELD(groupSort);
CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable);
return TSDB_CODE_SUCCESS;
}
@ -488,6 +490,8 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pPartitionKeys);
CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable);
return TSDB_CODE_SUCCESS;
}

View File

@ -1538,6 +1538,8 @@ static const char* jkTableScanPhysiPlanWatermark = "Watermark";
static const char* jkTableScanPhysiPlanIgnoreExpired = "IgnoreExpired";
static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
static const char* jkTableScanPhysiPlanTags = "Tags";
static const char* jkTableScanPhysiPlanSubtable = "Subtable";
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
@ -1595,6 +1597,12 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkTableScanPhysiPlanTags, pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkTableScanPhysiPlanSubtable, nodeToJson, pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid);
}
@ -1657,6 +1665,12 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanTags, &pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkTableScanPhysiPlanSubtable, &pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid);
}
@ -2270,6 +2284,37 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkStreamPartitionPhysiPlanTags = "Tags";
static const char* jkStreamPartitionPhysiPlanSubtable = "Subtable";
static int32_t physiStreamPartitionNodeToJson(const void* pObj, SJson* pJson) {
const SStreamPartitionPhysiNode* pNode = (const SStreamPartitionPhysiNode*)pObj;
int32_t code = physiPartitionNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkStreamPartitionPhysiPlanTags, pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkStreamPartitionPhysiPlanSubtable, nodeToJson, pNode->pSubtable);
}
return code;
}
static int32_t jsonToPhysiStreamPartitionNode(const SJson* pJson, void* pObj) {
SStreamPartitionPhysiNode* pNode = (SStreamPartitionPhysiNode*)pObj;
int32_t code = jsonToPhysiPartitionNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkStreamPartitionPhysiPlanTags, &pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkStreamPartitionPhysiPlanSubtable, &pNode->pSubtable);
}
return code;
}
static const char* jkIndefRowsFuncPhysiPlanExprs = "Exprs";
static const char* jkIndefRowsFuncPhysiPlanFuncs = "Funcs";
@ -4109,6 +4154,8 @@ static const char* jkSelectStmtProjections = "Projections";
static const char* jkSelectStmtFrom = "From";
static const char* jkSelectStmtWhere = "Where";
static const char* jkSelectStmtPartitionBy = "PartitionBy";
static const char* jkSelectStmtTags = "Tags";
static const char* jkSelectStmtSubtable = "Subtable";
static const char* jkSelectStmtWindow = "Window";
static const char* jkSelectStmtGroupBy = "GroupBy";
static const char* jkSelectStmtHaving = "Having";
@ -4134,6 +4181,12 @@ static int32_t selectStmtToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSelectStmtPartitionBy, pNode->pPartitionByList);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSelectStmtTags, pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSelectStmtSubtable, nodeToJson, pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSelectStmtWindow, nodeToJson, pNode->pWindow);
}
@ -4178,6 +4231,12 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSelectStmtPartitionBy, &pNode->pPartitionByList);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSelectStmtTags, &pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSelectStmtSubtable, &pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSelectStmtWindow, &pNode->pWindow);
}
@ -4586,8 +4645,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return physiStreamPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return physiIndefRowsFuncNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
@ -4738,8 +4798,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return jsonToPhysiStreamPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return jsonToPhysiIndefRowsFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:

View File

@ -1993,7 +1993,9 @@ enum {
PHY_TABLE_SCAN_CODE_SCAN = 1,
PHY_TABLE_SCAN_CODE_INLINE_ATTRS,
PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS,
PHY_TABLE_SCAN_CODE_GROUP_TAGS
PHY_TABLE_SCAN_CODE_GROUP_TAGS,
PHY_TABLE_SCAN_CODE_TAGS,
PHY_TABLE_SCAN_CODE_SUBTABLE
};
static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2062,6 +2064,12 @@ static int32_t physiTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_TAGS, nodeListToMsg, pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_TAGS, nodeListToMsg, pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SUBTABLE, nodeToMsg, pNode->pSubtable);
}
return code;
}
@ -2138,6 +2146,12 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_TABLE_SCAN_CODE_GROUP_TAGS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupTags);
break;
case PHY_TABLE_SCAN_CODE_TAGS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTags);
break;
case PHY_TABLE_SCAN_CODE_SUBTABLE:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSubtable);
break;
default:
break;
}
@ -2914,6 +2928,46 @@ static int32_t msgToPhysiPartitionNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_STREAM_PARTITION_CODE_BASE_NODE = 1, PHY_STREAM_PARTITION_CODE_TAGS, PHY_STREAM_PARTITION_CODE_SUBTABLE };
static int32_t physiStreamPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SStreamPartitionPhysiNode* pNode = (const SStreamPartitionPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_STREAM_PARTITION_CODE_BASE_NODE, physiPartitionNodeToMsg, &pNode->part);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_STREAM_PARTITION_CODE_TAGS, nodeListToMsg, pNode->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_STREAM_PARTITION_CODE_SUBTABLE, nodeToMsg, pNode->pSubtable);
}
return code;
}
static int32_t msgToPhysiStreamPartitionNode(STlvDecoder* pDecoder, void* pObj) {
SStreamPartitionPhysiNode* pNode = (SStreamPartitionPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_STREAM_PARTITION_CODE_BASE_NODE:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiPartitionNode, &pNode->part);
break;
case PHY_STREAM_PARTITION_CODE_TAGS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTags);
break;
case PHY_STREAM_PARTITION_CODE_SUBTABLE:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSubtable);
break;
default:
break;
}
}
return code;
}
enum { PHY_INDEF_ROWS_FUNC_CODE_BASE_NODE = 1, PHY_INDEF_ROWS_FUNC_CODE_EXPRS, PHY_INDEF_ROWS_FUNC_CODE_FUNCS };
static int32_t physiIndefRowsFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -3592,9 +3646,11 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = physiStateWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
code = physiPartitionNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
code = physiStreamPartitionNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
code = physiIndefRowsFuncNodeToMsg(pObj, pEncoder);
break;
@ -3727,9 +3783,11 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code = msgToPhysiStateWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
code = msgToPhysiPartitionNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
code = msgToPhysiStreamPartitionNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
code = msgToPhysiIndefRowsFuncNode(pDecoder, pObj);
break;

View File

@ -378,6 +378,8 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa
nodesWalkExpr(pSelect->pWhere, walker, pContext);
case SQL_CLAUSE_WHERE:
nodesWalkExprs(pSelect->pPartitionByList, walker, pContext);
nodesWalkExprs(pSelect->pTags, walker, pContext);
nodesWalkExpr(pSelect->pSubtable, walker, pContext);
case SQL_CLAUSE_PARTITION_BY:
nodesWalkExpr(pSelect->pWindow, walker, pContext);
case SQL_CLAUSE_WINDOW:
@ -412,6 +414,8 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit
nodesRewriteExpr(&(pSelect->pWhere), rewriter, pContext);
case SQL_CLAUSE_WHERE:
nodesRewriteExprs(pSelect->pPartitionByList, rewriter, pContext);
nodesRewriteExprs(pSelect->pTags, rewriter, pContext);
nodesRewriteExpr(&(pSelect->pSubtable), rewriter, pContext);
case SQL_CLAUSE_PARTITION_BY:
nodesRewriteExpr(&(pSelect->pWindow), rewriter, pContext);
case SQL_CLAUSE_WINDOW:

View File

@ -772,6 +772,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pFromTable);
nodesDestroyNode(pStmt->pWhere);
nodesDestroyList(pStmt->pPartitionByList);
nodesDestroyList(pStmt->pTags);
nodesDestroyNode(pStmt->pSubtable);
nodesDestroyNode(pStmt->pWindow);
nodesDestroyList(pStmt->pGroupByList);
nodesDestroyNode(pStmt->pHaving);

View File

@ -147,7 +147,7 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, STok
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pDbName);
SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pOptions);
SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, int32_t maxSpeed);
SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
SNode* createAlterTableOptions(SAstCreateContext* pCxt);
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal);
@ -212,7 +212,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNode* pQuery);
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pStreamName);
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);

View File

@ -159,7 +159,7 @@ cmd ::= DROP DATABASE exists_opt(A) db_name(B).
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A); }
cmd ::= ALTER DATABASE db_name(A) alter_db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); }
cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); }
cmd ::= TRIM DATABASE db_name(A). { pCxt->pRootNode = createTrimDatabaseStmt(pCxt, &A); }
cmd ::= TRIM DATABASE db_name(A) speed_opt(B). { pCxt->pRootNode = createTrimDatabaseStmt(pCxt, &A, B); }
%type not_exists_opt { bool }
%destructor not_exists_opt { }
@ -246,6 +246,11 @@ retention_list(A) ::= retention_list(B) NK_COMMA retention(C).
retention(A) ::= NK_VARIABLE(B) NK_COLON NK_VARIABLE(C). { A = createNodeListNodeEx(pCxt, createDurationValueNode(pCxt, &B), createDurationValueNode(pCxt, &C)); }
%type speed_opt { int32_t }
%destructor speed_opt { }
speed_opt(A) ::= . { A = 0; }
speed_opt(A) ::= MAX_SPEED NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); }
/************************************************ create/drop table/stable ********************************************/
cmd ::= CREATE TABLE not_exists_opt(A) full_table_name(B)
NK_LP column_def_list(C) NK_RP tags_def_opt(D) table_options(E). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, B, C, D, E); }
@ -501,8 +506,8 @@ bufsize_opt(A) ::= .
bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); }
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A)
stream_options(B) INTO full_table_name(C) AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, D); }
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) tags_def_opt(F) subtable_opt(G) AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
@ -512,6 +517,9 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C).
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); }
@ -909,7 +917,16 @@ where_clause_opt(A) ::= WHERE search_condition(B).
%type partition_by_clause_opt { SNodeList* }
%destructor partition_by_clause_opt { nodesDestroyList($$); }
partition_by_clause_opt(A) ::= . { A = NULL; }
partition_by_clause_opt(A) ::= PARTITION BY expression_list(B). { A = B; }
partition_by_clause_opt(A) ::= PARTITION BY partition_list(B). { A = B; }
%type partition_list { SNodeList* }
%destructor partition_list { nodesDestroyList($$); }
partition_list(A) ::= partition_item(B). { A = createNodeList(pCxt, B); }
partition_list(A) ::= partition_list(B) NK_COMMA partition_item(C). { A = addNodeToList(pCxt, B, C); }
partition_item(A) ::= expr_or_subquery(B). { A = releaseRawExprNode(pCxt, B); }
partition_item(A) ::= expr_or_subquery(B) column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
partition_item(A) ::= expr_or_subquery(B) AS column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
twindow_clause_opt(A) ::= . { A = NULL; }
twindow_clause_opt(A) ::=

View File

@ -1055,7 +1055,7 @@ SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) {
return (SNode*)pStmt;
}
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) {
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, int32_t maxSpeed) {
CHECK_PARSER_STATUS(pCxt);
if (!checkDbName(pCxt, pDbName, false)) {
return NULL;
@ -1063,6 +1063,7 @@ SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) {
STrimDatabaseStmt* pStmt = (STrimDatabaseStmt*)nodesMakeNode(QUERY_NODE_TRIM_DATABASE_STMT);
CHECK_OUT_OF_MEM(pStmt);
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
pStmt->maxSpeed = maxSpeed;
return (SNode*)pStmt;
}
@ -1700,7 +1701,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
}
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNode* pQuery) {
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
CHECK_OUT_OF_MEM(pStmt);
@ -1713,6 +1714,8 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const
pStmt->ignoreExists = ignoreExists;
pStmt->pOptions = (SStreamOptions*)pOptions;
pStmt->pQuery = pQuery;
pStmt->pTags = pTags;
pStmt->pSubtable = pSubtable;
return (SNode*)pStmt;
}

View File

@ -275,6 +275,12 @@ static int32_t calcConstSelectFrom(SCalcConstContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) {
code = calcConstList(pSelect->pPartitionByList);
}
if (TSDB_CODE_SUCCESS == code) {
code = calcConstList(pSelect->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = calcConstNode(&pSelect->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = calcConstNode(&pSelect->pWindow);
}

View File

@ -129,6 +129,7 @@ static SKeyword keywordTable[] = {
{"MATCH", TK_MATCH},
{"MAXROWS", TK_MAXROWS},
{"MAX_DELAY", TK_MAX_DELAY},
{"MAX_SPEED", TK_MAX_SPEED},
{"MERGE", TK_MERGE},
{"META", TK_META},
{"MINROWS", TK_MINROWS},
@ -200,6 +201,7 @@ static SKeyword keywordTable[] = {
{"STREAMS", TK_STREAMS},
{"STRICT", TK_STRICT},
{"SUBSCRIPTIONS", TK_SUBSCRIPTIONS},
{"SUBTABLE", TK_SUBTABLE},
{"SYSINFO", TK_SYSINFO},
{"TABLE", TK_TABLE},
{"TABLES", TK_TABLES},

View File

@ -3076,7 +3076,14 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelec
return TSDB_CODE_SUCCESS;
}
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
return translateExprList(pCxt, pSelect->pPartitionByList);
int32_t code = translateExprList(pCxt, pSelect->pPartitionByList);
if (TSDB_CODE_SUCCESS == code) {
code = translateExprList(pCxt, pSelect->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pSelect->pSubtable);
}
return code;
}
static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
@ -3968,7 +3975,7 @@ static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStm
}
static int32_t translateTrimDatabase(STranslateContext* pCxt, STrimDatabaseStmt* pStmt) {
STrimDbReq req = {0};
STrimDbReq req = {.maxSpeed = pStmt->maxSpeed};
SName name = {0};
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, req.db);
@ -5211,6 +5218,93 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
return code;
}
static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SSelectStmt* pSelect) {
if (NULL == pStmt->pTags) {
return TSDB_CODE_SUCCESS;
}
SNode* pTag = NULL;
FOREACH(pTag, pStmt->pTags) {
bool found = false;
SNode* pPart = NULL;
FOREACH(pPart, pSelect->pPartitionByList) {
if (0 == strcmp(((SColumnDefNode*)pTag)->colName, ((SExprNode*)pPart)->userAlias)) {
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSelect->pTags, nodesCloneNode(pPart))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
found = true;
break;
}
}
if (!found) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnDefNode*)pTag)->colName);
}
}
return TSDB_CODE_SUCCESS;
}
typedef struct SRewriteSubtableCxt {
STranslateContext* pCxt;
SNodeList* pPartitionList;
} SRewriteSubtableCxt;
static EDealRes rewriteSubtable(SNode** pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
SRewriteSubtableCxt* pCxt = pContext;
bool found = false;
SNode* pPart = NULL;
FOREACH(pPart, pCxt->pPartitionList) {
if (0 == strcmp(((SColumnNode*)*pNode)->colName, ((SExprNode*)pPart)->userAlias)) {
SNode* pNew = nodesCloneNode(pPart);
if (NULL == pNew) {
pCxt->pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*pNode);
*pNode = pNew;
found = true;
break;
}
if (!found) {
return generateDealNodeErrMsg(pCxt->pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)*pNode)->colName);
}
}
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
SSelectStmt* pSelect) {
if (NULL == pStmt->pSubtable) {
return TSDB_CODE_SUCCESS;
}
pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable);
if (NULL == pSelect->pSubtable) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRewriteSubtableCxt cxt = {.pCxt = pCxt, .pPartitionList = pSelect->pPartitionByList};
nodesRewriteExpr(&pSelect->pSubtable, rewriteSubtable, &cxt);
return pCxt->errCode;
}
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pSelect->pPartitionByList) {
if (NULL != pStmt->pTags || NULL != pStmt->pSubtable) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
return TSDB_CODE_SUCCESS;
}
int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
if (TSDB_CODE_SUCCESS == code) {
code = addSubtableNameToCreateStreamQuery(pCxt, pStmt, pSelect);
}
return code;
}
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
@ -5220,18 +5314,21 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) {
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true;
int32_t code = addWstartTsToCreateStreamQuery(pStmt);
int32_t code = addWstartTsToCreateStreamQuery(pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pStmt);
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt);
code = translateQuery(pCxt, pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt, false, &pReq->ast, NULL);
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
return code;
}
@ -5249,7 +5346,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
tNameExtractFullName(&name, pReq->targetStbFullName);
}
int32_t code = buildCreateStreamQuery(pCxt, pStmt->pQuery, pReq);
int32_t code = buildCreateStreamQuery(pCxt, pStmt, pReq);
if (TSDB_CODE_SUCCESS == code) {
pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) {
@ -5262,6 +5359,8 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0);
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->igExpired = pStmt->pOptions->ignoreExpired;
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
}
return code;

File diff suppressed because it is too large Load Diff

View File

@ -610,6 +610,20 @@ TEST_F(ParserInitialCTest, createStream) {
expect.igExpired = igExpired;
};
auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) {
SField field = {0};
strcpy(field.name, pFieldName);
field.type = type;
field.bytes = bytes > 0 ? bytes : tDataTypes[type].bytes;
field.flags |= COL_SMA_ON;
if (NULL == expect.pTags) {
expect.pTags = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SField));
}
taosArrayPush(expect.pTags, &field);
expect.numOfTags += 1;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_STREAM_STMT);
SCMCreateStreamReq req = {0};
@ -625,6 +639,19 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ(req.maxDelay, expect.maxDelay);
ASSERT_EQ(req.watermark, expect.watermark);
ASSERT_EQ(req.igExpired, expect.igExpired);
ASSERT_EQ(req.numOfTags, expect.numOfTags);
if (expect.numOfTags > 0) {
ASSERT_EQ(taosArrayGetSize(req.pTags), expect.numOfTags);
ASSERT_EQ(taosArrayGetSize(req.pTags), taosArrayGetSize(expect.pTags));
for (int32_t i = 0; i < expect.numOfTags; ++i) {
SField* pField = (SField*)taosArrayGet(req.pTags, i);
SField* pExpectField = (SField*)taosArrayGet(expect.pTags, i);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
ASSERT_EQ(pField->flags, pExpectField->flags);
}
}
tFreeSCMCreateStreamReq(&req);
});
@ -640,6 +667,17 @@ TEST_F(ParserInitialCTest, createStream) {
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 INTO st1 AS SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReqFunc(
"s1", "test",
"create stream s1 into st3 tags(tname varchar(10), id int) subtable(concat('new-', tname)) as "
"select _wstart wstart, count(*) cnt from st1 partition by tbname tname, tag1 id interval(10s)",
"st3");
addTag("tname", TSDB_DATA_TYPE_VARCHAR, 10 + VARSTR_HEADER_SIZE);
addTag("id", TSDB_DATA_TYPE_INT);
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
clearCreateStreamReq();
}
TEST_F(ParserInitialCTest, createStreamSemanticCheck) {

View File

@ -250,7 +250,10 @@ TEST_F(ParserShowToUseTest, trimDatabase) {
STrimDbReq expect = {0};
auto setTrimDbReq = [&](const char* pDb) { snprintf(expect.db, sizeof(expect.db), "0.%s", pDb); };
auto setTrimDbReq = [&](const char* pDb, int32_t maxSpeed = 0) {
snprintf(expect.db, sizeof(expect.db), "0.%s", pDb);
expect.maxSpeed = maxSpeed;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_TRIM_DATABASE_STMT);
@ -258,10 +261,14 @@ TEST_F(ParserShowToUseTest, trimDatabase) {
STrimDbReq req = {0};
ASSERT_EQ(tDeserializeSTrimDbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.db), std::string(expect.db));
ASSERT_EQ(req.maxSpeed, expect.maxSpeed);
});
setTrimDbReq("wxy_db");
run("TRIM DATABASE wxy_db");
setTrimDbReq("wxy_db", 100);
run("TRIM DATABASE wxy_db MAX_SPEED 100");
}
TEST_F(ParserShowToUseTest, useDatabase) {

View File

@ -1022,6 +1022,20 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) {
pPartition->pTags = nodesCloneList(pSelect->pTags);
if (NULL == pPartition->pTags) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pSubtable) {
pPartition->pSubtable = nodesCloneNode(pSelect->pSubtable);
if (NULL == pPartition->pSubtable) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pPartition;
} else {

View File

@ -1615,6 +1615,8 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags);
TSWAP(((SPartitionLogicNode*)pNode)->pTags, pScan->pTags);
TSWAP(((SPartitionLogicNode*)pNode)->pSubtable, pScan->pSubtable);
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) {
code = adjustLogicNodeDataRequirement((SLogicNode*)pScan, pNode->resultDataOrder);

View File

@ -563,7 +563,16 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->igExpired = pScanLogicNode->igExpired;
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pTags,
&pTableScan->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pSubtable,
&pTableScan->pSubtable);
}
return code;
}
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
@ -1322,11 +1331,10 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return code;
}
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pPartLogicNode,
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION);
static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SPartitionLogicNode* pPartLogicNode, ENodeType type,
SPhysiNode** pPhyNode) {
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, type);
if (NULL == pPart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -1371,6 +1379,34 @@ static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
return code;
}
static int32_t createStreamPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
SStreamPartitionPhysiNode* pPart = NULL;
int32_t code = createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode,
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, (SPhysiNode**)&pPart);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pTags, &pPart->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pSubtable, &pPart->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pPart;
} else {
nodesDestroyNode((SNode*)pPart);
}
return code;
}
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
if (pCxt->pPlanCxt->streamQuery) {
return createStreamPartitionPhysiNode(pCxt, pChildren, pPartLogicNode, pPhyNode);
}
return createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION, pPhyNode);
}
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
SPhysiNode** pPhyNode) {
SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFillNode, QUERY_NODE_PHYSICAL_PLAN_FILL);

View File

@ -32,6 +32,12 @@ TEST_F(PlanOtherTest, createStream) {
run("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 "
"interval(10s)");
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, c1 id INTERVAL(10S)");
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
}
TEST_F(PlanOtherTest, createStreamUseSTable) {