refactor(sync): add vnode snapshot test
This commit is contained in:
commit
48fd4d8c33
|
@ -55,11 +55,11 @@ extern int32_t tMsgDict[];
|
||||||
|
|
||||||
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
|
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
|
||||||
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
|
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
|
||||||
#define TMSG_INFO(TYPE) \
|
#define TMSG_INFO(TYPE) \
|
||||||
((TYPE) >= 0 && \
|
((TYPE) >= 0 && ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || \
|
||||||
((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \
|
(TYPE) < TDMT_SCH_MAX_MSG || (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || \
|
||||||
(TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG)) \
|
(TYPE) < TDMT_SYNC_MAX_MSG)) \
|
||||||
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
|
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
|
||||||
: 0
|
: 0
|
||||||
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
|
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
|
||||||
|
|
||||||
|
@ -815,6 +815,13 @@ typedef struct {
|
||||||
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
||||||
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
} STrimDbReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq);
|
||||||
|
int32_t tDeserializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
int32_t numOfStables;
|
int32_t numOfStables;
|
||||||
|
|
|
@ -116,6 +116,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "alter-db", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "alter-db", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB, "trim-db", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL)
|
||||||
|
|
|
@ -74,201 +74,202 @@
|
||||||
#define TK_DATABASE 56
|
#define TK_DATABASE 56
|
||||||
#define TK_USE 57
|
#define TK_USE 57
|
||||||
#define TK_FLUSH 58
|
#define TK_FLUSH 58
|
||||||
#define TK_IF 59
|
#define TK_TRIM 59
|
||||||
#define TK_NOT 60
|
#define TK_IF 60
|
||||||
#define TK_EXISTS 61
|
#define TK_NOT 61
|
||||||
#define TK_BUFFER 62
|
#define TK_EXISTS 62
|
||||||
#define TK_CACHELAST 63
|
#define TK_BUFFER 63
|
||||||
#define TK_CACHELASTSIZE 64
|
#define TK_CACHELAST 64
|
||||||
#define TK_COMP 65
|
#define TK_CACHELASTSIZE 65
|
||||||
#define TK_DURATION 66
|
#define TK_COMP 66
|
||||||
#define TK_NK_VARIABLE 67
|
#define TK_DURATION 67
|
||||||
#define TK_FSYNC 68
|
#define TK_NK_VARIABLE 68
|
||||||
#define TK_MAXROWS 69
|
#define TK_FSYNC 69
|
||||||
#define TK_MINROWS 70
|
#define TK_MAXROWS 70
|
||||||
#define TK_KEEP 71
|
#define TK_MINROWS 71
|
||||||
#define TK_PAGES 72
|
#define TK_KEEP 72
|
||||||
#define TK_PAGESIZE 73
|
#define TK_PAGES 73
|
||||||
#define TK_PRECISION 74
|
#define TK_PAGESIZE 74
|
||||||
#define TK_REPLICA 75
|
#define TK_PRECISION 75
|
||||||
#define TK_STRICT 76
|
#define TK_REPLICA 76
|
||||||
#define TK_WAL 77
|
#define TK_STRICT 77
|
||||||
#define TK_VGROUPS 78
|
#define TK_WAL 78
|
||||||
#define TK_SINGLE_STABLE 79
|
#define TK_VGROUPS 79
|
||||||
#define TK_RETENTIONS 80
|
#define TK_SINGLE_STABLE 80
|
||||||
#define TK_SCHEMALESS 81
|
#define TK_RETENTIONS 81
|
||||||
#define TK_NK_COLON 82
|
#define TK_SCHEMALESS 82
|
||||||
#define TK_TABLE 83
|
#define TK_NK_COLON 83
|
||||||
#define TK_NK_LP 84
|
#define TK_TABLE 84
|
||||||
#define TK_NK_RP 85
|
#define TK_NK_LP 85
|
||||||
#define TK_STABLE 86
|
#define TK_NK_RP 86
|
||||||
#define TK_ADD 87
|
#define TK_STABLE 87
|
||||||
#define TK_COLUMN 88
|
#define TK_ADD 88
|
||||||
#define TK_MODIFY 89
|
#define TK_COLUMN 89
|
||||||
#define TK_RENAME 90
|
#define TK_MODIFY 90
|
||||||
#define TK_TAG 91
|
#define TK_RENAME 91
|
||||||
#define TK_SET 92
|
#define TK_TAG 92
|
||||||
#define TK_NK_EQ 93
|
#define TK_SET 93
|
||||||
#define TK_USING 94
|
#define TK_NK_EQ 94
|
||||||
#define TK_TAGS 95
|
#define TK_USING 95
|
||||||
#define TK_COMMENT 96
|
#define TK_TAGS 96
|
||||||
#define TK_BOOL 97
|
#define TK_COMMENT 97
|
||||||
#define TK_TINYINT 98
|
#define TK_BOOL 98
|
||||||
#define TK_SMALLINT 99
|
#define TK_TINYINT 99
|
||||||
#define TK_INT 100
|
#define TK_SMALLINT 100
|
||||||
#define TK_INTEGER 101
|
#define TK_INT 101
|
||||||
#define TK_BIGINT 102
|
#define TK_INTEGER 102
|
||||||
#define TK_FLOAT 103
|
#define TK_BIGINT 103
|
||||||
#define TK_DOUBLE 104
|
#define TK_FLOAT 104
|
||||||
#define TK_BINARY 105
|
#define TK_DOUBLE 105
|
||||||
#define TK_TIMESTAMP 106
|
#define TK_BINARY 106
|
||||||
#define TK_NCHAR 107
|
#define TK_TIMESTAMP 107
|
||||||
#define TK_UNSIGNED 108
|
#define TK_NCHAR 108
|
||||||
#define TK_JSON 109
|
#define TK_UNSIGNED 109
|
||||||
#define TK_VARCHAR 110
|
#define TK_JSON 110
|
||||||
#define TK_MEDIUMBLOB 111
|
#define TK_VARCHAR 111
|
||||||
#define TK_BLOB 112
|
#define TK_MEDIUMBLOB 112
|
||||||
#define TK_VARBINARY 113
|
#define TK_BLOB 113
|
||||||
#define TK_DECIMAL 114
|
#define TK_VARBINARY 114
|
||||||
#define TK_MAX_DELAY 115
|
#define TK_DECIMAL 115
|
||||||
#define TK_WATERMARK 116
|
#define TK_MAX_DELAY 116
|
||||||
#define TK_ROLLUP 117
|
#define TK_WATERMARK 117
|
||||||
#define TK_TTL 118
|
#define TK_ROLLUP 118
|
||||||
#define TK_SMA 119
|
#define TK_TTL 119
|
||||||
#define TK_FIRST 120
|
#define TK_SMA 120
|
||||||
#define TK_LAST 121
|
#define TK_FIRST 121
|
||||||
#define TK_SHOW 122
|
#define TK_LAST 122
|
||||||
#define TK_DATABASES 123
|
#define TK_SHOW 123
|
||||||
#define TK_TABLES 124
|
#define TK_DATABASES 124
|
||||||
#define TK_STABLES 125
|
#define TK_TABLES 125
|
||||||
#define TK_MNODES 126
|
#define TK_STABLES 126
|
||||||
#define TK_MODULES 127
|
#define TK_MNODES 127
|
||||||
#define TK_QNODES 128
|
#define TK_MODULES 128
|
||||||
#define TK_FUNCTIONS 129
|
#define TK_QNODES 129
|
||||||
#define TK_INDEXES 130
|
#define TK_FUNCTIONS 130
|
||||||
#define TK_ACCOUNTS 131
|
#define TK_INDEXES 131
|
||||||
#define TK_APPS 132
|
#define TK_ACCOUNTS 132
|
||||||
#define TK_CONNECTIONS 133
|
#define TK_APPS 133
|
||||||
#define TK_LICENCE 134
|
#define TK_CONNECTIONS 134
|
||||||
#define TK_GRANTS 135
|
#define TK_LICENCE 135
|
||||||
#define TK_QUERIES 136
|
#define TK_GRANTS 136
|
||||||
#define TK_SCORES 137
|
#define TK_QUERIES 137
|
||||||
#define TK_TOPICS 138
|
#define TK_SCORES 138
|
||||||
#define TK_VARIABLES 139
|
#define TK_TOPICS 139
|
||||||
#define TK_BNODES 140
|
#define TK_VARIABLES 140
|
||||||
#define TK_SNODES 141
|
#define TK_BNODES 141
|
||||||
#define TK_CLUSTER 142
|
#define TK_SNODES 142
|
||||||
#define TK_TRANSACTIONS 143
|
#define TK_CLUSTER 143
|
||||||
#define TK_DISTRIBUTED 144
|
#define TK_TRANSACTIONS 144
|
||||||
#define TK_CONSUMERS 145
|
#define TK_DISTRIBUTED 145
|
||||||
#define TK_SUBSCRIPTIONS 146
|
#define TK_CONSUMERS 146
|
||||||
#define TK_LIKE 147
|
#define TK_SUBSCRIPTIONS 147
|
||||||
#define TK_INDEX 148
|
#define TK_LIKE 148
|
||||||
#define TK_FUNCTION 149
|
#define TK_INDEX 149
|
||||||
#define TK_INTERVAL 150
|
#define TK_FUNCTION 150
|
||||||
#define TK_TOPIC 151
|
#define TK_INTERVAL 151
|
||||||
#define TK_AS 152
|
#define TK_TOPIC 152
|
||||||
#define TK_WITH 153
|
#define TK_AS 153
|
||||||
#define TK_META 154
|
#define TK_WITH 154
|
||||||
#define TK_CONSUMER 155
|
#define TK_META 155
|
||||||
#define TK_GROUP 156
|
#define TK_CONSUMER 156
|
||||||
#define TK_DESC 157
|
#define TK_GROUP 157
|
||||||
#define TK_DESCRIBE 158
|
#define TK_DESC 158
|
||||||
#define TK_RESET 159
|
#define TK_DESCRIBE 159
|
||||||
#define TK_QUERY 160
|
#define TK_RESET 160
|
||||||
#define TK_CACHE 161
|
#define TK_QUERY 161
|
||||||
#define TK_EXPLAIN 162
|
#define TK_CACHE 162
|
||||||
#define TK_ANALYZE 163
|
#define TK_EXPLAIN 163
|
||||||
#define TK_VERBOSE 164
|
#define TK_ANALYZE 164
|
||||||
#define TK_NK_BOOL 165
|
#define TK_VERBOSE 165
|
||||||
#define TK_RATIO 166
|
#define TK_NK_BOOL 166
|
||||||
#define TK_NK_FLOAT 167
|
#define TK_RATIO 167
|
||||||
#define TK_COMPACT 168
|
#define TK_NK_FLOAT 168
|
||||||
#define TK_VNODES 169
|
#define TK_COMPACT 169
|
||||||
#define TK_IN 170
|
#define TK_VNODES 170
|
||||||
#define TK_OUTPUTTYPE 171
|
#define TK_IN 171
|
||||||
#define TK_AGGREGATE 172
|
#define TK_OUTPUTTYPE 172
|
||||||
#define TK_BUFSIZE 173
|
#define TK_AGGREGATE 173
|
||||||
#define TK_STREAM 174
|
#define TK_BUFSIZE 174
|
||||||
#define TK_INTO 175
|
#define TK_STREAM 175
|
||||||
#define TK_TRIGGER 176
|
#define TK_INTO 176
|
||||||
#define TK_AT_ONCE 177
|
#define TK_TRIGGER 177
|
||||||
#define TK_WINDOW_CLOSE 178
|
#define TK_AT_ONCE 178
|
||||||
#define TK_IGNORE 179
|
#define TK_WINDOW_CLOSE 179
|
||||||
#define TK_EXPIRED 180
|
#define TK_IGNORE 180
|
||||||
#define TK_KILL 181
|
#define TK_EXPIRED 181
|
||||||
#define TK_CONNECTION 182
|
#define TK_KILL 182
|
||||||
#define TK_TRANSACTION 183
|
#define TK_CONNECTION 183
|
||||||
#define TK_BALANCE 184
|
#define TK_TRANSACTION 184
|
||||||
#define TK_VGROUP 185
|
#define TK_BALANCE 185
|
||||||
#define TK_MERGE 186
|
#define TK_VGROUP 186
|
||||||
#define TK_REDISTRIBUTE 187
|
#define TK_MERGE 187
|
||||||
#define TK_SPLIT 188
|
#define TK_REDISTRIBUTE 188
|
||||||
#define TK_SYNCDB 189
|
#define TK_SPLIT 189
|
||||||
#define TK_DELETE 190
|
#define TK_SYNCDB 190
|
||||||
#define TK_INSERT 191
|
#define TK_DELETE 191
|
||||||
#define TK_NULL 192
|
#define TK_INSERT 192
|
||||||
#define TK_NK_QUESTION 193
|
#define TK_NULL 193
|
||||||
#define TK_NK_ARROW 194
|
#define TK_NK_QUESTION 194
|
||||||
#define TK_ROWTS 195
|
#define TK_NK_ARROW 195
|
||||||
#define TK_TBNAME 196
|
#define TK_ROWTS 196
|
||||||
#define TK_QSTARTTS 197
|
#define TK_TBNAME 197
|
||||||
#define TK_QENDTS 198
|
#define TK_QSTARTTS 198
|
||||||
#define TK_WSTARTTS 199
|
#define TK_QENDTS 199
|
||||||
#define TK_WENDTS 200
|
#define TK_WSTARTTS 200
|
||||||
#define TK_WDURATION 201
|
#define TK_WENDTS 201
|
||||||
#define TK_CAST 202
|
#define TK_WDURATION 202
|
||||||
#define TK_NOW 203
|
#define TK_CAST 203
|
||||||
#define TK_TODAY 204
|
#define TK_NOW 204
|
||||||
#define TK_TIMEZONE 205
|
#define TK_TODAY 205
|
||||||
#define TK_CLIENT_VERSION 206
|
#define TK_TIMEZONE 206
|
||||||
#define TK_SERVER_VERSION 207
|
#define TK_CLIENT_VERSION 207
|
||||||
#define TK_SERVER_STATUS 208
|
#define TK_SERVER_VERSION 208
|
||||||
#define TK_CURRENT_USER 209
|
#define TK_SERVER_STATUS 209
|
||||||
#define TK_COUNT 210
|
#define TK_CURRENT_USER 210
|
||||||
#define TK_LAST_ROW 211
|
#define TK_COUNT 211
|
||||||
#define TK_BETWEEN 212
|
#define TK_LAST_ROW 212
|
||||||
#define TK_IS 213
|
#define TK_BETWEEN 213
|
||||||
#define TK_NK_LT 214
|
#define TK_IS 214
|
||||||
#define TK_NK_GT 215
|
#define TK_NK_LT 215
|
||||||
#define TK_NK_LE 216
|
#define TK_NK_GT 216
|
||||||
#define TK_NK_GE 217
|
#define TK_NK_LE 217
|
||||||
#define TK_NK_NE 218
|
#define TK_NK_GE 218
|
||||||
#define TK_MATCH 219
|
#define TK_NK_NE 219
|
||||||
#define TK_NMATCH 220
|
#define TK_MATCH 220
|
||||||
#define TK_CONTAINS 221
|
#define TK_NMATCH 221
|
||||||
#define TK_JOIN 222
|
#define TK_CONTAINS 222
|
||||||
#define TK_INNER 223
|
#define TK_JOIN 223
|
||||||
#define TK_SELECT 224
|
#define TK_INNER 224
|
||||||
#define TK_DISTINCT 225
|
#define TK_SELECT 225
|
||||||
#define TK_WHERE 226
|
#define TK_DISTINCT 226
|
||||||
#define TK_PARTITION 227
|
#define TK_WHERE 227
|
||||||
#define TK_BY 228
|
#define TK_PARTITION 228
|
||||||
#define TK_SESSION 229
|
#define TK_BY 229
|
||||||
#define TK_STATE_WINDOW 230
|
#define TK_SESSION 230
|
||||||
#define TK_SLIDING 231
|
#define TK_STATE_WINDOW 231
|
||||||
#define TK_FILL 232
|
#define TK_SLIDING 232
|
||||||
#define TK_VALUE 233
|
#define TK_FILL 233
|
||||||
#define TK_NONE 234
|
#define TK_VALUE 234
|
||||||
#define TK_PREV 235
|
#define TK_NONE 235
|
||||||
#define TK_LINEAR 236
|
#define TK_PREV 236
|
||||||
#define TK_NEXT 237
|
#define TK_LINEAR 237
|
||||||
#define TK_HAVING 238
|
#define TK_NEXT 238
|
||||||
#define TK_RANGE 239
|
#define TK_HAVING 239
|
||||||
#define TK_EVERY 240
|
#define TK_RANGE 240
|
||||||
#define TK_ORDER 241
|
#define TK_EVERY 241
|
||||||
#define TK_SLIMIT 242
|
#define TK_ORDER 242
|
||||||
#define TK_SOFFSET 243
|
#define TK_SLIMIT 243
|
||||||
#define TK_LIMIT 244
|
#define TK_SOFFSET 244
|
||||||
#define TK_OFFSET 245
|
#define TK_LIMIT 245
|
||||||
#define TK_ASC 246
|
#define TK_OFFSET 246
|
||||||
#define TK_NULLS 247
|
#define TK_ASC 247
|
||||||
#define TK_ID 248
|
#define TK_NULLS 248
|
||||||
#define TK_NK_BITNOT 249
|
#define TK_ID 249
|
||||||
#define TK_VALUES 250
|
#define TK_NK_BITNOT 250
|
||||||
#define TK_IMPORT 251
|
#define TK_VALUES 251
|
||||||
#define TK_NK_SEMI 252
|
#define TK_IMPORT 252
|
||||||
#define TK_FILE 253
|
#define TK_NK_SEMI 253
|
||||||
|
#define TK_FILE 254
|
||||||
|
|
||||||
#define TK_NK_SPACE 300
|
#define TK_NK_SPACE 300
|
||||||
#define TK_NK_COMMENT 301
|
#define TK_NK_COMMENT 301
|
||||||
|
|
|
@ -103,6 +103,11 @@ typedef struct SFlushDatabaseStmt {
|
||||||
char dbName[TSDB_DB_NAME_LEN];
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
} SFlushDatabaseStmt;
|
} SFlushDatabaseStmt;
|
||||||
|
|
||||||
|
typedef struct STrimDatabaseStmt {
|
||||||
|
ENodeType type;
|
||||||
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
|
} STrimDatabaseStmt;
|
||||||
|
|
||||||
typedef struct STableOptions {
|
typedef struct STableOptions {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
bool commentNull;
|
bool commentNull;
|
||||||
|
|
|
@ -112,6 +112,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_DROP_DATABASE_STMT,
|
QUERY_NODE_DROP_DATABASE_STMT,
|
||||||
QUERY_NODE_ALTER_DATABASE_STMT,
|
QUERY_NODE_ALTER_DATABASE_STMT,
|
||||||
QUERY_NODE_FLUSH_DATABASE_STMT,
|
QUERY_NODE_FLUSH_DATABASE_STMT,
|
||||||
|
QUERY_NODE_TRIM_DATABASE_STMT,
|
||||||
QUERY_NODE_CREATE_TABLE_STMT,
|
QUERY_NODE_CREATE_TABLE_STMT,
|
||||||
QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
|
QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
|
||||||
QUERY_NODE_CREATE_MULTI_TABLE_STMT,
|
QUERY_NODE_CREATE_MULTI_TABLE_STMT,
|
||||||
|
|
|
@ -69,13 +69,14 @@ typedef void (*_ref_fn_t)(const void *pObj);
|
||||||
#define T_REF_VAL_GET(x) (x)->_ref.val
|
#define T_REF_VAL_GET(x) (x)->_ref.val
|
||||||
|
|
||||||
// single writer multiple reader lock
|
// single writer multiple reader lock
|
||||||
typedef volatile int32_t SRWLatch;
|
typedef volatile int64_t SRWLatch;
|
||||||
|
|
||||||
void taosInitRWLatch(SRWLatch *pLatch);
|
void taosInitRWLatch(SRWLatch *pLatch);
|
||||||
void taosWLockLatch(SRWLatch *pLatch);
|
void taosInitReentrantRWLatch(SRWLatch *pLatch);
|
||||||
void taosWUnLockLatch(SRWLatch *pLatch);
|
void taosWLockLatch(SRWLatch *pLatch);
|
||||||
void taosRLockLatch(SRWLatch *pLatch);
|
void taosWUnLockLatch(SRWLatch *pLatch);
|
||||||
void taosRUnLockLatch(SRWLatch *pLatch);
|
void taosRLockLatch(SRWLatch *pLatch);
|
||||||
|
void taosRUnLockLatch(SRWLatch *pLatch);
|
||||||
int32_t taosWTryLockLatch(SRWLatch *pLatch);
|
int32_t taosWTryLockLatch(SRWLatch *pLatch);
|
||||||
|
|
||||||
// copy on read
|
// copy on read
|
||||||
|
|
|
@ -463,6 +463,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
|
|
||||||
pDst->info = pBlock->info;
|
pDst->info = pBlock->info;
|
||||||
pDst->info.rows = 0;
|
pDst->info.rows = 0;
|
||||||
|
pDst->info.capacity = 0;
|
||||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
|
|
|
@ -2647,6 +2647,31 @@ int32_t tDeserializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSTrimDbReq(void *buf, int32_t bufLen, STrimDbReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSTrimDbReq(void *buf, int32_t bufLen, STrimDbReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
|
int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
|
@ -323,6 +323,9 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
|
||||||
|
|
||||||
if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
|
if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
|
||||||
pIter->index += step;
|
pIter->index += step;
|
||||||
|
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,30 +15,37 @@
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
const SVnodeCfg vnodeCfgDefault = {
|
const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
||||||
.vgId = -1,
|
.dbname = "",
|
||||||
.dbname = "",
|
.dbId = 0,
|
||||||
.dbId = 0,
|
.szPage = 4096,
|
||||||
.szPage = 4096,
|
.szCache = 256,
|
||||||
.szCache = 256,
|
.szBuf = 96 * 1024 * 1024,
|
||||||
.szBuf = 96 * 1024 * 1024,
|
.isHeap = false,
|
||||||
.isHeap = false,
|
.isWeak = 0,
|
||||||
.isWeak = 0,
|
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
|
||||||
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
|
.update = 1,
|
||||||
.update = 1,
|
.compression = 2,
|
||||||
.compression = 2,
|
.slLevel = 5,
|
||||||
.slLevel = 5,
|
.days = 14400,
|
||||||
.days = 14400,
|
.minRows = 100,
|
||||||
.minRows = 100,
|
.maxRows = 4096,
|
||||||
.maxRows = 4096,
|
.keep2 = 5256000,
|
||||||
.keep2 = 5256000,
|
.keep0 = 5256000,
|
||||||
.keep0 = 5256000,
|
.keep1 = 5256000},
|
||||||
.keep1 = 5256000},
|
.walCfg =
|
||||||
.walCfg =
|
{
|
||||||
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
.vgId = -1,
|
||||||
.hashBegin = 0,
|
.fsyncPeriod = 0,
|
||||||
.hashEnd = 0,
|
.retentionPeriod = -1,
|
||||||
.hashMethod = 0};
|
.rollPeriod = -1,
|
||||||
|
.segSize = -1,
|
||||||
|
.retentionSize = -1,
|
||||||
|
.level = TAOS_WAL_WRITE,
|
||||||
|
},
|
||||||
|
.hashBegin = 0,
|
||||||
|
.hashEnd = 0,
|
||||||
|
.hashMethod = 0};
|
||||||
|
|
||||||
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
||||||
// TODO
|
// TODO
|
||||||
|
@ -79,7 +86,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
SJson *pNodeRetentions = tjsonCreateArray();
|
SJson *pNodeRetentions = tjsonCreateArray();
|
||||||
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
|
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
|
||||||
for (int32_t i = 0; i < nRetention; ++i) {
|
for (int32_t i = 0; i < nRetention; ++i) {
|
||||||
SJson * pNodeRetention = tjsonCreateObject();
|
SJson *pNodeRetention = tjsonCreateObject();
|
||||||
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
|
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
|
||||||
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
|
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
|
||||||
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
|
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
|
||||||
|
@ -156,7 +163,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
|
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
SJson * pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
|
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
|
||||||
int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
|
int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
|
||||||
if (nRetention > TSDB_RETENTION_MAX) {
|
if (nRetention > TSDB_RETENTION_MAX) {
|
||||||
nRetention = TSDB_RETENTION_MAX;
|
nRetention = TSDB_RETENTION_MAX;
|
||||||
|
|
|
@ -480,37 +480,35 @@ typedef struct SCtgOperation {
|
||||||
|
|
||||||
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||||
|
|
||||||
#define CTG_IS_LOCKED(_lock) atomic_load_32((_lock))
|
|
||||||
|
|
||||||
#define CTG_LOCK(type, _lock) do { \
|
#define CTG_LOCK(type, _lock) do { \
|
||||||
if (CTG_READ == (type)) { \
|
if (CTG_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRLockLatch(_lock); \
|
taosRLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWLockLatch(_lock); \
|
taosWLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CTG_UNLOCK(type, _lock) do { \
|
#define CTG_UNLOCK(type, _lock) do { \
|
||||||
if (CTG_READ == (type)) { \
|
if (CTG_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRUnLockLatch(_lock); \
|
taosRUnLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWUnLockLatch(_lock); \
|
taosWUnLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -789,9 +789,13 @@ _return:
|
||||||
|
|
||||||
int32_t ctgCallUserCb(void* param) {
|
int32_t ctgCallUserCb(void* param) {
|
||||||
SCtgJob* pJob = (SCtgJob*)param;
|
SCtgJob* pJob = (SCtgJob*)param;
|
||||||
|
|
||||||
|
qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
|
||||||
|
|
||||||
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
|
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
|
||||||
|
|
||||||
|
qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);
|
||||||
|
|
||||||
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -822,8 +826,6 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code));
|
|
||||||
|
|
||||||
pJob->jobResCode = code;
|
pJob->jobResCode = code;
|
||||||
|
|
||||||
//taosSsleep(2);
|
//taosSsleep(2);
|
||||||
|
|
|
@ -540,6 +540,9 @@ typedef struct SIndefOperatorInfo {
|
||||||
SArray* pPseudoColInfo;
|
SArray* pPseudoColInfo;
|
||||||
SExprSupp scalarSup;
|
SExprSupp scalarSup;
|
||||||
SNode* pCondition;
|
SNode* pCondition;
|
||||||
|
uint64_t groupId;
|
||||||
|
|
||||||
|
SSDataBlock* pNextGroupRes;
|
||||||
} SIndefOperatorInfo;
|
} SIndefOperatorInfo;
|
||||||
|
|
||||||
typedef struct SFillOperatorInfo {
|
typedef struct SFillOperatorInfo {
|
||||||
|
@ -551,6 +554,8 @@ typedef struct SFillOperatorInfo {
|
||||||
bool multigroupResult;
|
bool multigroupResult;
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
SNode* pCondition;
|
SNode* pCondition;
|
||||||
|
SArray* pColMatchColInfo;
|
||||||
|
int32_t primaryTsCol;
|
||||||
} SFillOperatorInfo;
|
} SFillOperatorInfo;
|
||||||
|
|
||||||
typedef struct SGroupbyOperatorInfo {
|
typedef struct SGroupbyOperatorInfo {
|
||||||
|
|
|
@ -571,8 +571,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
||||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||||
|
|
||||||
// if the source equals to the destination, it is to create a new column as the result of scalar function or some
|
// if the source equals to the destination, it is to create a new column as the result of scalar
|
||||||
// operators.
|
// function or some operators.
|
||||||
bool createNewColModel = (pResult == pSrcBlock);
|
bool createNewColModel = (pResult == pSrcBlock);
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
@ -580,17 +580,17 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||||
|
SInputColumnInfoData* pInputData = &pfCtx->input;
|
||||||
|
|
||||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
if (pResult->info.rows > 0 && !createNewColModel) {
|
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0],
|
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows);
|
||||||
pfCtx->input.numOfRows);
|
|
||||||
} else {
|
} else {
|
||||||
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows, &pResult->info);
|
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows = pfCtx->input.numOfRows;
|
numOfRows = pInputData->numOfRows;
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
|
|
||||||
|
@ -623,14 +623,12 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
numOfRows = dest.numOfRows;
|
numOfRows = dest.numOfRows;
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||||
ASSERT(!fmIsAggFunc(pfCtx->functionId));
|
|
||||||
|
|
||||||
// _rowts/_c0, not tbname column
|
// _rowts/_c0, not tbname column
|
||||||
if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
|
if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
|
} else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
|
||||||
pfCtx->fpSet.init(&pCtx[k], pResInfo);
|
pfCtx->fpSet.init(pfCtx, pResInfo);
|
||||||
|
|
||||||
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset
|
pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset
|
||||||
|
@ -642,6 +640,23 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows = pfCtx->fpSet.process(pfCtx);
|
numOfRows = pfCtx->fpSet.process(pfCtx);
|
||||||
|
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||||
|
// _group_key function for "partition by tbname" + csum(col_name) query
|
||||||
|
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
|
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||||
|
|
||||||
|
// todo handle the json tag
|
||||||
|
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||||
|
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||||
|
bool isNull = colDataIsNull_s(pInput, f);
|
||||||
|
if (isNull) {
|
||||||
|
colDataAppendNULL(pOutput, pResult->info.rows + f);
|
||||||
|
} else {
|
||||||
|
char* data = colDataGetData(pInput, f);
|
||||||
|
colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
||||||
taosArrayPush(pBlockList, &pSrcBlock);
|
taosArrayPush(pBlockList, &pSrcBlock);
|
||||||
|
@ -675,25 +690,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
// todo disable this
|
|
||||||
|
|
||||||
// if (pResultRow->key == NULL) {
|
|
||||||
// pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
|
|
||||||
// varDataCopy(pResultRow->key, pData);
|
|
||||||
// } else {
|
|
||||||
// ASSERT(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
|
|
||||||
// }
|
|
||||||
} else {
|
|
||||||
int64_t v = -1;
|
|
||||||
GET_TYPED_DATA(v, int64_t, type, pData);
|
|
||||||
|
|
||||||
pResultRow->win.skey = v;
|
|
||||||
pResultRow->win.ekey = v;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
||||||
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
@ -3825,6 +3821,40 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) {
|
||||||
|
int32_t order = 0;
|
||||||
|
int32_t scanFlag = 0;
|
||||||
|
|
||||||
|
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||||
|
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||||
|
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
|
||||||
|
if (pScalarSup->pExprInfo != NULL) {
|
||||||
|
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
|
||||||
|
pIndefInfo->pPseudoColInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||||
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
|
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
|
||||||
|
pIndefInfo->pPseudoColInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||||
|
@ -3839,8 +3869,6 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = 0;
|
int64_t st = 0;
|
||||||
int32_t order = 0;
|
|
||||||
int32_t scanFlag = 0;
|
|
||||||
|
|
||||||
if (pOperator->cost.openCost == 0) {
|
if (pOperator->cost.openCost == 0) {
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
|
@ -3848,42 +3876,54 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
while (1) {
|
while(1) {
|
||||||
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
// here we need to handle the existsed group results
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
||||||
if (pBlock == NULL) {
|
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
|
||||||
doSetOperatorCompleted(pOperator);
|
SqlFunctionCtx* pCtx = &pSup->pCtx[k];
|
||||||
break;
|
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
pResInfo->initialized = false;
|
||||||
|
pCtx->pOutput = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
|
||||||
|
pIndefInfo->pNextGroupRes = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
|
||||||
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
while (1) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
||||||
longjmp(pTaskInfo->env, code);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
}
|
if (pBlock == NULL) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) {
|
||||||
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
|
pIndefInfo->groupId = pBlock->info.groupId; // this is the initial group result
|
||||||
if (pScalarSup->pExprInfo != NULL) {
|
} else {
|
||||||
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
|
if (pIndefInfo->groupId != pBlock->info.groupId) { // reset output buffer and computing status
|
||||||
pIndefInfo->pPseudoColInfo);
|
pIndefInfo->groupId = pBlock->info.groupId;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
pIndefInfo->pNextGroupRes = pBlock;
|
||||||
longjmp(pTaskInfo->env, code);
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
|
||||||
|
if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
doFilter(pIndefInfo->pCondition, pInfo->pRes);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
size_t rows = pInfo->pRes->info.rows;
|
||||||
|
if (rows >= 0) {
|
||||||
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx,
|
break;
|
||||||
pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
longjmp(pTaskInfo->env, code);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doFilter(pIndefInfo->pCondition, pInfo->pRes);
|
|
||||||
|
|
||||||
size_t rows = pInfo->pRes->info.rows;
|
size_t rows = pInfo->pRes->info.rows;
|
||||||
pOperator->resultInfo.totalRows += rows;
|
pOperator->resultInfo.totalRows += rows;
|
||||||
|
|
||||||
|
@ -3928,24 +3968,23 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
||||||
numOfRows = TWOMB / pResBlock->info.rowSize;
|
numOfRows = TWOMB / pResBlock->info.rowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, numOfRows);
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
|
||||||
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||||
|
|
||||||
pOperator->name = "IndefinitOperator";
|
pOperator->name = "IndefinitOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfExpr;
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||||
|
@ -4001,10 +4040,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||||
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||||
|
|
||||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||||
|
|
||||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||||
|
|
||||||
|
int32_t numOfOutputCols = 0;
|
||||||
|
SArray* pColMatchColInfo =
|
||||||
|
extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
||||||
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
|
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
|
||||||
|
@ -4012,17 +4057,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pRes = pResBlock;
|
pInfo->pRes = pResBlock;
|
||||||
pInfo->multigroupResult = multigroupResult;
|
pInfo->multigroupResult = multigroupResult;
|
||||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||||
pOperator->name = "FillOperator";
|
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||||
pOperator->blocking = false;
|
pOperator->name = "FillOperator";
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->blocking = false;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||||
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->exprSupp.numOfExprs = num;
|
pOperator->exprSupp.numOfExprs = num;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
|
@ -637,6 +637,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||||
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
|
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
|
||||||
void* page = getBufPage(pInfo->pBuf, *pageId);
|
void* page = getBufPage(pInfo->pBuf, *pageId);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
|
||||||
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
||||||
|
|
||||||
pInfo->pageIndex += 1;
|
pInfo->pageIndex += 1;
|
||||||
|
|
|
@ -217,6 +217,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (blockDataGetNumOfRows(pBlock) > 0) {
|
if (blockDataGetNumOfRows(pBlock) > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -601,8 +602,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->groupSort)
|
if (pInfo->groupSort) {
|
||||||
{
|
|
||||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||||
if (!pInfo->hasGroupId) {
|
if (!pInfo->hasGroupId) {
|
||||||
pInfo->groupId = tupleGroupId;
|
pInfo->groupId = tupleGroupId;
|
||||||
|
|
|
@ -50,8 +50,11 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
|
||||||
|
|
||||||
static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) {
|
static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) {
|
||||||
// the first are always the timestamp column, so start from the second column.
|
// the first are always the timestamp column, so start from the second column.
|
||||||
for (int32_t i = 1; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||||
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
if (p->info.type == TSDB_DATA_TYPE_TIMESTAMP && i == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
colDataAppendNULL(p, rowIndex);
|
colDataAppendNULL(p, rowIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4502,6 +4502,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t rows = pRes->info.rows;
|
size_t rows = pRes->info.rows;
|
||||||
|
blockDataUpdateTsWindow(pRes, iaInfo->primaryTsIndex);
|
||||||
pOperator->resultInfo.totalRows += rows;
|
pOperator->resultInfo.totalRows += rows;
|
||||||
return (rows == 0) ? NULL : pRes;
|
return (rows == 0) ? NULL : pRes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,7 +139,7 @@ bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MG
|
||||||
|
|
||||||
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
|
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
|
||||||
|
|
||||||
bool fmIsVectorFunc(int32_t funcId) { return !fmIsScalarFunc(funcId) && !fmIsScanPseudoColumnFunc(funcId); }
|
bool fmIsVectorFunc(int32_t funcId) { return !fmIsScalarFunc(funcId) && !fmIsPseudoColumnFunc(funcId); }
|
||||||
|
|
||||||
bool fmIsSelectFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_FUNC); }
|
bool fmIsSelectFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_FUNC); }
|
||||||
|
|
||||||
|
|
|
@ -105,6 +105,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
||||||
return makeNode(type, sizeof(SAlterDatabaseStmt));
|
return makeNode(type, sizeof(SAlterDatabaseStmt));
|
||||||
case QUERY_NODE_FLUSH_DATABASE_STMT:
|
case QUERY_NODE_FLUSH_DATABASE_STMT:
|
||||||
return makeNode(type, sizeof(SFlushDatabaseStmt));
|
return makeNode(type, sizeof(SFlushDatabaseStmt));
|
||||||
|
case QUERY_NODE_TRIM_DATABASE_STMT:
|
||||||
|
return makeNode(type, sizeof(STrimDatabaseStmt));
|
||||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||||
return makeNode(type, sizeof(SCreateTableStmt));
|
return makeNode(type, sizeof(SCreateTableStmt));
|
||||||
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
|
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
|
||||||
|
@ -548,6 +550,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
nodesDestroyNode((SNode*)((SAlterDatabaseStmt*)pNode)->pOptions);
|
nodesDestroyNode((SNode*)((SAlterDatabaseStmt*)pNode)->pOptions);
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_FLUSH_DATABASE_STMT: // no pointer field
|
case QUERY_NODE_FLUSH_DATABASE_STMT: // no pointer field
|
||||||
|
case QUERY_NODE_TRIM_DATABASE_STMT: // no pointer field
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_CREATE_TABLE_STMT: {
|
case QUERY_NODE_CREATE_TABLE_STMT: {
|
||||||
SCreateTableStmt* pStmt = (SCreateTableStmt*)pNode;
|
SCreateTableStmt* pStmt = (SCreateTableStmt*)pNode;
|
||||||
|
|
|
@ -137,6 +137,7 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, STok
|
||||||
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pDbName);
|
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pDbName);
|
||||||
SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pOptions);
|
SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pOptions);
|
||||||
SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||||
|
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||||
SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
|
SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
|
||||||
SNode* createAlterTableOptions(SAstCreateContext* pCxt);
|
SNode* createAlterTableOptions(SAstCreateContext* pCxt);
|
||||||
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal);
|
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal);
|
||||||
|
|
|
@ -158,6 +158,7 @@ cmd ::= DROP DATABASE exists_opt(A) db_name(B).
|
||||||
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A); }
|
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A); }
|
||||||
cmd ::= ALTER DATABASE db_name(A) alter_db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); }
|
cmd ::= ALTER DATABASE db_name(A) alter_db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); }
|
||||||
cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); }
|
cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); }
|
||||||
|
cmd ::= TRIM DATABASE db_name(A). { pCxt->pRootNode = createTrimDatabaseStmt(pCxt, &A); }
|
||||||
|
|
||||||
%type not_exists_opt { bool }
|
%type not_exists_opt { bool }
|
||||||
%destructor not_exists_opt { }
|
%destructor not_exists_opt { }
|
||||||
|
@ -934,7 +935,11 @@ query_expression_body(A) ::=
|
||||||
query_primary(A) ::= query_specification(B). { A = B; }
|
query_primary(A) ::= query_specification(B). { A = B; }
|
||||||
query_primary(A) ::=
|
query_primary(A) ::=
|
||||||
NK_LP query_expression_body(B)
|
NK_LP query_expression_body(B)
|
||||||
order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { A = B; }
|
order_by_clause_opt(C) slimit_clause_opt(D) limit_clause_opt(E) NK_RP. {
|
||||||
|
A = addOrderByClause(pCxt, B, C);
|
||||||
|
A = addSlimitClause(pCxt, A, D);
|
||||||
|
A = addLimitClause(pCxt, A, E);
|
||||||
|
}
|
||||||
|
|
||||||
%type order_by_clause_opt { SNodeList* }
|
%type order_by_clause_opt { SNodeList* }
|
||||||
%destructor order_by_clause_opt { nodesDestroyList($$); }
|
%destructor order_by_clause_opt { nodesDestroyList($$); }
|
||||||
|
|
|
@ -665,6 +665,9 @@ SNode* addHavingClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pHaving) {
|
||||||
|
|
||||||
SNode* addOrderByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pOrderByList) {
|
SNode* addOrderByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pOrderByList) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
if (NULL == pOrderByList) {
|
||||||
|
return pStmt;
|
||||||
|
}
|
||||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||||
((SSelectStmt*)pStmt)->pOrderByList = pOrderByList;
|
((SSelectStmt*)pStmt)->pOrderByList = pOrderByList;
|
||||||
} else {
|
} else {
|
||||||
|
@ -675,6 +678,9 @@ SNode* addOrderByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pOrder
|
||||||
|
|
||||||
SNode* addSlimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pSlimit) {
|
SNode* addSlimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pSlimit) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
if (NULL == pSlimit) {
|
||||||
|
return pStmt;
|
||||||
|
}
|
||||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||||
((SSelectStmt*)pStmt)->pSlimit = (SLimitNode*)pSlimit;
|
((SSelectStmt*)pStmt)->pSlimit = (SLimitNode*)pSlimit;
|
||||||
}
|
}
|
||||||
|
@ -683,6 +689,9 @@ SNode* addSlimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pSlimit) {
|
||||||
|
|
||||||
SNode* addLimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit) {
|
SNode* addLimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
if (NULL == pLimit) {
|
||||||
|
return pStmt;
|
||||||
|
}
|
||||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||||
((SSelectStmt*)pStmt)->pLimit = (SLimitNode*)pLimit;
|
((SSelectStmt*)pStmt)->pLimit = (SLimitNode*)pLimit;
|
||||||
} else {
|
} else {
|
||||||
|
@ -922,7 +931,18 @@ SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) {
|
||||||
if (!checkDbName(pCxt, pDbName, false)) {
|
if (!checkDbName(pCxt, pDbName, false)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SAlterDatabaseStmt* pStmt = (SAlterDatabaseStmt*)nodesMakeNode(QUERY_NODE_FLUSH_DATABASE_STMT);
|
SFlushDatabaseStmt* pStmt = (SFlushDatabaseStmt*)nodesMakeNode(QUERY_NODE_FLUSH_DATABASE_STMT);
|
||||||
|
CHECK_OUT_OF_MEM(pStmt);
|
||||||
|
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
|
||||||
|
return (SNode*)pStmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) {
|
||||||
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
if (!checkDbName(pCxt, pDbName, false)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
STrimDatabaseStmt* pStmt = (STrimDatabaseStmt*)nodesMakeNode(QUERY_NODE_TRIM_DATABASE_STMT);
|
||||||
CHECK_OUT_OF_MEM(pStmt);
|
CHECK_OUT_OF_MEM(pStmt);
|
||||||
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
|
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
|
||||||
return (SNode*)pStmt;
|
return (SNode*)pStmt;
|
||||||
|
|
|
@ -216,6 +216,7 @@ static SKeyword keywordTable[] = {
|
||||||
{"TRANSACTION", TK_TRANSACTION},
|
{"TRANSACTION", TK_TRANSACTION},
|
||||||
{"TRANSACTIONS", TK_TRANSACTIONS},
|
{"TRANSACTIONS", TK_TRANSACTIONS},
|
||||||
{"TRIGGER", TK_TRIGGER},
|
{"TRIGGER", TK_TRIGGER},
|
||||||
|
{"TRIM", TK_TRIM},
|
||||||
{"TSERIES", TK_TSERIES},
|
{"TSERIES", TK_TSERIES},
|
||||||
{"TTL", TK_TTL},
|
{"TTL", TK_TTL},
|
||||||
{"UNION", TK_UNION},
|
{"UNION", TK_UNION},
|
||||||
|
|
|
@ -1189,7 +1189,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
||||||
if (fmIsSelectFunc(pFunc->funcId)) {
|
if (fmIsSelectFunc(pFunc->funcId)) {
|
||||||
pSelect->hasSelectFunc = true;
|
pSelect->hasSelectFunc = true;
|
||||||
++(pSelect->selectFuncNum);
|
++(pSelect->selectFuncNum);
|
||||||
} else if (fmIsAggFunc(pFunc->funcId) || fmIsIndefiniteRowsFunc(pFunc->funcId)) {
|
} else if (fmIsVectorFunc(pFunc->funcId)) {
|
||||||
pSelect->hasOtherVectorFunc = true;
|
pSelect->hasOtherVectorFunc = true;
|
||||||
}
|
}
|
||||||
pSelect->hasUniqueFunc = pSelect->hasUniqueFunc ? true : (FUNCTION_TYPE_UNIQUE == pFunc->funcType);
|
pSelect->hasUniqueFunc = pSelect->hasUniqueFunc ? true : (FUNCTION_TYPE_UNIQUE == pFunc->funcType);
|
||||||
|
@ -1514,18 +1514,11 @@ static int32_t rewriteColsToSelectValFunc(STranslateContext* pCxt, SSelectStmt*
|
||||||
typedef struct CheckAggColCoexistCxt {
|
typedef struct CheckAggColCoexistCxt {
|
||||||
STranslateContext* pTranslateCxt;
|
STranslateContext* pTranslateCxt;
|
||||||
bool existCol;
|
bool existCol;
|
||||||
int32_t selectFuncNum;
|
|
||||||
bool existOtherVectorFunc;
|
|
||||||
} CheckAggColCoexistCxt;
|
} CheckAggColCoexistCxt;
|
||||||
|
|
||||||
static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
|
static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
|
||||||
CheckAggColCoexistCxt* pCxt = (CheckAggColCoexistCxt*)pContext;
|
CheckAggColCoexistCxt* pCxt = (CheckAggColCoexistCxt*)pContext;
|
||||||
if (isVectorFunc(*pNode)) {
|
if (isVectorFunc(*pNode)) {
|
||||||
if (isSelectFunc(*pNode)) {
|
|
||||||
++(pCxt->selectFuncNum);
|
|
||||||
} else {
|
|
||||||
pCxt->existOtherVectorFunc = true;
|
|
||||||
}
|
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
SNode* pPartKey = NULL;
|
SNode* pPartKey = NULL;
|
||||||
|
@ -1542,16 +1535,15 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
|
||||||
|
|
||||||
static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow ||
|
if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow ||
|
||||||
(!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc)) {
|
(!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
CheckAggColCoexistCxt cxt = {
|
CheckAggColCoexistCxt cxt = {.pTranslateCxt = pCxt, .existCol = false};
|
||||||
.pTranslateCxt = pCxt, .existCol = false, .selectFuncNum = 0, .existOtherVectorFunc = false};
|
|
||||||
nodesRewriteExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt);
|
nodesRewriteExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt);
|
||||||
if (!pSelect->isDistinct) {
|
if (!pSelect->isDistinct) {
|
||||||
nodesRewriteExprs(pSelect->pOrderByList, doCheckAggColCoexist, &cxt);
|
nodesRewriteExprs(pSelect->pOrderByList, doCheckAggColCoexist, &cxt);
|
||||||
}
|
}
|
||||||
if (1 == cxt.selectFuncNum && !cxt.existOtherVectorFunc) {
|
if (1 == pSelect->selectFuncNum && !pSelect->hasOtherVectorFunc) {
|
||||||
return rewriteColsToSelectValFunc(pCxt, pSelect);
|
return rewriteColsToSelectValFunc(pCxt, pSelect);
|
||||||
}
|
}
|
||||||
if (cxt.existCol) {
|
if (cxt.existCol) {
|
||||||
|
@ -3152,6 +3144,14 @@ static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStm
|
||||||
return buildCmdMsg(pCxt, TDMT_MND_ALTER_DB, (FSerializeFunc)tSerializeSAlterDbReq, &alterReq);
|
return buildCmdMsg(pCxt, TDMT_MND_ALTER_DB, (FSerializeFunc)tSerializeSAlterDbReq, &alterReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateTrimDatabase(STranslateContext* pCxt, STrimDatabaseStmt* pStmt) {
|
||||||
|
STrimDbReq req = {0};
|
||||||
|
SName name = {0};
|
||||||
|
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
|
||||||
|
tNameGetFullDbName(&name, req.db);
|
||||||
|
return buildCmdMsg(pCxt, TDMT_MND_TRIM_DB, (FSerializeFunc)tSerializeSTrimDbReq, &req);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
|
static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
|
||||||
*pArray = taosArrayInit(LIST_LENGTH(pList), sizeof(SField));
|
*pArray = taosArrayInit(LIST_LENGTH(pList), sizeof(SField));
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
|
@ -4584,6 +4584,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||||
case QUERY_NODE_ALTER_DATABASE_STMT:
|
case QUERY_NODE_ALTER_DATABASE_STMT:
|
||||||
code = translateAlterDatabase(pCxt, (SAlterDatabaseStmt*)pNode);
|
code = translateAlterDatabase(pCxt, (SAlterDatabaseStmt*)pNode);
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_TRIM_DATABASE_STMT:
|
||||||
|
code = translateTrimDatabase(pCxt, (STrimDatabaseStmt*)pNode);
|
||||||
|
break;
|
||||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||||
code = translateCreateSuperTable(pCxt, (SCreateTableStmt*)pNode);
|
code = translateCreateSuperTable(pCxt, (SCreateTableStmt*)pNode);
|
||||||
break;
|
break;
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -245,13 +245,21 @@ TEST_F(ParserSelectTest, orderBy) {
|
||||||
TEST_F(ParserSelectTest, distinct) {
|
TEST_F(ParserSelectTest, distinct) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
// run("SELECT distinct c1, c2 FROM t1 WHERE c1 > 0 order by c1");
|
run("SELECT distinct c1, c2 FROM t1 WHERE c1 > 0 order by c1");
|
||||||
|
|
||||||
// run("SELECT distinct c1 + 10, c2 FROM t1 WHERE c1 > 0 order by c1 + 10, c2");
|
run("SELECT distinct c1 + 10, c2 FROM t1 WHERE c1 > 0 order by c1 + 10, c2");
|
||||||
|
|
||||||
// run("SELECT distinct c1 + 10 cc1, c2 cc2 FROM t1 WHERE c1 > 0 order by cc1, c2");
|
run("SELECT distinct c1 + 10 cc1, c2 cc2 FROM t1 WHERE c1 > 0 order by cc1, c2");
|
||||||
|
|
||||||
// run("SELECT distinct COUNT(c2) FROM t1 WHERE c1 > 0 GROUP BY c1 order by COUNT(c2)");
|
run("SELECT distinct COUNT(c2) FROM t1 WHERE c1 > 0 GROUP BY c1 order by COUNT(c2)");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ParserSelectTest, limit) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
run("SELECT c1, c2 FROM t1 LIMIT 10");
|
||||||
|
|
||||||
|
run("(SELECT c1, c2 FROM t1 LIMIT 10)");
|
||||||
}
|
}
|
||||||
|
|
||||||
// INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
|
// INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
|
||||||
|
|
|
@ -222,6 +222,25 @@ TEST_F(ParserShowToUseTest, splitVgroup) {
|
||||||
run("SPLIT VGROUP 15");
|
run("SPLIT VGROUP 15");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ParserShowToUseTest, trimDatabase) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
STrimDbReq expect = {0};
|
||||||
|
|
||||||
|
auto setTrimDbReq = [&](const char* pDb) { snprintf(expect.db, sizeof(expect.db), "0.%s", pDb); };
|
||||||
|
|
||||||
|
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||||
|
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_TRIM_DATABASE_STMT);
|
||||||
|
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_TRIM_DB);
|
||||||
|
STrimDbReq req = {0};
|
||||||
|
ASSERT_EQ(tDeserializeSTrimDbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
|
||||||
|
ASSERT_EQ(std::string(req.db), std::string(expect.db));
|
||||||
|
});
|
||||||
|
|
||||||
|
setTrimDbReq("wxy_db");
|
||||||
|
run("TRIM DATABASE wxy_db");
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ParserShowToUseTest, useDatabase) {
|
TEST_F(ParserShowToUseTest, useDatabase) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
|
|
|
@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
|
||||||
#define QW_LOCK(type, _lock) \
|
#define QW_LOCK(type, _lock) \
|
||||||
do { \
|
do { \
|
||||||
if (QW_READ == (type)) { \
|
if (QW_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRLockLatch(_lock); \
|
taosRLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWLockLatch(_lock); \
|
taosWLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define QW_UNLOCK(type, _lock) \
|
#define QW_UNLOCK(type, _lock) \
|
||||||
do { \
|
do { \
|
||||||
if (QW_READ == (type)) { \
|
if (QW_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRUnLockLatch(_lock); \
|
taosRUnLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWUnLockLatch(_lock); \
|
taosWUnLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -3623,7 +3623,8 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP) && node->opType >= OP_TYPE_NOT_EQUAL) {
|
if (FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP) &&
|
||||||
|
(node->opType >= OP_TYPE_NOT_EQUAL) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
|
||||||
stat->scalarMode = true;
|
stat->scalarMode = true;
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,11 @@ typedef enum {
|
||||||
SCH_OP_GET_STATUS,
|
SCH_OP_GET_STATUS,
|
||||||
} SCH_OP_TYPE;
|
} SCH_OP_TYPE;
|
||||||
|
|
||||||
|
typedef struct SSchDebug {
|
||||||
|
bool lockEnable;
|
||||||
|
bool apiEnable;
|
||||||
|
} SSchDebug;
|
||||||
|
|
||||||
typedef struct SSchTrans {
|
typedef struct SSchTrans {
|
||||||
void *pTrans;
|
void *pTrans;
|
||||||
void *pHandle;
|
void *pHandle;
|
||||||
|
@ -186,7 +191,7 @@ typedef struct SSchTaskProfile {
|
||||||
|
|
||||||
typedef struct SSchTask {
|
typedef struct SSchTask {
|
||||||
uint64_t taskId; // task id
|
uint64_t taskId; // task id
|
||||||
SRWLatch lock; // task lock
|
SRWLatch lock; // task reentrant lock
|
||||||
int32_t maxExecTimes; // task may exec times
|
int32_t maxExecTimes; // task may exec times
|
||||||
int32_t execId; // task current execute try index
|
int32_t execId; // task current execute try index
|
||||||
SSchLevel *level; // level
|
SSchLevel *level; // level
|
||||||
|
@ -356,8 +361,41 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
|
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
|
||||||
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
|
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
|
||||||
|
|
||||||
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
|
#define SCH_LOCK_DEBUG(...) do { if (gSCHDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
|
||||||
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
|
|
||||||
|
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||||
|
|
||||||
|
#define SCH_LOCK(type, _lock) do { \
|
||||||
|
if (SCH_READ == (type)) { \
|
||||||
|
assert(atomic_load_64(_lock) >= 0); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosRLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64(_lock) > 0); \
|
||||||
|
} else { \
|
||||||
|
assert(atomic_load_64(_lock) >= 0); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosWLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64(_lock) & TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define SCH_UNLOCK(type, _lock) do { \
|
||||||
|
if (SCH_READ == (type)) { \
|
||||||
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosRUnLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
|
} else { \
|
||||||
|
assert(atomic_load_64((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosWUnLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
@ -435,6 +473,8 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
|
||||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
|
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
|
||||||
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
|
||||||
|
extern SSchDebug gSCHDebug;
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "schInt.h"
|
#include "schInt.h"
|
||||||
|
|
||||||
tsem_t schdRspSem;
|
tsem_t schdRspSem;
|
||||||
|
SSchDebug gSCHDebug = {0};
|
||||||
|
|
||||||
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
|
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -337,14 +337,14 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
||||||
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
|
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
|
||||||
|
|
||||||
SSchTask task = {0};
|
SSchTask task = {0};
|
||||||
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel, levelNum));
|
|
||||||
|
|
||||||
SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
|
SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
|
||||||
if (NULL == pTask) {
|
if (NULL == pTask) {
|
||||||
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
|
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel, levelNum));
|
||||||
|
|
||||||
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
|
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
|
||||||
|
|
||||||
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
|
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
|
||||||
|
@ -543,9 +543,12 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
||||||
if (rsp->tbFName[0]) {
|
if (rsp->tbFName[0]) {
|
||||||
|
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
|
||||||
if (NULL == pJob->execRes.res) {
|
if (NULL == pJob->execRes.res) {
|
||||||
pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
|
pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
|
||||||
if (NULL == pJob->execRes.res) {
|
if (NULL == pJob->execRes.res) {
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -557,6 +560,8 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
||||||
|
|
||||||
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
|
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
|
||||||
pJob->execRes.msgType = TDMT_SCH_QUERY;
|
pJob->execRes.msgType = TDMT_SCH_QUERY;
|
||||||
|
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -60,6 +60,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
||||||
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
|
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
taosInitReentrantRWLatch(&pTask->lock);
|
||||||
|
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
||||||
|
|
||||||
|
@ -263,7 +264,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
||||||
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
|
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
|
||||||
|
|
||||||
SCH_LOCK(SCH_WRITE, &parent->lock);
|
SCH_LOCK_TASK(parent);
|
||||||
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
||||||
.taskId = pTask->taskId,
|
.taskId = pTask->taskId,
|
||||||
.schedId = schMgmt.sId,
|
.schedId = schMgmt.sId,
|
||||||
|
@ -272,7 +273,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
||||||
};
|
};
|
||||||
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
||||||
SCH_UNLOCK(SCH_WRITE, &parent->lock);
|
SCH_UNLOCK_TASK(parent);
|
||||||
|
|
||||||
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
||||||
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
|
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
|
||||||
|
|
|
@ -208,7 +208,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
||||||
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||||
|
|
||||||
qInfo("task %d receive dispatch rsp", pTask->taskId);
|
qDebug("task %d receive dispatch rsp", pTask->taskId);
|
||||||
|
|
||||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
@ -242,7 +242,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||||
qInfo("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
||||||
|
|
||||||
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
|
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
|
||||||
|
|
||||||
|
|
|
@ -303,7 +303,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
}
|
}
|
||||||
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
|
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
qInfo("stream continue dispatching: task %d", pTask->taskId);
|
qDebug("stream continue dispatching: task %d", pTask->taskId);
|
||||||
|
|
||||||
SRpcMsg dispatchMsg = {0};
|
SRpcMsg dispatchMsg = {0};
|
||||||
SEpSet* pEpSet = NULL;
|
SEpSet* pEpSet = NULL;
|
||||||
|
|
|
@ -141,34 +141,32 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
regfree(&idxRegPattern);
|
regfree(&idxRegPattern);
|
||||||
|
|
||||||
taosArraySort(pLogInfoArray, compareWalFileInfo);
|
taosArraySort(pLogInfoArray, compareWalFileInfo);
|
||||||
int oldSz = 0;
|
|
||||||
if (pWal->fileInfoSet) {
|
|
||||||
oldSz = taosArrayGetSize(pWal->fileInfoSet);
|
|
||||||
}
|
|
||||||
int newSz = taosArrayGetSize(pLogInfoArray);
|
|
||||||
|
|
||||||
if (oldSz > newSz) {
|
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz);
|
int actualFileNum = taosArrayGetSize(pLogInfoArray);
|
||||||
} else if (oldSz < newSz) {
|
|
||||||
for (int i = oldSz; i < newSz; i++) {
|
if (metaFileNum > actualFileNum) {
|
||||||
|
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
|
||||||
|
} else if (metaFileNum < actualFileNum) {
|
||||||
|
for (int i = metaFileNum; i < actualFileNum; i++) {
|
||||||
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
|
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
|
||||||
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pLogInfoArray);
|
taosArrayDestroy(pLogInfoArray);
|
||||||
|
|
||||||
pWal->writeCur = newSz - 1;
|
pWal->writeCur = actualFileNum - 1;
|
||||||
if (newSz > 0) {
|
if (actualFileNum > 0) {
|
||||||
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||||
|
|
||||||
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz - 1);
|
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1);
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
|
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
|
||||||
int64_t file_size = 0;
|
int64_t fileSize = 0;
|
||||||
taosStatFile(fnameStr, &file_size, NULL);
|
taosStatFile(fnameStr, &fileSize, NULL);
|
||||||
|
|
||||||
if (oldSz != newSz || pLastFileInfo->fileSize != file_size) {
|
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
|
||||||
pLastFileInfo->fileSize = file_size;
|
pLastFileInfo->fileSize = fileSize;
|
||||||
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
|
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
|
||||||
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
|
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
|
||||||
ASSERT(pWal->vers.lastVer != -1);
|
ASSERT(pWal->vers.lastVer != -1);
|
||||||
|
|
|
@ -99,7 +99,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
|
|
||||||
// delete files
|
// delete files
|
||||||
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
for (int i = pWal->writeCur; i < fileSetSize; i++) {
|
for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
|
||||||
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||||
taosRemoveFile(fnameStr);
|
taosRemoveFile(fnameStr);
|
||||||
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||||
|
@ -113,18 +113,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||||
|
|
||||||
if (pIdxTFile == NULL) {
|
if (pIdxTFile == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||||
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
|
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
ASSERT(0);
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// read idx file and get log file pos
|
// read idx file and get log file pos
|
||||||
SWalIdxEntry entry;
|
SWalIdxEntry entry;
|
||||||
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||||
|
ASSERT(0);
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -133,12 +136,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||||
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||||
if (pLogTFile == NULL) {
|
if (pLogTFile == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
// TODO
|
// TODO
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
ASSERT(0);
|
||||||
// TODO
|
// TODO
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -148,6 +153,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
ASSERT(taosValidFile(pLogTFile));
|
ASSERT(taosValidFile(pLogTFile));
|
||||||
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
|
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
|
||||||
if (size != sizeof(SWalCkHead)) {
|
if (size != sizeof(SWalCkHead)) {
|
||||||
|
ASSERT(0);
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -205,15 +211,22 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
||||||
pWal->vers.verInSnapshotting = ver;
|
pWal->vers.verInSnapshotting = ver;
|
||||||
// check file rolling
|
// check file rolling
|
||||||
if (pWal->cfg.retentionPeriod == 0) {
|
if (pWal->cfg.retentionPeriod == 0) {
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
walRoll(pWal);
|
walRoll(pWal);
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walEndSnapshot(SWal *pWal) {
|
int32_t walEndSnapshot(SWal *pWal) {
|
||||||
|
int32_t code = 0;
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
int64_t ver = pWal->vers.verInSnapshotting;
|
int64_t ver = pWal->vers.verInSnapshotting;
|
||||||
if (ver == -1) return 0;
|
if (ver == -1) {
|
||||||
|
code = -1;
|
||||||
|
goto END;
|
||||||
|
};
|
||||||
|
|
||||||
pWal->vers.snapshotVer = ver;
|
pWal->vers.snapshotVer = ver;
|
||||||
int ts = taosGetTimestampSec();
|
int ts = taosGetTimestampSec();
|
||||||
|
@ -259,12 +272,14 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
pWal->vers.verInSnapshotting = -1;
|
pWal->vers.verInSnapshotting = -1;
|
||||||
|
|
||||||
// save snapshot ver, commit ver
|
// save snapshot ver, commit ver
|
||||||
int code = walSaveMeta(pWal);
|
code = walSaveMeta(pWal);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
return -1;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
END:
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walRoll(SWal *pWal) {
|
int walRoll(SWal *pWal) {
|
||||||
|
@ -273,14 +288,14 @@ int walRoll(SWal *pWal) {
|
||||||
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pWal->pWriteLogTFile != NULL) {
|
if (pWal->pWriteLogTFile != NULL) {
|
||||||
code = taosCloseFile(&pWal->pWriteLogTFile);
|
code = taosCloseFile(&pWal->pWriteLogTFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TdFilePtr pIdxTFile, pLogTFile;
|
TdFilePtr pIdxTFile, pLogTFile;
|
||||||
|
@ -291,18 +306,20 @@ int walRoll(SWal *pWal) {
|
||||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pIdxTFile == NULL) {
|
if (pIdxTFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto END;
|
||||||
}
|
}
|
||||||
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
||||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pLogTFile == NULL) {
|
if (pLogTFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto END;
|
||||||
}
|
}
|
||||||
// terrno set inner
|
// error code was set inner
|
||||||
code = walRollFileInfo(pWal);
|
code = walRollFileInfo(pWal);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return -1;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
// switch file
|
// switch file
|
||||||
|
@ -312,7 +329,9 @@ int walRoll(SWal *pWal) {
|
||||||
ASSERT(pWal->writeCur >= 0);
|
ASSERT(pWal->writeCur >= 0);
|
||||||
|
|
||||||
pWal->lastRollSeq = walGetSeq();
|
pWal->lastRollSeq = walGetSeq();
|
||||||
return 0;
|
|
||||||
|
END:
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
|
|
|
@ -17,8 +17,10 @@
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
|
||||||
#define TD_RWLATCH_WRITE_FLAG 0x40000000
|
#define TD_RWLATCH_WRITE_FLAG 0x40000000
|
||||||
|
#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000
|
||||||
|
|
||||||
void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; }
|
void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; }
|
||||||
|
void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = TD_RWLATCH_REENTRANT_FLAG; }
|
||||||
|
|
||||||
void taosWLockLatch(SRWLatch *pLatch) {
|
void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
SRWLatch oLatch, nLatch;
|
SRWLatch oLatch, nLatch;
|
||||||
|
@ -26,8 +28,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
|
|
||||||
// Set write flag
|
// Set write flag
|
||||||
while (1) {
|
while (1) {
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
||||||
|
if (oLatch & TD_RWLATCH_REENTRANT_FLAG) {
|
||||||
|
nLatch = (((oLatch >> 32) + 1) << 32) | (oLatch & 0xFFFFFFFF);
|
||||||
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
nLoops++;
|
nLoops++;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
sched_yield();
|
sched_yield();
|
||||||
|
@ -37,14 +45,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
}
|
}
|
||||||
|
|
||||||
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
||||||
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for all reads end
|
// wait for all reads end
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch == TD_RWLATCH_WRITE_FLAG) break;
|
if (0 == (oLatch & 0xFFFFFFF)) break;
|
||||||
nLoops++;
|
nLoops++;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
sched_yield();
|
sched_yield();
|
||||||
|
@ -53,29 +61,50 @@ void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// no reentrant
|
||||||
int32_t taosWTryLockLatch(SRWLatch *pLatch) {
|
int32_t taosWTryLockLatch(SRWLatch *pLatch) {
|
||||||
SRWLatch oLatch, nLatch;
|
SRWLatch oLatch, nLatch;
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch) {
|
if (oLatch << 2) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
||||||
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) {
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); }
|
void taosWUnLockLatch(SRWLatch *pLatch) {
|
||||||
|
SRWLatch oLatch, nLatch, wLatch;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
oLatch = atomic_load_64(pLatch);
|
||||||
|
|
||||||
|
if (0 == (oLatch & TD_RWLATCH_REENTRANT_FLAG)) {
|
||||||
|
atomic_store_64(pLatch, 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
wLatch = ((oLatch << 2) >> 34);
|
||||||
|
if (wLatch) {
|
||||||
|
nLatch = ((--wLatch) << 32) | TD_RWLATCH_REENTRANT_FLAG | TD_RWLATCH_WRITE_FLAG;
|
||||||
|
} else {
|
||||||
|
nLatch = TD_RWLATCH_REENTRANT_FLAG;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void taosRLockLatch(SRWLatch *pLatch) {
|
void taosRLockLatch(SRWLatch *pLatch) {
|
||||||
SRWLatch oLatch, nLatch;
|
SRWLatch oLatch, nLatch;
|
||||||
int32_t nLoops = 0;
|
int32_t nLoops = 0;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
||||||
nLoops++;
|
nLoops++;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
|
@ -86,8 +115,8 @@ void taosRLockLatch(SRWLatch *pLatch) {
|
||||||
}
|
}
|
||||||
|
|
||||||
nLatch = oLatch + 1;
|
nLatch = oLatch + 1;
|
||||||
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); }
|
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_64(pLatch, 1); }
|
||||||
|
|
|
@ -360,15 +360,15 @@ class TDTestCase:
|
||||||
tdSql.checkRows(229)
|
tdSql.checkRows(229)
|
||||||
tdSql.checkData(0,0,0)
|
tdSql.checkData(0,0,0)
|
||||||
tdSql.query("select diff(c1) from stb1 partition by tbname ")
|
tdSql.query("select diff(c1) from stb1 partition by tbname ")
|
||||||
tdSql.checkRows(199)
|
tdSql.checkRows(190)
|
||||||
# tdSql.query("select diff(st1) from stb1 partition by tbname")
|
# tdSql.query("select diff(st1) from stb1 partition by tbname")
|
||||||
# tdSql.checkRows(229)
|
# tdSql.checkRows(229)
|
||||||
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(199)
|
tdSql.checkRows(190)
|
||||||
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(199)
|
tdSql.checkRows(190)
|
||||||
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(199)
|
tdSql.checkRows(190)
|
||||||
|
|
||||||
# # bug need fix
|
# # bug need fix
|
||||||
# tdSql.query("select diff(st1+c1) from stb1 partition by tbname slimit 1 ")
|
# tdSql.query("select diff(st1+c1) from stb1 partition by tbname slimit 1 ")
|
||||||
|
@ -378,7 +378,7 @@ class TDTestCase:
|
||||||
|
|
||||||
# bug need fix
|
# bug need fix
|
||||||
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(199)
|
tdSql.checkRows(190)
|
||||||
|
|
||||||
# bug need fix
|
# bug need fix
|
||||||
# tdSql.query("select tbname , diff(c1) from stb1 partition by tbname")
|
# tdSql.query("select tbname , diff(c1) from stb1 partition by tbname")
|
||||||
|
|
|
@ -678,15 +678,15 @@ class TDTestCase:
|
||||||
tdSql.checkRows(68)
|
tdSql.checkRows(68)
|
||||||
tdSql.checkData(0,0,1.000000000)
|
tdSql.checkData(0,0,1.000000000)
|
||||||
tdSql.query("select mavg(c1,3) from stb1 partition by tbname ")
|
tdSql.query("select mavg(c1,3) from stb1 partition by tbname ")
|
||||||
tdSql.checkRows(38)
|
tdSql.checkRows(20)
|
||||||
# tdSql.query("select mavg(st1,3) from stb1 partition by tbname")
|
# tdSql.query("select mavg(st1,3) from stb1 partition by tbname")
|
||||||
# tdSql.checkRows(38)
|
# tdSql.checkRows(38)
|
||||||
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(38)
|
tdSql.checkRows(20)
|
||||||
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(38)
|
tdSql.checkRows(20)
|
||||||
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(38)
|
tdSql.checkRows(20)
|
||||||
|
|
||||||
# # bug need fix
|
# # bug need fix
|
||||||
# tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname slimit 1 ")
|
# tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname slimit 1 ")
|
||||||
|
@ -696,7 +696,7 @@ class TDTestCase:
|
||||||
|
|
||||||
# bug need fix
|
# bug need fix
|
||||||
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
|
||||||
tdSql.checkRows(38)
|
tdSql.checkRows(20)
|
||||||
|
|
||||||
# bug need fix
|
# bug need fix
|
||||||
# tdSql.query("select tbname , mavg(c1,3) from stb1 partition by tbname")
|
# tdSql.query("select tbname , mavg(c1,3) from stb1 partition by tbname")
|
||||||
|
|
|
@ -32,9 +32,9 @@ class TDTestCase:
|
||||||
#
|
#
|
||||||
# --------------- main frame -------------------
|
# --------------- main frame -------------------
|
||||||
#
|
#
|
||||||
clientCfgDict = {'queryPolicy': '1','debugFlag': 135}
|
clientCfgDict = {'queryPolicy': '1','debugFlag': 143}
|
||||||
clientCfgDict["queryPolicy"] = '1'
|
clientCfgDict["queryPolicy"] = '1'
|
||||||
clientCfgDict["debugFlag"] = 131
|
clientCfgDict["debugFlag"] = 143
|
||||||
|
|
||||||
updatecfgDict = {'clientCfg': {}}
|
updatecfgDict = {'clientCfg': {}}
|
||||||
updatecfgDict = {'debugFlag': 143}
|
updatecfgDict = {'debugFlag': 143}
|
||||||
|
@ -480,4 +480,4 @@ class TDTestCase:
|
||||||
# add case with filename
|
# add case with filename
|
||||||
#
|
#
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
|
Loading…
Reference in New Issue