support topic operations

This commit is contained in:
dapan1121 2021-03-08 17:11:14 +08:00
parent 3dfb225d20
commit 7baa3f6d00
8 changed files with 2059 additions and 1948 deletions

View File

@ -5614,6 +5614,8 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDbInfo* pCreateDb) {
pMsg->ignoreExist = pCreateDb->ignoreExists; pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update; pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast; pMsg->cacheLastRow = pCreateDb->cachelast;
pMsg->dbType = pCreateDb->dbType;
pMsg->partitions = htons(pCreateDb->partitions);
} }
int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql) { int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql) {
@ -6244,6 +6246,15 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
val = htons(pCreate->partitions);
if (pCreate->dbType == TSDB_DB_TYPE_TOPIC &&
(val < TSDB_MIN_DB_PARTITON_OPTION || val > TSDB_MAX_DB_PARTITON_OPTION)) {
snprintf(msg, tListLen(msg), "invalid topic option partition: %d valid range: [%d, %d]", val,
TSDB_MIN_DB_PARTITON_OPTION, TSDB_MAX_DB_PARTITON_OPTION);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -1055,7 +1055,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateDbMsg); pCmd->payloadLen = sizeof(SCreateDbMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
pCmd->msgType = (pInfo->pMiscInfo->dbOpt.dbType == TSDB_DB_TYPE_DEFAULT) ? TSDB_MSG_TYPE_CM_CREATE_DB : TSDB_MSG_TYPE_CM_CREATE_TP;
SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload; SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload;
@ -1187,7 +1188,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB; pCmd->msgType = (pInfo->pMiscInfo->dbType == TSDB_DB_TYPE_DEFAULT) ? TSDB_MSG_TYPE_CM_DROP_DB : TSDB_MSG_TYPE_CM_DROP_TP;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1514,7 +1515,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SAlterDbMsg); pCmd->payloadLen = sizeof(SAlterDbMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB; pCmd->msgType = (pInfo->pMiscInfo->dbOpt.dbType == TSDB_DB_TYPE_DEFAULT) ? TSDB_MSG_TYPE_CM_ALTER_DB : TSDB_MSG_TYPE_CM_ALTER_TP;
SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg* )pCmd->payload; SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg* )pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);

View File

@ -62,170 +62,174 @@
#define TK_BITNOT 43 #define TK_BITNOT 43
#define TK_SHOW 44 #define TK_SHOW 44
#define TK_DATABASES 45 #define TK_DATABASES 45
#define TK_MNODES 46 #define TK_TOPICS 46
#define TK_DNODES 47 #define TK_MNODES 47
#define TK_ACCOUNTS 48 #define TK_DNODES 48
#define TK_USERS 49 #define TK_ACCOUNTS 49
#define TK_MODULES 50 #define TK_USERS 50
#define TK_QUERIES 51 #define TK_MODULES 51
#define TK_CONNECTIONS 52 #define TK_QUERIES 52
#define TK_STREAMS 53 #define TK_CONNECTIONS 53
#define TK_VARIABLES 54 #define TK_STREAMS 54
#define TK_SCORES 55 #define TK_VARIABLES 55
#define TK_GRANTS 56 #define TK_SCORES 56
#define TK_VNODES 57 #define TK_GRANTS 57
#define TK_IPTOKEN 58 #define TK_VNODES 58
#define TK_DOT 59 #define TK_IPTOKEN 59
#define TK_CREATE 60 #define TK_DOT 60
#define TK_TABLE 61 #define TK_CREATE 61
#define TK_DATABASE 62 #define TK_TABLE 62
#define TK_TABLES 63 #define TK_DATABASE 63
#define TK_STABLES 64 #define TK_TABLES 64
#define TK_VGROUPS 65 #define TK_STABLES 65
#define TK_DROP 66 #define TK_VGROUPS 66
#define TK_STABLE 67 #define TK_DROP 67
#define TK_DNODE 68 #define TK_STABLE 68
#define TK_USER 69 #define TK_TOPIC 69
#define TK_ACCOUNT 70 #define TK_DNODE 70
#define TK_USE 71 #define TK_USER 71
#define TK_DESCRIBE 72 #define TK_ACCOUNT 72
#define TK_ALTER 73 #define TK_USE 73
#define TK_PASS 74 #define TK_DESCRIBE 74
#define TK_PRIVILEGE 75 #define TK_ALTER 75
#define TK_LOCAL 76 #define TK_PASS 76
#define TK_IF 77 #define TK_PRIVILEGE 77
#define TK_EXISTS 78 #define TK_LOCAL 78
#define TK_PPS 79 #define TK_IF 79
#define TK_TSERIES 80 #define TK_EXISTS 80
#define TK_DBS 81 #define TK_PPS 81
#define TK_STORAGE 82 #define TK_TSERIES 82
#define TK_QTIME 83 #define TK_DBS 83
#define TK_CONNS 84 #define TK_STORAGE 84
#define TK_STATE 85 #define TK_QTIME 85
#define TK_KEEP 86 #define TK_CONNS 86
#define TK_CACHE 87 #define TK_STATE 87
#define TK_REPLICA 88 #define TK_KEEP 88
#define TK_QUORUM 89 #define TK_CACHE 89
#define TK_DAYS 90 #define TK_REPLICA 90
#define TK_MINROWS 91 #define TK_QUORUM 91
#define TK_MAXROWS 92 #define TK_DAYS 92
#define TK_BLOCKS 93 #define TK_MINROWS 93
#define TK_CTIME 94 #define TK_MAXROWS 94
#define TK_WAL 95 #define TK_BLOCKS 95
#define TK_FSYNC 96 #define TK_CTIME 96
#define TK_COMP 97 #define TK_WAL 97
#define TK_PRECISION 98 #define TK_FSYNC 98
#define TK_UPDATE 99 #define TK_COMP 99
#define TK_CACHELAST 100 #define TK_PRECISION 100
#define TK_LP 101 #define TK_UPDATE 101
#define TK_RP 102 #define TK_CACHELAST 102
#define TK_UNSIGNED 103 #define TK_PARTITIONS 103
#define TK_TAGS 104 #define TK_LP 104
#define TK_USING 105 #define TK_RP 105
#define TK_COMMA 106 #define TK_UNSIGNED 106
#define TK_AS 107 #define TK_TAGS 107
#define TK_NULL 108 #define TK_USING 108
#define TK_SELECT 109 #define TK_COMMA 109
#define TK_UNION 110 #define TK_AS 110
#define TK_ALL 111 #define TK_NULL 111
#define TK_DISTINCT 112 #define TK_SELECT 112
#define TK_FROM 113 #define TK_UNION 113
#define TK_VARIABLE 114 #define TK_ALL 114
#define TK_INTERVAL 115 #define TK_DISTINCT 115
#define TK_FILL 116 #define TK_FROM 116
#define TK_SLIDING 117 #define TK_VARIABLE 117
#define TK_ORDER 118 #define TK_INTERVAL 118
#define TK_BY 119 #define TK_FILL 119
#define TK_ASC 120 #define TK_SLIDING 120
#define TK_DESC 121 #define TK_ORDER 121
#define TK_GROUP 122 #define TK_BY 122
#define TK_HAVING 123 #define TK_ASC 123
#define TK_LIMIT 124 #define TK_DESC 124
#define TK_OFFSET 125 #define TK_GROUP 125
#define TK_SLIMIT 126 #define TK_HAVING 126
#define TK_SOFFSET 127 #define TK_LIMIT 127
#define TK_WHERE 128 #define TK_OFFSET 128
#define TK_NOW 129 #define TK_SLIMIT 129
#define TK_RESET 130 #define TK_SOFFSET 130
#define TK_QUERY 131 #define TK_WHERE 131
#define TK_ADD 132 #define TK_NOW 132
#define TK_COLUMN 133 #define TK_RESET 133
#define TK_TAG 134 #define TK_QUERY 134
#define TK_CHANGE 135 #define TK_ADD 135
#define TK_SET 136 #define TK_COLUMN 136
#define TK_KILL 137 #define TK_TAG 137
#define TK_CONNECTION 138 #define TK_CHANGE 138
#define TK_STREAM 139 #define TK_SET 139
#define TK_COLON 140 #define TK_KILL 140
#define TK_ABORT 141 #define TK_CONNECTION 141
#define TK_AFTER 142 #define TK_STREAM 142
#define TK_ATTACH 143 #define TK_COLON 143
#define TK_BEFORE 144 #define TK_ABORT 144
#define TK_BEGIN 145 #define TK_AFTER 145
#define TK_CASCADE 146 #define TK_ATTACH 146
#define TK_CLUSTER 147 #define TK_BEFORE 147
#define TK_CONFLICT 148 #define TK_BEGIN 148
#define TK_COPY 149 #define TK_CASCADE 149
#define TK_DEFERRED 150 #define TK_CLUSTER 150
#define TK_DELIMITERS 151 #define TK_CONFLICT 151
#define TK_DETACH 152 #define TK_COPY 152
#define TK_EACH 153 #define TK_DEFERRED 153
#define TK_END 154 #define TK_DELIMITERS 154
#define TK_EXPLAIN 155 #define TK_DETACH 155
#define TK_FAIL 156 #define TK_EACH 156
#define TK_FOR 157 #define TK_END 157
#define TK_IGNORE 158 #define TK_EXPLAIN 158
#define TK_IMMEDIATE 159 #define TK_FAIL 159
#define TK_INITIALLY 160 #define TK_FOR 160
#define TK_INSTEAD 161 #define TK_IGNORE 161
#define TK_MATCH 162 #define TK_IMMEDIATE 162
#define TK_KEY 163 #define TK_INITIALLY 163
#define TK_OF 164 #define TK_INSTEAD 164
#define TK_RAISE 165 #define TK_MATCH 165
#define TK_REPLACE 166 #define TK_KEY 166
#define TK_RESTRICT 167 #define TK_OF 167
#define TK_ROW 168 #define TK_RAISE 168
#define TK_STATEMENT 169 #define TK_REPLACE 169
#define TK_TRIGGER 170 #define TK_RESTRICT 170
#define TK_VIEW 171 #define TK_ROW 171
#define TK_COUNT 172 #define TK_STATEMENT 172
#define TK_SUM 173 #define TK_TRIGGER 173
#define TK_AVG 174 #define TK_VIEW 174
#define TK_MIN 175 #define TK_COUNT 175
#define TK_MAX 176 #define TK_SUM 176
#define TK_FIRST 177 #define TK_AVG 177
#define TK_LAST 178 #define TK_MIN 178
#define TK_TOP 179 #define TK_MAX 179
#define TK_BOTTOM 180 #define TK_FIRST 180
#define TK_STDDEV 181 #define TK_LAST 181
#define TK_PERCENTILE 182 #define TK_TOP 182
#define TK_APERCENTILE 183 #define TK_BOTTOM 183
#define TK_LEASTSQUARES 184 #define TK_STDDEV 184
#define TK_HISTOGRAM 185 #define TK_PERCENTILE 185
#define TK_DIFF 186 #define TK_APERCENTILE 186
#define TK_SPREAD 187 #define TK_LEASTSQUARES 187
#define TK_TWA 188 #define TK_HISTOGRAM 188
#define TK_INTERP 189 #define TK_DIFF 189
#define TK_LAST_ROW 190 #define TK_SPREAD 190
#define TK_RATE 191 #define TK_TWA 191
#define TK_IRATE 192 #define TK_INTERP 192
#define TK_SUM_RATE 193 #define TK_LAST_ROW 193
#define TK_SUM_IRATE 194 #define TK_RATE 194
#define TK_AVG_RATE 195 #define TK_IRATE 195
#define TK_AVG_IRATE 196 #define TK_SUM_RATE 196
#define TK_TBID 197 #define TK_SUM_IRATE 197
#define TK_SEMI 198 #define TK_AVG_RATE 198
#define TK_NONE 199 #define TK_AVG_IRATE 199
#define TK_PREV 200 #define TK_TBID 200
#define TK_LINEAR 201 #define TK_SEMI 201
#define TK_IMPORT 202 #define TK_NONE 202
#define TK_METRIC 203 #define TK_PREV 203
#define TK_TBNAME 204 #define TK_LINEAR 204
#define TK_JOIN 205 #define TK_IMPORT 205
#define TK_METRICS 206 #define TK_METRIC 206
#define TK_INSERT 207 #define TK_TBNAME 207
#define TK_INTO 208 #define TK_JOIN 208
#define TK_VALUES 209 #define TK_METRICS 209
#define TK_INSERT 210
#define TK_INTO 211
#define TK_VALUES 212

View File

@ -125,6 +125,8 @@ typedef struct SCreateDbInfo {
int8_t update; int8_t update;
int8_t cachelast; int8_t cachelast;
SArray *keep; SArray *keep;
int8_t dbType;
int16_t partitions;
} SCreateDbInfo; } SCreateDbInfo;
typedef struct SCreateAcctInfo { typedef struct SCreateAcctInfo {
@ -155,6 +157,7 @@ typedef struct SUserInfo {
typedef struct SMiscInfo { typedef struct SMiscInfo {
SArray *a; // SArray<SStrToken> SArray *a; // SArray<SStrToken>
bool existsCheck; bool existsCheck;
int16_t dbType;
int16_t tableType; int16_t tableType;
SUserInfo user; SUserInfo user;
union { union {
@ -265,7 +268,7 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
void SqlInfoDestroy(SSqlInfo *pInfo); void SqlInfoDestroy(SSqlInfo *pInfo);
void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...); void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...);
void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck,int16_t tableType); void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck,int16_t dbType,int16_t tableType);
void setShowOptions(SSqlInfo *pInfo, int32_t type, SStrToken* prefix, SStrToken* pPatterns); void setShowOptions(SSqlInfo *pInfo, int32_t type, SStrToken* prefix, SStrToken* pPatterns);
void setCreateDbInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pToken, SCreateDbInfo *pDB, SStrToken *pIgExists); void setCreateDbInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pToken, SCreateDbInfo *pDB, SStrToken *pIgExists);
@ -276,6 +279,7 @@ void setKillSql(SSqlInfo *pInfo, int32_t type, SStrToken *ip);
void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege); void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege);
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo); void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo);
void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo);
// prefix show db.tables; // prefix show db.tables;
void setDbName(SStrToken *pCpxName, SStrToken *pDb); void setDbName(SStrToken *pCpxName, SStrToken *pDb);

View File

@ -821,3 +821,4 @@ cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); s
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE TBID NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE TBID NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
METRIC TBNAME JOIN METRICS STABLE NULL INSERT INTO VALUES. METRIC TBNAME JOIN METRICS STABLE NULL INSERT INTO VALUES.

View File

@ -805,7 +805,7 @@ void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) {
va_end(va); va_end(va);
} }
void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck, int16_t tableType) { void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck, int16_t dbType, int16_t tableType) {
pInfo->type = type; pInfo->type = type;
if (pInfo->pMiscInfo == NULL) { if (pInfo->pMiscInfo == NULL) {
@ -816,6 +816,7 @@ void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrTo
taosArrayPush(pInfo->pMiscInfo->a, pToken); taosArrayPush(pInfo->pMiscInfo->a, pToken);
pInfo->pMiscInfo->existsCheck = (existsCheck->n == 1); pInfo->pMiscInfo->existsCheck = (existsCheck->n == 1);
pInfo->pMiscInfo->dbType = dbType;
pInfo->pMiscInfo->tableType = tableType; pInfo->pMiscInfo->tableType = tableType;
} }
@ -936,5 +937,19 @@ void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo) {
pDBInfo->update = -1; pDBInfo->update = -1;
pDBInfo->cachelast = 0; pDBInfo->cachelast = 0;
pDBInfo->dbType = TSDB_DB_TYPE_DEFAULT;
pDBInfo->partitions = TSDB_DEFAULT_DB_PARTITON_OPTION;
memset(&pDBInfo->precision, 0, sizeof(SStrToken)); memset(&pDBInfo->precision, 0, sizeof(SStrToken));
} }
void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo) {
pDBInfo->dbType = TSDB_DB_TYPE_TOPIC;
pDBInfo->partitions = TSDB_DEFAULT_DB_PARTITON_OPTION;
setDefaultCreateDbOption(pDBInfo);
}

View File

@ -241,6 +241,9 @@ static SKeyword keywordTable[] = {
{"AVG_IRATE", TK_AVG_IRATE}, {"AVG_IRATE", TK_AVG_IRATE},
{"CACHELAST", TK_CACHELAST}, {"CACHELAST", TK_CACHELAST},
{"DISTINCT", TK_DISTINCT}, {"DISTINCT", TK_DISTINCT},
{"PARTITIONS", TK_PARTITIONS},
{"TOPIC", TK_TOPIC},
{"TOPICS", TK_TOPICS}
}; };
static const char isIdChar[] = { static const char isIdChar[] = {

File diff suppressed because it is too large Load Diff