feat: add privileges
This commit is contained in:
parent
aab4cb98dc
commit
7c8cf28b37
|
@ -171,14 +171,19 @@ typedef enum _mgmt_table {
|
|||
|
||||
#define TSDB_ALTER_USER_PASSWD 0x1
|
||||
#define TSDB_ALTER_USER_SUPERUSER 0x2
|
||||
#define TSDB_ALTER_USER_ENABLE 0x3
|
||||
#define TSDB_ALTER_USER_SYSINFO 0x4
|
||||
#define TSDB_ALTER_USER_PRIVILEGES 0x5
|
||||
#define TSDB_ALTER_USER_ADD_WHITE_LIST 0x6
|
||||
#define TSDB_ALTER_USER_DROP_WHITE_LIST 0x7
|
||||
|
||||
#define ALTER_USER_PRIVILEGE_ADD_READ_DB(_type, _priv) (_type) == TSDB_ALTER_USER_PRIVILEGES && (_priv)
|
||||
#define TSDB_ALTER_USER_ADD_READ_DB 0x3
|
||||
#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4
|
||||
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5
|
||||
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6
|
||||
#define TSDB_ALTER_USER_ADD_ALL_DB 0x7
|
||||
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8
|
||||
#define TSDB_ALTER_USER_ENABLE 0x9
|
||||
#define TSDB_ALTER_USER_SYSINFO 0xA
|
||||
#define TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC 0xB
|
||||
#define TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC 0xC
|
||||
#define TSDB_ALTER_USER_ADD_READ_TABLE 0xD
|
||||
|
@ -187,10 +192,7 @@ typedef enum _mgmt_table {
|
|||
#define TSDB_ALTER_USER_REMOVE_WRITE_TABLE 0x10
|
||||
#define TSDB_ALTER_USER_ADD_ALL_TABLE 0x11
|
||||
#define TSDB_ALTER_USER_REMOVE_ALL_TABLE 0x12
|
||||
#define TSDB_ALTER_USER_ADD_WHITE_LIST 0x13
|
||||
#define TSDB_ALTER_USER_DROP_WHITE_LIST 0x14
|
||||
|
||||
#define TSDB_ALTER_USER_PRIVILEGES 0x2
|
||||
|
||||
#define TSDB_KILL_MSG_LEN 30
|
||||
|
||||
|
@ -959,6 +961,7 @@ typedef struct {
|
|||
int32_t tagCondLen;
|
||||
int32_t numIpRanges;
|
||||
SIpV4Range* pIpRanges;
|
||||
int64_t privileges;
|
||||
int32_t sqlLen;
|
||||
char* sql;
|
||||
} SAlterUserReq;
|
||||
|
|
|
@ -56,6 +56,7 @@ extern "C" {
|
|||
#define PRIVILEGE_TYPE_READ BIT_FLAG_MASK(1)
|
||||
#define PRIVILEGE_TYPE_WRITE BIT_FLAG_MASK(2)
|
||||
#define PRIVILEGE_TYPE_SUBSCRIBE BIT_FLAG_MASK(3)
|
||||
#define PRIVILEGE_TYPE_ALTER BIT_FLAG_MASK(4)
|
||||
|
||||
typedef struct SDatabaseOptions {
|
||||
ENodeType type;
|
||||
|
|
|
@ -1691,6 +1691,7 @@ int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq)
|
|||
if (tEncodeU32(&encoder, pReq->pIpRanges[i].ip) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pReq->pIpRanges[i].mask) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI64(&encoder, pReq->privileges) < 0) return -1;
|
||||
ENCODESQL();
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -1728,6 +1729,7 @@ int32_t tDeserializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq
|
|||
if (tDecodeU32(&decoder, &(pReq->pIpRanges[i].ip)) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &(pReq->pIpRanges[i].mask)) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI64(&decoder, &pReq->privileges) < 0) return -1;
|
||||
DECODESQL();
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ priv_type_list(A) ::= priv_type_list(B) NK_COMMA priv_type(C).
|
|||
%destructor priv_type { }
|
||||
priv_type(A) ::= READ. { A = PRIVILEGE_TYPE_READ; }
|
||||
priv_type(A) ::= WRITE. { A = PRIVILEGE_TYPE_WRITE; }
|
||||
priv_type(A) ::= ALTER. { A = PRIVILEGE_TYPE_ALTER; }
|
||||
|
||||
%type priv_level { STokenPair }
|
||||
%destructor priv_level { }
|
||||
|
|
|
@ -705,7 +705,7 @@ static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreat
|
|||
}
|
||||
|
||||
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
|
||||
int32_t code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE,
|
||||
int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_WRITE,
|
||||
pCxt->pMetaCache);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -6593,6 +6593,20 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
|
|||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||
}
|
||||
|
||||
#ifdef TD_ENTERPRISE
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)((SSelectStmt*)pStmt->pQuery)->pFromTable;
|
||||
SName name;
|
||||
int32_t code = getTableMetaImpl(
|
||||
pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name),
|
||||
&(pRealTable->pMeta), true);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code));
|
||||
}
|
||||
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||
}
|
||||
#endif
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -7641,17 +7655,9 @@ static int32_t translateGrantTagCond(STranslateContext* pCxt, SGrantStmt* pStmt,
|
|||
|
||||
static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
|
||||
SAlterUserReq req = {0};
|
||||
if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
|
||||
(BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
|
||||
BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
|
||||
req.alterType = ('\0' == pStmt->tabName[0] ? TSDB_ALTER_USER_ADD_ALL_DB : TSDB_ALTER_USER_ADD_ALL_TABLE);
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
|
||||
req.alterType = ('\0' == pStmt->tabName[0] ? TSDB_ALTER_USER_ADD_READ_DB : TSDB_ALTER_USER_ADD_READ_TABLE);
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
|
||||
req.alterType = ('\0' == pStmt->tabName[0] ? TSDB_ALTER_USER_ADD_WRITE_DB : TSDB_ALTER_USER_ADD_WRITE_TABLE);
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
|
||||
req.alterType = TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC;
|
||||
}
|
||||
req.alterType = TSDB_ALTER_USER_PRIVILEGES;
|
||||
req.privileges = pStmt->privileges;
|
||||
|
||||
strcpy(req.user, pStmt->userName);
|
||||
sprintf(req.objname, "%d.%s", pCxt->pParseCxt->acctId, pStmt->objName);
|
||||
sprintf(req.tabName, "%s", pStmt->tabName);
|
||||
|
|
|
@ -17,6 +17,7 @@ exe:
|
|||
gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS)
|
||||
gcc $(CFLAGS) ./passwdTest.c -o $(ROOT)passwdTest $(LFLAGS)
|
||||
gcc $(CFLAGS) ./whiteListTest.c -o $(ROOT)whiteListTest $(LFLAGS)
|
||||
gcc $(CFLAGS) ./tmqViewTest.c -o $(ROOT)tmqViewTest $(LFLAGS)
|
||||
|
||||
clean:
|
||||
rm $(ROOT)batchprepare
|
||||
|
@ -25,3 +26,4 @@ clean:
|
|||
rm $(ROOT)insertSameTs
|
||||
rm $(ROOT)passwdTest
|
||||
rm $(ROOT)whiteListTest
|
||||
rm $(ROOT)tmqViewTest
|
||||
|
|
|
@ -0,0 +1,341 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
const char* topic_name = "topicname";
|
||||
|
||||
static int32_t msg_process(TAOS_RES* msg) {
|
||||
char buf[1024];
|
||||
int32_t rows = 0;
|
||||
|
||||
const char* topicName = tmq_get_topic_name(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
|
||||
printf("topic: %s\n", topicName);
|
||||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
// int32_t* length = taos_fetch_lengths(msg);
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
rows++;
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("precision: %d, row content: %s\n", precision, buf);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
static int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes;
|
||||
// drop database if exists
|
||||
printf("create database\n");
|
||||
pRes = taos_query(pConn, "drop topic topicname");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create database
|
||||
pRes = taos_query(pConn, "create database tmqdb precision 'ns' WAL_RETENTION_PERIOD 3600");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create super table
|
||||
printf("create super table\n");
|
||||
pRes = taos_query(
|
||||
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create sub tables
|
||||
printf("create sub tables\n");
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// insert data
|
||||
printf("insert data into sub tables\n");
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create or replace view tmqdb.view1 as select ts, c1, t1 from tmqdb.stb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create view, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
|
||||
END:
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "use tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic topicname as select * from tmqdb.view1 where c1 > 1 and c1 <= 22");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
tmq_conf_res_t code;
|
||||
tmq_t* tmq = NULL;
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "client.id", "user defined name");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
||||
_end:
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
int32_t code = tmq_list_append(topicList, topic_name);
|
||||
if (code) {
|
||||
tmq_list_destroy(topicList);
|
||||
return NULL;
|
||||
}
|
||||
return topicList;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t* tmq) {
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 5000;
|
||||
while (running) {
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
totalRows += msg_process(tmqmsg);
|
||||
taos_free_result(tmqmsg);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
|
||||
void consume_repeatly(tmq_t* tmq) {
|
||||
int32_t numOfAssignment = 0;
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code));
|
||||
}
|
||||
|
||||
// seek to the earliest offset
|
||||
for(int32_t i = 0; i < numOfAssignment; ++i) {
|
||||
tmq_topic_assignment* p = &pAssign[i];
|
||||
|
||||
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code));
|
||||
}
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
// let's do it again
|
||||
basic_consume_loop(tmq);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int32_t code;
|
||||
|
||||
if (init_env() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (create_topic() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_t* tmq = build_consumer();
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "build_consumer() fail!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
if (NULL == topic_list) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((code = tmq_subscribe(tmq, topic_list))) {
|
||||
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
|
||||
}
|
||||
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
basic_consume_loop(tmq);
|
||||
|
||||
consume_repeatly(tmq);
|
||||
|
||||
code = tmq_consumer_close(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
|
||||
} else {
|
||||
fprintf(stderr, "Consumer closed\n");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -1,20 +1,34 @@
|
|||
sql connect
|
||||
sql use test;
|
||||
sql use testa;
|
||||
|
||||
sql create view view1 as select * from st;
|
||||
sql create view view1 as select * from sta1;
|
||||
sql drop view view1;
|
||||
|
||||
sql create or replace view view2 as select f from ct1;
|
||||
sql create or replace view view2 as select f from cta11;
|
||||
sql drop view if exists view2;
|
||||
sql drop view if exists view3;
|
||||
sql_error drop view view2;
|
||||
sql_error drop view view3;
|
||||
|
||||
sql create view view3 as select avg(f) from st2;
|
||||
sql create or replace view view3 as select f fa from st;
|
||||
sql_error create view view3 as select * from st2;
|
||||
sql create view view3 as select avg(f) from sta2;
|
||||
sql create or replace view view3 as select f fa from sta2;
|
||||
sql_error create view view3 as select * from sta2;
|
||||
sql create view view4 as select * from view3;
|
||||
sql create or replace view view4 as select fa from view3;
|
||||
sql drop view view3;
|
||||
sql_error create view view5 as select * from view3;
|
||||
sql drop view view4;
|
||||
|
||||
sql create view testa.view1 as select * from testa.sta1;
|
||||
sql create view testa.view2 as select * from testb.stb2;
|
||||
sql create view testb.view1 as select * from testb.stb1;
|
||||
sql create view testb.view2 as select * from testa.sta2;
|
||||
sql drop view view1;
|
||||
sql drop view view2;
|
||||
sql drop view testb.view1;
|
||||
sql drop view testb.view2;
|
||||
|
||||
sql_error create view view1 as show tables;
|
||||
sql_error create view view1 as desc sta1;
|
||||
sql_error create view view1 as select * from st;
|
||||
sql_error create view view1 as select count(*) from sta1 group by f interval(1s);
|
||||
|
|
|
@ -1,152 +1,79 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
sql drop database if exists test
|
||||
sql create database test;
|
||||
sql use test;
|
||||
sql use testa;
|
||||
|
||||
sql create table st(ts timestamp, f int) tags (t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 1);
|
||||
sql insert into ct2 using st tags(2) values(now, 2);
|
||||
sql insert into ct3 using st tags(3) values(now, 3);
|
||||
sql insert into ct4 using st tags(4) values(now, 4);
|
||||
sql create view view1 as select * from sta1;
|
||||
sql select * from view1 order by ts;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-10-16 09:10:11.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 100111 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create table st2(ts timestamp, f int) tags (t int);
|
||||
sql insert into ct21 using st2 tags(1) values(now, 1);
|
||||
sql insert into ct22 using st2 tags(2) values(now, 2);
|
||||
sql insert into ct23 using st2 tags(3) values(now, 3);
|
||||
sql insert into ct24 using st2 tags(4) values(now, 4);
|
||||
sql create or replace view view1 as select 1, 2;
|
||||
sql select * from view1;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create or replace view view1 as select tbname as a, f from sta1;
|
||||
sql select cast(avg(f) as int) b from view1 group by a having avg(f) > 100111 order by b;
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 100112 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 100113 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 100114 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#sql create or replace view view1 as select tbname, avg(f) from sta1 partition by tbname;
|
||||
#sql select * from view1 partition by view1.tbname;
|
||||
|
||||
sql create or replace view view1 as select * from sta1;
|
||||
sql create or replace view testb.view2 as select * from testb.stb1;
|
||||
sql_error select avg(t1.f), avg(t2.f) from view1 t1, view2 t2 where t1.ts = t2.ts and t1.f < 100114;
|
||||
sql select avg(t1.f), avg(t2.f) from view1 t1, testb.view2 t2 where t1.ts = t2.ts and t1.f < 100114;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 100112.000000000 then
|
||||
print $data00
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 110112.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql drop view testb.view2;
|
||||
sql create or replace view view2 as select * from sta2;
|
||||
sql select avg(view1.f), avg(view2.f) from view1, view2 where view1.ts = view2.ts and view1.f < 100114;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 100112.000000000 then
|
||||
print $data00
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 100222.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create view view1 as select * from st;
|
||||
sql drop view view1;
|
||||
sql create or replace view view2 as select f from ct1;
|
||||
sql drop view if exists view2;
|
||||
sql drop view if exists view3;
|
||||
sql_error drop view view2;
|
||||
sql_error drop view view3;
|
||||
sql drop view view2;
|
||||
|
||||
|
||||
sql select tbname, 1 from st group by tbname order by tbname;
|
||||
print $rows $data00 $data10 $data20
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @ct1@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @ct2@ then
|
||||
return -1
|
||||
endi
|
||||
sql select tbname, 1 from st group by tbname slimit 0, 1;
|
||||
print $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql select tbname, 1 from st group by tbname slimit 2, 2;
|
||||
print $rows $data00 $data10
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
sql select tbname, 1 from st group by tbname order by tbname slimit 0, 1;
|
||||
print $rows $data00 $data10 $data20
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create table stt1(ts timestamp, f int) tags (t int, b varchar(10));
|
||||
sql insert into ctt11 using stt1 tags(1, '1aa') values(now, 1);
|
||||
sql insert into ctt12 using stt1 tags(2, '1bb') values(now, 2);
|
||||
sql insert into ctt13 using stt1 tags(3, '1cc') values(now, 3);
|
||||
sql insert into ctt14 using stt1 tags(4, '1dd') values(now, 4);
|
||||
sql insert into ctt14 values(now, 5);
|
||||
|
||||
sql create table stt2(ts timestamp, f int) tags (t int, b varchar(10));
|
||||
sql insert into ctt21 using stt2 tags(1, '2aa') values(now, 1);
|
||||
sql insert into ctt22 using stt2 tags(2, '2bb') values(now, 2);
|
||||
sql insert into ctt23 using stt2 tags(3, '2cc') values(now, 3);
|
||||
sql insert into ctt24 using stt2 tags(4, '2dd') values(now, 4);
|
||||
|
||||
sql select tags t, b from stt1 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data10 $data11 $data20 $data21 $data30 $data31
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @1dd@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t, b from stt2 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data10 $data11 $data20 $data21 $data30 $data31
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @2dd@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t,b,f from stt1 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32 $data40 $data41 $data42
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data42 != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags tbname,t,b from stt1 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @ctt14@ then
|
||||
return -1
|
||||
endi
|
||||
if $data32 != @1dd@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t,b from stt1 where t=1
|
||||
print $rows
|
||||
print $data00 $data01
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @1@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @1aa@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t,b from stt1 where tbname='ctt11'
|
||||
print $rows
|
||||
print $data00 $data01
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @1@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @1aa@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t,b from ctt11
|
||||
print $rows
|
||||
print $data00 $data01
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @1@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @1aa@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -1,25 +1,8 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
sql drop database if exists test
|
||||
sql create database test;
|
||||
sql use test;
|
||||
|
||||
sql create table st(ts timestamp, f int) tags (t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 1);
|
||||
sql insert into ct2 using st tags(2) values(now, 2);
|
||||
sql insert into ct3 using st tags(3) values(now, 3);
|
||||
sql insert into ct4 using st tags(4) values(now, 4);
|
||||
|
||||
sql create table st2(ts timestamp, f int) tags (t int);
|
||||
sql insert into ct21 using st2 tags(1) values(now, 1);
|
||||
sql insert into ct22 using st2 tags(2) values(now, 2);
|
||||
sql insert into ct23 using st2 tags(3) values(now, 3);
|
||||
sql insert into ct24 using st2 tags(4) values(now, 4);
|
||||
|
||||
sql create view view1 as select * from st;
|
||||
sql drop view view1;
|
||||
sql show views;
|
||||
|
||||
sql create or replace view view2 as select f from ct1;
|
||||
sql drop view if exists view2;
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
sql connect
|
||||
sql use testa;
|
||||
|
||||
sql create view view1 as select * from sta1;
|
||||
sql_error CREATE STREAM s1 INTO s1t AS SELECT _wstart, count(*) FROM view1 PARTITION BY f INTERVAL(1m);
|
||||
|
||||
sql drop view view1;
|
||||
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -2,23 +2,40 @@ system sh/stop_dnodes.sh
|
|||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
sql drop database if exists test
|
||||
sql create database test;
|
||||
sql use test;
|
||||
sql drop database if exists testa
|
||||
sql create database testa vgroups 3;
|
||||
sql use testa;
|
||||
|
||||
sql create table st(ts timestamp, f int) tags (t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 1);
|
||||
sql insert into ct2 using st tags(2) values(now, 2);
|
||||
sql insert into ct3 using st tags(3) values(now, 3);
|
||||
sql insert into ct4 using st tags(4) values(now, 4);
|
||||
sql create table sta1(ts timestamp, f int) tags (t int);
|
||||
sql insert into cta11 using sta1 tags(1) values('2023-10-16 09:10:11', 100111);
|
||||
sql insert into cta12 using sta1 tags(2) values('2023-10-16 09:10:12', 100112);
|
||||
sql insert into cta13 using sta1 tags(3) values('2023-10-16 09:10:13', 100113);
|
||||
sql insert into cta14 using sta1 tags(4) values('2023-10-16 09:10:14', 100114);
|
||||
|
||||
sql create table st2(ts timestamp, f int) tags (t int);
|
||||
sql insert into ct21 using st2 tags(1) values(now, 1);
|
||||
sql insert into ct22 using st2 tags(2) values(now, 2);
|
||||
sql insert into ct23 using st2 tags(3) values(now, 3);
|
||||
sql insert into ct24 using st2 tags(4) values(now, 4);
|
||||
sql create table sta2(ts timestamp, f int) tags (t int);
|
||||
sql insert into cta21 using sta2 tags(1) values('2023-10-16 09:10:11', 100221);
|
||||
sql insert into cta22 using sta2 tags(2) values('2023-10-16 09:10:12', 100222);
|
||||
sql insert into cta23 using sta2 tags(3) values('2023-10-16 09:10:13', 100223);
|
||||
sql insert into cta24 using sta2 tags(4) values('2023-10-16 09:10:14', 100224);
|
||||
|
||||
sql drop database if exists testb
|
||||
sql create database testb vgroups 1;
|
||||
sql use testb;
|
||||
|
||||
sql create table stb1(ts timestamp, f int) tags (t int);
|
||||
sql insert into ctb11 using stb1 tags(1) values('2023-10-16 09:10:11', 110111);
|
||||
sql insert into ctb12 using stb1 tags(2) values('2023-10-16 09:10:12', 110112);
|
||||
sql insert into ctb13 using stb1 tags(3) values('2023-10-16 09:10:13', 110113);
|
||||
sql insert into ctb14 using stb1 tags(4) values('2023-10-16 09:10:14', 110114);
|
||||
|
||||
sql create table stb2(ts timestamp, f int) tags (t int);
|
||||
sql insert into ctb21 using stb2 tags(1) values('2023-10-16 09:10:11', 110221);
|
||||
sql insert into ctb22 using stb2 tags(2) values('2023-10-16 09:10:12', 110222);
|
||||
sql insert into ctb23 using stb2 tags(3) values('2023-10-16 09:10:13', 110223);
|
||||
sql insert into ctb24 using stb2 tags(4) values('2023-10-16 09:10:14', 110224);
|
||||
|
||||
run tsim/view/create_drop_view.sim
|
||||
run tsim/view/query_view.sim
|
||||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
@ -26,5 +43,6 @@ system sh/exec.sh -n dnode1 -s start
|
|||
print ================== server restart completed
|
||||
|
||||
run tsim/view/create_drop_view.sim
|
||||
run tsim/view/query_view.sim
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue