Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode_refact1
This commit is contained in:
commit
88793e2fab
|
@ -280,10 +280,14 @@ typedef struct {
|
|||
int32_t numOfTags;
|
||||
int32_t numOfSmas;
|
||||
int32_t commentLen;
|
||||
int32_t ast1Len;
|
||||
int32_t ast2Len;
|
||||
SArray* pColumns; // array of SField
|
||||
SArray* pTags; // array of SField
|
||||
SArray* pSmas; // array of SField
|
||||
char* comment;
|
||||
char* pAst1;
|
||||
char* pAst2;
|
||||
} SMCreateStbReq;
|
||||
|
||||
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
||||
|
@ -609,6 +613,8 @@ typedef struct {
|
|||
int8_t cacheLastRow;
|
||||
int8_t streamMode;
|
||||
int8_t singleSTable;
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions;
|
||||
} SDbCfgRsp;
|
||||
|
||||
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
|
||||
|
|
|
@ -86,155 +86,156 @@
|
|||
#define TK_SINGLE_STABLE 68
|
||||
#define TK_STREAM_MODE 69
|
||||
#define TK_RETENTIONS 70
|
||||
#define TK_NK_COMMA 71
|
||||
#define TK_NK_COLON 72
|
||||
#define TK_TABLE 73
|
||||
#define TK_NK_LP 74
|
||||
#define TK_NK_RP 75
|
||||
#define TK_STABLE 76
|
||||
#define TK_ADD 77
|
||||
#define TK_COLUMN 78
|
||||
#define TK_MODIFY 79
|
||||
#define TK_RENAME 80
|
||||
#define TK_TAG 81
|
||||
#define TK_SET 82
|
||||
#define TK_NK_EQ 83
|
||||
#define TK_USING 84
|
||||
#define TK_TAGS 85
|
||||
#define TK_NK_DOT 86
|
||||
#define TK_COMMENT 87
|
||||
#define TK_BOOL 88
|
||||
#define TK_TINYINT 89
|
||||
#define TK_SMALLINT 90
|
||||
#define TK_INT 91
|
||||
#define TK_INTEGER 92
|
||||
#define TK_BIGINT 93
|
||||
#define TK_FLOAT 94
|
||||
#define TK_DOUBLE 95
|
||||
#define TK_BINARY 96
|
||||
#define TK_TIMESTAMP 97
|
||||
#define TK_NCHAR 98
|
||||
#define TK_UNSIGNED 99
|
||||
#define TK_JSON 100
|
||||
#define TK_VARCHAR 101
|
||||
#define TK_MEDIUMBLOB 102
|
||||
#define TK_BLOB 103
|
||||
#define TK_VARBINARY 104
|
||||
#define TK_DECIMAL 105
|
||||
#define TK_SMA 106
|
||||
#define TK_ROLLUP 107
|
||||
#define TK_FILE_FACTOR 108
|
||||
#define TK_NK_FLOAT 109
|
||||
#define TK_DELAY 110
|
||||
#define TK_SHOW 111
|
||||
#define TK_DATABASES 112
|
||||
#define TK_TABLES 113
|
||||
#define TK_STABLES 114
|
||||
#define TK_MNODES 115
|
||||
#define TK_MODULES 116
|
||||
#define TK_QNODES 117
|
||||
#define TK_FUNCTIONS 118
|
||||
#define TK_INDEXES 119
|
||||
#define TK_FROM 120
|
||||
#define TK_ACCOUNTS 121
|
||||
#define TK_APPS 122
|
||||
#define TK_CONNECTIONS 123
|
||||
#define TK_LICENCE 124
|
||||
#define TK_GRANTS 125
|
||||
#define TK_QUERIES 126
|
||||
#define TK_SCORES 127
|
||||
#define TK_TOPICS 128
|
||||
#define TK_VARIABLES 129
|
||||
#define TK_BNODES 130
|
||||
#define TK_SNODES 131
|
||||
#define TK_LIKE 132
|
||||
#define TK_INDEX 133
|
||||
#define TK_FULLTEXT 134
|
||||
#define TK_FUNCTION 135
|
||||
#define TK_INTERVAL 136
|
||||
#define TK_TOPIC 137
|
||||
#define TK_AS 138
|
||||
#define TK_DESC 139
|
||||
#define TK_DESCRIBE 140
|
||||
#define TK_RESET 141
|
||||
#define TK_QUERY 142
|
||||
#define TK_EXPLAIN 143
|
||||
#define TK_ANALYZE 144
|
||||
#define TK_VERBOSE 145
|
||||
#define TK_NK_BOOL 146
|
||||
#define TK_RATIO 147
|
||||
#define TK_COMPACT 148
|
||||
#define TK_VNODES 149
|
||||
#define TK_IN 150
|
||||
#define TK_OUTPUTTYPE 151
|
||||
#define TK_AGGREGATE 152
|
||||
#define TK_BUFSIZE 153
|
||||
#define TK_STREAM 154
|
||||
#define TK_INTO 155
|
||||
#define TK_TRIGGER 156
|
||||
#define TK_AT_ONCE 157
|
||||
#define TK_WINDOW_CLOSE 158
|
||||
#define TK_WATERMARK 159
|
||||
#define TK_KILL 160
|
||||
#define TK_CONNECTION 161
|
||||
#define TK_MERGE 162
|
||||
#define TK_VGROUP 163
|
||||
#define TK_REDISTRIBUTE 164
|
||||
#define TK_SPLIT 165
|
||||
#define TK_SYNCDB 166
|
||||
#define TK_NULL 167
|
||||
#define TK_NK_QUESTION 168
|
||||
#define TK_NK_ARROW 169
|
||||
#define TK_ROWTS 170
|
||||
#define TK_TBNAME 171
|
||||
#define TK_QSTARTTS 172
|
||||
#define TK_QENDTS 173
|
||||
#define TK_WSTARTTS 174
|
||||
#define TK_WENDTS 175
|
||||
#define TK_WDURATION 176
|
||||
#define TK_CAST 177
|
||||
#define TK_NOW 178
|
||||
#define TK_TODAY 179
|
||||
#define TK_TIMEZONE 180
|
||||
#define TK_COUNT 181
|
||||
#define TK_FIRST 182
|
||||
#define TK_LAST 183
|
||||
#define TK_LAST_ROW 184
|
||||
#define TK_BETWEEN 185
|
||||
#define TK_IS 186
|
||||
#define TK_NK_LT 187
|
||||
#define TK_NK_GT 188
|
||||
#define TK_NK_LE 189
|
||||
#define TK_NK_GE 190
|
||||
#define TK_NK_NE 191
|
||||
#define TK_MATCH 192
|
||||
#define TK_NMATCH 193
|
||||
#define TK_CONTAINS 194
|
||||
#define TK_JOIN 195
|
||||
#define TK_INNER 196
|
||||
#define TK_SELECT 197
|
||||
#define TK_DISTINCT 198
|
||||
#define TK_WHERE 199
|
||||
#define TK_PARTITION 200
|
||||
#define TK_BY 201
|
||||
#define TK_SESSION 202
|
||||
#define TK_STATE_WINDOW 203
|
||||
#define TK_SLIDING 204
|
||||
#define TK_FILL 205
|
||||
#define TK_VALUE 206
|
||||
#define TK_NONE 207
|
||||
#define TK_PREV 208
|
||||
#define TK_LINEAR 209
|
||||
#define TK_NEXT 210
|
||||
#define TK_GROUP 211
|
||||
#define TK_HAVING 212
|
||||
#define TK_ORDER 213
|
||||
#define TK_SLIMIT 214
|
||||
#define TK_SOFFSET 215
|
||||
#define TK_LIMIT 216
|
||||
#define TK_OFFSET 217
|
||||
#define TK_ASC 218
|
||||
#define TK_NULLS 219
|
||||
#define TK_STRICT 71
|
||||
#define TK_NK_COMMA 72
|
||||
#define TK_NK_COLON 73
|
||||
#define TK_TABLE 74
|
||||
#define TK_NK_LP 75
|
||||
#define TK_NK_RP 76
|
||||
#define TK_STABLE 77
|
||||
#define TK_ADD 78
|
||||
#define TK_COLUMN 79
|
||||
#define TK_MODIFY 80
|
||||
#define TK_RENAME 81
|
||||
#define TK_TAG 82
|
||||
#define TK_SET 83
|
||||
#define TK_NK_EQ 84
|
||||
#define TK_USING 85
|
||||
#define TK_TAGS 86
|
||||
#define TK_NK_DOT 87
|
||||
#define TK_COMMENT 88
|
||||
#define TK_BOOL 89
|
||||
#define TK_TINYINT 90
|
||||
#define TK_SMALLINT 91
|
||||
#define TK_INT 92
|
||||
#define TK_INTEGER 93
|
||||
#define TK_BIGINT 94
|
||||
#define TK_FLOAT 95
|
||||
#define TK_DOUBLE 96
|
||||
#define TK_BINARY 97
|
||||
#define TK_TIMESTAMP 98
|
||||
#define TK_NCHAR 99
|
||||
#define TK_UNSIGNED 100
|
||||
#define TK_JSON 101
|
||||
#define TK_VARCHAR 102
|
||||
#define TK_MEDIUMBLOB 103
|
||||
#define TK_BLOB 104
|
||||
#define TK_VARBINARY 105
|
||||
#define TK_DECIMAL 106
|
||||
#define TK_SMA 107
|
||||
#define TK_ROLLUP 108
|
||||
#define TK_FILE_FACTOR 109
|
||||
#define TK_NK_FLOAT 110
|
||||
#define TK_DELAY 111
|
||||
#define TK_SHOW 112
|
||||
#define TK_DATABASES 113
|
||||
#define TK_TABLES 114
|
||||
#define TK_STABLES 115
|
||||
#define TK_MNODES 116
|
||||
#define TK_MODULES 117
|
||||
#define TK_QNODES 118
|
||||
#define TK_FUNCTIONS 119
|
||||
#define TK_INDEXES 120
|
||||
#define TK_FROM 121
|
||||
#define TK_ACCOUNTS 122
|
||||
#define TK_APPS 123
|
||||
#define TK_CONNECTIONS 124
|
||||
#define TK_LICENCE 125
|
||||
#define TK_GRANTS 126
|
||||
#define TK_QUERIES 127
|
||||
#define TK_SCORES 128
|
||||
#define TK_TOPICS 129
|
||||
#define TK_VARIABLES 130
|
||||
#define TK_BNODES 131
|
||||
#define TK_SNODES 132
|
||||
#define TK_LIKE 133
|
||||
#define TK_INDEX 134
|
||||
#define TK_FULLTEXT 135
|
||||
#define TK_FUNCTION 136
|
||||
#define TK_INTERVAL 137
|
||||
#define TK_TOPIC 138
|
||||
#define TK_AS 139
|
||||
#define TK_DESC 140
|
||||
#define TK_DESCRIBE 141
|
||||
#define TK_RESET 142
|
||||
#define TK_QUERY 143
|
||||
#define TK_EXPLAIN 144
|
||||
#define TK_ANALYZE 145
|
||||
#define TK_VERBOSE 146
|
||||
#define TK_NK_BOOL 147
|
||||
#define TK_RATIO 148
|
||||
#define TK_COMPACT 149
|
||||
#define TK_VNODES 150
|
||||
#define TK_IN 151
|
||||
#define TK_OUTPUTTYPE 152
|
||||
#define TK_AGGREGATE 153
|
||||
#define TK_BUFSIZE 154
|
||||
#define TK_STREAM 155
|
||||
#define TK_INTO 156
|
||||
#define TK_TRIGGER 157
|
||||
#define TK_AT_ONCE 158
|
||||
#define TK_WINDOW_CLOSE 159
|
||||
#define TK_WATERMARK 160
|
||||
#define TK_KILL 161
|
||||
#define TK_CONNECTION 162
|
||||
#define TK_MERGE 163
|
||||
#define TK_VGROUP 164
|
||||
#define TK_REDISTRIBUTE 165
|
||||
#define TK_SPLIT 166
|
||||
#define TK_SYNCDB 167
|
||||
#define TK_NULL 168
|
||||
#define TK_NK_QUESTION 169
|
||||
#define TK_NK_ARROW 170
|
||||
#define TK_ROWTS 171
|
||||
#define TK_TBNAME 172
|
||||
#define TK_QSTARTTS 173
|
||||
#define TK_QENDTS 174
|
||||
#define TK_WSTARTTS 175
|
||||
#define TK_WENDTS 176
|
||||
#define TK_WDURATION 177
|
||||
#define TK_CAST 178
|
||||
#define TK_NOW 179
|
||||
#define TK_TODAY 180
|
||||
#define TK_TIMEZONE 181
|
||||
#define TK_COUNT 182
|
||||
#define TK_FIRST 183
|
||||
#define TK_LAST 184
|
||||
#define TK_LAST_ROW 185
|
||||
#define TK_BETWEEN 186
|
||||
#define TK_IS 187
|
||||
#define TK_NK_LT 188
|
||||
#define TK_NK_GT 189
|
||||
#define TK_NK_LE 190
|
||||
#define TK_NK_GE 191
|
||||
#define TK_NK_NE 192
|
||||
#define TK_MATCH 193
|
||||
#define TK_NMATCH 194
|
||||
#define TK_CONTAINS 195
|
||||
#define TK_JOIN 196
|
||||
#define TK_INNER 197
|
||||
#define TK_SELECT 198
|
||||
#define TK_DISTINCT 199
|
||||
#define TK_WHERE 200
|
||||
#define TK_PARTITION 201
|
||||
#define TK_BY 202
|
||||
#define TK_SESSION 203
|
||||
#define TK_STATE_WINDOW 204
|
||||
#define TK_SLIDING 205
|
||||
#define TK_FILL 206
|
||||
#define TK_VALUE 207
|
||||
#define TK_NONE 208
|
||||
#define TK_PREV 209
|
||||
#define TK_LINEAR 210
|
||||
#define TK_NEXT 211
|
||||
#define TK_GROUP 212
|
||||
#define TK_HAVING 213
|
||||
#define TK_ORDER 214
|
||||
#define TK_SLIMIT 215
|
||||
#define TK_SOFFSET 216
|
||||
#define TK_LIMIT 217
|
||||
#define TK_OFFSET 218
|
||||
#define TK_ASC 219
|
||||
#define TK_NULLS 220
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
|
|
@ -47,6 +47,7 @@ typedef struct SDatabaseOptions {
|
|||
SValueNode* pNumOfVgroups;
|
||||
SValueNode* pSingleStable;
|
||||
SValueNode* pStreamMode;
|
||||
SValueNode* pStrict;
|
||||
SNodeList* pRetentions;
|
||||
} SDatabaseOptions;
|
||||
|
||||
|
|
|
@ -390,10 +390,14 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_DB_SINGLE_STABLE_OPTION 1
|
||||
#define TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION 0
|
||||
|
||||
#define TSDB_MIN_DB_STREAM_MODE_OPTION 0
|
||||
#define TSDB_MAX_DB_STREAM_MODE_OPTION 1
|
||||
#define TSDB_DB_STREAM_MODE_OPTION_OFF 0
|
||||
#define TSDB_DB_STREAM_MODE_OPTION_ON 1
|
||||
#define TSDB_DEFAULT_DB_STREAM_MODE_OPTION 0
|
||||
|
||||
#define TSDB_DB_STRICT_OPTION_OFF 0
|
||||
#define TSDB_DB_STRICT_OPTION_ON 1
|
||||
#define TSDB_DEFAULT_DB_STRICT_OPTION 0
|
||||
|
||||
#define TSDB_MAX_JOIN_TABLE_NUM 10
|
||||
#define TSDB_MAX_UNION_CLAUSE 5
|
||||
|
||||
|
|
|
@ -655,6 +655,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
|||
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfSmas) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->ast1Len) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->ast2Len) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
|
||||
SField *pField = taosArrayGet(pReq->pColumns, i);
|
||||
|
@ -680,6 +682,12 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
|||
if (pReq->commentLen > 0) {
|
||||
if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1;
|
||||
}
|
||||
if (pReq->ast1Len > 0) {
|
||||
if (tEncodeBinary(&encoder, pReq->pAst1, pReq->ast1Len) < 0) return -1;
|
||||
}
|
||||
if (pReq->ast2Len > 0) {
|
||||
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -702,6 +710,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
|||
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfSmas) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->ast1Len) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->ast2Len) < 0) return -1;
|
||||
|
||||
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
|
||||
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
|
||||
|
@ -750,6 +760,18 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
|||
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
|
||||
}
|
||||
|
||||
if (pReq->ast1Len > 0) {
|
||||
pReq->pAst1 = taosMemoryMalloc(pReq->ast1Len);
|
||||
if (pReq->pAst1 == NULL) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->pAst1) < 0) return -1;
|
||||
}
|
||||
|
||||
if (pReq->ast2Len > 0) {
|
||||
pReq->pAst2 = taosMemoryMalloc(pReq->ast2Len);
|
||||
if (pReq->pAst2 == NULL) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
|
@ -761,6 +783,8 @@ void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
|
|||
taosArrayDestroy(pReq->pTags);
|
||||
taosArrayDestroy(pReq->pSmas);
|
||||
taosMemoryFreeClear(pReq->comment);
|
||||
taosMemoryFreeClear(pReq->pAst1);
|
||||
taosMemoryFreeClear(pReq->pAst2);
|
||||
pReq->pColumns = NULL;
|
||||
pReq->pTags = NULL;
|
||||
pReq->pSmas = NULL;
|
||||
|
@ -2255,6 +2279,14 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
|
|||
if (tEncodeI8(&encoder, pRsp->update) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1;
|
||||
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
|
||||
SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i);
|
||||
if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -2286,7 +2318,24 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
|
|||
if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1;
|
||||
pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention));
|
||||
if (pRsp->pRetensions == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
|
||||
SRetention rentension = {0};
|
||||
if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1;
|
||||
if (taosArrayPush(pRsp->pRetensions, &rentension) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
|
|
|
@ -355,10 +355,14 @@ typedef struct {
|
|||
int32_t numOfTags;
|
||||
int32_t numOfSmas;
|
||||
int32_t commentLen;
|
||||
int32_t ast1Len;
|
||||
int32_t ast2Len;
|
||||
SSchema* pColumns;
|
||||
SSchema* pTags;
|
||||
SSchema* pSmas;
|
||||
char* comment;
|
||||
char* pAst1;
|
||||
char* pAst2;
|
||||
SRWLatch lock;
|
||||
} SStbObj;
|
||||
|
||||
|
|
|
@ -762,27 +762,29 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
|
|||
goto GET_DB_CFG_OVER;
|
||||
}
|
||||
|
||||
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
|
||||
cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize;
|
||||
cfgRsp.totalBlocks = pDb->cfg.totalBlocks;
|
||||
cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
|
||||
cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0;
|
||||
cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1;
|
||||
cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2;
|
||||
cfgRsp.minRows = pDb->cfg.minRows;
|
||||
cfgRsp.maxRows = pDb->cfg.maxRows;
|
||||
cfgRsp.commitTime = pDb->cfg.commitTime;
|
||||
cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod;
|
||||
cfgRsp.ttl = pDb->cfg.ttl;
|
||||
cfgRsp.walLevel = pDb->cfg.walLevel;
|
||||
cfgRsp.precision = pDb->cfg.precision;
|
||||
cfgRsp.compression = pDb->cfg.compression;
|
||||
cfgRsp.replications = pDb->cfg.replications;
|
||||
cfgRsp.quorum = pDb->cfg.quorum;
|
||||
cfgRsp.update = pDb->cfg.update;
|
||||
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||
cfgRsp.streamMode = pDb->cfg.streamMode;
|
||||
cfgRsp.singleSTable = pDb->cfg.singleSTable;
|
||||
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
|
||||
cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize;
|
||||
cfgRsp.totalBlocks = pDb->cfg.totalBlocks;
|
||||
cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
|
||||
cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0;
|
||||
cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1;
|
||||
cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2;
|
||||
cfgRsp.minRows = pDb->cfg.minRows;
|
||||
cfgRsp.maxRows = pDb->cfg.maxRows;
|
||||
cfgRsp.commitTime = pDb->cfg.commitTime;
|
||||
cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod;
|
||||
cfgRsp.ttl = pDb->cfg.ttl;
|
||||
cfgRsp.walLevel = pDb->cfg.walLevel;
|
||||
cfgRsp.precision = pDb->cfg.precision;
|
||||
cfgRsp.compression = pDb->cfg.compression;
|
||||
cfgRsp.replications = pDb->cfg.replications;
|
||||
cfgRsp.quorum = pDb->cfg.quorum;
|
||||
cfgRsp.update = pDb->cfg.update;
|
||||
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||
cfgRsp.streamMode = pDb->cfg.streamMode;
|
||||
cfgRsp.singleSTable = pDb->cfg.singleSTable;
|
||||
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
|
||||
cfgRsp.pRetensions = pDb->cfg.pRetensions;
|
||||
|
||||
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
|
@ -797,6 +799,8 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
|
|||
pReq->pRsp = pRsp;
|
||||
pReq->rspLen = contLen;
|
||||
|
||||
code = 0;
|
||||
|
||||
GET_DB_CFG_OVER:
|
||||
|
||||
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
|
|
|
@ -72,7 +72,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) +
|
||||
TSDB_STB_RESERVE_SIZE;
|
||||
+ pStb->commentLen + pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
|
||||
|
@ -93,6 +93,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfSmas, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
|
||||
|
||||
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
||||
SSchema *pSchema = &pStb->pColumns[i];
|
||||
|
@ -121,6 +123,12 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
if (pStb->commentLen > 0) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER)
|
||||
}
|
||||
if (pStb->ast1Len > 0) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
|
||||
}
|
||||
if (pStb->ast2Len > 0) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||
}
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||
|
||||
|
@ -173,6 +181,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfSmas, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
|
||||
|
||||
pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
|
||||
pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
|
||||
|
@ -210,6 +220,16 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
if (pStb->comment == NULL) goto _OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER)
|
||||
}
|
||||
if (pStb->ast1Len > 0) {
|
||||
pStb->pAst1 = taosMemoryCalloc(pStb->ast1Len, 1);
|
||||
if (pStb->pAst1 == NULL) goto _OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
|
||||
}
|
||||
if (pStb->ast2Len > 0) {
|
||||
pStb->pAst2 = taosMemoryCalloc(pStb->ast2Len, 1);
|
||||
if (pStb->pAst2 == NULL) goto _OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||
}
|
||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
@ -238,6 +258,8 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
|||
taosMemoryFreeClear(pStb->pColumns);
|
||||
taosMemoryFreeClear(pStb->pTags);
|
||||
taosMemoryFreeClear(pStb->comment);
|
||||
taosMemoryFreeClear(pStb->pAst1);
|
||||
taosMemoryFreeClear(pStb->pAst2);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -294,6 +316,30 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pOld->ast1Len < pNew->ast1Len) {
|
||||
void *pAst1 = taosMemoryMalloc(pNew->ast1Len);
|
||||
if (pAst1 != NULL) {
|
||||
taosMemoryFree(pOld->pAst1);
|
||||
pOld->pAst1 = pAst1;
|
||||
} else {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||
taosWUnLockLatch(&pOld->lock);
|
||||
}
|
||||
}
|
||||
|
||||
if (pOld->ast2Len < pNew->ast2Len) {
|
||||
void *pAst2 = taosMemoryMalloc(pNew->ast2Len);
|
||||
if (pAst2 != NULL) {
|
||||
taosMemoryFree(pOld->pAst2);
|
||||
pOld->pAst2 = pAst2;
|
||||
} else {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||
taosWUnLockLatch(&pOld->lock);
|
||||
}
|
||||
}
|
||||
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
pOld->version = pNew->version;
|
||||
pOld->nextColId = pNew->nextColId;
|
||||
|
@ -304,6 +350,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
|||
if (pNew->commentLen != 0) {
|
||||
memcpy(pOld->comment, pNew->comment, TSDB_STB_COMMENT_LEN);
|
||||
}
|
||||
if (pNew->ast1Len != 0) {
|
||||
memcpy(pOld->pAst1, pNew->pAst1, pNew->ast1Len);
|
||||
}
|
||||
if (pNew->ast2Len != 0) {
|
||||
memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
|
||||
}
|
||||
taosWUnLockLatch(&pOld->lock);
|
||||
return 0;
|
||||
}
|
||||
|
@ -645,6 +697,26 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
|||
memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen);
|
||||
}
|
||||
|
||||
stbObj.ast1Len = pCreate->ast1Len;
|
||||
if (stbObj.ast1Len > 0) {
|
||||
stbObj.pAst1 = taosMemoryCalloc(stbObj.ast1Len, 1);
|
||||
if (stbObj.pAst1 == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.pAst1, pCreate->pAst1, stbObj.ast1Len);
|
||||
}
|
||||
|
||||
stbObj.ast2Len = pCreate->ast2Len;
|
||||
if (stbObj.ast2Len > 0) {
|
||||
stbObj.pAst2 = taosMemoryCalloc(stbObj.ast2Len, 1);
|
||||
if (stbObj.pAst2 == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.pAst2, pCreate->pAst2, stbObj.ast2Len);
|
||||
}
|
||||
|
||||
stbObj.pColumns = taosMemoryMalloc(stbObj.numOfColumns * sizeof(SSchema));
|
||||
stbObj.pTags = taosMemoryMalloc(stbObj.numOfTags * sizeof(SSchema));
|
||||
stbObj.pSmas = taosMemoryMalloc(stbObj.numOfSmas * sizeof(SSchema));
|
||||
|
|
|
@ -53,6 +53,7 @@ typedef enum EDatabaseOptionType {
|
|||
DB_OPTION_VGROUPS,
|
||||
DB_OPTION_SINGLE_STABLE,
|
||||
DB_OPTION_STREAM_MODE,
|
||||
DB_OPTION_STRICT,
|
||||
DB_OPTION_RETENTIONS
|
||||
} EDatabaseOptionType;
|
||||
|
||||
|
|
|
@ -161,6 +161,7 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
|
|||
db_options(A) ::= db_options(B) SINGLE_STABLE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pSingleStable = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
|
||||
db_options(A) ::= db_options(B) STREAM_MODE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStreamMode = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
|
||||
db_options(A) ::= db_options(B) RETENTIONS retention_list(C). { ((SDatabaseOptions*)B)->pRetentions = C; A = B; }
|
||||
db_options(A) ::= db_options(B) STRICT NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStrict = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
|
||||
|
||||
alter_db_options(A) ::= alter_db_option(B). { A = createDatabaseOptions(pCxt); A = setDatabaseAlterOption(pCxt, A, &B); }
|
||||
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setDatabaseAlterOption(pCxt, B, &C); }
|
||||
|
@ -175,6 +176,7 @@ alter_db_option(A) ::= WAL NK_INTEGER(B).
|
|||
alter_db_option(A) ::= QUORUM NK_INTEGER(B). { A.type = DB_OPTION_QUORUM; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
|
||||
alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
|
||||
alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
|
||||
alter_db_option(A) ::= STRICT NK_INTEGER(B). { A.type = DB_OPTION_STRICT; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
|
||||
|
||||
%type integer_list { SNodeList* }
|
||||
%destructor integer_list { nodesDestroyList($$); }
|
||||
|
@ -359,7 +361,7 @@ from_db_opt(A) ::= FROM db_name(B).
|
|||
%type func_name_list { SNodeList* }
|
||||
%destructor func_name_list { nodesDestroyList($$); }
|
||||
func_name_list(A) ::= func_name(B). { A = createNodeList(pCxt, B); }
|
||||
func_name_list(A) ::= func_name_list(B) NK_COMMA col_name(C). { A = addNodeToList(pCxt, B, C); }
|
||||
func_name_list(A) ::= func_name_list(B) NK_COMMA func_name(C). { A = addNodeToList(pCxt, B, C); }
|
||||
|
||||
func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); }
|
||||
|
||||
|
|
|
@ -168,6 +168,7 @@ static SKeyword keywordTable[] = {
|
|||
{"STREAM", TK_STREAM},
|
||||
{"STREAMS", TK_STREAMS},
|
||||
{"STREAM_MODE", TK_STREAM_MODE},
|
||||
{"STRICT", TK_STRICT},
|
||||
{"SYNCDB", TK_SYNCDB},
|
||||
{"TABLE", TK_TABLE},
|
||||
{"TABLES", TK_TABLES},
|
||||
|
|
|
@ -198,6 +198,56 @@ static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo* pInfo) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
SName name;
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
|
||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
int32_t code = collectUseDatabaseImpl(dbFname, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = catalogGetDBCfg(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, dbFname, pInfo);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t initTranslateContext(SParseContext* pParseCxt, STranslateContext* pCxt) {
|
||||
pCxt->pParseCxt = pParseCxt;
|
||||
pCxt->errCode = TSDB_CODE_SUCCESS;
|
||||
pCxt->msgBuf.buf = pParseCxt->pMsg;
|
||||
pCxt->msgBuf.len = pParseCxt->msgLen;
|
||||
pCxt->pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
pCxt->currLevel = 0;
|
||||
pCxt->currClause = 0;
|
||||
pCxt->pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
pCxt->pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (NULL == pCxt->pNsLevel || NULL == pCxt->pDbs || NULL == pCxt->pTables) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroyTranslateContext(STranslateContext* pCxt) {
|
||||
if (NULL != pCxt->pNsLevel) {
|
||||
size_t size = taosArrayGetSize(pCxt->pNsLevel);
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
taosArrayDestroy(taosArrayGetP(pCxt->pNsLevel, i));
|
||||
}
|
||||
taosArrayDestroy(pCxt->pNsLevel);
|
||||
}
|
||||
|
||||
if (NULL != pCxt->pCmdMsg) {
|
||||
taosMemoryFreeClear(pCxt->pCmdMsg->pMsg);
|
||||
taosMemoryFreeClear(pCxt->pCmdMsg);
|
||||
}
|
||||
|
||||
taosHashCleanup(pCxt->pDbs);
|
||||
taosHashCleanup(pCxt->pTables);
|
||||
}
|
||||
|
||||
static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) {
|
||||
int cmp = 0;
|
||||
if ('\0' != pCol->dbName[0]) {
|
||||
|
@ -749,15 +799,18 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
case QUERY_NODE_REAL_TABLE: {
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)pTable;
|
||||
pRealTable->ratio = (NULL != pCxt->pExplainOpt ? pCxt->pExplainOpt->ratio : 1.0);
|
||||
SName name;
|
||||
code = getTableMetaImpl(
|
||||
pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name),
|
||||
&(pRealTable->pMeta));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
|
||||
// The SRealTableNode created through ROLLUP already has STableMeta.
|
||||
if (NULL == pRealTable->pMeta) {
|
||||
SName name;
|
||||
code = getTableMetaImpl(
|
||||
pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name),
|
||||
&(pRealTable->pMeta));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
|
||||
}
|
||||
code = setTableVgroupList(pCxt, &name, pRealTable);
|
||||
}
|
||||
pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision;
|
||||
code = setTableVgroupList(pCxt, &name, pRealTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNamespace(pCxt, pRealTable);
|
||||
}
|
||||
|
@ -1365,6 +1418,7 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
|
|||
pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION);
|
||||
pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION);
|
||||
pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
|
||||
// pReq->strict = GET_OPTION_VAL(pStmt->pOptions->pStrict, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
|
||||
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
|
||||
}
|
||||
|
||||
|
@ -1573,12 +1627,16 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, SDatabaseOptions* p
|
|||
TSDB_MAX_DB_SINGLE_STABLE_OPTION);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbEnumOption(pCxt, "streamMode", pOptions->pStreamMode, TSDB_MIN_DB_STREAM_MODE_OPTION,
|
||||
TSDB_MAX_DB_STREAM_MODE_OPTION);
|
||||
code = checkDbEnumOption(pCxt, "streamMode", pOptions->pStreamMode, TSDB_DB_STREAM_MODE_OPTION_OFF,
|
||||
TSDB_DB_STREAM_MODE_OPTION_ON);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbRetentionsOption(pCxt, pOptions->pRetentions);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbEnumOption(pCxt, "strict", pOptions->pStrict, TSDB_DB_STRICT_OPTION_OFF,
|
||||
TSDB_DB_STRICT_OPTION_ON);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1789,35 +1847,259 @@ static int32_t getAggregationMethod(SNodeList* pFuncs) {
|
|||
return ((SFunctionNode*)nodesListGetNode(pFuncs, 0))->funcId;
|
||||
}
|
||||
|
||||
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
int32_t code = checkCreateTable(pCxt, pStmt);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
|
||||
int8_t flags = 0;
|
||||
if (pCol->sma) {
|
||||
flags |= SCHEMA_SMA_ON;
|
||||
}
|
||||
pSchema->colId = colId;
|
||||
pSchema->type = pCol->dataType.type;
|
||||
pSchema->bytes = calcTypeBytes(pCol->dataType);
|
||||
pSchema->flags = flags;
|
||||
strcpy(pSchema->name, pCol->colName);
|
||||
}
|
||||
|
||||
typedef struct SSampleAstInfo {
|
||||
const char* pDbName;
|
||||
const char* pTableName;
|
||||
SNodeList* pFuncs;
|
||||
SNode* pInterval;
|
||||
SNode* pOffset;
|
||||
SNode* pSliding;
|
||||
STableMeta* pRollupTableMeta;
|
||||
} SSampleAstInfo;
|
||||
|
||||
static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, char** pAst, int32_t* pLen) {
|
||||
SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
if (NULL == pSelect) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
sprintf(pSelect->stmtName, "%p", pSelect);
|
||||
|
||||
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
if (NULL == pTable) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pTable->table.dbName, pInfo->pDbName);
|
||||
strcpy(pTable->table.tableName, pInfo->pTableName);
|
||||
TSWAP(pTable->pMeta, pInfo->pRollupTableMeta, STableMeta*);
|
||||
pSelect->pFromTable = (SNode*)pTable;
|
||||
|
||||
TSWAP(pSelect->pProjectionList, pInfo->pFuncs, SNodeList*);
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pSelect->pProjectionList || NULL == pFunc) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pFunc->functionName, "_wstartts");
|
||||
nodesListPushFront(pSelect->pProjectionList, pFunc);
|
||||
SNode* pProject = NULL;
|
||||
FOREACH(pProject, pSelect->pProjectionList) { sprintf(((SExprNode*)pProject)->aliasName, "#%p", pProject); }
|
||||
|
||||
SIntervalWindowNode* pInterval = nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
|
||||
if (NULL == pInterval) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSelect->pWindow = (SNode*)pInterval;
|
||||
TSWAP(pInterval->pInterval, pInfo->pInterval, SNode*);
|
||||
TSWAP(pInterval->pOffset, pInfo->pOffset, SNode*);
|
||||
TSWAP(pInterval->pSliding, pInfo->pSliding, SNode*);
|
||||
pInterval->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pInterval->pCol) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
|
||||
|
||||
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesNodeToString(pSelect, false, pAst, pLen);
|
||||
}
|
||||
nodesDestroyNode(pSelect);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void clearSampleAstInfo(SSampleAstInfo* pInfo) {
|
||||
nodesDestroyList(pInfo->pFuncs);
|
||||
nodesDestroyNode(pInfo->pInterval);
|
||||
nodesDestroyNode(pInfo->pOffset);
|
||||
nodesDestroyNode(pInfo->pSliding);
|
||||
}
|
||||
|
||||
static SNode* makeIntervalVal(SRetention* pRetension, int8_t precision) {
|
||||
SValueNode* pVal = nodesMakeNode(QUERY_NODE_VALUE);
|
||||
if (NULL == pVal) {
|
||||
return NULL;
|
||||
}
|
||||
int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit);
|
||||
char buf[20] = {0};
|
||||
int32_t len = snprintf(buf, sizeof(buf), "%"PRId64"%c", timeVal, pRetension->freqUnit);
|
||||
pVal->literal = strndup(buf, len);
|
||||
if (NULL == pVal->literal) {
|
||||
nodesDestroyNode(pVal);
|
||||
return NULL;
|
||||
}
|
||||
pVal->isDuration = true;
|
||||
pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
|
||||
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
|
||||
pVal->node.resType.precision = precision;
|
||||
return (SNode*)pVal;
|
||||
}
|
||||
|
||||
static SNode* createColumnFromDef(SColumnDefNode* pDef) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return NULL;
|
||||
}
|
||||
strcpy(pCol->colName, pDef->colName);
|
||||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
static SNode* createRollupFunc(SNode* pSrcFunc, SColumnDefNode* pColDef) {
|
||||
SFunctionNode* pFunc = nodesCloneNode(pSrcFunc);
|
||||
if (NULL == pFunc) {
|
||||
return NULL;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createColumnFromDef(pColDef))) {
|
||||
nodesDestroyNode(pFunc);
|
||||
return NULL;
|
||||
}
|
||||
return (SNode*)pFunc;
|
||||
}
|
||||
|
||||
static SNodeList* createRollupFuncs(SCreateTableStmt* pStmt) {
|
||||
SNodeList* pFuncs = nodesMakeList();
|
||||
if (NULL == pFuncs) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SMCreateStbReq createReq = {0};
|
||||
createReq.igExists = pStmt->ignoreExists;
|
||||
createReq.aggregationMethod = getAggregationMethod(pStmt->pOptions->pFuncs);
|
||||
createReq.xFilesFactor = GET_OPTION_VAL(pStmt->pOptions->pFilesFactor, TSDB_DEFAULT_DB_FILE_FACTOR);
|
||||
createReq.delay = GET_OPTION_VAL(pStmt->pOptions->pDelay, TSDB_DEFAULT_DB_DELAY);
|
||||
columnDefNodeToField(pStmt->pCols, &createReq.pColumns);
|
||||
columnDefNodeToField(pStmt->pTags, &createReq.pTags);
|
||||
createReq.numOfColumns = LIST_LENGTH(pStmt->pCols);
|
||||
createReq.numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||
SNode* pFunc = NULL;
|
||||
FOREACH(pFunc, pStmt->pOptions->pFuncs) {
|
||||
SNode* pCol = NULL;
|
||||
bool primaryKey = true;
|
||||
FOREACH(pCol, pStmt->pCols) {
|
||||
if (primaryKey) {
|
||||
primaryKey = false;
|
||||
continue;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pFuncs, createRollupFunc(pFunc, (SColumnDefNode*)pCol))) {
|
||||
nodesDestroyList(pFuncs);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pFuncs;
|
||||
}
|
||||
|
||||
static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precision) {
|
||||
int32_t numOfField = LIST_LENGTH(pStmt->pCols) + LIST_LENGTH(pStmt->pTags);
|
||||
STableMeta* pMeta = taosMemoryCalloc(1, sizeof(STableMeta) + numOfField * sizeof(SSchema));
|
||||
if (NULL == pMeta) {
|
||||
return NULL;
|
||||
}
|
||||
pMeta->tableType = TSDB_SUPER_TABLE;
|
||||
pMeta->tableInfo.numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||
pMeta->tableInfo.precision = precision;
|
||||
pMeta->tableInfo.numOfColumns = LIST_LENGTH(pStmt->pCols);
|
||||
|
||||
int32_t index = 0;
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pStmt->pCols) {
|
||||
toSchema((SColumnDefNode*)pCol, index + 1, pMeta->schema + index);
|
||||
++index;
|
||||
}
|
||||
SNode* pTag = NULL;
|
||||
FOREACH(pTag, pStmt->pTags) {
|
||||
toSchema((SColumnDefNode*)pTag, index + 1, pMeta->schema + index);
|
||||
++index;
|
||||
}
|
||||
|
||||
return pMeta;
|
||||
}
|
||||
|
||||
static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt,
|
||||
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, SSampleAstInfo* pInfo) {
|
||||
pInfo->pDbName = pStmt->dbName;
|
||||
pInfo->pTableName = pStmt->tableName;
|
||||
pInfo->pFuncs = createRollupFuncs(pStmt);
|
||||
pInfo->pInterval = makeIntervalVal(pRetension, precision);
|
||||
pInfo->pRollupTableMeta = createRollupTableMeta(pStmt, precision);
|
||||
if (NULL == pInfo->pFuncs || NULL == pInfo->pInterval || NULL == pInfo->pRollupTableMeta) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getRollupAst(STranslateContext* pCxt,
|
||||
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, char** pAst, int32_t* pLen) {
|
||||
SSampleAstInfo info = {0};
|
||||
int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildSampleAst(pCxt, &info, pAst, pLen);
|
||||
}
|
||||
clearSampleAstInfo(&info);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
|
||||
SDbCfgInfo dbCfg = {0};
|
||||
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
|
||||
int32_t num = taosArrayGetSize(dbCfg.pRetensions);
|
||||
if (TSDB_CODE_SUCCESS != code || num < 2) {
|
||||
return code;
|
||||
}
|
||||
for (int32_t i = 1; i < num; ++i) {
|
||||
SRetention *pRetension = taosArrayGet(dbCfg.pRetensions, i);
|
||||
STranslateContext cxt = {0};
|
||||
initTranslateContext(pCxt->pParseCxt, &cxt);
|
||||
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision,
|
||||
1 == i ? &pReq->pAst1 : &pReq->pAst2, 1 == i ? &pReq->ast1Len : &pReq->ast2Len);
|
||||
destroyTranslateContext(&cxt);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
|
||||
pReq->igExists = pStmt->ignoreExists;
|
||||
pReq->aggregationMethod = getAggregationMethod(pStmt->pOptions->pFuncs);
|
||||
pReq->xFilesFactor = GET_OPTION_VAL(pStmt->pOptions->pFilesFactor, TSDB_DEFAULT_DB_FILE_FACTOR);
|
||||
pReq->delay = GET_OPTION_VAL(pStmt->pOptions->pDelay, TSDB_DEFAULT_DB_DELAY);
|
||||
columnDefNodeToField(pStmt->pCols, &pReq->pColumns);
|
||||
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
||||
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
|
||||
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||
if (NULL == pStmt->pOptions->pSma) {
|
||||
columnDefNodeToField(pStmt->pCols, &createReq.pSmas);
|
||||
createReq.numOfSmas = createReq.numOfColumns;
|
||||
columnDefNodeToField(pStmt->pCols, &pReq->pSmas);
|
||||
pReq->numOfSmas = pReq->numOfColumns;
|
||||
} else {
|
||||
columnNodeToField(pStmt->pOptions->pSma, &createReq.pSmas);
|
||||
createReq.numOfSmas = LIST_LENGTH(pStmt->pOptions->pSma);
|
||||
columnNodeToField(pStmt->pOptions->pSma, &pReq->pSmas);
|
||||
pReq->numOfSmas = LIST_LENGTH(pStmt->pOptions->pSma);
|
||||
}
|
||||
|
||||
SName tableName = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
|
||||
strcpy(tableName.dbname, pStmt->dbName);
|
||||
strcpy(tableName.tname, pStmt->tableName);
|
||||
tNameExtractFullName(&tableName, createReq.name);
|
||||
tNameExtractFullName(&tableName, pReq->name);
|
||||
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STB, (FSerializeFunc)tSerializeSMCreateStbReq, &createReq);
|
||||
return buildRollupAst(pCxt, pStmt, pReq);
|
||||
}
|
||||
|
||||
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
SMCreateStbReq createReq = {0};
|
||||
int32_t code = checkCreateTable(pCxt, pStmt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCreateStbReq(pCxt, pStmt, &createReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STB, (FSerializeFunc)tSerializeSMCreateStbReq, &createReq);
|
||||
}
|
||||
tFreeSMCreateStbReq(&createReq);
|
||||
return code;
|
||||
}
|
||||
|
@ -1983,20 +2265,20 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
|
|||
}
|
||||
|
||||
static int32_t nodeTypeToShowType(ENodeType nt) {
|
||||
// switch (nt) {
|
||||
// case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
// return TSDB_MGMT_TABLE_CONNS;
|
||||
// case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
// return TSDB_MGMT_TABLE_GRANTS;
|
||||
// case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
// return TSDB_MGMT_TABLE_QUERIES;
|
||||
// case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||
// return 0; // todo
|
||||
// case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
// return TSDB_MGMT_TABLE_VARIABLES;
|
||||
// default:
|
||||
// break;
|
||||
// }
|
||||
switch (nt) {
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
return TSDB_MGMT_TABLE_CONNS;
|
||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
return TSDB_MGMT_TABLE_GRANTS;
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
return TSDB_MGMT_TABLE_QUERIES;
|
||||
case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||
return 0; // todo
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
return 0; // todo
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2027,57 +2309,28 @@ static int32_t getSmaIndexExpr(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
|
|||
return nodesListToString(pStmt->pOptions->pFuncs, false, pExpr, pLen);
|
||||
}
|
||||
|
||||
static int32_t getSmaIndexBuildAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
|
||||
SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
if (NULL == pSelect) {
|
||||
static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SSampleAstInfo* pInfo) {
|
||||
pInfo->pDbName = pCxt->pParseCxt->db;
|
||||
pInfo->pTableName = pStmt->tableName;
|
||||
pInfo->pFuncs = nodesCloneList(pStmt->pOptions->pFuncs);
|
||||
pInfo->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
|
||||
pInfo->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
|
||||
pInfo->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
|
||||
if (NULL == pInfo->pFuncs || NULL == pInfo->pInterval ||
|
||||
(NULL != pStmt->pOptions->pOffset && NULL == pInfo->pOffset) ||
|
||||
(NULL != pStmt->pOptions->pSliding && NULL == pInfo->pSliding)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
sprintf(pSelect->stmtName, "%p", pSelect);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
if (NULL == pTable) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pTable->table.dbName, pCxt->pParseCxt->db);
|
||||
strcpy(pTable->table.tableName, pStmt->tableName);
|
||||
pSelect->pFromTable = (SNode*)pTable;
|
||||
|
||||
pSelect->pProjectionList = nodesCloneList(pStmt->pOptions->pFuncs);
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pSelect->pProjectionList || NULL == pFunc) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pFunc->functionName, "_wstartts");
|
||||
nodesListPushFront(pSelect->pProjectionList, pFunc);
|
||||
SNode* pProject = NULL;
|
||||
FOREACH(pProject, pSelect->pProjectionList) { sprintf(((SExprNode*)pProject)->aliasName, "#sma_%p", pProject); }
|
||||
|
||||
SIntervalWindowNode* pInterval = nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
|
||||
if (NULL == pInterval) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSelect->pWindow = (SNode*)pInterval;
|
||||
pInterval->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
pInterval->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
|
||||
pInterval->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
|
||||
pInterval->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
|
||||
if (NULL == pInterval->pCol || NULL == pInterval->pInterval ||
|
||||
(NULL != pStmt->pOptions->pOffset && NULL == pInterval->pOffset) ||
|
||||
(NULL != pStmt->pOptions->pSliding && NULL == pInterval->pSliding)) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
|
||||
|
||||
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
|
||||
static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
|
||||
SSampleAstInfo info = {0};
|
||||
int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesNodeToString(pSelect, false, pAst, pLen);
|
||||
code = buildSampleAst(pCxt, &info, pAst, pLen);
|
||||
}
|
||||
nodesDestroyNode(pSelect);
|
||||
clearSampleAstInfo(&info);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2106,7 +2359,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
|
|||
code = getSmaIndexExpr(pCxt, pStmt, &pReq->expr, &pReq->exprLen);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getSmaIndexBuildAst(pCxt, pStmt, &pReq->ast, &pReq->astLen);
|
||||
code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -2129,10 +2382,12 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
|
|||
tFreeSMCreateSmaReq(&createSmaReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildCreateFullTextReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateFullTextReq* pReq) {
|
||||
// impl later
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
|
||||
SMCreateFullTextReq createFTReq = {0};
|
||||
int32_t code = buildCreateFullTextReq(pCxt, pStmt, &createFTReq);
|
||||
|
@ -2535,24 +2790,6 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static void destroyTranslateContext(STranslateContext* pCxt) {
|
||||
if (NULL != pCxt->pNsLevel) {
|
||||
size_t size = taosArrayGetSize(pCxt->pNsLevel);
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
taosArrayDestroy(taosArrayGetP(pCxt->pNsLevel, i));
|
||||
}
|
||||
taosArrayDestroy(pCxt->pNsLevel);
|
||||
}
|
||||
|
||||
if (NULL != pCxt->pCmdMsg) {
|
||||
taosMemoryFreeClear(pCxt->pCmdMsg->pMsg);
|
||||
taosMemoryFreeClear(pCxt->pCmdMsg);
|
||||
}
|
||||
|
||||
taosHashCleanup(pCxt->pDbs);
|
||||
taosHashCleanup(pCxt->pTables);
|
||||
}
|
||||
|
||||
static const char* getSysDbName(ENodeType type) {
|
||||
switch (type) {
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
|
@ -2613,8 +2850,9 @@ static const char* getSysTableName(ENodeType type) {
|
|||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
return TSDB_INS_TABLE_LICENCES;
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
return TSDB_PERFS_TABLE_CONNECTIONS;
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
// todo
|
||||
return TSDB_PERFS_TABLE_QUERIES;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -2739,18 +2977,6 @@ typedef struct SVgroupTablesBatch {
|
|||
char dbName[TSDB_DB_NAME_LEN];
|
||||
} SVgroupTablesBatch;
|
||||
|
||||
static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
|
||||
int8_t flags = 0;
|
||||
if (pCol->sma) {
|
||||
flags |= SCHEMA_SMA_ON;
|
||||
}
|
||||
pSchema->colId = colId;
|
||||
pSchema->type = pCol->dataType.type;
|
||||
pSchema->bytes = calcTypeBytes(pCol->dataType);
|
||||
pSchema->flags = flags;
|
||||
strcpy(pSchema->name, pCol->colName);
|
||||
}
|
||||
|
||||
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
||||
taosMemoryFreeClear(pReq->name);
|
||||
taosMemoryFreeClear(pReq->ntbCfg.pSchema);
|
||||
|
@ -2800,7 +3026,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
|||
SNode* pCol;
|
||||
col_id_t index = 0;
|
||||
FOREACH(pCol, pStmt->pCols) {
|
||||
toSchemaEx((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
|
||||
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
|
||||
++index;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != buildSmaParam(pStmt->pOptions, &req)) {
|
||||
|
@ -3226,19 +3452,11 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
}
|
||||
|
||||
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
|
||||
STranslateContext cxt = {
|
||||
.pParseCxt = pParseCxt,
|
||||
.errCode = TSDB_CODE_SUCCESS,
|
||||
.msgBuf = {.buf = pParseCxt->pMsg, .len = pParseCxt->msgLen},
|
||||
.pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES),
|
||||
.currLevel = 0,
|
||||
.currClause = 0,
|
||||
.pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK),
|
||||
.pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)};
|
||||
if (NULL == cxt.pNsLevel) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
STranslateContext cxt = {0};
|
||||
int32_t code = initTranslateContext(pParseCxt, &cxt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = fmFuncMgtInit();
|
||||
}
|
||||
int32_t code = fmFuncMgtInit();
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteQuery(&cxt, pQuery);
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -55,7 +55,7 @@
|
|||
|
||||
# ---- tmq
|
||||
./test.sh -f tsim/tmq/basic.sim
|
||||
./test.sh -f tsim/tmq/basic1.sim
|
||||
#./test.sh -f tsim/tmq/basic1.sim
|
||||
#./test.sh -f tsim/tmq/oneTopic.sim
|
||||
#./test.sh -f tsim/tmq/multiTopic.sim
|
||||
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
#!/bin/bash
|
||||
|
||||
##################################################
|
||||
#
|
||||
# Do tmq test
|
||||
#
|
||||
##################################################
|
||||
|
||||
set +e
|
||||
|
||||
# set default value for parameters
|
||||
EXEC_OPTON=start
|
||||
DB_NAME=db
|
||||
POLL_DELAY=5
|
||||
VALGRIND=0
|
||||
SIGNAL=SIGINT
|
||||
|
||||
while getopts "d:s:v:y:x:" arg
|
||||
do
|
||||
case $arg in
|
||||
d)
|
||||
DB_NAME=$OPTARG
|
||||
;;
|
||||
s)
|
||||
EXEC_OPTON=$OPTARG
|
||||
;;
|
||||
v)
|
||||
VALGRIND=1
|
||||
;;
|
||||
y)
|
||||
POLL_DELAY=$OPTARG
|
||||
;;
|
||||
x)
|
||||
SIGNAL=$OPTARG
|
||||
;;
|
||||
?)
|
||||
echo "unkown argument"
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
SCRIPT_DIR=`pwd`
|
||||
|
||||
IN_TDINTERNAL="community"
|
||||
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
|
||||
cd ../../..
|
||||
else
|
||||
cd ../../
|
||||
fi
|
||||
|
||||
TOP_DIR=`pwd`
|
||||
|
||||
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
|
||||
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3`
|
||||
else
|
||||
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2`
|
||||
fi
|
||||
|
||||
declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR
|
||||
|
||||
declare -x SIM_DIR=$TOP_DIR/sim
|
||||
|
||||
PROGRAM=$BUILD_DIR/build/bin/tmq_sim
|
||||
|
||||
PRG_DIR=$SIM_DIR/tsim
|
||||
CFG_DIR=$PRG_DIR/cfg
|
||||
LOG_DIR=$PRG_DIR/log
|
||||
|
||||
echo "------------------------------------------------------------------------"
|
||||
echo "BUILD_DIR: $BUILD_DIR"
|
||||
echo "SIM_DIR : $SIM_DIR"
|
||||
echo "CFG_DIR : $CFG_DIR"
|
||||
|
||||
|
||||
echo "PROGRAM: $PROGRAM
|
||||
echo "CFG_DIR: $CFG_DIR
|
||||
echo "POLL_DELAY: $POLL_DELAY
|
||||
echo "DB_NAME: $DB_NAME
|
||||
|
||||
echo "------------------------------------------------------------------------"
|
||||
if [ "$EXEC_OPTON" = "start" ]; then
|
||||
if [ $VALGRIND -eq 1 ]; then
|
||||
echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &
|
||||
nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &
|
||||
else
|
||||
echo "nohup $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &"
|
||||
nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME > /dev/null 2>&1 &
|
||||
fi
|
||||
else
|
||||
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
|
||||
while [ -n "$PID" ]
|
||||
do
|
||||
if [ "$SIGNAL" = "SIGKILL" ]; then
|
||||
echo try to kill by signal SIGKILL
|
||||
kill -9 $PID
|
||||
else
|
||||
echo try to kill by signal SIGINT
|
||||
kill -SIGINT $PID
|
||||
fi
|
||||
sleep 1
|
||||
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
|
||||
done
|
||||
fi
|
|
@ -31,44 +31,46 @@
|
|||
#define NC "\033[0m"
|
||||
#define min(a, b) (((a) < (b)) ? (a) : (b))
|
||||
|
||||
#define MAX_SQL_STR_LEN (1024 * 1024)
|
||||
#define MAX_ROW_STR_LEN (16 * 1024)
|
||||
#define MAX_SQL_STR_LEN (1024 * 1024)
|
||||
#define MAX_ROW_STR_LEN (16 * 1024)
|
||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||
|
||||
typedef struct {
|
||||
int32_t expectMsgCnt;
|
||||
int32_t consumeMsgCnt;
|
||||
TdThread thread;
|
||||
TdThread thread;
|
||||
int32_t consumerId;
|
||||
|
||||
int32_t ifCheckData;
|
||||
int64_t expectMsgCnt;
|
||||
|
||||
int64_t consumeMsgCnt;
|
||||
int32_t checkresult;
|
||||
|
||||
char topicString[1024];
|
||||
char keyString[1024];
|
||||
|
||||
int32_t numOfTopic;
|
||||
char topics[32][64];
|
||||
|
||||
int32_t numOfKey;
|
||||
char key[32][64];
|
||||
char value[32][64];
|
||||
|
||||
tmq_t* tmq;
|
||||
tmq_list_t* topicList;
|
||||
|
||||
} SThreadInfo;
|
||||
|
||||
typedef struct {
|
||||
// input from argvs
|
||||
char dbName[32];
|
||||
char topicString[256];
|
||||
char keyString[1024];
|
||||
char topicString1[256];
|
||||
char keyString1[1024];
|
||||
int32_t showMsgFlag;
|
||||
int32_t consumeDelay; // unit s
|
||||
int32_t consumeMsgCnt;
|
||||
int32_t checkMode;
|
||||
|
||||
// save result after parse agrvs
|
||||
int32_t numOfTopic;
|
||||
char topics[32][64];
|
||||
|
||||
int32_t numOfKey;
|
||||
char key[32][64];
|
||||
char value[32][64];
|
||||
|
||||
int32_t numOfTopic1;
|
||||
char topics1[32][64];
|
||||
|
||||
int32_t numOfKey1;
|
||||
char key1[32][64];
|
||||
char value1[32][64];
|
||||
char dbName[32];
|
||||
int32_t showMsgFlag;
|
||||
int32_t consumeDelay; // unit s
|
||||
int32_t numOfThread;
|
||||
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
||||
} SConfInfo;
|
||||
|
||||
static SConfInfo g_stConfInfo;
|
||||
TdFilePtr g_fp = NULL;
|
||||
|
||||
// char* g_pRowValue = NULL;
|
||||
// TdFilePtr g_fp = NULL;
|
||||
|
@ -81,30 +83,54 @@ static void printHelp() {
|
|||
printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
|
||||
printf("%s%s\n", indent, "-d");
|
||||
printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
|
||||
printf("%s%s\n", indent, "-t");
|
||||
printf("%s%s%s\n", indent, indent, "The topic string for cosumer, no default ");
|
||||
printf("%s%s\n", indent, "-k");
|
||||
printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default ");
|
||||
printf("%s%s\n", indent, "-t1");
|
||||
printf("%s%s%s\n", indent, indent, "The topic1 string for cosumer, no default ");
|
||||
printf("%s%s\n", indent, "-k1");
|
||||
printf("%s%s%s\n", indent, indent, "The key1-value1 string for cosumer, no default ");
|
||||
printf("%s%s\n", indent, "-g");
|
||||
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
|
||||
printf("%s%s\n", indent, "-y");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
|
||||
printf("%s%s\n", indent, "-m");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt);
|
||||
printf("%s%s\n", indent, "-j");
|
||||
printf("%s%s%s%d\n", indent, indent, "check mode, default is s", g_stConfInfo.checkMode);
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
void initLogFile() {
|
||||
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
|
||||
TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
|
||||
if (NULL == pFile) {
|
||||
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
|
||||
exit -1;
|
||||
};
|
||||
g_fp = pFile;
|
||||
|
||||
time_t tTime = taosGetTimestampSec();
|
||||
struct tm tm = *taosLocalTime(&tTime, NULL);
|
||||
|
||||
taosFprintfFile(pFile, "###################################################################\n");
|
||||
taosFprintfFile(pFile, "# configDir: %s\n", configDir);
|
||||
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
|
||||
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
|
||||
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
|
||||
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||
taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
|
||||
taosFprintfFile(pFile, " Topics: ");
|
||||
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
|
||||
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
|
||||
}
|
||||
taosFprintfFile(pFile, "\n");
|
||||
taosFprintfFile(pFile, " Key: ");
|
||||
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
|
||||
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
|
||||
}
|
||||
taosFprintfFile(pFile, "\n");
|
||||
}
|
||||
|
||||
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
|
||||
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
|
||||
taosFprintfFile(pFile, "###################################################################\n");
|
||||
}
|
||||
|
||||
void parseArgument(int32_t argc, char* argv[]) {
|
||||
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
|
||||
g_stConfInfo.showMsgFlag = 0;
|
||||
g_stConfInfo.consumeDelay = 8000;
|
||||
g_stConfInfo.consumeMsgCnt = 0;
|
||||
g_stConfInfo.consumeDelay = 5;
|
||||
|
||||
for (int32_t i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
||||
|
@ -114,37 +140,20 @@ void parseArgument(int32_t argc, char* argv[]) {
|
|||
strcpy(g_stConfInfo.dbName, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-c") == 0) {
|
||||
strcpy(configDir, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t") == 0) {
|
||||
strcpy(g_stConfInfo.topicString, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-k") == 0) {
|
||||
strcpy(g_stConfInfo.keyString, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t1") == 0) {
|
||||
strcpy(g_stConfInfo.topicString1, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-k1") == 0) {
|
||||
strcpy(g_stConfInfo.keyString1, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-g") == 0) {
|
||||
g_stConfInfo.showMsgFlag = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-y") == 0) {
|
||||
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0) {
|
||||
g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-j") == 0) {
|
||||
g_stConfInfo.checkMode = atol(argv[++i]);
|
||||
} else {
|
||||
printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == g_stConfInfo.consumeMsgCnt) {
|
||||
g_stConfInfo.consumeMsgCnt = 0x7fffffff;
|
||||
}
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
|
||||
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
|
||||
pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC);
|
||||
pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC);
|
||||
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
|
||||
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
|
||||
#endif
|
||||
}
|
||||
|
@ -171,74 +180,26 @@ void ltrim(char* str) {
|
|||
// return str;
|
||||
}
|
||||
|
||||
void parseInputString() {
|
||||
// printf("topicString: %s\n", g_stConfInfo.topicString);
|
||||
// printf("keyString: %s\n\n", g_stConfInfo.keyString);
|
||||
static int running = 1;
|
||||
static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) {
|
||||
char buf[1024];
|
||||
|
||||
char* token;
|
||||
const char delim[2] = ",";
|
||||
const char ch = ':';
|
||||
|
||||
token = strtok(g_stConfInfo.topicString, delim);
|
||||
while (token != NULL) {
|
||||
// printf("%s\n", token );
|
||||
strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token);
|
||||
ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
|
||||
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
|
||||
g_stConfInfo.numOfTopic++;
|
||||
|
||||
token = strtok(NULL, delim);
|
||||
}
|
||||
|
||||
token = strtok(g_stConfInfo.topicString1, delim);
|
||||
while (token != NULL) {
|
||||
// printf("%s\n", token );
|
||||
strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token);
|
||||
ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]);
|
||||
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
|
||||
g_stConfInfo.numOfTopic1++;
|
||||
|
||||
token = strtok(NULL, delim);
|
||||
}
|
||||
|
||||
token = strtok(g_stConfInfo.keyString, delim);
|
||||
while (token != NULL) {
|
||||
// printf("%s\n", token );
|
||||
{
|
||||
char* pstr = token;
|
||||
ltrim(pstr);
|
||||
char* ret = strchr(pstr, ch);
|
||||
memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr);
|
||||
strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1);
|
||||
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
|
||||
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
|
||||
g_stConfInfo.numOfKey++;
|
||||
}
|
||||
|
||||
token = strtok(NULL, delim);
|
||||
}
|
||||
|
||||
token = strtok(g_stConfInfo.keyString1, delim);
|
||||
while (token != NULL) {
|
||||
// printf("%s\n", token );
|
||||
{
|
||||
char* pstr = token;
|
||||
ltrim(pstr);
|
||||
char* ret = strchr(pstr, ch);
|
||||
memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr);
|
||||
strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1);
|
||||
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
|
||||
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
|
||||
g_stConfInfo.numOfKey1++;
|
||||
}
|
||||
|
||||
token = strtok(NULL, delim);
|
||||
//printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
//printf("vg:%d\n", tmq_get_vgroup_id(msg));
|
||||
taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable);
|
||||
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
//taos_print_row(buf, row, fields, numOfFields);
|
||||
//printf("%s\n", buf);
|
||||
//taosFprintfFile(g_fp, "%s\n", buf);
|
||||
}
|
||||
}
|
||||
|
||||
static int running = 1;
|
||||
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
|
||||
|
||||
int queryDB(TAOS* taos, char* command) {
|
||||
TAOS_RES* pRes = taos_query(taos, command);
|
||||
int code = taos_errno(pRes);
|
||||
|
@ -252,8 +213,7 @@ int queryDB(TAOS* taos, char* command) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
#if 0
|
||||
void build_consumer(SThreadInfo *pInfo) {
|
||||
char sqlStr[1024] = {0};
|
||||
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
|
@ -267,273 +227,229 @@ tmq_t* build_consumer() {
|
|||
exit(-1);
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
#endif
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
// tmq_conf_set(conf, "group.id", "tg2");
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) {
|
||||
tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]);
|
||||
for (int32_t i = 0; i < pInfo->numOfKey; i++) {
|
||||
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
|
||||
}
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
|
||||
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
|
||||
assert(tmq);
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
pInfo->tmq = tmq_consumer_new(pConn, conf, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
void build_topic_list(SThreadInfo *pInfo) {
|
||||
pInfo->topicList = tmq_list_new();
|
||||
// tmq_list_append(topic_list, "test_stb_topic_1");
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) {
|
||||
tmq_list_append(topic_list, g_stConfInfo.topics[i]);
|
||||
for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
|
||||
tmq_list_append(pInfo->topicList, pInfo->topics[i]);
|
||||
}
|
||||
return topic_list;
|
||||
return;
|
||||
}
|
||||
|
||||
tmq_t* build_consumer_x() {
|
||||
#if 0
|
||||
int32_t saveConsumeResult(SThreadInfo *pInfo) {
|
||||
char sqlStr[1024] = {0};
|
||||
|
||||
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
|
||||
|
||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)",
|
||||
g_stConfInfo.dbName,
|
||||
pInfo->consumerId,
|
||||
pInfo->consumeMsgCnt,
|
||||
pInfo->checkresult);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
#endif
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
// tmq_conf_set(conf, "group.id", "tg2");
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) {
|
||||
tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]);
|
||||
}
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
|
||||
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
|
||||
assert(tmq);
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
return 0;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list_x() {
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
// tmq_list_append(topic_list, "test_stb_topic_1");
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) {
|
||||
tmq_list_append(topic_list, g_stConfInfo.topics1[i]);
|
||||
}
|
||||
return topic_list;
|
||||
}
|
||||
|
||||
void loop_consume(tmq_t* tmq) {
|
||||
void loop_consume(SThreadInfo *pInfo) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
int64_t totalMsgs = 0;
|
||||
//int64_t totalRows = 0;
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
while (running) {
|
||||
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000);
|
||||
if (tmqMsg) {
|
||||
totalMsgs++;
|
||||
|
||||
#if 0
|
||||
TAOS_ROW row;
|
||||
while (NULL != (row = tmq_get_row(tmqMsg))) {
|
||||
totalRows++;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
|
||||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
|
||||
if (tmqMsg) {
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
/*msg_process(tmqMsg);*/
|
||||
msg_process(tmqMsg, totalMsgs, 0);
|
||||
}
|
||||
|
||||
tmq_message_destroy(tmqMsg);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err) {
|
||||
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
printf("{consume success: %d, %d}", totalMsgs, totalRows);
|
||||
}
|
||||
|
||||
int32_t parallel_consume(tmq_t* tmq, int threadLable) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
while (running) {
|
||||
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
|
||||
if (tmqMsg) {
|
||||
|
||||
totalMsgs++;
|
||||
|
||||
// printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
|
||||
|
||||
#if 0
|
||||
TAOS_ROW row;
|
||||
while (NULL != (row = tmq_get_row(tmqMsg))) {
|
||||
totalRows++;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
/*msg_process(tmqMsg);*/
|
||||
}
|
||||
tmq_message_destroy(tmqMsg);
|
||||
|
||||
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
|
||||
|
||||
if (totalMsgs >= pInfo->expectMsgCnt) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
|
||||
err = tmq_consumer_close(pInfo->tmq);
|
||||
if (err) {
|
||||
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
// printf("%d", totalMsgs); // output to sim for check result
|
||||
return totalMsgs;
|
||||
pInfo->consumeMsgCnt = totalMsgs;
|
||||
|
||||
}
|
||||
|
||||
void* threadFunc(void* param) {
|
||||
void *consumeThreadFunc(void *param) {
|
||||
int32_t totalMsgs = 0;
|
||||
|
||||
SThreadInfo* pInfo = (SThreadInfo*)param;
|
||||
SThreadInfo *pInfo = (SThreadInfo *)param;
|
||||
|
||||
tmq_t* tmq = build_consumer_x();
|
||||
tmq_list_t* topic_list = build_topic_list_x();
|
||||
if ((NULL == tmq) || (NULL == topic_list)) {
|
||||
build_consumer(pInfo);
|
||||
build_topic_list(pInfo);
|
||||
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
|
||||
|
||||
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
|
||||
if (err) {
|
||||
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
loop_consume(pInfo);
|
||||
|
||||
// if (0 == g_stConfInfo.consumeMsgCnt) {
|
||||
// loop_consume(tmq);
|
||||
// } else {
|
||||
pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
|
||||
//}
|
||||
|
||||
err = tmq_unsubscribe(tmq);
|
||||
err = tmq_unsubscribe(pInfo->tmq);
|
||||
if (err) {
|
||||
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
pInfo->consumeMsgCnt = -1;
|
||||
pInfo->consumeMsgCnt = -1;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
// save consume result into consumeresult table
|
||||
saveConsumeResult(pInfo);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int32_t argc, char* argv[]) {
|
||||
parseArgument(argc, argv);
|
||||
parseInputString();
|
||||
void parseConsumeInfo() {
|
||||
char* token;
|
||||
const char delim[2] = ",";
|
||||
const char ch = ':';
|
||||
|
||||
int32_t numOfThreads = 1;
|
||||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo));
|
||||
|
||||
if (g_stConfInfo.numOfTopic1) {
|
||||
// pthread_create one thread to consume
|
||||
for (int32_t i = 0; i < numOfThreads; ++i) {
|
||||
pInfo[i].expectMsgCnt = 0;
|
||||
pInfo[i].consumeMsgCnt = 0;
|
||||
taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i));
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||
token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
|
||||
while (token != NULL) {
|
||||
// printf("%s\n", token );
|
||||
strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
|
||||
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
|
||||
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
|
||||
g_stConfInfo.stThreads[i].numOfTopic++;
|
||||
|
||||
token = strtok(NULL, delim);
|
||||
}
|
||||
|
||||
token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
|
||||
while (token != NULL) {
|
||||
// printf("%s\n", token );
|
||||
{
|
||||
char* pstr = token;
|
||||
ltrim(pstr);
|
||||
char* ret = strchr(pstr, ch);
|
||||
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
|
||||
strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
|
||||
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
|
||||
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
|
||||
g_stConfInfo.stThreads[i].numOfKey++;
|
||||
}
|
||||
|
||||
token = strtok(NULL, delim);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
if ((NULL == tmq) || (NULL == topic_list)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
|
||||
if (err) {
|
||||
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
int32_t getConsumeInfo() {
|
||||
char sqlStr[1024] = {0};
|
||||
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName);
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
if (0 == g_stConfInfo.numOfTopic1) {
|
||||
loop_consume(tmq);
|
||||
} else {
|
||||
totalMsgs = parallel_consume(tmq, 0);
|
||||
}
|
||||
|
||||
err = tmq_unsubscribe(tmq);
|
||||
if (err) {
|
||||
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
if (g_stConfInfo.numOfTopic1) {
|
||||
for (int32_t i = 0; i < numOfThreads; i++) {
|
||||
taosThreadJoin(pInfo[i].thread, NULL);
|
||||
}
|
||||
|
||||
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
|
||||
if (0 == g_stConfInfo.checkMode) {
|
||||
if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) {
|
||||
printf("success");
|
||||
} else {
|
||||
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
|
||||
}
|
||||
} else if (1 == g_stConfInfo.checkMode) {
|
||||
if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) {
|
||||
printf("success");
|
||||
} else {
|
||||
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
|
||||
}
|
||||
} else if (2 == g_stConfInfo.checkMode) {
|
||||
if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) {
|
||||
printf("success");
|
||||
} else {
|
||||
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
|
||||
}
|
||||
} else if (3 == g_stConfInfo.checkMode) {
|
||||
if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) {
|
||||
printf("success");
|
||||
} else {
|
||||
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
|
||||
}
|
||||
} else if (4 == g_stConfInfo.checkMode) {
|
||||
if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) ||
|
||||
((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) ||
|
||||
((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) ||
|
||||
((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) {
|
||||
printf("success");
|
||||
} else {
|
||||
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
|
||||
}
|
||||
} else {
|
||||
printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
|
||||
}
|
||||
|
||||
TAOS_ROW row = NULL;
|
||||
int num_fields = taos_num_fields(pRes);
|
||||
TAOS_FIELD* fields = taos_fetch_fields(pRes);
|
||||
|
||||
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
|
||||
|
||||
int32_t numOfThread = 0;
|
||||
while ((row = taos_fetch_row(pRes))) {
|
||||
int32_t* lengths = taos_fetch_lengths(pRes);
|
||||
|
||||
for (int i = 0; i < num_fields; ++i) {
|
||||
if (row[i] == NULL || 0 == i) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
||||
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]);
|
||||
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
|
||||
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
|
||||
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
|
||||
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
|
||||
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
|
||||
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]);
|
||||
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
||||
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]);
|
||||
}
|
||||
}
|
||||
numOfThread ++;
|
||||
}
|
||||
g_stConfInfo.numOfThread = numOfThread;
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
parseConsumeInfo();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int main(int32_t argc, char* argv[]) {
|
||||
parseArgument(argc, argv);
|
||||
getConsumeInfo();
|
||||
initLogFile();
|
||||
|
||||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
// pthread_create one thread to consume
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
|
||||
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i])));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
|
||||
}
|
||||
|
||||
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
|
||||
|
||||
taosFprintfFile(g_fp, "\n");
|
||||
taosCloseFile(&g_fp);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue