Merge remote-tracking branch 'origin/3.0' into fix/TD-20857
This commit is contained in:
commit
5b43a1aa41
|
@ -473,6 +473,7 @@ void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
|||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
char createUser[TSDB_USER_LEN];
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
#include "parser.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_TOPIC_VER_NUMBER 1
|
||||
#define MND_TOPIC_VER_NUMBER 2
|
||||
#define MND_TOPIC_RESERVE_SIZE 64
|
||||
|
||||
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
|
||||
|
@ -93,6 +93,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
int32_t dataPos = 0;
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
|
||||
|
@ -159,7 +160,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
|
||||
|
||||
if (sver != MND_TOPIC_VER_NUMBER) {
|
||||
if (sver != 1 && sver != 2) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
|
@ -174,6 +175,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t dataPos = 0;
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
|
||||
if (sver >= 2) {
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_DECODE_OVER);
|
||||
}
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
|
||||
|
@ -358,11 +362,18 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
||||
const char *userName) {
|
||||
mInfo("topic:%s to create", pCreate->name);
|
||||
SMqTopicObj topicObj = {0};
|
||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
|
||||
|
||||
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
topicObj.createTime = taosGetTimestampMs();
|
||||
topicObj.updateTime = topicObj.createTime;
|
||||
topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
|
@ -574,11 +585,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
|
||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OVER:
|
||||
|
@ -634,7 +641,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC) != 0) {
|
||||
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
}
|
||||
|
@ -698,10 +705,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
}
|
||||
#endif
|
||||
|
||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic");
|
||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||
if (pTrans == NULL) {
|
||||
|
|
|
@ -293,7 +293,7 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value,
|
|||
}
|
||||
|
||||
SDiskCfg cfg = {0};
|
||||
tstrncpy(cfg.dir, value, sizeof(cfg.dir));
|
||||
tstrncpy(cfg.dir, pItem->str, sizeof(cfg.dir));
|
||||
cfg.level = level ? atoi(level) : 0;
|
||||
cfg.primary = primary ? atoi(primary) : 1;
|
||||
void *ret = taosArrayPush(pItem->array, &cfg);
|
||||
|
@ -660,12 +660,13 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
|
||||
if (strcasecmp(name, "dataDir") == 0) {
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_VAR);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
|
||||
uInfo("load from env variables cfg success");
|
||||
|
@ -702,12 +703,13 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
|
||||
if (strcasecmp(name, "dataDir") == 0) {
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_CMD);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
|
||||
uInfo("load from env cmd cfg success");
|
||||
|
@ -766,12 +768,13 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
|
||||
if (strcasecmp(name, "dataDir") == 0) {
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
|
||||
taosCloseFile(&pFile);
|
||||
|
@ -825,12 +828,13 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
|
||||
if (strcasecmp(name, "dataDir") == 0) {
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
|
||||
taosCloseFile(&pFile);
|
||||
|
@ -989,12 +993,14 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
|
|||
paGetToken(value2 + vlen2 + 1, &value3, &vlen3);
|
||||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
|
||||
if (strcasecmp(name, "dataDir") == 0) {
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_APOLLO_URL);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
}
|
||||
tjsonDelete(pJson);
|
||||
|
|
|
@ -6,8 +6,8 @@ sql connect
|
|||
print =============== create db
|
||||
sql create database d1 vgroups 1;
|
||||
sql use d1
|
||||
sql create table stb (ts timestamp, i int) tags (j int)
|
||||
sql create topic topic_1 as select ts, i from stb
|
||||
sql create table d1_stb (ts timestamp, i int) tags (j int)
|
||||
sql create topic d1_topic_1 as select ts, i from d1_stb
|
||||
|
||||
sql create database d2 vgroups 1;
|
||||
sql create database d3 vgroups 1;
|
||||
|
@ -72,7 +72,6 @@ sql REVOKE read,write ON d1.* from user1;
|
|||
sql REVOKE read,write ON d2.* from user1;
|
||||
sql REVOKE read,write ON *.* from user1;
|
||||
|
||||
|
||||
print =============== create users
|
||||
sql create user u1 PASS 'taosdata'
|
||||
sql select * from information_schema.ins_users
|
||||
|
@ -92,7 +91,17 @@ sql_error drop database d1;
|
|||
sql_error drop database d2;
|
||||
|
||||
sql_error create stable d1.st (ts timestamp, i int) tags (j int)
|
||||
sql create stable d2.st (ts timestamp, i int) tags (j int)
|
||||
sql_error create topic topic_2 as select ts, i from stb
|
||||
sql use d2
|
||||
sql create table d2_stb (ts timestamp, i int) tags (j int)
|
||||
|
||||
# Insufficient privilege for operation
|
||||
sql_error create topic d2_topic_1 as select ts, i from d2_stb
|
||||
|
||||
sql use d1
|
||||
# Insufficient privilege for operation
|
||||
sql_error drop topic d1_topic_1
|
||||
sql create topic d1_topic_2 as select ts, i from d1_stb
|
||||
|
||||
sql drop topic d1_topic_2
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue