fix:test case error
This commit is contained in:
parent
bd2bef2428
commit
ce2a3e4be2
|
@ -19,7 +19,7 @@
|
||||||
#ifndef _GRANT
|
#ifndef _GRANT
|
||||||
|
|
||||||
int32_t grantCheck(EGrantType grant) {
|
int32_t grantCheck(EGrantType grant) {
|
||||||
if(taosGetTimestampMs() < 1706252996000) {
|
if(taosGetTimestampMs() < 1706254434000) {
|
||||||
uError("receivee no expired");
|
uError("receivee no expired");
|
||||||
return 0;
|
return 0;
|
||||||
} else{
|
} else{
|
||||||
|
|
|
@ -3050,24 +3050,23 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if(pStream->status != STREAM_STATUS__PAUSE){
|
if(pStream->status != STREAM_STATUS__PAUSE){
|
||||||
SMPauseStreamReq *reqPause = rpcMallocCont(sizeof(SMPauseStreamReq));
|
SMPauseStreamReq reqPause = {0};
|
||||||
if (reqPause == NULL) {
|
strcpy(reqPause.name, pStream->name);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
reqPause.igNotExists = 1;
|
||||||
sdbRelease(pSdb, pStream);
|
|
||||||
return -1;
|
int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
|
||||||
}
|
void * pHead = rpcMallocCont(contLen);
|
||||||
strcpy(reqPause->name, pStream->name);
|
tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
|
||||||
reqPause->igNotExists = 1;
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_PAUSE_STREAM,
|
.msgType = TDMT_MND_PAUSE_STREAM,
|
||||||
.pCont = reqPause,
|
.pCont = pHead,
|
||||||
.contLen = sizeof(SMPauseStreamReq),
|
.contLen = contLen,
|
||||||
.info = *info,
|
.info = *info,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause->name, reqPause->name);
|
mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
|
|
|
@ -92,7 +92,7 @@ class TDTestCase:
|
||||||
print(time.time())
|
print(time.time())
|
||||||
while 1:
|
while 1:
|
||||||
t = time.time()
|
t = time.time()
|
||||||
if t > 1706252996 :
|
if t > 1706254434 :
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print("time:%d" %(t))
|
print("time:%d" %(t))
|
||||||
|
@ -100,12 +100,16 @@ class TDTestCase:
|
||||||
|
|
||||||
|
|
||||||
tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
|
tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
tdSql.query("select * from information_schema.ins_stream_tasks")
|
tdSql.query("select * from information_schema.ins_stream_tasks")
|
||||||
tdSql.checkData(0, 5, 'pause')
|
tdSql.checkData(0, 5, 'paused')
|
||||||
tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)")
|
tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)")
|
||||||
tdSql.query("select * from so1")
|
tdSql.query("select * from so1")
|
||||||
tdSql.checkRows(5)
|
tdSql.checkRows(5)
|
||||||
|
|
||||||
|
tdSql.error("resume stream s1")
|
||||||
|
|
||||||
def consumeTest(self):
|
def consumeTest(self):
|
||||||
consumer_dict = {
|
consumer_dict = {
|
||||||
"group.id": "g1",
|
"group.id": "g1",
|
||||||
|
|
Loading…
Reference in New Issue