feat(stream):add max delay check
This commit is contained in:
parent
446ce05a4f
commit
9aaab9c3b0
|
@ -10609,6 +10609,19 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
|
||||||
"Non window query only support scalar function, aggregate function is not allowed");
|
"Non window query only support scalar function, aggregate function is not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (NULL != pStmt->pOptions->pDelay) {
|
||||||
|
SValueNode* pVal = (SValueNode*)pStmt->pOptions->pDelay;
|
||||||
|
int64_t minDelay = 0;
|
||||||
|
char* str = "5s";
|
||||||
|
if (DEAL_RES_ERROR != translateValue(pCxt, pVal) && TSDB_CODE_SUCCESS ==
|
||||||
|
parseNatualDuration(str, strlen(str), &minDelay, &pVal->unit, pVal->node.resType.precision, false)) {
|
||||||
|
if (pVal->datum.i < minDelay) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||||
|
"stream max delay must be bigger than 5 session");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -477,6 +477,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
|
||||||
if (!pStr) {
|
if (!pStr) {
|
||||||
if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
||||||
(*pWinCode) = TSDB_CODE_FAILED;
|
(*pWinCode) = TSDB_CODE_FAILED;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
|
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
|
||||||
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
||||||
|
|
|
@ -133,4 +133,17 @@ if $data13 != -111 then
|
||||||
goto loop1
|
goto loop1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 2====================
|
||||||
|
|
||||||
|
sql create database test vgroups 1 ;
|
||||||
|
sql use test;
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql_error create stream streams1 trigger max_delay 4000a ignore update 0 ignore expired 0 into streamtST1 as select _wstart, count(*) from st interval(5s);
|
||||||
|
sql_error create stream streams2 trigger max_delay 4s ignore update 0 ignore expired 0 into streamtST2 as select _wstart, count(*) from st interval(5s);
|
||||||
|
sql create stream streams3 trigger max_delay 5000a ignore update 0 ignore expired 0 into streamtST3 as select _wstart, count(*) from st interval(5s);
|
||||||
|
sql create stream streams4 trigger max_delay 5s ignore update 0 ignore expired 0 into streamtST4 as select _wstart, count(*) from st interval(5s);
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue