Merge pull request #18731 from taosdata/enh/TD-20611
enh: allow any user to create a topic
This commit is contained in:
commit
b30e411f8b
|
@ -473,6 +473,7 @@ void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
char createUser[TSDB_USER_LEN];
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define MND_TOPIC_VER_NUMBER 1
|
#define MND_TOPIC_VER_NUMBER 2
|
||||||
#define MND_TOPIC_RESERVE_SIZE 64
|
#define MND_TOPIC_RESERVE_SIZE 64
|
||||||
|
|
||||||
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
|
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
|
||||||
|
@ -93,6 +93,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
|
||||||
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
|
||||||
|
@ -159,7 +160,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
|
||||||
|
|
||||||
if (sver != MND_TOPIC_VER_NUMBER) {
|
if (sver != 1 && sver != 2) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto TOPIC_DECODE_OVER;
|
goto TOPIC_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
@ -174,6 +175,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
|
||||||
|
if (sver >= 2) {
|
||||||
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_DECODE_OVER);
|
||||||
|
}
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
|
||||||
|
@ -358,11 +362,18 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
||||||
|
const char *userName) {
|
||||||
mInfo("topic:%s to create", pCreate->name);
|
mInfo("topic:%s to create", pCreate->name);
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
|
||||||
|
|
||||||
|
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
topicObj.createTime = taosGetTimestampMs();
|
topicObj.createTime = taosGetTimestampMs();
|
||||||
topicObj.updateTime = topicObj.createTime;
|
topicObj.updateTime = topicObj.createTime;
|
||||||
topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
@ -574,11 +585,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC) != 0) {
|
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user);
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
|
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -634,7 +641,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC) != 0) {
|
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) {
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -698,10 +705,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic");
|
||||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
|
|
@ -7,7 +7,7 @@ print =============== create db
|
||||||
sql create database d1 vgroups 1;
|
sql create database d1 vgroups 1;
|
||||||
sql use d1
|
sql use d1
|
||||||
sql create table stb (ts timestamp, i int) tags (j int)
|
sql create table stb (ts timestamp, i int) tags (j int)
|
||||||
sql create topic topic_1 as select ts, i from stb
|
# sql create topic topic_1 as select ts, i from stb
|
||||||
|
|
||||||
sql create database d2 vgroups 1;
|
sql create database d2 vgroups 1;
|
||||||
sql create database d3 vgroups 1;
|
sql create database d3 vgroups 1;
|
||||||
|
@ -93,6 +93,6 @@ sql_error drop database d2;
|
||||||
|
|
||||||
sql_error create stable d1.st (ts timestamp, i int) tags (j int)
|
sql_error create stable d1.st (ts timestamp, i int) tags (j int)
|
||||||
sql create stable d2.st (ts timestamp, i int) tags (j int)
|
sql create stable d2.st (ts timestamp, i int) tags (j int)
|
||||||
sql_error create topic topic_2 as select ts, i from stb
|
#sql create topic topic_2 as select ts, i from stb
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue