From bd2bef2428ea0f1b462261ce6fde0cae558210de Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 Jan 2024 15:05:31 +0800 Subject: [PATCH] fix:add test case --- source/client/src/clientTmq.c | 2 +- source/common/src/tgrant.c | 10 ++++- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 6 ++- source/libs/parser/src/parTranslater.c | 8 ++-- .../0-others/subscribe_stream_privilege.py | 45 +++++++++++++++++-- 6 files changed, 61 insertions(+), 12 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 79611b7eee..8b424a7bf7 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -762,7 +762,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { } taosWUnLockLatch(&tmq->lock); } - + tDeatroySMqHbRsp(&rsp); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c index 74a59fd580..eb0e677b37 100644 --- a/source/common/src/tgrant.c +++ b/source/common/src/tgrant.c @@ -18,6 +18,14 @@ #ifndef _GRANT -int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; } +int32_t grantCheck(EGrantType grant) { + if(taosGetTimestampMs() < 1706252996000) { + uError("receivee no expired"); + return 0; + } else{ + uError("receivee expired"); + return -1; + } +} #endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2c8a193121..ffa0fbda12 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -101,7 +101,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * goto FAILED; } - if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { + if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) { code = TSDB_CODE_MND_NO_RIGHTS; terrno = TSDB_CODE_MND_NO_RIGHTS; goto FAILED; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bf92a51ed8..79e39df581 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1907,9 +1907,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + mInfo("stream:%s, not exist 1, if exist is set", pauseReq.name); return 0; } else { + mInfo("stream:%s, not exist 2, if exist is set,%p,%d,%p", pauseReq.name, pReq->pCont, pReq->contLen, pReq); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } @@ -3066,6 +3067,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause->name, reqPause->name); } sdbRelease(pSdb, pStream); @@ -3099,7 +3101,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + mDebug("receivee stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execInfo.lock); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d246641576..5aaa1a815c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3923,7 +3923,7 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p } if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && !hasPartitionByTbname(pSelect->pPartitionByList)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query1"); } return TSDB_CODE_SUCCESS; } @@ -7467,7 +7467,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) || NULL == ((SSelectStmt*)pStmt->pQuery)->pFromTable || QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query2"); } #ifdef TD_ENTERPRISE @@ -7486,7 +7486,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code)); } if (TSDB_VIEW_TABLE == tableType) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query3"); } #endif @@ -7721,7 +7721,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query4"); } if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py index 5f40450af4..44778d39d8 100644 --- a/tests/system-test/0-others/subscribe_stream_privilege.py +++ b/tests/system-test/0-others/subscribe_stream_privilege.py @@ -23,7 +23,7 @@ from util.sqlset import * class TDTestCase: clientCfgDict = {'debugFlag': 135} - updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} + updatecfgDict = {'debugFlag': 143, 'clientCfg':clientCfgDict} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) @@ -71,6 +71,41 @@ class TDTestCase: for j in self.values_list: tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + def checkUserPrivileges(self, rowCnt): + tdSql.query("show user privileges") + tdSql.checkRows(rowCnt) + + def streamTest(self): + tdSql.execute("create stream s1 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname") + time.sleep(2) + tdSql.query("select * from so1") + tdSql.checkRows(4) + tdSql.execute("insert into stb_0(ts,col2) values(now, 332)") + time.sleep(2) + tdSql.query("select * from so1") + tdSql.checkRows(5) + + time.sleep(2) + tdSql.query("select * from information_schema.ins_stream_tasks") + tdSql.checkData(0, 5, 'ready') + + print(time.time()) + while 1: + t = time.time() + if t > 1706252996 : + break + else: + print("time:%d" %(t)) + time.sleep(1) + + + tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname") + tdSql.query("select * from information_schema.ins_stream_tasks") + tdSql.checkData(0, 5, 'pause') + tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)") + tdSql.query("select * from so1") + tdSql.checkRows(5) + def consumeTest(self): consumer_dict = { "group.id": "g1", @@ -90,8 +125,10 @@ class TDTestCase: if not exceptOccured: tdLog.exit(f"has no privilege, should except") + checkUserPrivileges(1) tdLog.debug("test subscribe topic privilege granted by other user") tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}') + checkUserPrivileges(2) exceptOccured = False try: @@ -118,6 +155,7 @@ class TDTestCase: tdLog.debug("test subscribe topic privilege revoked by other user") tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}') + checkUserPrivileges(1) time.sleep(5) finally: @@ -130,8 +168,9 @@ class TDTestCase: def run(self): self.prepare_data() self.create_user() - self.consumeTest() - + #self.consumeTest() + self.streamTest() + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__)