fix: add checks for stream query
This commit is contained in:
parent
cb7be9a2f0
commit
536c6bdbc5
|
@ -16,8 +16,8 @@
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "types.h"
|
|
||||||
#include "tgrant.h"
|
#include "tgrant.h"
|
||||||
|
#include "types.h"
|
||||||
|
|
||||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
@ -97,7 +97,7 @@ static const SSysDbTableSchema userDBSchema[] = {
|
||||||
{.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "wal_seg_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userFuncSchema[] = {
|
static const SSysDbTableSchema userFuncSchema[] = {
|
||||||
|
@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
|
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
|
||||||
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
|
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
|
||||||
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
|
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
|
||||||
// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
|
// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
|
||||||
// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
|
// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
|
||||||
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
|
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
|
||||||
{TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)},
|
{TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)},
|
||||||
{TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
{TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
||||||
|
|
|
@ -4708,6 +4708,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
|
||||||
return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq);
|
return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
|
||||||
|
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
|
||||||
|
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
|
||||||
|
!isPartitionByTbname(pSelect->pPartitionByList);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||||
if (NULL != pStmt->pOptions->pWatermark &&
|
if (NULL != pStmt->pOptions->pWatermark &&
|
||||||
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
|
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
|
||||||
|
@ -4723,14 +4729,19 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
if (QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
|
|
||||||
|
if (QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) ||
|
||||||
|
TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
||||||
|
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
||||||
|
|
|
@ -268,7 +268,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||||
return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
|
return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||||
return stbSplIsMultiTbScanChild(streamQuery, pNode);
|
return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||||
|
|
Loading…
Reference in New Issue