fix: add checks for stream query
This commit is contained in:
parent
67e2f901fc
commit
fdf977f935
|
@ -1254,19 +1254,20 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
if (isSelectStmt(pCxt->pCurrStmt)) {
|
if (isSelectStmt(pCxt->pCurrStmt)) {
|
||||||
// select percentile() without from clause is also valid
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
|
||||||
if (NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) {
|
"%s is only supported in single table query", pFunc->functionName);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
SNode* pTable = ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable;
|
|
||||||
if (QUERY_NODE_REAL_TABLE == nodeType(pTable) &&
|
|
||||||
(TSDB_CHILD_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType ||
|
|
||||||
TSDB_NORMAL_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType)) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
|
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
|
||||||
"%s is only supported in single table query", pFunc->functionName);
|
SNode* pTable = pSelect->pFromTable;
|
||||||
|
// select percentile() without from clause is also valid
|
||||||
|
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
|
||||||
|
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
|
||||||
|
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) ||
|
||||||
|
NULL != pSelect->pPartitionByList) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
|
||||||
|
"%s is only supported in single table query", pFunc->functionName);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isStar(SNode* pNode) {
|
static bool isStar(SNode* pNode) {
|
||||||
|
@ -2509,9 +2510,31 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
|
static bool isPartitionByTbname(SNodeList* pPartitionByList) {
|
||||||
|
if (1 != LIST_LENGTH(pPartitionByList)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SNode* pPartKey = nodesListGetNode(pPartitionByList, 0);
|
||||||
|
return QUERY_NODE_FUNCTION != nodeType(pPartKey) || FUNCTION_TYPE_TBNAME != ((SFunctionNode*)pPartKey)->funcType;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
|
if (!pCxt->createStream) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
|
||||||
|
!isPartitionByTbname(pSelect->pPartitionByList)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
|
SStateWindowNode* pState = (SStateWindowNode*)pSelect->pWindow;
|
||||||
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
|
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
|
||||||
// todo check for "function not support for state_window"
|
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||||
|
pCxt->errCode = checkStateWindowForStream(pCxt, pSelect);
|
||||||
|
}
|
||||||
return pCxt->errCode;
|
return pCxt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2522,14 +2545,13 @@ static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNod
|
||||||
if (PRIMARYKEY_TIMESTAMP_COL_ID != pSession->pCol->colId) {
|
if (PRIMARYKEY_TIMESTAMP_COL_ID != pSession->pCol->colId) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_COL);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_COL);
|
||||||
}
|
}
|
||||||
// todo check for "function not support for session"
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
switch (nodeType(pSelect->pWindow)) {
|
switch (nodeType(pSelect->pWindow)) {
|
||||||
case QUERY_NODE_STATE_WINDOW:
|
case QUERY_NODE_STATE_WINDOW:
|
||||||
return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow);
|
return translateStateWindow(pCxt, pSelect);
|
||||||
case QUERY_NODE_SESSION_WINDOW:
|
case QUERY_NODE_SESSION_WINDOW:
|
||||||
return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
|
return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
|
||||||
case QUERY_NODE_INTERVAL_WINDOW:
|
case QUERY_NODE_INTERVAL_WINDOW:
|
||||||
|
@ -4708,7 +4730,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY);
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
||||||
|
|
Loading…
Reference in New Issue