Merge branch '3.0' into test/jcy

This commit is contained in:
jiacy-jcy 2022-04-20 15:55:20 +08:00
commit 87228eca84
19 changed files with 3602 additions and 3225 deletions

View File

@ -280,10 +280,14 @@ typedef struct {
int32_t numOfTags; int32_t numOfTags;
int32_t numOfSmas; int32_t numOfSmas;
int32_t commentLen; int32_t commentLen;
int32_t ast1Len;
int32_t ast2Len;
SArray* pColumns; // array of SField SArray* pColumns; // array of SField
SArray* pTags; // array of SField SArray* pTags; // array of SField
SArray* pSmas; // array of SField SArray* pSmas; // array of SField
char* comment; char* comment;
char* pAst1;
char* pAst2;
} SMCreateStbReq; } SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
@ -609,6 +613,8 @@ typedef struct {
int8_t cacheLastRow; int8_t cacheLastRow;
int8_t streamMode; int8_t streamMode;
int8_t singleSTable; int8_t singleSTable;
int32_t numOfRetensions;
SArray* pRetensions;
} SDbCfgRsp; } SDbCfgRsp;
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp); int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);

View File

@ -308,8 +308,8 @@ static FORCE_INLINE int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx,
// use literal value directly and not use formula to simplify the codes // use literal value directly and not use formula to simplify the codes
switch (nOffset) { switch (nOffset) {
case 0: case 0:
*pDestByte = ((*pDestByte) & 0x3F) | (valType << 6);
// set the value and clear other partitions for offset 0 // set the value and clear other partitions for offset 0
*pDestByte = (valType << 6);
// *pDestByte |= (valType << 6); // *pDestByte |= (valType << 6);
break; break;
case 1: case 1:
@ -417,8 +417,8 @@ static FORCE_INLINE int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, T
// use literal value directly and not use formula to simplify the codes // use literal value directly and not use formula to simplify the codes
switch (nOffset) { switch (nOffset) {
case 0: case 0:
*pDestByte = ((*pDestByte) & 0x7F) | (valType << 7);
// set the value and clear other partitions for offset 0 // set the value and clear other partitions for offset 0
*pDestByte = (valType << 7);
// *pDestByte |= (valType << 7); // *pDestByte |= (valType << 7);
break; break;
case 1: case 1:

View File

@ -86,155 +86,156 @@
#define TK_SINGLE_STABLE 68 #define TK_SINGLE_STABLE 68
#define TK_STREAM_MODE 69 #define TK_STREAM_MODE 69
#define TK_RETENTIONS 70 #define TK_RETENTIONS 70
#define TK_NK_COMMA 71 #define TK_STRICT 71
#define TK_NK_COLON 72 #define TK_NK_COMMA 72
#define TK_TABLE 73 #define TK_NK_COLON 73
#define TK_NK_LP 74 #define TK_TABLE 74
#define TK_NK_RP 75 #define TK_NK_LP 75
#define TK_STABLE 76 #define TK_NK_RP 76
#define TK_ADD 77 #define TK_STABLE 77
#define TK_COLUMN 78 #define TK_ADD 78
#define TK_MODIFY 79 #define TK_COLUMN 79
#define TK_RENAME 80 #define TK_MODIFY 80
#define TK_TAG 81 #define TK_RENAME 81
#define TK_SET 82 #define TK_TAG 82
#define TK_NK_EQ 83 #define TK_SET 83
#define TK_USING 84 #define TK_NK_EQ 84
#define TK_TAGS 85 #define TK_USING 85
#define TK_NK_DOT 86 #define TK_TAGS 86
#define TK_COMMENT 87 #define TK_NK_DOT 87
#define TK_BOOL 88 #define TK_COMMENT 88
#define TK_TINYINT 89 #define TK_BOOL 89
#define TK_SMALLINT 90 #define TK_TINYINT 90
#define TK_INT 91 #define TK_SMALLINT 91
#define TK_INTEGER 92 #define TK_INT 92
#define TK_BIGINT 93 #define TK_INTEGER 93
#define TK_FLOAT 94 #define TK_BIGINT 94
#define TK_DOUBLE 95 #define TK_FLOAT 95
#define TK_BINARY 96 #define TK_DOUBLE 96
#define TK_TIMESTAMP 97 #define TK_BINARY 97
#define TK_NCHAR 98 #define TK_TIMESTAMP 98
#define TK_UNSIGNED 99 #define TK_NCHAR 99
#define TK_JSON 100 #define TK_UNSIGNED 100
#define TK_VARCHAR 101 #define TK_JSON 101
#define TK_MEDIUMBLOB 102 #define TK_VARCHAR 102
#define TK_BLOB 103 #define TK_MEDIUMBLOB 103
#define TK_VARBINARY 104 #define TK_BLOB 104
#define TK_DECIMAL 105 #define TK_VARBINARY 105
#define TK_SMA 106 #define TK_DECIMAL 106
#define TK_ROLLUP 107 #define TK_SMA 107
#define TK_FILE_FACTOR 108 #define TK_ROLLUP 108
#define TK_NK_FLOAT 109 #define TK_FILE_FACTOR 109
#define TK_DELAY 110 #define TK_NK_FLOAT 110
#define TK_SHOW 111 #define TK_DELAY 111
#define TK_DATABASES 112 #define TK_SHOW 112
#define TK_TABLES 113 #define TK_DATABASES 113
#define TK_STABLES 114 #define TK_TABLES 114
#define TK_MNODES 115 #define TK_STABLES 115
#define TK_MODULES 116 #define TK_MNODES 116
#define TK_QNODES 117 #define TK_MODULES 117
#define TK_FUNCTIONS 118 #define TK_QNODES 118
#define TK_INDEXES 119 #define TK_FUNCTIONS 119
#define TK_FROM 120 #define TK_INDEXES 120
#define TK_ACCOUNTS 121 #define TK_FROM 121
#define TK_APPS 122 #define TK_ACCOUNTS 122
#define TK_CONNECTIONS 123 #define TK_APPS 123
#define TK_LICENCE 124 #define TK_CONNECTIONS 124
#define TK_GRANTS 125 #define TK_LICENCE 125
#define TK_QUERIES 126 #define TK_GRANTS 126
#define TK_SCORES 127 #define TK_QUERIES 127
#define TK_TOPICS 128 #define TK_SCORES 128
#define TK_VARIABLES 129 #define TK_TOPICS 129
#define TK_BNODES 130 #define TK_VARIABLES 130
#define TK_SNODES 131 #define TK_BNODES 131
#define TK_LIKE 132 #define TK_SNODES 132
#define TK_INDEX 133 #define TK_LIKE 133
#define TK_FULLTEXT 134 #define TK_INDEX 134
#define TK_FUNCTION 135 #define TK_FULLTEXT 135
#define TK_INTERVAL 136 #define TK_FUNCTION 136
#define TK_TOPIC 137 #define TK_INTERVAL 137
#define TK_AS 138 #define TK_TOPIC 138
#define TK_DESC 139 #define TK_AS 139
#define TK_DESCRIBE 140 #define TK_DESC 140
#define TK_RESET 141 #define TK_DESCRIBE 141
#define TK_QUERY 142 #define TK_RESET 142
#define TK_EXPLAIN 143 #define TK_QUERY 143
#define TK_ANALYZE 144 #define TK_EXPLAIN 144
#define TK_VERBOSE 145 #define TK_ANALYZE 145
#define TK_NK_BOOL 146 #define TK_VERBOSE 146
#define TK_RATIO 147 #define TK_NK_BOOL 147
#define TK_COMPACT 148 #define TK_RATIO 148
#define TK_VNODES 149 #define TK_COMPACT 149
#define TK_IN 150 #define TK_VNODES 150
#define TK_OUTPUTTYPE 151 #define TK_IN 151
#define TK_AGGREGATE 152 #define TK_OUTPUTTYPE 152
#define TK_BUFSIZE 153 #define TK_AGGREGATE 153
#define TK_STREAM 154 #define TK_BUFSIZE 154
#define TK_INTO 155 #define TK_STREAM 155
#define TK_TRIGGER 156 #define TK_INTO 156
#define TK_AT_ONCE 157 #define TK_TRIGGER 157
#define TK_WINDOW_CLOSE 158 #define TK_AT_ONCE 158
#define TK_WATERMARK 159 #define TK_WINDOW_CLOSE 159
#define TK_KILL 160 #define TK_WATERMARK 160
#define TK_CONNECTION 161 #define TK_KILL 161
#define TK_MERGE 162 #define TK_CONNECTION 162
#define TK_VGROUP 163 #define TK_MERGE 163
#define TK_REDISTRIBUTE 164 #define TK_VGROUP 164
#define TK_SPLIT 165 #define TK_REDISTRIBUTE 165
#define TK_SYNCDB 166 #define TK_SPLIT 166
#define TK_NULL 167 #define TK_SYNCDB 167
#define TK_NK_QUESTION 168 #define TK_NULL 168
#define TK_NK_ARROW 169 #define TK_NK_QUESTION 169
#define TK_ROWTS 170 #define TK_NK_ARROW 170
#define TK_TBNAME 171 #define TK_ROWTS 171
#define TK_QSTARTTS 172 #define TK_TBNAME 172
#define TK_QENDTS 173 #define TK_QSTARTTS 173
#define TK_WSTARTTS 174 #define TK_QENDTS 174
#define TK_WENDTS 175 #define TK_WSTARTTS 175
#define TK_WDURATION 176 #define TK_WENDTS 176
#define TK_CAST 177 #define TK_WDURATION 177
#define TK_NOW 178 #define TK_CAST 178
#define TK_TODAY 179 #define TK_NOW 179
#define TK_TIMEZONE 180 #define TK_TODAY 180
#define TK_COUNT 181 #define TK_TIMEZONE 181
#define TK_FIRST 182 #define TK_COUNT 182
#define TK_LAST 183 #define TK_FIRST 183
#define TK_LAST_ROW 184 #define TK_LAST 184
#define TK_BETWEEN 185 #define TK_LAST_ROW 185
#define TK_IS 186 #define TK_BETWEEN 186
#define TK_NK_LT 187 #define TK_IS 187
#define TK_NK_GT 188 #define TK_NK_LT 188
#define TK_NK_LE 189 #define TK_NK_GT 189
#define TK_NK_GE 190 #define TK_NK_LE 190
#define TK_NK_NE 191 #define TK_NK_GE 191
#define TK_MATCH 192 #define TK_NK_NE 192
#define TK_NMATCH 193 #define TK_MATCH 193
#define TK_CONTAINS 194 #define TK_NMATCH 194
#define TK_JOIN 195 #define TK_CONTAINS 195
#define TK_INNER 196 #define TK_JOIN 196
#define TK_SELECT 197 #define TK_INNER 197
#define TK_DISTINCT 198 #define TK_SELECT 198
#define TK_WHERE 199 #define TK_DISTINCT 199
#define TK_PARTITION 200 #define TK_WHERE 200
#define TK_BY 201 #define TK_PARTITION 201
#define TK_SESSION 202 #define TK_BY 202
#define TK_STATE_WINDOW 203 #define TK_SESSION 203
#define TK_SLIDING 204 #define TK_STATE_WINDOW 204
#define TK_FILL 205 #define TK_SLIDING 205
#define TK_VALUE 206 #define TK_FILL 206
#define TK_NONE 207 #define TK_VALUE 207
#define TK_PREV 208 #define TK_NONE 208
#define TK_LINEAR 209 #define TK_PREV 209
#define TK_NEXT 210 #define TK_LINEAR 210
#define TK_GROUP 211 #define TK_NEXT 211
#define TK_HAVING 212 #define TK_GROUP 212
#define TK_ORDER 213 #define TK_HAVING 213
#define TK_SLIMIT 214 #define TK_ORDER 214
#define TK_SOFFSET 215 #define TK_SLIMIT 215
#define TK_LIMIT 216 #define TK_SOFFSET 216
#define TK_OFFSET 217 #define TK_LIMIT 217
#define TK_ASC 218 #define TK_OFFSET 218
#define TK_NULLS 219 #define TK_ASC 219
#define TK_NULLS 220
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301

View File

@ -47,6 +47,7 @@ typedef struct SDatabaseOptions {
SValueNode* pNumOfVgroups; SValueNode* pNumOfVgroups;
SValueNode* pSingleStable; SValueNode* pSingleStable;
SValueNode* pStreamMode; SValueNode* pStreamMode;
SValueNode* pStrict;
SNodeList* pRetentions; SNodeList* pRetentions;
} SDatabaseOptions; } SDatabaseOptions;

View File

@ -390,10 +390,14 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_DB_SINGLE_STABLE_OPTION 1 #define TSDB_MAX_DB_SINGLE_STABLE_OPTION 1
#define TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION 0 #define TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION 0
#define TSDB_MIN_DB_STREAM_MODE_OPTION 0 #define TSDB_DB_STREAM_MODE_OPTION_OFF 0
#define TSDB_MAX_DB_STREAM_MODE_OPTION 1 #define TSDB_DB_STREAM_MODE_OPTION_ON 1
#define TSDB_DEFAULT_DB_STREAM_MODE_OPTION 0 #define TSDB_DEFAULT_DB_STREAM_MODE_OPTION 0
#define TSDB_DB_STRICT_OPTION_OFF 0
#define TSDB_DB_STRICT_OPTION_ON 1
#define TSDB_DEFAULT_DB_STRICT_OPTION 0
#define TSDB_MAX_JOIN_TABLE_NUM 10 #define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5 #define TSDB_MAX_UNION_CLAUSE 5

View File

@ -611,6 +611,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfSmas) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfSmas) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1; if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ast1Len) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ast2Len) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfColumns; ++i) { for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField *pField = taosArrayGet(pReq->pColumns, i); SField *pField = taosArrayGet(pReq->pColumns, i);
@ -636,6 +638,12 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (pReq->commentLen > 0) { if (pReq->commentLen > 0) {
if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1; if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1;
} }
if (pReq->ast1Len > 0) {
if (tEncodeBinary(&encoder, pReq->pAst1, pReq->ast1Len) < 0) return -1;
}
if (pReq->ast2Len > 0) {
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -658,6 +666,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfSmas) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfSmas) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1; if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->ast1Len) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->ast2Len) < 0) return -1;
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField)); pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField)); pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
@ -706,6 +716,18 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
} }
if (pReq->ast1Len > 0) {
pReq->pAst1 = taosMemoryMalloc(pReq->ast1Len);
if (pReq->pAst1 == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->pAst1) < 0) return -1;
}
if (pReq->ast2Len > 0) {
pReq->pAst2 = taosMemoryMalloc(pReq->ast2Len);
if (pReq->pAst2 == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tCoderClear(&decoder);
@ -717,6 +739,8 @@ void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
taosArrayDestroy(pReq->pTags); taosArrayDestroy(pReq->pTags);
taosArrayDestroy(pReq->pSmas); taosArrayDestroy(pReq->pSmas);
taosMemoryFreeClear(pReq->comment); taosMemoryFreeClear(pReq->comment);
taosMemoryFreeClear(pReq->pAst1);
taosMemoryFreeClear(pReq->pAst2);
pReq->pColumns = NULL; pReq->pColumns = NULL;
pReq->pTags = NULL; pReq->pTags = NULL;
pReq->pSmas = NULL; pReq->pSmas = NULL;
@ -2211,6 +2235,14 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if (tEncodeI8(&encoder, pRsp->update) < 0) return -1; if (tEncodeI8(&encoder, pRsp->update) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1; if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i);
if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1;
if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1;
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -2242,7 +2274,24 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1;
pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention));
if (pRsp->pRetensions == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
SRetention rentension = {0};
if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1;
if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1;
if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1;
if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1;
if (taosArrayPush(pRsp->pRetensions, &rentension) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tCoderClear(&decoder);

View File

@ -355,10 +355,14 @@ typedef struct {
int32_t numOfTags; int32_t numOfTags;
int32_t numOfSmas; int32_t numOfSmas;
int32_t commentLen; int32_t commentLen;
int32_t ast1Len;
int32_t ast2Len;
SSchema* pColumns; SSchema* pColumns;
SSchema* pTags; SSchema* pTags;
SSchema* pSmas; SSchema* pSmas;
char* comment; char* comment;
char* pAst1;
char* pAst2;
SRWLatch lock; SRWLatch lock;
} SStbObj; } SStbObj;

View File

@ -762,27 +762,29 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
goto GET_DB_CFG_OVER; goto GET_DB_CFG_OVER;
} }
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups; cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize; cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize;
cfgRsp.totalBlocks = pDb->cfg.totalBlocks; cfgRsp.totalBlocks = pDb->cfg.totalBlocks;
cfgRsp.daysPerFile = pDb->cfg.daysPerFile; cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0; cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0;
cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1; cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1;
cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2; cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2;
cfgRsp.minRows = pDb->cfg.minRows; cfgRsp.minRows = pDb->cfg.minRows;
cfgRsp.maxRows = pDb->cfg.maxRows; cfgRsp.maxRows = pDb->cfg.maxRows;
cfgRsp.commitTime = pDb->cfg.commitTime; cfgRsp.commitTime = pDb->cfg.commitTime;
cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod; cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod;
cfgRsp.ttl = pDb->cfg.ttl; cfgRsp.ttl = pDb->cfg.ttl;
cfgRsp.walLevel = pDb->cfg.walLevel; cfgRsp.walLevel = pDb->cfg.walLevel;
cfgRsp.precision = pDb->cfg.precision; cfgRsp.precision = pDb->cfg.precision;
cfgRsp.compression = pDb->cfg.compression; cfgRsp.compression = pDb->cfg.compression;
cfgRsp.replications = pDb->cfg.replications; cfgRsp.replications = pDb->cfg.replications;
cfgRsp.quorum = pDb->cfg.quorum; cfgRsp.quorum = pDb->cfg.quorum;
cfgRsp.update = pDb->cfg.update; cfgRsp.update = pDb->cfg.update;
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow; cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
cfgRsp.streamMode = pDb->cfg.streamMode; cfgRsp.streamMode = pDb->cfg.streamMode;
cfgRsp.singleSTable = pDb->cfg.singleSTable; cfgRsp.singleSTable = pDb->cfg.singleSTable;
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions;
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp); int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
@ -797,6 +799,8 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
pReq->pRsp = pRsp; pReq->pRsp = pRsp;
pReq->rspLen = contLen; pReq->rspLen = contLen;
code = 0;
GET_DB_CFG_OVER: GET_DB_CFG_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {

View File

@ -72,7 +72,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) + int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) +
TSDB_STB_RESERVE_SIZE; + pStb->commentLen + pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER; if (pRaw == NULL) goto _OVER;
@ -93,6 +93,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfSmas, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfSmas, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i]; SSchema *pSchema = &pStb->pColumns[i];
@ -121,6 +123,12 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
if (pStb->commentLen > 0) { if (pStb->commentLen > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER)
} }
if (pStb->ast1Len > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
}
if (pStb->ast2Len > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
@ -173,6 +181,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfSmas, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfSmas, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema)); pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema)); pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
@ -210,6 +220,16 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
if (pStb->comment == NULL) goto _OVER; if (pStb->comment == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER) SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER)
} }
if (pStb->ast1Len > 0) {
pStb->pAst1 = taosMemoryCalloc(pStb->ast1Len, 1);
if (pStb->pAst1 == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
}
if (pStb->ast2Len > 0) {
pStb->pAst2 = taosMemoryCalloc(pStb->ast2Len, 1);
if (pStb->pAst2 == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
}
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
@ -238,6 +258,8 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
taosMemoryFreeClear(pStb->pColumns); taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags); taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment); taosMemoryFreeClear(pStb->comment);
taosMemoryFreeClear(pStb->pAst1);
taosMemoryFreeClear(pStb->pAst2);
return 0; return 0;
} }
@ -294,6 +316,30 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
} }
} }
if (pOld->ast1Len < pNew->ast1Len) {
void *pAst1 = taosMemoryMalloc(pNew->ast1Len);
if (pAst1 != NULL) {
taosMemoryFree(pOld->pAst1);
pOld->pAst1 = pAst1;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
taosWUnLockLatch(&pOld->lock);
}
}
if (pOld->ast2Len < pNew->ast2Len) {
void *pAst2 = taosMemoryMalloc(pNew->ast2Len);
if (pAst2 != NULL) {
taosMemoryFree(pOld->pAst2);
pOld->pAst2 = pAst2;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
taosWUnLockLatch(&pOld->lock);
}
}
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version; pOld->version = pNew->version;
pOld->nextColId = pNew->nextColId; pOld->nextColId = pNew->nextColId;
@ -304,6 +350,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
if (pNew->commentLen != 0) { if (pNew->commentLen != 0) {
memcpy(pOld->comment, pNew->comment, TSDB_STB_COMMENT_LEN); memcpy(pOld->comment, pNew->comment, TSDB_STB_COMMENT_LEN);
} }
if (pNew->ast1Len != 0) {
memcpy(pOld->pAst1, pNew->pAst1, pNew->ast1Len);
}
if (pNew->ast2Len != 0) {
memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
}
taosWUnLockLatch(&pOld->lock); taosWUnLockLatch(&pOld->lock);
return 0; return 0;
} }
@ -646,6 +698,26 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen); memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen);
} }
stbObj.ast1Len = pCreate->ast1Len;
if (stbObj.ast1Len > 0) {
stbObj.pAst1 = taosMemoryCalloc(stbObj.ast1Len, 1);
if (stbObj.pAst1 == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.pAst1, pCreate->pAst1, stbObj.ast1Len);
}
stbObj.ast2Len = pCreate->ast2Len;
if (stbObj.ast2Len > 0) {
stbObj.pAst2 = taosMemoryCalloc(stbObj.ast2Len, 1);
if (stbObj.pAst2 == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.pAst2, pCreate->pAst2, stbObj.ast2Len);
}
stbObj.pColumns = taosMemoryMalloc(stbObj.numOfColumns * sizeof(SSchema)); stbObj.pColumns = taosMemoryMalloc(stbObj.numOfColumns * sizeof(SSchema));
stbObj.pTags = taosMemoryMalloc(stbObj.numOfTags * sizeof(SSchema)); stbObj.pTags = taosMemoryMalloc(stbObj.numOfTags * sizeof(SSchema));
stbObj.pSmas = taosMemoryMalloc(stbObj.numOfSmas * sizeof(SSchema)); stbObj.pSmas = taosMemoryMalloc(stbObj.numOfSmas * sizeof(SSchema));

View File

@ -1410,30 +1410,33 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) { if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
// memmove(pData, (char*)src->pData + bytes * start, bytes * num); // memmove(pData, (char*)src->pData + bytes * start, bytes * num);
for (int32_t k = start; k < num + start; ++k) { int32_t rowIndex = numOfRows;
for (int32_t k = start; k <= end; ++k, ++rowIndex) {
SCellVal sVal = {0}; SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) { if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
TASSERT(0); TASSERT(0);
} }
if (sVal.valType == TD_VTYPE_NULL) { if (sVal.valType == TD_VTYPE_NULL) {
colDataAppendNULL(pColInfo, k); colDataAppendNULL(pColInfo, rowIndex);
} else { } else {
colDataAppend(pColInfo, k, sVal.val, false); colDataAppend(pColInfo, rowIndex, sVal.val, false);
} }
} }
} else { // handle the var-string } else { // handle the var-string
int32_t rowIndex = numOfRows;
// todo refactor, only copy one-by-one // todo refactor, only copy one-by-one
for (int32_t k = start; k < num + start; ++k) { for (int32_t k = start; k < num + start; ++k, ++rowIndex) {
SCellVal sVal = {0}; SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) { if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
TASSERT(0); TASSERT(0);
} }
if (sVal.valType == TD_VTYPE_NULL) { if (sVal.valType == TD_VTYPE_NULL) {
colDataAppendNULL(pColInfo, k); colDataAppendNULL(pColInfo, rowIndex);
} else { } else {
colDataAppend(pColInfo, k, sVal.val, false); colDataAppend(pColInfo, rowIndex, sVal.val, false);
} }
} }
} }
@ -1441,8 +1444,9 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
j++; j++;
i++; i++;
} else { // pColInfo->info.colId < src->colId, it is a NULL data } else { // pColInfo->info.colId < src->colId, it is a NULL data
for (int32_t k = start; k < num + start; ++k) { // TODO opt performance int32_t rowIndex = numOfRows;
colDataAppend(pColInfo, k, NULL, true); for (int32_t k = start; k < num + start; ++k, ++rowIndex) { // TODO opt performance
colDataAppend(pColInfo, rowIndex, NULL, true);
} }
i++; i++;
} }
@ -1450,8 +1454,10 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
while (i < requiredNumOfCols) { // the remain columns are all null data while (i < requiredNumOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
for (int32_t k = start; k < num + start; ++k) { int32_t rowIndex = numOfRows;
colDataAppend(pColInfo, k, NULL, true); // TODO add a fast version to set a number of consecutive NULL value.
for (int32_t k = start; k < num + start; ++k, ++rowIndex) {
colDataAppend(pColInfo, rowIndex, NULL, true); // TODO add a fast version to set a number of consecutive NULL value.
} }
i++; i++;
} }
@ -1749,7 +1755,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p
// be included in the query time window will be discarded // be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) { static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataBlockInfo blockInfo = {0}; // GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
initTableMemIterator(pTsdbReadHandle, pCheckInfo); initTableMemIterator(pTsdbReadHandle, pCheckInfo);
@ -1771,9 +1777,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
STable* pTable = NULL; STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 " rows:%d, start:%d, end:%d, %s",
" rows:%d, start:%d,"
"end:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
cur->pos, endPos, pTsdbReadHandle->idStr); cur->pos, endPos, pTsdbReadHandle->idStr);

View File

@ -53,6 +53,7 @@ typedef enum EDatabaseOptionType {
DB_OPTION_VGROUPS, DB_OPTION_VGROUPS,
DB_OPTION_SINGLE_STABLE, DB_OPTION_SINGLE_STABLE,
DB_OPTION_STREAM_MODE, DB_OPTION_STREAM_MODE,
DB_OPTION_STRICT,
DB_OPTION_RETENTIONS DB_OPTION_RETENTIONS
} EDatabaseOptionType; } EDatabaseOptionType;

View File

@ -161,6 +161,7 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
db_options(A) ::= db_options(B) SINGLE_STABLE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pSingleStable = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; } db_options(A) ::= db_options(B) SINGLE_STABLE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pSingleStable = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
db_options(A) ::= db_options(B) STREAM_MODE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStreamMode = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; } db_options(A) ::= db_options(B) STREAM_MODE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStreamMode = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
db_options(A) ::= db_options(B) RETENTIONS retention_list(C). { ((SDatabaseOptions*)B)->pRetentions = C; A = B; } db_options(A) ::= db_options(B) RETENTIONS retention_list(C). { ((SDatabaseOptions*)B)->pRetentions = C; A = B; }
db_options(A) ::= db_options(B) STRICT NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStrict = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
alter_db_options(A) ::= alter_db_option(B). { A = createDatabaseOptions(pCxt); A = setDatabaseAlterOption(pCxt, A, &B); } alter_db_options(A) ::= alter_db_option(B). { A = createDatabaseOptions(pCxt); A = setDatabaseAlterOption(pCxt, A, &B); }
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setDatabaseAlterOption(pCxt, B, &C); } alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setDatabaseAlterOption(pCxt, B, &C); }
@ -175,6 +176,7 @@ alter_db_option(A) ::= WAL NK_INTEGER(B).
alter_db_option(A) ::= QUORUM NK_INTEGER(B). { A.type = DB_OPTION_QUORUM; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); } alter_db_option(A) ::= QUORUM NK_INTEGER(B). { A.type = DB_OPTION_QUORUM; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); } alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); } alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= STRICT NK_INTEGER(B). { A.type = DB_OPTION_STRICT; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
%type integer_list { SNodeList* } %type integer_list { SNodeList* }
%destructor integer_list { nodesDestroyList($$); } %destructor integer_list { nodesDestroyList($$); }
@ -359,7 +361,7 @@ from_db_opt(A) ::= FROM db_name(B).
%type func_name_list { SNodeList* } %type func_name_list { SNodeList* }
%destructor func_name_list { nodesDestroyList($$); } %destructor func_name_list { nodesDestroyList($$); }
func_name_list(A) ::= func_name(B). { A = createNodeList(pCxt, B); } func_name_list(A) ::= func_name(B). { A = createNodeList(pCxt, B); }
func_name_list(A) ::= func_name_list(B) NK_COMMA col_name(C). { A = addNodeToList(pCxt, B, C); } func_name_list(A) ::= func_name_list(B) NK_COMMA func_name(C). { A = addNodeToList(pCxt, B, C); }
func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); } func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); }

View File

@ -168,6 +168,7 @@ static SKeyword keywordTable[] = {
{"STREAM", TK_STREAM}, {"STREAM", TK_STREAM},
{"STREAMS", TK_STREAMS}, {"STREAMS", TK_STREAMS},
{"STREAM_MODE", TK_STREAM_MODE}, {"STREAM_MODE", TK_STREAM_MODE},
{"STRICT", TK_STRICT},
{"SYNCDB", TK_SYNCDB}, {"SYNCDB", TK_SYNCDB},
{"TABLE", TK_TABLE}, {"TABLE", TK_TABLE},
{"TABLES", TK_TABLES}, {"TABLES", TK_TABLES},

View File

@ -198,6 +198,56 @@ static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int
return code; return code;
} }
static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo* pInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
int32_t code = collectUseDatabaseImpl(dbFname, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBCfg(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, dbFname, pInfo);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname);
}
return code;
}
static int32_t initTranslateContext(SParseContext* pParseCxt, STranslateContext* pCxt) {
pCxt->pParseCxt = pParseCxt;
pCxt->errCode = TSDB_CODE_SUCCESS;
pCxt->msgBuf.buf = pParseCxt->pMsg;
pCxt->msgBuf.len = pParseCxt->msgLen;
pCxt->pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
pCxt->currLevel = 0;
pCxt->currClause = 0;
pCxt->pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pCxt->pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pCxt->pNsLevel || NULL == pCxt->pDbs || NULL == pCxt->pTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static void destroyTranslateContext(STranslateContext* pCxt) {
if (NULL != pCxt->pNsLevel) {
size_t size = taosArrayGetSize(pCxt->pNsLevel);
for (size_t i = 0; i < size; ++i) {
taosArrayDestroy(taosArrayGetP(pCxt->pNsLevel, i));
}
taosArrayDestroy(pCxt->pNsLevel);
}
if (NULL != pCxt->pCmdMsg) {
taosMemoryFreeClear(pCxt->pCmdMsg->pMsg);
taosMemoryFreeClear(pCxt->pCmdMsg);
}
taosHashCleanup(pCxt->pDbs);
taosHashCleanup(pCxt->pTables);
}
static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) { static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) {
int cmp = 0; int cmp = 0;
if ('\0' != pCol->dbName[0]) { if ('\0' != pCol->dbName[0]) {
@ -749,15 +799,18 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
case QUERY_NODE_REAL_TABLE: { case QUERY_NODE_REAL_TABLE: {
SRealTableNode* pRealTable = (SRealTableNode*)pTable; SRealTableNode* pRealTable = (SRealTableNode*)pTable;
pRealTable->ratio = (NULL != pCxt->pExplainOpt ? pCxt->pExplainOpt->ratio : 1.0); pRealTable->ratio = (NULL != pCxt->pExplainOpt ? pCxt->pExplainOpt->ratio : 1.0);
SName name; // The SRealTableNode created through ROLLUP already has STableMeta.
code = getTableMetaImpl( if (NULL == pRealTable->pMeta) {
pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), SName name;
&(pRealTable->pMeta)); code = getTableMetaImpl(
if (TSDB_CODE_SUCCESS != code) { pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name),
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName); &(pRealTable->pMeta));
if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
}
code = setTableVgroupList(pCxt, &name, pRealTable);
} }
pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision; pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision;
code = setTableVgroupList(pCxt, &name, pRealTable);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = addNamespace(pCxt, pRealTable); code = addNamespace(pCxt, pRealTable);
} }
@ -1365,6 +1418,7 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION); pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION);
pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION); pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION);
pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION); pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
// pReq->strict = GET_OPTION_VAL(pStmt->pOptions->pStrict, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq); return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
} }
@ -1573,12 +1627,16 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, SDatabaseOptions* p
TSDB_MAX_DB_SINGLE_STABLE_OPTION); TSDB_MAX_DB_SINGLE_STABLE_OPTION);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "streamMode", pOptions->pStreamMode, TSDB_MIN_DB_STREAM_MODE_OPTION, code = checkDbEnumOption(pCxt, "streamMode", pOptions->pStreamMode, TSDB_DB_STREAM_MODE_OPTION_OFF,
TSDB_MAX_DB_STREAM_MODE_OPTION); TSDB_DB_STREAM_MODE_OPTION_ON);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbRetentionsOption(pCxt, pOptions->pRetentions); code = checkDbRetentionsOption(pCxt, pOptions->pRetentions);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "strict", pOptions->pStrict, TSDB_DB_STRICT_OPTION_OFF,
TSDB_DB_STRICT_OPTION_ON);
}
return code; return code;
} }
@ -1789,35 +1847,259 @@ static int32_t getAggregationMethod(SNodeList* pFuncs) {
return ((SFunctionNode*)nodesListGetNode(pFuncs, 0))->funcId; return ((SFunctionNode*)nodesListGetNode(pFuncs, 0))->funcId;
} }
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
int32_t code = checkCreateTable(pCxt, pStmt); int8_t flags = 0;
if (TSDB_CODE_SUCCESS != code) { if (pCol->sma) {
return code; flags |= SCHEMA_SMA_ON;
}
pSchema->colId = colId;
pSchema->type = pCol->dataType.type;
pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->flags = flags;
strcpy(pSchema->name, pCol->colName);
}
typedef struct SSampleAstInfo {
const char* pDbName;
const char* pTableName;
SNodeList* pFuncs;
SNode* pInterval;
SNode* pOffset;
SNode* pSliding;
STableMeta* pRollupTableMeta;
} SSampleAstInfo;
static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, char** pAst, int32_t* pLen) {
SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == pSelect) {
return TSDB_CODE_OUT_OF_MEMORY;
}
sprintf(pSelect->stmtName, "%p", pSelect);
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
if (NULL == pTable) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pTable->table.dbName, pInfo->pDbName);
strcpy(pTable->table.tableName, pInfo->pTableName);
TSWAP(pTable->pMeta, pInfo->pRollupTableMeta, STableMeta*);
pSelect->pFromTable = (SNode*)pTable;
TSWAP(pSelect->pProjectionList, pInfo->pFuncs, SNodeList*);
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pSelect->pProjectionList || NULL == pFunc) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pFunc->functionName, "_wstartts");
nodesListPushFront(pSelect->pProjectionList, pFunc);
SNode* pProject = NULL;
FOREACH(pProject, pSelect->pProjectionList) { sprintf(((SExprNode*)pProject)->aliasName, "#%p", pProject); }
SIntervalWindowNode* pInterval = nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
if (NULL == pInterval) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSelect->pWindow = (SNode*)pInterval;
TSWAP(pInterval->pInterval, pInfo->pInterval, SNode*);
TSWAP(pInterval->pOffset, pInfo->pOffset, SNode*);
TSWAP(pInterval->pSliding, pInfo->pSliding, SNode*);
pInterval->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pInterval->pCol) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pSelect, false, pAst, pLen);
}
nodesDestroyNode(pSelect);
return code;
}
static void clearSampleAstInfo(SSampleAstInfo* pInfo) {
nodesDestroyList(pInfo->pFuncs);
nodesDestroyNode(pInfo->pInterval);
nodesDestroyNode(pInfo->pOffset);
nodesDestroyNode(pInfo->pSliding);
}
static SNode* makeIntervalVal(SRetention* pRetension, int8_t precision) {
SValueNode* pVal = nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == pVal) {
return NULL;
}
int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit);
char buf[20] = {0};
int32_t len = snprintf(buf, sizeof(buf), "%"PRId64"%c", timeVal, pRetension->freqUnit);
pVal->literal = strndup(buf, len);
if (NULL == pVal->literal) {
nodesDestroyNode(pVal);
return NULL;
}
pVal->isDuration = true;
pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pVal->node.resType.precision = precision;
return (SNode*)pVal;
}
static SNode* createColumnFromDef(SColumnDefNode* pDef) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
strcpy(pCol->colName, pDef->colName);
return (SNode*)pCol;
}
static SNode* createRollupFunc(SNode* pSrcFunc, SColumnDefNode* pColDef) {
SFunctionNode* pFunc = nodesCloneNode(pSrcFunc);
if (NULL == pFunc) {
return NULL;
}
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createColumnFromDef(pColDef))) {
nodesDestroyNode(pFunc);
return NULL;
}
return (SNode*)pFunc;
}
static SNodeList* createRollupFuncs(SCreateTableStmt* pStmt) {
SNodeList* pFuncs = nodesMakeList();
if (NULL == pFuncs) {
return NULL;
} }
SMCreateStbReq createReq = {0}; SNode* pFunc = NULL;
createReq.igExists = pStmt->ignoreExists; FOREACH(pFunc, pStmt->pOptions->pFuncs) {
createReq.aggregationMethod = getAggregationMethod(pStmt->pOptions->pFuncs); SNode* pCol = NULL;
createReq.xFilesFactor = GET_OPTION_VAL(pStmt->pOptions->pFilesFactor, TSDB_DEFAULT_DB_FILE_FACTOR); bool primaryKey = true;
createReq.delay = GET_OPTION_VAL(pStmt->pOptions->pDelay, TSDB_DEFAULT_DB_DELAY); FOREACH(pCol, pStmt->pCols) {
columnDefNodeToField(pStmt->pCols, &createReq.pColumns); if (primaryKey) {
columnDefNodeToField(pStmt->pTags, &createReq.pTags); primaryKey = false;
createReq.numOfColumns = LIST_LENGTH(pStmt->pCols); continue;
createReq.numOfTags = LIST_LENGTH(pStmt->pTags); }
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pFuncs, createRollupFunc(pFunc, (SColumnDefNode*)pCol))) {
nodesDestroyList(pFuncs);
return NULL;
}
}
}
return pFuncs;
}
static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precision) {
int32_t numOfField = LIST_LENGTH(pStmt->pCols) + LIST_LENGTH(pStmt->pTags);
STableMeta* pMeta = taosMemoryCalloc(1, sizeof(STableMeta) + numOfField * sizeof(SSchema));
if (NULL == pMeta) {
return NULL;
}
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->tableInfo.numOfTags = LIST_LENGTH(pStmt->pTags);
pMeta->tableInfo.precision = precision;
pMeta->tableInfo.numOfColumns = LIST_LENGTH(pStmt->pCols);
int32_t index = 0;
SNode* pCol = NULL;
FOREACH(pCol, pStmt->pCols) {
toSchema((SColumnDefNode*)pCol, index + 1, pMeta->schema + index);
++index;
}
SNode* pTag = NULL;
FOREACH(pTag, pStmt->pTags) {
toSchema((SColumnDefNode*)pTag, index + 1, pMeta->schema + index);
++index;
}
return pMeta;
}
static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt,
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, SSampleAstInfo* pInfo) {
pInfo->pDbName = pStmt->dbName;
pInfo->pTableName = pStmt->tableName;
pInfo->pFuncs = createRollupFuncs(pStmt);
pInfo->pInterval = makeIntervalVal(pRetension, precision);
pInfo->pRollupTableMeta = createRollupTableMeta(pStmt, precision);
if (NULL == pInfo->pFuncs || NULL == pInfo->pInterval || NULL == pInfo->pRollupTableMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static int32_t getRollupAst(STranslateContext* pCxt,
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, char** pAst, int32_t* pLen) {
SSampleAstInfo info = {0};
int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info);
if (TSDB_CODE_SUCCESS == code) {
code = buildSampleAst(pCxt, &info, pAst, pLen);
}
clearSampleAstInfo(&info);
return code;
}
static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
int32_t num = taosArrayGetSize(dbCfg.pRetensions);
if (TSDB_CODE_SUCCESS != code || num < 2) {
return code;
}
for (int32_t i = 1; i < num; ++i) {
SRetention *pRetension = taosArrayGet(dbCfg.pRetensions, i);
STranslateContext cxt = {0};
initTranslateContext(pCxt->pParseCxt, &cxt);
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision,
1 == i ? &pReq->pAst1 : &pReq->pAst2, 1 == i ? &pReq->ast1Len : &pReq->ast2Len);
destroyTranslateContext(&cxt);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
pReq->aggregationMethod = getAggregationMethod(pStmt->pOptions->pFuncs);
pReq->xFilesFactor = GET_OPTION_VAL(pStmt->pOptions->pFilesFactor, TSDB_DEFAULT_DB_FILE_FACTOR);
pReq->delay = GET_OPTION_VAL(pStmt->pOptions->pDelay, TSDB_DEFAULT_DB_DELAY);
columnDefNodeToField(pStmt->pCols, &pReq->pColumns);
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
if (NULL == pStmt->pOptions->pSma) { if (NULL == pStmt->pOptions->pSma) {
columnDefNodeToField(pStmt->pCols, &createReq.pSmas); columnDefNodeToField(pStmt->pCols, &pReq->pSmas);
createReq.numOfSmas = createReq.numOfColumns; pReq->numOfSmas = pReq->numOfColumns;
} else { } else {
columnNodeToField(pStmt->pOptions->pSma, &createReq.pSmas); columnNodeToField(pStmt->pOptions->pSma, &pReq->pSmas);
createReq.numOfSmas = LIST_LENGTH(pStmt->pOptions->pSma); pReq->numOfSmas = LIST_LENGTH(pStmt->pOptions->pSma);
} }
SName tableName = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; SName tableName = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
strcpy(tableName.dbname, pStmt->dbName); strcpy(tableName.dbname, pStmt->dbName);
strcpy(tableName.tname, pStmt->tableName); strcpy(tableName.tname, pStmt->tableName);
tNameExtractFullName(&tableName, createReq.name); tNameExtractFullName(&tableName, pReq->name);
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STB, (FSerializeFunc)tSerializeSMCreateStbReq, &createReq); return buildRollupAst(pCxt, pStmt, pReq);
}
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
SMCreateStbReq createReq = {0};
int32_t code = checkCreateTable(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateStbReq(pCxt, pStmt, &createReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STB, (FSerializeFunc)tSerializeSMCreateStbReq, &createReq);
}
tFreeSMCreateStbReq(&createReq); tFreeSMCreateStbReq(&createReq);
return code; return code;
} }
@ -1983,20 +2265,20 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
} }
static int32_t nodeTypeToShowType(ENodeType nt) { static int32_t nodeTypeToShowType(ENodeType nt) {
// switch (nt) { switch (nt) {
// case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:
// return TSDB_MGMT_TABLE_CONNS; return TSDB_MGMT_TABLE_CONNS;
// case QUERY_NODE_SHOW_LICENCE_STMT: case QUERY_NODE_SHOW_LICENCE_STMT:
// return TSDB_MGMT_TABLE_GRANTS; return TSDB_MGMT_TABLE_GRANTS;
// case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
// return TSDB_MGMT_TABLE_QUERIES; return TSDB_MGMT_TABLE_QUERIES;
// case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
// return 0; // todo return 0; // todo
// case QUERY_NODE_SHOW_VARIABLE_STMT: case QUERY_NODE_SHOW_VARIABLE_STMT:
// return TSDB_MGMT_TABLE_VARIABLES; return 0; // todo
// default: default:
// break; break;
// } }
return 0; return 0;
} }
@ -2027,57 +2309,28 @@ static int32_t getSmaIndexExpr(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
return nodesListToString(pStmt->pOptions->pFuncs, false, pExpr, pLen); return nodesListToString(pStmt->pOptions->pFuncs, false, pExpr, pLen);
} }
static int32_t getSmaIndexBuildAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) { static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SSampleAstInfo* pInfo) {
SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT); pInfo->pDbName = pCxt->pParseCxt->db;
if (NULL == pSelect) { pInfo->pTableName = pStmt->tableName;
pInfo->pFuncs = nodesCloneList(pStmt->pOptions->pFuncs);
pInfo->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
pInfo->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
pInfo->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
if (NULL == pInfo->pFuncs || NULL == pInfo->pInterval ||
(NULL != pStmt->pOptions->pOffset && NULL == pInfo->pOffset) ||
(NULL != pStmt->pOptions->pSliding && NULL == pInfo->pSliding)) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
sprintf(pSelect->stmtName, "%p", pSelect); return TSDB_CODE_SUCCESS;
}
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE); static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
if (NULL == pTable) { SSampleAstInfo info = {0};
nodesDestroyNode(pSelect); int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pTable->table.dbName, pCxt->pParseCxt->db);
strcpy(pTable->table.tableName, pStmt->tableName);
pSelect->pFromTable = (SNode*)pTable;
pSelect->pProjectionList = nodesCloneList(pStmt->pOptions->pFuncs);
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pSelect->pProjectionList || NULL == pFunc) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pFunc->functionName, "_wstartts");
nodesListPushFront(pSelect->pProjectionList, pFunc);
SNode* pProject = NULL;
FOREACH(pProject, pSelect->pProjectionList) { sprintf(((SExprNode*)pProject)->aliasName, "#sma_%p", pProject); }
SIntervalWindowNode* pInterval = nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
if (NULL == pInterval) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSelect->pWindow = (SNode*)pInterval;
pInterval->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
pInterval->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
pInterval->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
pInterval->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
if (NULL == pInterval->pCol || NULL == pInterval->pInterval ||
(NULL != pStmt->pOptions->pOffset && NULL == pInterval->pOffset) ||
(NULL != pStmt->pOptions->pSliding && NULL == pInterval->pSliding)) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pSelect, false, pAst, pLen); code = buildSampleAst(pCxt, &info, pAst, pLen);
} }
nodesDestroyNode(pSelect); clearSampleAstInfo(&info);
return code; return code;
} }
@ -2106,7 +2359,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
code = getSmaIndexExpr(pCxt, pStmt, &pReq->expr, &pReq->exprLen); code = getSmaIndexExpr(pCxt, pStmt, &pReq->expr, &pReq->exprLen);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexBuildAst(pCxt, pStmt, &pReq->ast, &pReq->astLen); code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen);
} }
return code; return code;
@ -2129,10 +2382,12 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
tFreeSMCreateSmaReq(&createSmaReq); tFreeSMCreateSmaReq(&createSmaReq);
return code; return code;
} }
static int32_t buildCreateFullTextReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateFullTextReq* pReq) { static int32_t buildCreateFullTextReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateFullTextReq* pReq) {
// impl later // impl later
return 0; return 0;
} }
static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) { static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SMCreateFullTextReq createFTReq = {0}; SMCreateFullTextReq createFTReq = {0};
int32_t code = buildCreateFullTextReq(pCxt, pStmt, &createFTReq); int32_t code = buildCreateFullTextReq(pCxt, pStmt, &createFTReq);
@ -2535,24 +2790,6 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
static void destroyTranslateContext(STranslateContext* pCxt) {
if (NULL != pCxt->pNsLevel) {
size_t size = taosArrayGetSize(pCxt->pNsLevel);
for (size_t i = 0; i < size; ++i) {
taosArrayDestroy(taosArrayGetP(pCxt->pNsLevel, i));
}
taosArrayDestroy(pCxt->pNsLevel);
}
if (NULL != pCxt->pCmdMsg) {
taosMemoryFreeClear(pCxt->pCmdMsg->pMsg);
taosMemoryFreeClear(pCxt->pCmdMsg);
}
taosHashCleanup(pCxt->pDbs);
taosHashCleanup(pCxt->pTables);
}
static const char* getSysDbName(ENodeType type) { static const char* getSysDbName(ENodeType type) {
switch (type) { switch (type) {
case QUERY_NODE_SHOW_DATABASES_STMT: case QUERY_NODE_SHOW_DATABASES_STMT:
@ -2613,8 +2850,9 @@ static const char* getSysTableName(ENodeType type) {
case QUERY_NODE_SHOW_LICENCE_STMT: case QUERY_NODE_SHOW_LICENCE_STMT:
return TSDB_INS_TABLE_LICENCES; return TSDB_INS_TABLE_LICENCES;
case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:
return TSDB_PERFS_TABLE_CONNECTIONS;
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
// todo return TSDB_PERFS_TABLE_QUERIES;
default: default:
break; break;
} }
@ -2739,18 +2977,6 @@ typedef struct SVgroupTablesBatch {
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch; } SVgroupTablesBatch;
static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
int8_t flags = 0;
if (pCol->sma) {
flags |= SCHEMA_SMA_ON;
}
pSchema->colId = colId;
pSchema->type = pCol->dataType.type;
pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->flags = flags;
strcpy(pSchema->name, pCol->colName);
}
static void destroyCreateTbReq(SVCreateTbReq* pReq) { static void destroyCreateTbReq(SVCreateTbReq* pReq) {
taosMemoryFreeClear(pReq->name); taosMemoryFreeClear(pReq->name);
taosMemoryFreeClear(pReq->ntbCfg.pSchema); taosMemoryFreeClear(pReq->ntbCfg.pSchema);
@ -2798,7 +3024,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
SNode* pCol; SNode* pCol;
col_id_t index = 0; col_id_t index = 0;
FOREACH(pCol, pStmt->pCols) { FOREACH(pCol, pStmt->pCols) {
toSchemaEx((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index); toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
++index; ++index;
} }
if (TSDB_CODE_SUCCESS != buildSmaParam(pStmt->pOptions, &req)) { if (TSDB_CODE_SUCCESS != buildSmaParam(pStmt->pOptions, &req)) {
@ -3224,19 +3450,11 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
} }
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) { int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
STranslateContext cxt = { STranslateContext cxt = {0};
.pParseCxt = pParseCxt, int32_t code = initTranslateContext(pParseCxt, &cxt);
.errCode = TSDB_CODE_SUCCESS, if (TSDB_CODE_SUCCESS == code) {
.msgBuf = {.buf = pParseCxt->pMsg, .len = pParseCxt->msgLen}, code = fmFuncMgtInit();
.pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES),
.currLevel = 0,
.currClause = 0,
.pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK),
.pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)};
if (NULL == cxt.pNsLevel) {
return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t code = fmFuncMgtInit();
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteQuery(&cxt, pQuery); code = rewriteQuery(&cxt, pQuery);
} }

File diff suppressed because it is too large Load Diff

View File

@ -304,7 +304,7 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
continue; continue;
} }
char *in = pInputData->pData + pInputData->varmeta.offset[i]; char *in = colDataGetData(pInputData, i);
out[i] = lenFn(in, type); out[i] = lenFn(in, type);
} }
@ -395,11 +395,8 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int16_t dataLen = 0; int16_t dataLen = 0;
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
if (pInput[i].numOfRows == 1) { int32_t rowIdx = (pInput[i].numOfRows == 1) ? 0 : k;
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; input[i] = colDataGetData(pInputData[i], rowIdx);
} else {
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[k];
}
ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
@ -473,11 +470,8 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
break; break;
} }
if (pInput[i].numOfRows == 1) { int32_t rowIdx = (pInput[i].numOfRows == 1) ? 0 : k;
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; input[i] = colDataGetData(pInputData[i], rowIdx);
} else {
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[k];
}
ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
@ -534,7 +528,7 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala
continue; continue;
} }
char *input = pInputData->pData + pInputData->varmeta.offset[i]; char *input = colDataGetData(pInput[0].columnData, i);
int32_t len = varDataLen(input); int32_t len = varDataLen(input);
if (type == TSDB_DATA_TYPE_VARCHAR) { if (type == TSDB_DATA_TYPE_VARCHAR) {
for (int32_t j = 0; j < len; ++j) { for (int32_t j = 0; j < len; ++j) {
@ -575,8 +569,8 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
colDataAppendNULL(pOutputData, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
char *input = pInputData->pData + pInputData->varmeta.offset[i];
char *input = colDataGetData(pInput[0].columnData, i);
int32_t len = varDataLen(input); int32_t len = varDataLen(input);
int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE; int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
trimFn(input, output, type, charLen); trimFn(input, output, type, charLen);
@ -606,7 +600,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t subLen = INT16_MAX; int32_t subLen = INT16_MAX;
if (inputNum == 3) { if (inputNum == 3) {
GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
if (subLen < 0) { //subLen cannot be negative if (subLen < 0 || subLen > INT16_MAX) { //subLen cannot be negative
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE; subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE;
@ -615,19 +609,16 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData; SColumnInfoData *pOutputData = pOutput->columnData;
char *input = pInputData->pData + pInputData->varmeta.offset[0];
char *output = NULL;
int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows; int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows;
char *outputBuf = taosMemoryCalloc(outputLen, 1); char *outputBuf = taosMemoryCalloc(outputLen, 1);
output = outputBuf; char *output = outputBuf;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataAppendNULL(pOutputData, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
char *input = colDataGetData(pInput[0].columnData, i);
int32_t len = varDataLen(input); int32_t len = varDataLen(input);
int32_t startPosBytes; int32_t startPosBytes;
@ -646,7 +637,6 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
varDataSetLen(output, resLen); varDataSetLen(output, resLen);
colDataAppend(pOutputData, i , output, false); colDataAppend(pOutputData, i , output, false);
input += varDataTLen(input);
output += varDataTLen(output); output += varDataTLen(output);
} }
@ -799,13 +789,13 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int32_t type = GET_PARAM_TYPE(pInput); int32_t type = GET_PARAM_TYPE(pInput);
char *input = pInput[0].columnData->pData;
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) { if (colDataIsNull_s(pInput[0].columnData, i)) {
colDataAppendNULL(pOutput->columnData, i); colDataAppendNULL(pOutput->columnData, i);
continue; continue;
} }
char *input = colDataGetData(pInput[0].columnData, i);
char fraction[20] = {0}; char fraction[20] = {0};
bool hasFraction = false; bool hasFraction = false;
NUM_TO_STRING(type, input, sizeof(fraction), fraction); NUM_TO_STRING(type, input, sizeof(fraction), fraction);
@ -822,7 +812,8 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / (1000 * 1000 * 1000); timeVal = timeVal / (1000 * 1000 * 1000);
} else { } else {
assert(0); colDataAppendNULL(pOutput->columnData, i);
continue;
} }
hasFraction = true; hasFraction = true;
memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS);
@ -852,7 +843,6 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
varDataSetLen(buf, len); varDataSetLen(buf, len);
colDataAppend(pOutput->columnData, i, buf, false); colDataAppend(pOutput->columnData, i, buf, false);
input += tDataTypes[type].bytes;
} }
pOutput->numOfRows = pInput->numOfRows; pOutput->numOfRows = pInput->numOfRows;
@ -864,12 +854,12 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
int32_t type = GET_PARAM_TYPE(pInput); int32_t type = GET_PARAM_TYPE(pInput);
int32_t timePrec = GET_PARAM_PRECISON(pInput); int32_t timePrec = GET_PARAM_PRECISON(pInput);
char *input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0];
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) { if (colDataIsNull_s(pInput[0].columnData, i)) {
colDataAppendNULL(pOutput->columnData, i); colDataAppendNULL(pOutput->columnData, i);
continue; continue;
} }
char *input = colDataGetData(pInput[0].columnData, i);
int64_t timeVal = 0; int64_t timeVal = 0;
int32_t ret = convertStringToTimestamp(type, input, timePrec, &timeVal); int32_t ret = convertStringToTimestamp(type, input, timePrec, &timeVal);
@ -879,7 +869,6 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
} }
colDataAppend(pOutput->columnData, i, (char *)&timeVal, false); colDataAppend(pOutput->columnData, i, (char *)&timeVal, false);
input += varDataTLen(input);
} }
pOutput->numOfRows = pInput->numOfRows; pOutput->numOfRows = pInput->numOfRows;
@ -897,19 +886,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
char *input = NULL;
if (IS_VAR_DATA_TYPE(type)) {
input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0];
} else {
input = pInput[0].columnData->pData;
}
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) { if (colDataIsNull_s(pInput[0].columnData, i)) {
colDataAppendNULL(pOutput->columnData, i); colDataAppendNULL(pOutput->columnData, i);
continue; continue;
} }
char *input = colDataGetData(pInput[0].columnData, i);
if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */ if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */
int32_t ret = convertStringToTimestamp(type, input, TSDB_TIME_PRECISION_NANO, &timeVal); int32_t ret = convertStringToTimestamp(type, input, TSDB_TIME_PRECISION_NANO, &timeVal);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
@ -959,7 +943,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS){ } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS){
timeVal = timeVal * factor; timeVal = timeVal * factor;
} else { } else {
assert(0); colDataAppendNULL(pOutput->columnData, i);
continue;
} }
break; break;
} }
@ -973,7 +958,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor; timeVal = timeVal * factor;
} else { } else {
assert(0); colDataAppendNULL(pOutput->columnData, i);
continue;
} }
break; break;
} }
@ -987,7 +973,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor / factor / 60 * 60 * factor; timeVal = timeVal * factor / factor / 60 * 60 * factor;
} else { } else {
assert(0); colDataAppendNULL(pOutput->columnData, i);
continue;
} }
break; break;
} }
@ -1001,7 +988,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor / factor / 3600 * 3600 * factor; timeVal = timeVal * factor / factor / 3600 * 3600 * factor;
} else { } else {
assert(0); colDataAppendNULL(pOutput->columnData, i);
continue;
} }
break; break;
} }
@ -1015,7 +1003,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor / factor / 86400* 86400 * factor; timeVal = timeVal * factor / factor / 86400* 86400 * factor;
} else { } else {
assert(0); colDataAppendNULL(pOutput->columnData, i);
continue;
} }
break; break;
} }
@ -1029,7 +1018,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor / factor / 604800 * 604800* factor; timeVal = timeVal * factor / factor / 604800 * 604800* factor;
} else { } else {
assert(0); colDataAppendNULL(pOutput->columnData, i);
continue;
} }
break; break;
} }
@ -1068,11 +1058,6 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
} }
colDataAppend(pOutput->columnData, i, (char *)&timeVal, false); colDataAppend(pOutput->columnData, i, (char *)&timeVal, false);
if (IS_VAR_DATA_TYPE(type)) {
input += varDataTLen(input);
} else {
input += tDataTypes[type].bytes;
}
} }
pOutput->numOfRows = pInput->numOfRows; pOutput->numOfRows = pInput->numOfRows;
@ -1094,12 +1079,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) { type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (IS_VAR_DATA_TYPE(type)) {
input[k] = pInput[k].columnData->pData + pInput[k].columnData->varmeta.offset[0];
} else {
input[k] = pInput[k].columnData->pData;
}
} }
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
@ -1109,6 +1088,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
continue; continue;
} }
int32_t rowIdx = (pInput[k].numOfRows == 1) ? 0 : i;
input[k] = colDataGetData(pInput[k].columnData, rowIdx);
int32_t type = GET_PARAM_TYPE(&pInput[k]); int32_t type = GET_PARAM_TYPE(&pInput[k]);
if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */ if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */
int32_t ret = convertStringToTimestamp(type, input[k], TSDB_TIME_PRECISION_NANO, &timeVal[k]); int32_t ret = convertStringToTimestamp(type, input[k], TSDB_TIME_PRECISION_NANO, &timeVal[k]);
@ -1138,14 +1120,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
timeVal[k] = timeVal[k] * 1000; timeVal[k] = timeVal[k] * 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal[k] = timeVal[k]; timeVal[k] = timeVal[k];
}
}
if (pInput[k].numOfRows != 1) {
if (IS_VAR_DATA_TYPE(type)) {
input[k] += varDataTLen(input[k]);
} else { } else {
input[k] += tDataTypes[type].bytes; colDataAppendNULL(pOutput->columnData, i);
continue;
} }
} }
} }

View File

@ -55,7 +55,7 @@
# ---- tmq # ---- tmq
./test.sh -f tsim/tmq/basic.sim ./test.sh -f tsim/tmq/basic.sim
./test.sh -f tsim/tmq/basic1.sim #./test.sh -f tsim/tmq/basic1.sim
#./test.sh -f tsim/tmq/oneTopic.sim #./test.sh -f tsim/tmq/oneTopic.sim
#./test.sh -f tsim/tmq/multiTopic.sim #./test.sh -f tsim/tmq/multiTopic.sim

103
tests/script/tsim/tmq/consume.sh Executable file
View File

@ -0,0 +1,103 @@
#!/bin/bash
##################################################
#
# Do tmq test
#
##################################################
set +e
# set default value for parameters
EXEC_OPTON=start
DB_NAME=db
POLL_DELAY=5
VALGRIND=0
SIGNAL=SIGINT
while getopts "d:s:v:y:x:" arg
do
case $arg in
d)
DB_NAME=$OPTARG
;;
s)
EXEC_OPTON=$OPTARG
;;
v)
VALGRIND=1
;;
y)
POLL_DELAY=$OPTARG
;;
x)
SIGNAL=$OPTARG
;;
?)
echo "unkown argument"
;;
esac
done
SCRIPT_DIR=`pwd`
IN_TDINTERNAL="community"
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
cd ../../..
else
cd ../../
fi
TOP_DIR=`pwd`
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3`
else
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2`
fi
declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR
declare -x SIM_DIR=$TOP_DIR/sim
PROGRAM=$BUILD_DIR/build/bin/tmq_sim
PRG_DIR=$SIM_DIR/tsim
CFG_DIR=$PRG_DIR/cfg
LOG_DIR=$PRG_DIR/log
echo "------------------------------------------------------------------------"
echo "BUILD_DIR: $BUILD_DIR"
echo "SIM_DIR : $SIM_DIR"
echo "CFG_DIR : $CFG_DIR"
echo "PROGRAM: $PROGRAM
echo "CFG_DIR: $CFG_DIR
echo "POLL_DELAY: $POLL_DELAY
echo "DB_NAME: $DB_NAME
echo "------------------------------------------------------------------------"
if [ "$EXEC_OPTON" = "start" ]; then
if [ $VALGRIND -eq 1 ]; then
echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &
nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &
else
echo "nohup $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &"
nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME > /dev/null 2>&1 &
fi
else
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]
do
if [ "$SIGNAL" = "SIGKILL" ]; then
echo try to kill by signal SIGKILL
kill -9 $PID
else
echo try to kill by signal SIGINT
kill -SIGINT $PID
fi
sleep 1
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
done
fi

View File

@ -31,44 +31,46 @@
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
typedef struct { typedef struct {
int32_t expectMsgCnt; TdThread thread;
int32_t consumeMsgCnt; int32_t consumerId;
TdThread thread;
int32_t ifCheckData;
int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int32_t checkresult;
char topicString[1024];
char keyString[1024];
int32_t numOfTopic;
char topics[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
tmq_t* tmq;
tmq_list_t* topicList;
} SThreadInfo; } SThreadInfo;
typedef struct { typedef struct {
// input from argvs // input from argvs
char dbName[32]; char dbName[32];
char topicString[256]; int32_t showMsgFlag;
char keyString[1024]; int32_t consumeDelay; // unit s
char topicString1[256]; int32_t numOfThread;
char keyString1[1024]; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
int32_t showMsgFlag;
int32_t consumeDelay; // unit s
int32_t consumeMsgCnt;
int32_t checkMode;
// save result after parse agrvs
int32_t numOfTopic;
char topics[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
int32_t numOfTopic1;
char topics1[32][64];
int32_t numOfKey1;
char key1[32][64];
char value1[32][64];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
@ -81,30 +83,54 @@ static void printHelp() {
printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
printf("%s%s\n", indent, "-d"); printf("%s%s\n", indent, "-d");
printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "The topic string for cosumer, no default ");
printf("%s%s\n", indent, "-k");
printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default ");
printf("%s%s\n", indent, "-t1");
printf("%s%s%s\n", indent, indent, "The topic1 string for cosumer, no default ");
printf("%s%s\n", indent, "-k1");
printf("%s%s%s\n", indent, indent, "The key1-value1 string for cosumer, no default ");
printf("%s%s\n", indent, "-g"); printf("%s%s\n", indent, "-g");
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
printf("%s%s\n", indent, "-y"); printf("%s%s\n", indent, "-y");
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
printf("%s%s\n", indent, "-m");
printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt);
printf("%s%s\n", indent, "-j");
printf("%s%s%s%d\n", indent, indent, "check mode, default is s", g_stConfInfo.checkMode);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
void initLogFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
exit -1;
};
g_fp = pFile;
time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(pFile, "###################################################################\n");
taosFprintfFile(pFile, "# configDir: %s\n", configDir);
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, " Topics: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
}
taosFprintfFile(pFile, "\n");
taosFprintfFile(pFile, " Key: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
}
taosFprintfFile(pFile, "\n");
}
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(pFile, "###################################################################\n");
}
void parseArgument(int32_t argc, char* argv[]) { void parseArgument(int32_t argc, char* argv[]) {
memset(&g_stConfInfo, 0, sizeof(SConfInfo)); memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0; g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.consumeDelay = 8000; g_stConfInfo.consumeDelay = 5;
g_stConfInfo.consumeMsgCnt = 0;
for (int32_t i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
@ -114,37 +140,20 @@ void parseArgument(int32_t argc, char* argv[]) {
strcpy(g_stConfInfo.dbName, argv[++i]); strcpy(g_stConfInfo.dbName, argv[++i]);
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-t") == 0) {
strcpy(g_stConfInfo.topicString, argv[++i]);
} else if (strcmp(argv[i], "-k") == 0) {
strcpy(g_stConfInfo.keyString, argv[++i]);
} else if (strcmp(argv[i], "-t1") == 0) {
strcpy(g_stConfInfo.topicString1, argv[++i]);
} else if (strcmp(argv[i], "-k1") == 0) {
strcpy(g_stConfInfo.keyString1, argv[++i]);
} else if (strcmp(argv[i], "-g") == 0) { } else if (strcmp(argv[i], "-g") == 0) {
g_stConfInfo.showMsgFlag = atol(argv[++i]); g_stConfInfo.showMsgFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) { } else if (strcmp(argv[i], "-y") == 0) {
g_stConfInfo.consumeDelay = atol(argv[++i]); g_stConfInfo.consumeDelay = atol(argv[++i]);
} else if (strcmp(argv[i], "-m") == 0) {
g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
} else if (strcmp(argv[i], "-j") == 0) {
g_stConfInfo.checkMode = atol(argv[++i]);
} else { } else {
printf("%s unknow para: %s %s", GREEN, argv[++i], NC); printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1); exit(-1);
} }
} }
if (0 == g_stConfInfo.consumeMsgCnt) { #if 1
g_stConfInfo.consumeMsgCnt = 0x7fffffff;
}
#if 0
pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC); pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
#endif #endif
} }
@ -171,74 +180,26 @@ void ltrim(char* str) {
// return str; // return str;
} }
void parseInputString() { static int running = 1;
// printf("topicString: %s\n", g_stConfInfo.topicString); static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) {
// printf("keyString: %s\n\n", g_stConfInfo.keyString); char buf[1024];
char* token; //printf("topic: %s\n", tmq_get_topic_name(msg));
const char delim[2] = ","; //printf("vg:%d\n", tmq_get_vgroup_id(msg));
const char ch = ':'; taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
token = strtok(g_stConfInfo.topicString, delim);
while (token != NULL) { while (1) {
// printf("%s\n", token ); TAOS_ROW row = taos_fetch_row(msg);
strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token); if (row == NULL) break;
ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); TAOS_FIELD* fields = taos_fetch_fields(msg);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); int32_t numOfFields = taos_field_count(msg);
g_stConfInfo.numOfTopic++; //taos_print_row(buf, row, fields, numOfFields);
//printf("%s\n", buf);
token = strtok(NULL, delim); //taosFprintfFile(g_fp, "%s\n", buf);
}
token = strtok(g_stConfInfo.topicString1, delim);
while (token != NULL) {
// printf("%s\n", token );
strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token);
ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic1++;
token = strtok(NULL, delim);
}
token = strtok(g_stConfInfo.keyString, delim);
while (token != NULL) {
// printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr);
strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey++;
}
token = strtok(NULL, delim);
}
token = strtok(g_stConfInfo.keyString1, delim);
while (token != NULL) {
// printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr);
strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey1++;
}
token = strtok(NULL, delim);
} }
} }
static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
int queryDB(TAOS* taos, char* command) { int queryDB(TAOS* taos, char* command) {
TAOS_RES* pRes = taos_query(taos, command); TAOS_RES* pRes = taos_query(taos, command);
int code = taos_errno(pRes); int code = taos_errno(pRes);
@ -252,8 +213,7 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
tmq_t* build_consumer() { void build_consumer(SThreadInfo *pInfo) {
#if 0
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
@ -267,273 +227,229 @@ tmq_t* build_consumer() {
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
// tmq_conf_set(conf, "group.id", "tg2"); // tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} }
tmq_conf_set(conf, "td.connect.user", "root"); pInfo->tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_conf_set(conf, "td.connect.pass", "taosdata"); return;
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
} }
tmq_list_t* build_topic_list() { void build_topic_list(SThreadInfo *pInfo) {
tmq_list_t* topic_list = tmq_list_new(); pInfo->topicList = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) { for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics[i]); tmq_list_append(pInfo->topicList, pInfo->topics[i]);
} }
return topic_list; return;
} }
tmq_t* build_consumer_x() { int32_t saveConsumeResult(SThreadInfo *pInfo) {
#if 0
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
sprintf(sqlStr, "use %s", g_stConfInfo.dbName); // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)",
g_stConfInfo.dbName,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->checkresult);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new(); return 0;
// tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) {
tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]);
}
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
} }
tmq_list_t* build_topic_list_x() { void loop_consume(SThreadInfo *pInfo) {
tmq_list_t* topic_list = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics1[i]);
}
return topic_list;
}
void loop_consume(tmq_t* tmq) {
tmq_resp_err_t err; tmq_resp_err_t err;
int64_t totalMsgs = 0;
//int64_t totalRows = 0;
int32_t totalMsgs = 0;
int32_t totalRows = 0;
int32_t skipLogNum = 0;
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++;
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqMsg);*/ msg_process(tmqMsg, totalMsgs, 0);
} }
tmq_message_destroy(tmqMsg); tmq_message_destroy(tmqMsg);
} else {
break;
}
}
err = tmq_consumer_close(tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
printf("{consume success: %d, %d}", totalMsgs, totalRows);
}
int32_t parallel_consume(tmq_t* tmq, int threadLable) {
tmq_resp_err_t err;
int32_t totalMsgs = 0;
int32_t totalRows = 0;
int32_t skipLogNum = 0;
while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) {
totalMsgs++; totalMsgs++;
// printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs); if (totalMsgs >= pInfo->expectMsgCnt) {
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqMsg);*/
}
tmq_message_destroy(tmqMsg);
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
break; break;
} }
} else { } else {
break; break;
} }
} }
err = tmq_consumer_close(tmq); err = tmq_consumer_close(pInfo->tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
// printf("%d", totalMsgs); // output to sim for check result pInfo->consumeMsgCnt = totalMsgs;
return totalMsgs;
} }
void* threadFunc(void* param) { void *consumeThreadFunc(void *param) {
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo *pInfo = (SThreadInfo *)param;
tmq_t* tmq = build_consumer_x(); build_consumer(pInfo);
tmq_list_t* topic_list = build_topic_list_x(); build_topic_list(pInfo);
if ((NULL == tmq) || (NULL == topic_list)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
loop_consume(pInfo);
// if (0 == g_stConfInfo.consumeMsgCnt) { err = tmq_unsubscribe(pInfo->tmq);
// loop_consume(tmq);
// } else {
pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
//}
err = tmq_unsubscribe(tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1; pInfo->consumeMsgCnt = -1;
return NULL; return NULL;
} }
// save consume result into consumeresult table
saveConsumeResult(pInfo);
return NULL; return NULL;
} }
int main(int32_t argc, char* argv[]) { void parseConsumeInfo() {
parseArgument(argc, argv); char* token;
parseInputString(); const char delim[2] = ",";
const char ch = ':';
int32_t numOfThreads = 1; for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
TdThreadAttr thattr; token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
taosThreadAttrInit(&thattr); while (token != NULL) {
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); // printf("%s\n", token );
SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo)); strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
if (g_stConfInfo.numOfTopic1) { // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
// pthread_create one thread to consume g_stConfInfo.stThreads[i].numOfTopic++;
for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].expectMsgCnt = 0; token = strtok(NULL, delim);
pInfo[i].consumeMsgCnt = 0; }
taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i));
token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
while (token != NULL) {
// printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++;
}
token = strtok(NULL, delim);
} }
} }
}
int32_t totalMsgs = 0; int32_t getConsumeInfo() {
tmq_t* tmq = build_consumer(); char sqlStr[1024] = {0};
tmq_list_t* topic_list = build_topic_list();
if ((NULL == tmq) || (NULL == topic_list)) { TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
return -1; assert(pConn != NULL);
}
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName);
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (err) { if (taos_errno(pRes) != 0) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1); exit(-1);
} }
if (0 == g_stConfInfo.numOfTopic1) { TAOS_ROW row = NULL;
loop_consume(tmq); int num_fields = taos_num_fields(pRes);
} else { TAOS_FIELD* fields = taos_fetch_fields(pRes);
totalMsgs = parallel_consume(tmq, 0);
} // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
err = tmq_unsubscribe(tmq); int32_t numOfThread = 0;
if (err) { while ((row = taos_fetch_row(pRes))) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); int32_t* lengths = taos_fetch_lengths(pRes);
exit(-1);
} for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {
if (g_stConfInfo.numOfTopic1) { continue;
for (int32_t i = 0; i < numOfThreads; i++) { }
taosThreadJoin(pInfo[i].thread, NULL);
} if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]);
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
if (0 == g_stConfInfo.checkMode) { memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) { } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
printf("success"); memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else { } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]);
} } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
} else if (1 == g_stConfInfo.checkMode) { g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]);
if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) { }
printf("success");
} else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} else if (2 == g_stConfInfo.checkMode) {
if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) {
printf("success");
} else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} else if (3 == g_stConfInfo.checkMode) {
if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) {
printf("success");
} else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} else if (4 == g_stConfInfo.checkMode) {
if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) ||
((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) ||
((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) ||
((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) {
printf("success");
} else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} else {
printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} }
numOfThread ++;
} }
g_stConfInfo.numOfThread = numOfThread;
taos_free_result(pRes);
parseConsumeInfo();
return 0; return 0;
} }
int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv);
getConsumeInfo();
initLogFile();
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
// pthread_create one thread to consume
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i])));
}
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
}
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "\n");
taosCloseFile(&g_fp);
return 0;
}