From 2798ff824cf4e56eb9ef86377e36a19d5a70e169 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 25 Jan 2024 16:07:09 +0800 Subject: [PATCH 1/7] feat:[TD-28247]add grant for subscribe and stream --- include/common/tgrant.h | 1 + include/common/tmsg.h | 24 ++- source/client/src/clientTmq.c | 33 +++- source/common/src/tmsg.c | 83 ++++++---- source/dnode/mnode/impl/inc/mndPrivilege.h | 1 - source/dnode/mnode/impl/src/mndConsumer.c | 59 +++++++- source/dnode/mnode/impl/src/mndPrivilege.c | 3 - source/dnode/mnode/impl/src/mndStream.c | 67 +++++++-- source/dnode/mnode/impl/src/mndUser.c | 2 + .../0-others/subscribe_stream_privilege.py | 142 ++++++++++++++++++ 10 files changed, 351 insertions(+), 64 deletions(-) create mode 100644 tests/system-test/0-others/subscribe_stream_privilege.py diff --git a/include/common/tgrant.h b/include/common/tgrant.h index f06fca8014..cfb1401698 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -48,6 +48,7 @@ typedef enum { TSDB_GRANT_CPU_CORES, TSDB_GRANT_STABLE, TSDB_GRANT_TABLE, + TSDB_GRANT_SUBSCRIBE, } EGrantType; int32_t grantCheck(EGrantType grant); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2ee48a18e0..05e2738f7c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3360,6 +3360,7 @@ typedef struct { char name[TSDB_STREAM_FNAME_LEN]; int8_t igNotExists; int8_t igUntreated; + int8_t suspend; } SMResumeStreamReq; int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq); @@ -3754,7 +3755,12 @@ typedef struct { } SMqHbReq; typedef struct { - int8_t reserved; + char topic[TSDB_TOPIC_FNAME_LEN]; + int8_t noPrivilege; +} STopicPrivilege; + +typedef struct { + SArray* topicPrivileges; // SArray } SMqHbRsp; typedef struct { @@ -3773,18 +3779,6 @@ typedef struct { SVCreateTbReq cTbReq; } SVSubmitBlk; -typedef struct { - int32_t flags; - int32_t nBlocks; - union { - SArray* pArray; - SVSubmitBlk* pBlocks; - }; -} SVSubmitReq; - -int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq); -int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq); - typedef struct { SMsgHead header; uint64_t sId; @@ -3893,6 +3887,10 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeatroySMqHbReq(SMqHbReq* pReq); +int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); +int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); +int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp); + int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 69681b9ae0..79611b7eee 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -155,6 +155,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; SArray* vgs; // SArray SSchemaWrapper schema; + int8_t noPrivilege; } SMqClientTopic; typedef struct { @@ -739,6 +740,29 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { if (pMsg) { + SMqHbRsp rsp = {0}; + tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); + + int64_t refId = *(int64_t*)param; + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); + if (tmq != NULL) { + taosWLockLatch(&tmq->lock); + for(int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++){ + STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i); + if(privilege->noPrivilege == 1){ + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for (int32_t j = 0; j < topicNumCur; j++) { + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); + if(strcmp(pTopicCur->topicName, privilege->topic) == 0){ + tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); + pTopicCur->noPrivilege = 1; + } + } + } + } + taosWUnLockLatch(&tmq->lock); + } + taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } @@ -809,7 +833,9 @@ void tmqSendHbReq(void* param, void* tmrId) { sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; - sendInfo->param = NULL; + sendInfo->paramFreeFp = taosMemoryFree; + sendInfo->param = taosMemoryMalloc(sizeof(int64_t)); + *(int64_t *)sendInfo->param = refId; sendInfo->fp = tmqHbCb; sendInfo->msgType = TDMT_MND_TMQ_HB; @@ -1705,7 +1731,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); int32_t numOfVg = taosArrayGetSize(pTopic->vgs); - + if(pTopic->noPrivilege){ + tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName); + continue; + } for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c9e2908e8a..a395884538 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6139,6 +6139,55 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { return 0; } +int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) { + taosArrayDestroy(pRsp->topicPrivileges); + return 0; +} + +int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + int32_t sz = taosArrayGetSize(pRsp->topicPrivileges); + if (tEncodeI32(&encoder, sz) < 0) return -1; + for (int32_t i = 0; i < sz; ++i) { + STopicPrivilege *privilege = (STopicPrivilege *)taosArrayGet(pRsp->topicPrivileges, i); + if (tEncodeCStr(&encoder, privilege->topic) < 0) return -1; + if (tEncodeI8(&encoder, privilege->noPrivilege) < 0) return -1; + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t sz = 0; + if (tDecodeI32(&decoder, &sz) < 0) return -1; + if (sz > 0) { + pRsp->topicPrivileges = taosArrayInit(sz, sizeof(STopicPrivilege)); + if (NULL == pRsp->topicPrivileges) return -1; + for (int32_t i = 0; i < sz; ++i) { + STopicPrivilege *data = taosArrayReserve(pRsp->topicPrivileges, 1); + if (tDecodeCStrTo(&decoder, data->topic) < 0) return -1; + if (tDecodeI8(&decoder, &data->noPrivilege) < 0) return -1; + } + } + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tDeatroySMqHbReq(SMqHbReq *pReq) { for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) { TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i); @@ -6194,7 +6243,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { if (NULL == pReq->topics) return -1; for (int32_t i = 0; i < sz; ++i) { TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1); - tDecodeCStrTo(&decoder, data->topicName); + if (tDecodeCStrTo(&decoder, data->topicName) < 0) return -1; int32_t szVgs = 0; if (tDecodeI32(&decoder, &szVgs) < 0) return -1; if (szVgs > 0) { @@ -7753,36 +7802,6 @@ static int32_t tDecodeSVSubmitBlk(SDecoder *pCoder, SVSubmitBlk *pBlock, int32_t return 0; } -int32_t tEncodeSVSubmitReq(SEncoder *pCoder, const SVSubmitReq *pReq) { - int32_t nBlocks = taosArrayGetSize(pReq->pArray); - - if (tStartEncode(pCoder) < 0) return -1; - - if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1; - if (tEncodeI32v(pCoder, nBlocks) < 0) return -1; - for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) { - if (tEncodeSVSubmitBlk(pCoder, (SVSubmitBlk *)taosArrayGet(pReq->pArray, iBlock), pReq->flags) < 0) return -1; - } - - tEndEncode(pCoder); - return 0; -} - -int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { - if (tStartDecode(pCoder) < 0) return -1; - - if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1; - if (tDecodeI32v(pCoder, &pReq->nBlocks) < 0) return -1; - pReq->pBlocks = tDecoderMalloc(pCoder, sizeof(SVSubmitBlk) * pReq->nBlocks); - if (pReq->pBlocks == NULL) return -1; - for (int32_t iBlock = 0; iBlock < pReq->nBlocks; iBlock++) { - if (tDecodeSVSubmitBlk(pCoder, pReq->pBlocks + iBlock, pReq->flags) < 0) return -1; - } - - tEndDecode(pCoder); - return 0; -} - static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) { if (tStartEncode(pEncoder) < 0) return -1; @@ -8914,6 +8933,7 @@ int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStr if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1; + if (tEncodeI8(&encoder, pReq->suspend) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -8928,6 +8948,7 @@ int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamR if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igUntreated) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->suspend) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndPrivilege.h b/source/dnode/mnode/impl/inc/mndPrivilege.h index 4a8fb20715..6f74ea3b36 100644 --- a/source/dnode/mnode/impl/inc/mndPrivilege.h +++ b/source/dnode/mnode/impl/inc/mndPrivilege.h @@ -30,7 +30,6 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname); int32_t mndCheckViewPrivilege(SMnode *pMnode, const char *user, EOperType operType, const char *pViewFName); int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic); -int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName); int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname); int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4db000287c..14e3df2af9 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -102,7 +102,8 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * } if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { - code = -1; + code = TSDB_CODE_MND_NO_RIGHTS; + terrno = TSDB_CODE_MND_NO_RIGHTS; goto FAILED; } @@ -220,22 +221,52 @@ FAIL: return -1; } +static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char* user){ + rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege)); + if(rsp->topicPrivileges == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + for(int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++){ + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic); + if (pTopic == NULL) { // terrno has been set by callee function + continue; + } + STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1); + if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) { + data->noPrivilege = 1; + } else{ + data->noPrivilege = 0; + } + mndReleaseTopic(pMnode, pTopic); + } + return 0; +} + static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t code = 0; SMnode *pMnode = pMsg->info.node; SMqHbReq req = {0}; + SMqHbRsp rsp = {0}; + SMqConsumerObj *pConsumer = NULL; - if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) { + if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto end; } int64_t consumerId = req.consumerId; - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); + pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer:0x%" PRIx64 " not exist", consumerId); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - code = -1; + code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; + goto end; + } + code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user); + if(code != 0){ goto end; } @@ -280,9 +311,22 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { mndReleaseSubscribe(pMnode, pSub); } - mndReleaseConsumer(pMnode, pConsumer); + // encode rsp + int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp); + void *buf = rpcMallocCont(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + + tSerializeSMqHbRsp(&buf, tlen, &rsp); + pMsg->info.rsp = buf; + pMsg->info.rspLen = tlen; end: + tDeatroySMqHbRsp(&rsp); + mndReleaseConsumer(pMnode, pConsumer); tDeatroySMqHbReq(&req); return code; } @@ -500,6 +544,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; + if(grantCheck(TSDB_GRANT_SUBSCRIBE) < 0){ + terrno = TSDB_CODE_GRANT_EXPIRED; + return -1; + } + SCMSubscribeReq subscribe = {0}; tDeserializeSCMSubscribeReq(msgStr, &subscribe); diff --git a/source/dnode/mnode/impl/src/mndPrivilege.c b/source/dnode/mnode/impl/src/mndPrivilege.c index d4c0a6b36b..13a80cb1a6 100644 --- a/source/dnode/mnode/impl/src/mndPrivilege.c +++ b/source/dnode/mnode/impl/src/mndPrivilege.c @@ -30,9 +30,6 @@ int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType op } int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; } -int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) { - return 0; -} int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 441305f282..bf92a51ed8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -808,6 +808,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { int32_t sqlLen = 0; terrno = TSDB_CODE_SUCCESS; + if(grantCheck(TSDB_GRANT_STREAMS) < 0){ + terrno = TSDB_CODE_GRANT_STREAM_LIMITED; + return -1; + } + SCMCreateStreamReq createStreamReq = {0}; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -2034,17 +2039,22 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - SMResumeStreamReq pauseReq = {0}; - if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { + if(grantCheck(TSDB_GRANT_STREAMS) < 0){ + terrno = TSDB_CODE_GRANT_EXPIRED; + return -1; + } + + SMResumeStreamReq resumeReq = {0}; + if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } - pStream = mndAcquireStream(pMnode, pauseReq.name); + pStream = mndAcquireStream(pMnode, resumeReq.name); if (pStream == NULL) { - if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + if (resumeReq.igNotExists) { + mInfo("stream:%s, not exist, if exist is set", resumeReq.name); sdbRelease(pMnode->pSdb, pStream); return 0; } else { @@ -2072,12 +2082,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME); if (pTrans == NULL) { - mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr()); + mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } - mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name); + mInfo("trans:%d used to resume stream:%s", pTrans->id, resumeReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2089,8 +2099,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); // resume all tasks - if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { - mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); + if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { + mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; @@ -3030,6 +3040,39 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } } +int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void* pIter = NULL; + while(1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + if(pStream->status != STREAM_STATUS__PAUSE){ + SMPauseStreamReq *reqPause = rpcMallocCont(sizeof(SMPauseStreamReq)); + if (reqPause == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbRelease(pSdb, pStream); + return -1; + } + strcpy(reqPause->name, pStream->name); + reqPause->igNotExists = 1; + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_PAUSE_STREAM, + .pCont = reqPause, + .contLen = sizeof(SMPauseStreamReq), + .info = *info, + }; + + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } + + sdbRelease(pSdb, pStream); + } + return 0; +} + int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; @@ -3039,6 +3082,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int64_t streamId = 0; int32_t transId = 0; + if(grantCheck(TSDB_GRANT_STREAMS) < 0){ + if(suspendAllStreams(pMnode, &pReq->info) < 0){ + return -1; + } + } + SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 0e3b544508..5e5a3626a4 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -1925,6 +1925,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode return -1; } taosHashPut(pNewUser->topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN); + mndReleaseTopic(pMnode, pTopic); } if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) { @@ -1935,6 +1936,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode return -1; } taosHashRemove(pNewUser->topics, pAlterReq->objname, len); + mndReleaseTopic(pMnode, pTopic); } return TSDB_CODE_SUCCESS; diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py new file mode 100644 index 0000000000..881060ee3c --- /dev/null +++ b/tests/system-test/0-others/subscribe_stream_privilege.py @@ -0,0 +1,142 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +import time + +import taos +from taos.tmq import * +from util.cases import * +from util.common import * +from util.log import * +from util.sql import * +from util.sqlset import * + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + self.stbname = 'stb' + self.user_name = 'test' + self.binary_length = 20 # the length of binary for column_dict + self.nchar_length = 20 # the length of nchar for column_dict + self.dbnames = ['db1'] + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'float', + 'col2': 'int', + 'col3': 'float', + } + + self.tag_dict = { + 't1': 'int', + 't2': f'binary({self.binary_length})' + } + + self.tag_list = [ + f'1, "Beijing"', + f'2, "Shanghai"', + f'3, "Guangzhou"', + f'4, "Shenzhen"' + ] + + self.values_list = [ + f'now, 9.1, 200, 0.3' + ] + + self.tbnum = 4 + self.topic_name = 'topic1' + + + def prepare_data(self): + for db in self.dbnames: + tdSql.execute(f"create database {db}") + tdSql.execute(f"use {db}") + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + + def consumeTest(self): + consumer_dict = { + "group.id": "g1", + "td.connect.user": self.user_name, + "td.connect.pass": "test", + "auto.offset.reset": "earliest" + } + consumer = Consumer(consumer_dict) + + tdLog.debug("test subscribe topic created by other user") + exceptOccured = False + try: + consumer.subscribe([self.topic_name]) + except TmqError: + exceptOccured = True + + if not exceptOccured: + tdLog.exit(f"has no privilege, should except") + + tdLog.debug("test subscribe topic privilege granted by other user") + tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}') + + exceptOccured = False + try: + consumer.subscribe([self.topic_name]) + except TmqError: + exceptOccured = True + + if exceptOccured: + tdLog.exit(f"has privilege, should not except") + + cnt = 0 + try: + while True: + res = consumer.poll(1) + cnt += 1 + if cnt == 1: + if not res: + tdLog.exit(f"grant privilege, should get res") + elif cnt == 2: + if res: + time.sleep(1000) + tdLog.exit(f"revoke privilege, should get NULL") + + else: + break + + tdLog.debug("test subscribe topic privilege revoked by other user") + tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}') + time.sleep(5) + + finally: + consumer.close() + time.sleep(1000) + + def create_user(self): + tdSql.execute(f'create topic {self.topic_name} as database {self.dbnames[0]}') + tdSql.execute(f'create user {self.user_name} pass "test"') + + def run(self): + self.prepare_data() + self.create_user() + self.consumeTest() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From 474514ab6682e8a6b276b983cf21836c641b616e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 25 Jan 2024 17:29:55 +0800 Subject: [PATCH 2/7] fix:test case error --- source/dnode/mnode/impl/src/mndConsumer.c | 3 ++- .../0-others/subscribe_stream_privilege.py | 11 +++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 14e3df2af9..2c8a193121 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -234,6 +234,7 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR continue; } STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1); + strcpy(data->topic, topic); if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) { data->noPrivilege = 1; } else{ @@ -320,7 +321,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { goto end; } - tSerializeSMqHbRsp(&buf, tlen, &rsp); + tSerializeSMqHbRsp(buf, tlen, &rsp); pMsg->info.rsp = buf; pMsg->info.rspLen = tlen; diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py index 881060ee3c..5f40450af4 100644 --- a/tests/system-test/0-others/subscribe_stream_privilege.py +++ b/tests/system-test/0-others/subscribe_stream_privilege.py @@ -22,6 +22,8 @@ from util.sqlset import * class TDTestCase: + clientCfgDict = {'debugFlag': 135} + updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) @@ -61,7 +63,7 @@ class TDTestCase: def prepare_data(self): for db in self.dbnames: - tdSql.execute(f"create database {db}") + tdSql.execute(f"create database {db} vgroups 1") tdSql.execute(f"use {db}") tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) for i in range(self.tbnum): @@ -110,11 +112,9 @@ class TDTestCase: tdLog.exit(f"grant privilege, should get res") elif cnt == 2: if res: - time.sleep(1000) tdLog.exit(f"revoke privilege, should get NULL") - - else: - break + else: + break tdLog.debug("test subscribe topic privilege revoked by other user") tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}') @@ -122,7 +122,6 @@ class TDTestCase: finally: consumer.close() - time.sleep(1000) def create_user(self): tdSql.execute(f'create topic {self.topic_name} as database {self.dbnames[0]}') From bd2bef2428ea0f1b462261ce6fde0cae558210de Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 Jan 2024 15:05:31 +0800 Subject: [PATCH 3/7] fix:add test case --- source/client/src/clientTmq.c | 2 +- source/common/src/tgrant.c | 10 ++++- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 6 ++- source/libs/parser/src/parTranslater.c | 8 ++-- .../0-others/subscribe_stream_privilege.py | 45 +++++++++++++++++-- 6 files changed, 61 insertions(+), 12 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 79611b7eee..8b424a7bf7 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -762,7 +762,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { } taosWUnLockLatch(&tmq->lock); } - + tDeatroySMqHbRsp(&rsp); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c index 74a59fd580..eb0e677b37 100644 --- a/source/common/src/tgrant.c +++ b/source/common/src/tgrant.c @@ -18,6 +18,14 @@ #ifndef _GRANT -int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; } +int32_t grantCheck(EGrantType grant) { + if(taosGetTimestampMs() < 1706252996000) { + uError("receivee no expired"); + return 0; + } else{ + uError("receivee expired"); + return -1; + } +} #endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2c8a193121..ffa0fbda12 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -101,7 +101,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * goto FAILED; } - if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { + if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) { code = TSDB_CODE_MND_NO_RIGHTS; terrno = TSDB_CODE_MND_NO_RIGHTS; goto FAILED; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bf92a51ed8..79e39df581 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1907,9 +1907,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + mInfo("stream:%s, not exist 1, if exist is set", pauseReq.name); return 0; } else { + mInfo("stream:%s, not exist 2, if exist is set,%p,%d,%p", pauseReq.name, pReq->pCont, pReq->contLen, pReq); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } @@ -3066,6 +3067,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause->name, reqPause->name); } sdbRelease(pSdb, pStream); @@ -3099,7 +3101,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + mDebug("receivee stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execInfo.lock); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d246641576..5aaa1a815c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3923,7 +3923,7 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p } if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && !hasPartitionByTbname(pSelect->pPartitionByList)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query1"); } return TSDB_CODE_SUCCESS; } @@ -7467,7 +7467,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) || NULL == ((SSelectStmt*)pStmt->pQuery)->pFromTable || QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query2"); } #ifdef TD_ENTERPRISE @@ -7486,7 +7486,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code)); } if (TSDB_VIEW_TABLE == tableType) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query3"); } #endif @@ -7721,7 +7721,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query4"); } if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py index 5f40450af4..44778d39d8 100644 --- a/tests/system-test/0-others/subscribe_stream_privilege.py +++ b/tests/system-test/0-others/subscribe_stream_privilege.py @@ -23,7 +23,7 @@ from util.sqlset import * class TDTestCase: clientCfgDict = {'debugFlag': 135} - updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} + updatecfgDict = {'debugFlag': 143, 'clientCfg':clientCfgDict} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) @@ -71,6 +71,41 @@ class TDTestCase: for j in self.values_list: tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + def checkUserPrivileges(self, rowCnt): + tdSql.query("show user privileges") + tdSql.checkRows(rowCnt) + + def streamTest(self): + tdSql.execute("create stream s1 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname") + time.sleep(2) + tdSql.query("select * from so1") + tdSql.checkRows(4) + tdSql.execute("insert into stb_0(ts,col2) values(now, 332)") + time.sleep(2) + tdSql.query("select * from so1") + tdSql.checkRows(5) + + time.sleep(2) + tdSql.query("select * from information_schema.ins_stream_tasks") + tdSql.checkData(0, 5, 'ready') + + print(time.time()) + while 1: + t = time.time() + if t > 1706252996 : + break + else: + print("time:%d" %(t)) + time.sleep(1) + + + tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname") + tdSql.query("select * from information_schema.ins_stream_tasks") + tdSql.checkData(0, 5, 'pause') + tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)") + tdSql.query("select * from so1") + tdSql.checkRows(5) + def consumeTest(self): consumer_dict = { "group.id": "g1", @@ -90,8 +125,10 @@ class TDTestCase: if not exceptOccured: tdLog.exit(f"has no privilege, should except") + checkUserPrivileges(1) tdLog.debug("test subscribe topic privilege granted by other user") tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}') + checkUserPrivileges(2) exceptOccured = False try: @@ -118,6 +155,7 @@ class TDTestCase: tdLog.debug("test subscribe topic privilege revoked by other user") tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}') + checkUserPrivileges(1) time.sleep(5) finally: @@ -130,8 +168,9 @@ class TDTestCase: def run(self): self.prepare_data() self.create_user() - self.consumeTest() - + #self.consumeTest() + self.streamTest() + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From ce2a3e4be2a9113d4daf55bb5ba22089578b5270 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 Jan 2024 15:35:29 +0800 Subject: [PATCH 4/7] fix:test case error --- source/common/src/tgrant.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 21 +++++++++---------- .../0-others/subscribe_stream_privilege.py | 8 +++++-- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c index eb0e677b37..6251131005 100644 --- a/source/common/src/tgrant.c +++ b/source/common/src/tgrant.c @@ -19,7 +19,7 @@ #ifndef _GRANT int32_t grantCheck(EGrantType grant) { - if(taosGetTimestampMs() < 1706252996000) { + if(taosGetTimestampMs() < 1706254434000) { uError("receivee no expired"); return 0; } else{ diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 79e39df581..4b753fb5a6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -3050,24 +3050,23 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ if (pIter == NULL) break; if(pStream->status != STREAM_STATUS__PAUSE){ - SMPauseStreamReq *reqPause = rpcMallocCont(sizeof(SMPauseStreamReq)); - if (reqPause == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbRelease(pSdb, pStream); - return -1; - } - strcpy(reqPause->name, pStream->name); - reqPause->igNotExists = 1; + SMPauseStreamReq reqPause = {0}; + strcpy(reqPause.name, pStream->name); + reqPause.igNotExists = 1; + + int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause); + void * pHead = rpcMallocCont(contLen); + tSerializeSMPauseStreamReq(pHead, contLen, &reqPause); SRpcMsg rpcMsg = { .msgType = TDMT_MND_PAUSE_STREAM, - .pCont = reqPause, - .contLen = sizeof(SMPauseStreamReq), + .pCont = pHead, + .contLen = contLen, .info = *info, }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause->name, reqPause->name); + mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name); } sdbRelease(pSdb, pStream); diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py index 44778d39d8..9656d0e5e8 100644 --- a/tests/system-test/0-others/subscribe_stream_privilege.py +++ b/tests/system-test/0-others/subscribe_stream_privilege.py @@ -92,7 +92,7 @@ class TDTestCase: print(time.time()) while 1: t = time.time() - if t > 1706252996 : + if t > 1706254434 : break else: print("time:%d" %(t)) @@ -100,12 +100,16 @@ class TDTestCase: tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname") + + time.sleep(10) tdSql.query("select * from information_schema.ins_stream_tasks") - tdSql.checkData(0, 5, 'pause') + tdSql.checkData(0, 5, 'paused') tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)") tdSql.query("select * from so1") tdSql.checkRows(5) + tdSql.error("resume stream s1") + def consumeTest(self): consumer_dict = { "group.id": "g1", From 388e89096c301666fbe2018136bccf81b0ec0b67 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 Jan 2024 15:51:37 +0800 Subject: [PATCH 5/7] fix:test case error --- source/common/src/tgrant.c | 10 +--------- tests/parallel_test/cases.task | 1 + .../system-test/0-others/subscribe_stream_privilege.py | 4 ++-- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c index 6251131005..f212d71362 100644 --- a/source/common/src/tgrant.c +++ b/source/common/src/tgrant.c @@ -18,14 +18,6 @@ #ifndef _GRANT -int32_t grantCheck(EGrantType grant) { - if(taosGetTimestampMs() < 1706254434000) { - uError("receivee no expired"); - return 0; - } else{ - uError("receivee expired"); - return -1; - } -} +int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;} #endif \ No newline at end of file diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 79bec1ec76..97f4e533db 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -292,6 +292,7 @@ fi ,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_double.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py index 9656d0e5e8..d85d87dc3a 100644 --- a/tests/system-test/0-others/subscribe_stream_privilege.py +++ b/tests/system-test/0-others/subscribe_stream_privilege.py @@ -172,8 +172,8 @@ class TDTestCase: def run(self): self.prepare_data() self.create_user() - #self.consumeTest() - self.streamTest() + self.consumeTest() + # self.streamTest() def stop(self): tdSql.close() From 7c828d35064ad666f60ed3b7fa36ada705ac8e42 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 Jan 2024 16:42:26 +0800 Subject: [PATCH 6/7] feat:[TD-28247]add grant for subscribe and stream --- include/common/tmsg.h | 1 - source/common/src/tmsg.c | 2 -- source/libs/parser/src/parTranslater.c | 8 ++++---- tests/parallel_test/cases.task | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 05e2738f7c..5cf18fbb66 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3360,7 +3360,6 @@ typedef struct { char name[TSDB_STREAM_FNAME_LEN]; int8_t igNotExists; int8_t igUntreated; - int8_t suspend; } SMResumeStreamReq; int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a395884538..3f1cfbc87f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8933,7 +8933,6 @@ int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStr if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1; - if (tEncodeI8(&encoder, pReq->suspend) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -8948,7 +8947,6 @@ int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamR if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igUntreated) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->suspend) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5aaa1a815c..d246641576 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3923,7 +3923,7 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p } if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && !hasPartitionByTbname(pSelect->pPartitionByList)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query1"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } return TSDB_CODE_SUCCESS; } @@ -7467,7 +7467,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) || NULL == ((SSelectStmt*)pStmt->pQuery)->pFromTable || QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query2"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } #ifdef TD_ENTERPRISE @@ -7486,7 +7486,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code)); } if (TSDB_VIEW_TABLE == tableType) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query3"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } #endif @@ -7721,7 +7721,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query4"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 97f4e533db..0dfbdf3038 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -292,7 +292,7 @@ fi ,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py -,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py +,,n,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_double.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py From d66d5335bb791860bcec0e69dd6a55c51673937d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 30 Jan 2024 09:38:14 +0800 Subject: [PATCH 7/7] fix:python error --- tests/system-test/0-others/subscribe_stream_privilege.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py index d85d87dc3a..b477af9f57 100644 --- a/tests/system-test/0-others/subscribe_stream_privilege.py +++ b/tests/system-test/0-others/subscribe_stream_privilege.py @@ -129,10 +129,10 @@ class TDTestCase: if not exceptOccured: tdLog.exit(f"has no privilege, should except") - checkUserPrivileges(1) + self.checkUserPrivileges(1) tdLog.debug("test subscribe topic privilege granted by other user") tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}') - checkUserPrivileges(2) + self.checkUserPrivileges(2) exceptOccured = False try: @@ -159,7 +159,7 @@ class TDTestCase: tdLog.debug("test subscribe topic privilege revoked by other user") tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}') - checkUserPrivileges(1) + self.checkUserPrivileges(1) time.sleep(5) finally: