fix:add test case

This commit is contained in:
wangmm0220 2024-01-26 15:05:31 +08:00
parent 474514ab66
commit bd2bef2428
6 changed files with 61 additions and 12 deletions

View File

@ -762,7 +762,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
} }
tDeatroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
} }

View File

@ -18,6 +18,14 @@
#ifndef _GRANT #ifndef _GRANT
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; } int32_t grantCheck(EGrantType grant) {
if(taosGetTimestampMs() < 1706252996000) {
uError("receivee no expired");
return 0;
} else{
uError("receivee expired");
return -1;
}
}
#endif #endif

View File

@ -101,7 +101,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
goto FAILED; goto FAILED;
} }
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
code = TSDB_CODE_MND_NO_RIGHTS; code = TSDB_CODE_MND_NO_RIGHTS;
terrno = TSDB_CODE_MND_NO_RIGHTS; terrno = TSDB_CODE_MND_NO_RIGHTS;
goto FAILED; goto FAILED;

View File

@ -1907,9 +1907,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
if (pStream == NULL) { if (pStream == NULL) {
if (pauseReq.igNotExists) { if (pauseReq.igNotExists) {
mInfo("stream:%s, not exist, if exist is set", pauseReq.name); mInfo("stream:%s, not exist 1, if exist is set", pauseReq.name);
return 0; return 0;
} else { } else {
mInfo("stream:%s, not exist 2, if exist is set,%p,%d,%p", pauseReq.name, pReq->pCont, pReq->contLen, pReq);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1; return -1;
} }
@ -3066,6 +3067,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
}; };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause->name, reqPause->name);
} }
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
@ -3099,7 +3101,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); mDebug("receivee stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);

View File

@ -3923,7 +3923,7 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p
} }
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList)) { !hasPartitionByTbname(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query1");
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -7467,7 +7467,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) || NULL == ((SSelectStmt*)pStmt->pQuery)->pFromTable || if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) || NULL == ((SSelectStmt*)pStmt->pQuery)->pFromTable ||
QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) { QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query2");
} }
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
@ -7486,7 +7486,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code)); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code));
} }
if (TSDB_VIEW_TABLE == tableType) { if (TSDB_VIEW_TABLE == tableType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query3");
} }
#endif #endif
@ -7721,7 +7721,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) { crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query4");
} }
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,

View File

@ -23,7 +23,7 @@ from util.sqlset import *
class TDTestCase: class TDTestCase:
clientCfgDict = {'debugFlag': 135} clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} updatecfgDict = {'debugFlag': 143, 'clientCfg':clientCfgDict}
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
@ -71,6 +71,41 @@ class TDTestCase:
for j in self.values_list: for j in self.values_list:
tdSql.execute(f'insert into {self.stbname}_{i} values({j})') tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
def checkUserPrivileges(self, rowCnt):
tdSql.query("show user privileges")
tdSql.checkRows(rowCnt)
def streamTest(self):
tdSql.execute("create stream s1 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
time.sleep(2)
tdSql.query("select * from so1")
tdSql.checkRows(4)
tdSql.execute("insert into stb_0(ts,col2) values(now, 332)")
time.sleep(2)
tdSql.query("select * from so1")
tdSql.checkRows(5)
time.sleep(2)
tdSql.query("select * from information_schema.ins_stream_tasks")
tdSql.checkData(0, 5, 'ready')
print(time.time())
while 1:
t = time.time()
if t > 1706252996 :
break
else:
print("time:%d" %(t))
time.sleep(1)
tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
tdSql.query("select * from information_schema.ins_stream_tasks")
tdSql.checkData(0, 5, 'pause')
tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)")
tdSql.query("select * from so1")
tdSql.checkRows(5)
def consumeTest(self): def consumeTest(self):
consumer_dict = { consumer_dict = {
"group.id": "g1", "group.id": "g1",
@ -90,8 +125,10 @@ class TDTestCase:
if not exceptOccured: if not exceptOccured:
tdLog.exit(f"has no privilege, should except") tdLog.exit(f"has no privilege, should except")
checkUserPrivileges(1)
tdLog.debug("test subscribe topic privilege granted by other user") tdLog.debug("test subscribe topic privilege granted by other user")
tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}') tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}')
checkUserPrivileges(2)
exceptOccured = False exceptOccured = False
try: try:
@ -118,6 +155,7 @@ class TDTestCase:
tdLog.debug("test subscribe topic privilege revoked by other user") tdLog.debug("test subscribe topic privilege revoked by other user")
tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}') tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}')
checkUserPrivileges(1)
time.sleep(5) time.sleep(5)
finally: finally:
@ -130,8 +168,9 @@ class TDTestCase:
def run(self): def run(self):
self.prepare_data() self.prepare_data()
self.create_user() self.create_user()
self.consumeTest() #self.consumeTest()
self.streamTest()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)