Merge pull request #24642 from taosdata/feat/TD-28247
feat:[TD-28247]add grant for subscribe and stream
This commit is contained in:
commit
4c22e03c1b
|
@ -48,6 +48,7 @@ typedef enum {
|
|||
TSDB_GRANT_CPU_CORES,
|
||||
TSDB_GRANT_STABLE,
|
||||
TSDB_GRANT_TABLE,
|
||||
TSDB_GRANT_SUBSCRIBE,
|
||||
} EGrantType;
|
||||
|
||||
int32_t grantCheck(EGrantType grant);
|
||||
|
|
|
@ -3754,7 +3754,12 @@ typedef struct {
|
|||
} SMqHbReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t noPrivilege;
|
||||
} STopicPrivilege;
|
||||
|
||||
typedef struct {
|
||||
SArray* topicPrivileges; // SArray<STopicPrivilege>
|
||||
} SMqHbRsp;
|
||||
|
||||
typedef struct {
|
||||
|
@ -3773,18 +3778,6 @@ typedef struct {
|
|||
SVCreateTbReq cTbReq;
|
||||
} SVSubmitBlk;
|
||||
|
||||
typedef struct {
|
||||
int32_t flags;
|
||||
int32_t nBlocks;
|
||||
union {
|
||||
SArray* pArray;
|
||||
SVSubmitBlk* pBlocks;
|
||||
};
|
||||
} SVSubmitReq;
|
||||
|
||||
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
|
||||
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
|
@ -3893,6 +3886,10 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
|||
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
|
||||
|
||||
int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
|
||||
int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
|
||||
int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
|
||||
|
||||
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
|
||||
|
|
|
@ -155,6 +155,7 @@ typedef struct {
|
|||
char db[TSDB_DB_FNAME_LEN];
|
||||
SArray* vgs; // SArray<SMqClientVg>
|
||||
SSchemaWrapper schema;
|
||||
int8_t noPrivilege;
|
||||
} SMqClientTopic;
|
||||
|
||||
typedef struct {
|
||||
|
@ -739,6 +740,29 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
|||
|
||||
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if (pMsg) {
|
||||
SMqHbRsp rsp = {0};
|
||||
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
|
||||
|
||||
int64_t refId = *(int64_t*)param;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq != NULL) {
|
||||
taosWLockLatch(&tmq->lock);
|
||||
for(int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++){
|
||||
STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
|
||||
if(privilege->noPrivilege == 1){
|
||||
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
|
||||
for (int32_t j = 0; j < topicNumCur; j++) {
|
||||
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
|
||||
if(strcmp(pTopicCur->topicName, privilege->topic) == 0){
|
||||
tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
|
||||
pTopicCur->noPrivilege = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
}
|
||||
tDeatroySMqHbRsp(&rsp);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
|
@ -809,7 +833,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
|
||||
sendInfo->requestId = generateRequestId();
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = NULL;
|
||||
sendInfo->paramFreeFp = taosMemoryFree;
|
||||
sendInfo->param = taosMemoryMalloc(sizeof(int64_t));
|
||||
*(int64_t *)sendInfo->param = refId;
|
||||
sendInfo->fp = tmqHbCb;
|
||||
sendInfo->msgType = TDMT_MND_TMQ_HB;
|
||||
|
||||
|
@ -1705,7 +1731,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
for (int i = 0; i < numOfTopics; i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
|
||||
|
||||
if(pTopic->noPrivilege){
|
||||
tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
|
||||
continue;
|
||||
}
|
||||
for (int j = 0; j < numOfVg; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
|
||||
|
|
|
@ -18,6 +18,6 @@
|
|||
|
||||
#ifndef _GRANT
|
||||
|
||||
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
|
||||
int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;}
|
||||
|
||||
#endif
|
|
@ -6139,6 +6139,55 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) {
|
||||
taosArrayDestroy(pRsp->topicPrivileges);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
int32_t sz = taosArrayGetSize(pRsp->topicPrivileges);
|
||||
if (tEncodeI32(&encoder, sz) < 0) return -1;
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
STopicPrivilege *privilege = (STopicPrivilege *)taosArrayGet(pRsp->topicPrivileges, i);
|
||||
if (tEncodeCStr(&encoder, privilege->topic) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, privilege->noPrivilege) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (char *)buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
int32_t sz = 0;
|
||||
if (tDecodeI32(&decoder, &sz) < 0) return -1;
|
||||
if (sz > 0) {
|
||||
pRsp->topicPrivileges = taosArrayInit(sz, sizeof(STopicPrivilege));
|
||||
if (NULL == pRsp->topicPrivileges) return -1;
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
STopicPrivilege *data = taosArrayReserve(pRsp->topicPrivileges, 1);
|
||||
if (tDecodeCStrTo(&decoder, data->topic) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &data->noPrivilege) < 0) return -1;
|
||||
}
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
|
||||
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
|
||||
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
|
||||
|
@ -6194,7 +6243,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
|||
if (NULL == pReq->topics) return -1;
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1);
|
||||
tDecodeCStrTo(&decoder, data->topicName);
|
||||
if (tDecodeCStrTo(&decoder, data->topicName) < 0) return -1;
|
||||
int32_t szVgs = 0;
|
||||
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
|
||||
if (szVgs > 0) {
|
||||
|
@ -7753,36 +7802,6 @@ static int32_t tDecodeSVSubmitBlk(SDecoder *pCoder, SVSubmitBlk *pBlock, int32_t
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSVSubmitReq(SEncoder *pCoder, const SVSubmitReq *pReq) {
|
||||
int32_t nBlocks = taosArrayGetSize(pReq->pArray);
|
||||
|
||||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
|
||||
if (tEncodeI32v(pCoder, nBlocks) < 0) return -1;
|
||||
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
|
||||
if (tEncodeSVSubmitBlk(pCoder, (SVSubmitBlk *)taosArrayGet(pReq->pArray, iBlock), pReq->flags) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
|
||||
if (tDecodeI32v(pCoder, &pReq->nBlocks) < 0) return -1;
|
||||
pReq->pBlocks = tDecoderMalloc(pCoder, sizeof(SVSubmitBlk) * pReq->nBlocks);
|
||||
if (pReq->pBlocks == NULL) return -1;
|
||||
for (int32_t iBlock = 0; iBlock < pReq->nBlocks; iBlock++) {
|
||||
if (tDecodeSVSubmitBlk(pCoder, pReq->pBlocks + iBlock, pReq->flags) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
|
|||
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
|
||||
int32_t mndCheckViewPrivilege(SMnode *pMnode, const char *user, EOperType operType, const char *pViewFName);
|
||||
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic);
|
||||
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName);
|
||||
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
|
||||
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
|
||||
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
|
||||
|
|
|
@ -101,8 +101,9 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
|
|||
goto FAILED;
|
||||
}
|
||||
|
||||
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
||||
code = -1;
|
||||
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
|
||||
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||
terrno = TSDB_CODE_MND_NO_RIGHTS;
|
||||
goto FAILED;
|
||||
}
|
||||
|
||||
|
@ -220,22 +221,53 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char* user){
|
||||
rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
|
||||
if(rsp->topicPrivileges == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++){
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) { // terrno has been set by callee function
|
||||
continue;
|
||||
}
|
||||
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
|
||||
strcpy(data->topic, topic);
|
||||
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
|
||||
data->noPrivilege = 1;
|
||||
} else{
|
||||
data->noPrivilege = 0;
|
||||
}
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqHbReq req = {0};
|
||||
SMqHbRsp rsp = {0};
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
|
||||
if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) {
|
||||
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
||||
int64_t consumerId = req.consumerId;
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " not exist", consumerId);
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
code = -1;
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
goto end;
|
||||
}
|
||||
code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user);
|
||||
if(code != 0){
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
@ -280,9 +312,22 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
// encode rsp
|
||||
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp);
|
||||
void *buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
||||
tSerializeSMqHbRsp(buf, tlen, &rsp);
|
||||
pMsg->info.rsp = buf;
|
||||
pMsg->info.rspLen = tlen;
|
||||
|
||||
end:
|
||||
tDeatroySMqHbRsp(&rsp);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
tDeatroySMqHbReq(&req);
|
||||
return code;
|
||||
}
|
||||
|
@ -500,6 +545,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->info.node;
|
||||
char *msgStr = pMsg->pCont;
|
||||
|
||||
if(grantCheck(TSDB_GRANT_SUBSCRIBE) < 0){
|
||||
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SCMSubscribeReq subscribe = {0};
|
||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||
|
||||
|
|
|
@ -30,9 +30,6 @@ int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType op
|
|||
}
|
||||
|
||||
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
|
||||
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {
|
||||
|
|
|
@ -644,6 +644,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
int32_t sqlLen = 0;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||
terrno = TSDB_CODE_GRANT_STREAM_LIMITED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SCMCreateStreamReq createStreamReq = {0};
|
||||
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
|
@ -1624,21 +1629,26 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
SMResumeStreamReq pauseReq = {0};
|
||||
if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
|
||||
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMResumeStreamReq resumeReq = {0};
|
||||
if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pStream = mndAcquireStream(pMnode, pauseReq.name);
|
||||
pStream = mndAcquireStream(pMnode, resumeReq.name);
|
||||
|
||||
if (pStream == NULL) {
|
||||
if (pauseReq.igNotExists) {
|
||||
mInfo("stream:%s not exist, not resume stream", pauseReq.name);
|
||||
if (resumeReq.igNotExists) {
|
||||
mInfo("stream:%s not exist, not resume stream", resumeReq.name);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return 0;
|
||||
} else {
|
||||
mError("stream:%s not exist, failed to resume stream", pauseReq.name);
|
||||
mError("stream:%s not exist, failed to resume stream", resumeReq.name);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
@ -1663,7 +1673,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
|
||||
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1671,8 +1681,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
|
||||
|
||||
// set the resume action
|
||||
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
||||
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
|
@ -2130,7 +2140,6 @@ static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numO
|
|||
|
||||
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
||||
SStreamTaskCheckpointReq req = {0};
|
||||
|
||||
SDecoder decoder = {0};
|
||||
|
|
|
@ -182,7 +182,39 @@ static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
|
|||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = NULL;
|
||||
void* pIter = NULL;
|
||||
while(1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if(pStream->status != STREAM_STATUS__PAUSE){
|
||||
SMPauseStreamReq reqPause = {0};
|
||||
strcpy(reqPause.name, pStream->name);
|
||||
reqPause.igNotExists = 1;
|
||||
|
||||
int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
|
||||
void * pHead = rpcMallocCont(contLen);
|
||||
tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_PAUSE_STREAM,
|
||||
.pCont = pHead,
|
||||
.contLen = contLen,
|
||||
.info = *info,
|
||||
};
|
||||
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -192,6 +224,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||
|
||||
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
||||
|
||||
|
|
|
@ -1925,6 +1925,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
|||
return -1;
|
||||
}
|
||||
taosHashPut(pNewUser->topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
|
||||
if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) {
|
||||
|
@ -1935,6 +1936,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
|||
return -1;
|
||||
}
|
||||
taosHashRemove(pNewUser->topics, pAlterReq->objname, len);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -292,6 +292,7 @@ fi
|
|||
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py
|
||||
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_double.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
import time
|
||||
|
||||
import taos
|
||||
from taos.tmq import *
|
||||
from util.cases import *
|
||||
from util.common import *
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.sqlset import *
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
clientCfgDict = {'debugFlag': 135}
|
||||
updatecfgDict = {'debugFlag': 143, 'clientCfg':clientCfgDict}
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
self.setsql = TDSetSql()
|
||||
self.stbname = 'stb'
|
||||
self.user_name = 'test'
|
||||
self.binary_length = 20 # the length of binary for column_dict
|
||||
self.nchar_length = 20 # the length of nchar for column_dict
|
||||
self.dbnames = ['db1']
|
||||
self.column_dict = {
|
||||
'ts': 'timestamp',
|
||||
'col1': 'float',
|
||||
'col2': 'int',
|
||||
'col3': 'float',
|
||||
}
|
||||
|
||||
self.tag_dict = {
|
||||
't1': 'int',
|
||||
't2': f'binary({self.binary_length})'
|
||||
}
|
||||
|
||||
self.tag_list = [
|
||||
f'1, "Beijing"',
|
||||
f'2, "Shanghai"',
|
||||
f'3, "Guangzhou"',
|
||||
f'4, "Shenzhen"'
|
||||
]
|
||||
|
||||
self.values_list = [
|
||||
f'now, 9.1, 200, 0.3'
|
||||
]
|
||||
|
||||
self.tbnum = 4
|
||||
self.topic_name = 'topic1'
|
||||
|
||||
|
||||
def prepare_data(self):
|
||||
for db in self.dbnames:
|
||||
tdSql.execute(f"create database {db} vgroups 1")
|
||||
tdSql.execute(f"use {db}")
|
||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||
for i in range(self.tbnum):
|
||||
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||
for j in self.values_list:
|
||||
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||
|
||||
def checkUserPrivileges(self, rowCnt):
|
||||
tdSql.query("show user privileges")
|
||||
tdSql.checkRows(rowCnt)
|
||||
|
||||
def streamTest(self):
|
||||
tdSql.execute("create stream s1 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
|
||||
time.sleep(2)
|
||||
tdSql.query("select * from so1")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.execute("insert into stb_0(ts,col2) values(now, 332)")
|
||||
time.sleep(2)
|
||||
tdSql.query("select * from so1")
|
||||
tdSql.checkRows(5)
|
||||
|
||||
time.sleep(2)
|
||||
tdSql.query("select * from information_schema.ins_stream_tasks")
|
||||
tdSql.checkData(0, 5, 'ready')
|
||||
|
||||
print(time.time())
|
||||
while 1:
|
||||
t = time.time()
|
||||
if t > 1706254434 :
|
||||
break
|
||||
else:
|
||||
print("time:%d" %(t))
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
|
||||
|
||||
time.sleep(10)
|
||||
tdSql.query("select * from information_schema.ins_stream_tasks")
|
||||
tdSql.checkData(0, 5, 'paused')
|
||||
tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)")
|
||||
tdSql.query("select * from so1")
|
||||
tdSql.checkRows(5)
|
||||
|
||||
tdSql.error("resume stream s1")
|
||||
|
||||
def consumeTest(self):
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": self.user_name,
|
||||
"td.connect.pass": "test",
|
||||
"auto.offset.reset": "earliest"
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
tdLog.debug("test subscribe topic created by other user")
|
||||
exceptOccured = False
|
||||
try:
|
||||
consumer.subscribe([self.topic_name])
|
||||
except TmqError:
|
||||
exceptOccured = True
|
||||
|
||||
if not exceptOccured:
|
||||
tdLog.exit(f"has no privilege, should except")
|
||||
|
||||
self.checkUserPrivileges(1)
|
||||
tdLog.debug("test subscribe topic privilege granted by other user")
|
||||
tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}')
|
||||
self.checkUserPrivileges(2)
|
||||
|
||||
exceptOccured = False
|
||||
try:
|
||||
consumer.subscribe([self.topic_name])
|
||||
except TmqError:
|
||||
exceptOccured = True
|
||||
|
||||
if exceptOccured:
|
||||
tdLog.exit(f"has privilege, should not except")
|
||||
|
||||
cnt = 0
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
cnt += 1
|
||||
if cnt == 1:
|
||||
if not res:
|
||||
tdLog.exit(f"grant privilege, should get res")
|
||||
elif cnt == 2:
|
||||
if res:
|
||||
tdLog.exit(f"revoke privilege, should get NULL")
|
||||
else:
|
||||
break
|
||||
|
||||
tdLog.debug("test subscribe topic privilege revoked by other user")
|
||||
tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}')
|
||||
self.checkUserPrivileges(1)
|
||||
time.sleep(5)
|
||||
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
def create_user(self):
|
||||
tdSql.execute(f'create topic {self.topic_name} as database {self.dbnames[0]}')
|
||||
tdSql.execute(f'create user {self.user_name} pass "test"')
|
||||
|
||||
def run(self):
|
||||
self.prepare_data()
|
||||
self.create_user()
|
||||
self.consumeTest()
|
||||
# self.streamTest()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue