[TD-2657]<fix>: check the invalid sql while cq started.
This commit is contained in:
parent
fe94d26169
commit
c07a5a7d18
|
@ -6259,10 +6259,12 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
|
||||
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||
const char* msg1 = "invalid table name";
|
||||
const char* msg2 = "functions not allowed in CQ";
|
||||
const char* msg3 = "fill only available for interval query";
|
||||
const char* msg4 = "fill option not supported in stream computing";
|
||||
const char* msg5 = "sql too long"; // todo ADD support
|
||||
const char* msg6 = "from missing in subclause";
|
||||
const char* msg7 = "time interval is required";
|
||||
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
@ -6272,10 +6274,10 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
// if sql specifies db, use it, otherwise use default db
|
||||
SStrToken* pzTableName = &(pCreateTable->name);
|
||||
SStrToken* pName = &(pCreateTable->name);
|
||||
SQuerySQL* pQuerySql = pCreateTable->pSelect;
|
||||
|
||||
if (tscValidateName(pzTableName) != TSDB_CODE_SUCCESS) {
|
||||
if (tscValidateName(pName) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
|
@ -6314,15 +6316,19 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
// set interval value
|
||||
if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
} else {
|
||||
if ((pQueryInfo->interval.interval > 0) &&
|
||||
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
}
|
||||
|
||||
if ((pQueryInfo->interval.interval > 0) &&
|
||||
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
|
||||
}
|
||||
|
||||
// set the created table[stream] name
|
||||
code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql);
|
||||
code = tscSetTableFullName(pTableMetaInfo, pName, pSql);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -399,11 +399,16 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
|
|||
tscSetRetryTimer(pStream, pSql, timer);
|
||||
}
|
||||
|
||||
static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
||||
static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
||||
int64_t minIntervalTime =
|
||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
|
||||
sprintf(pSql->cmd.payload, "the interval value is 0");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
|
||||
tscWarn("%p stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
|
||||
|
@ -443,6 +448,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
|||
pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor
|
||||
pQueryInfo->interval.sliding = 0;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
|
||||
|
@ -492,34 +499,19 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
|
|||
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
|
||||
}
|
||||
|
||||
static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
|
||||
if (pSql == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
pSql->res.code = code;
|
||||
|
||||
if (info != NULL) {
|
||||
strncpy(pCmd->payload, info, pCmd->payloadLen);
|
||||
}
|
||||
}
|
||||
|
||||
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
||||
SSqlStream* pStream = (SSqlStream*)param;
|
||||
SSqlObj* pSql = pStream->pSql;
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
setErrorInfo(pSql, code, pCmd->payload);
|
||||
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code);
|
||||
pSql->res.code = code;
|
||||
tscError("%p open stream failed, sql:%s, reason:%s, code:%s", pSql, pSql->sqlstr, pCmd->payload, tstrerror(code));
|
||||
|
||||
pStream->fp(pStream->param, NULL, NULL);
|
||||
return;
|
||||
}
|
||||
|
||||
registerSqlObj(pSql);
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
@ -530,13 +522,22 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
|||
pStream->ctime = taosGetTimestamp(pStream->precision);
|
||||
pStream->etime = pQueryInfo->window.ekey;
|
||||
|
||||
tscAddIntoStreamList(pStream);
|
||||
if (tscSetSlidingWindowInfo(pSql, pStream) != TSDB_CODE_SUCCESS) {
|
||||
pSql->res.code = code;
|
||||
|
||||
tscError("%p stream %p open failed, since the interval value is incorrect", pSql, pStream);
|
||||
pStream->fp(pStream->param, NULL, NULL);
|
||||
return;
|
||||
}
|
||||
|
||||
tscSetSlidingWindowInfo(pSql, pStream);
|
||||
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
|
||||
|
||||
int64_t starttime = tscGetLaunchTimestamp(pStream);
|
||||
pCmd->command = TSDB_SQL_SELECT;
|
||||
|
||||
registerSqlObj(pSql);
|
||||
tscAddIntoStreamList(pStream);
|
||||
|
||||
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
|
||||
|
||||
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
|
||||
|
|
Loading…
Reference in New Issue