Merge pull request #8937 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
This commit is contained in:
Haojun Liao 2021-12-06 14:49:15 +08:00 committed by GitHub
commit d4a834e248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 977 additions and 795 deletions

View File

@ -55,10 +55,18 @@ typedef struct SDataBlockInfo {
int64_t uid;
} SDataBlockInfo;
typedef struct SConstantItem {
SColumnInfo info;
int32_t startIndex; // run-length-encoding to save the space for multiple rows
int32_t endIndex;
SVariant value;
} SConstantItem;
typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
SDataBlockInfo info;
SArray *pDataBlock; // SArray<SColumnInfoData>
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info;
} SSDataBlock;
typedef struct SColumnInfoData {
@ -82,7 +90,7 @@ typedef struct SLimit {
typedef struct SOrder {
uint32_t order;
int32_t orderColId;
SColumn col;
} SOrder;
typedef struct SGroupbyExpr {

View File

@ -281,7 +281,7 @@ typedef struct SSchema {
uint8_t type;
char name[TSDB_COL_NAME_LEN];
int16_t colId;
int16_t bytes;
int32_t bytes;
} SSchema;
//#endif

View File

@ -54,6 +54,7 @@ extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[];
extern int64_t tsMaxVnodeQueuedBytes;
extern int tsCompatibleModel; // 2.0 compatible model
//query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing

View File

@ -229,7 +229,7 @@ typedef struct SScalarFunctionInfo {
typedef struct SMultiFunctionsDesc {
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool agg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
@ -261,6 +261,7 @@ int32_t qIsBuiltinFunction(const char* name, int32_t len, bool* scalarFunction);
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
bool qIsAggregateFunction(const char* functionName);
bool qIsSelectivityFunction(const char* functionName);
tExprNode* exprTreeFromBinary(const void* data, size_t size);

View File

@ -86,7 +86,7 @@ typedef struct SQueryStmtInfo {
SLimit slimit;
STagCond tagCond;
SArray * colCond;
SOrder order;
SArray * order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
@ -108,10 +108,10 @@ typedef struct SQueryStmtInfo {
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
struct SQueryStmtInfo *pDownstream;
SMultiFunctionsDesc info;
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
int32_t havingFieldNum;
SMultiFunctionsDesc info;
int32_t exprListLevelIndex;
} SQueryStmtInfo;
@ -176,6 +176,7 @@ typedef struct SSourceParam {
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, const char* funcName, SSourceParam* pSource, SSchema* pResSchema, int16_t interSize);
int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t getExprFunctionLevel(SQueryStmtInfo* pQueryInfo);
STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);

View File

@ -23,6 +23,35 @@ extern "C" {
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_StateWindow = 22,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
// OP_DummyInput = 16, //TODO remove it after fully refactor.
// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
// OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
OP_Join = 21,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
OP_Exchange = 26,
};
struct SEpSet;
struct SQueryPlanNode;
struct SQueryDistPlanNode;

View File

@ -75,6 +75,11 @@ int32_t tsCompressMsgSize = -1;
*/
int32_t tsCompressColData = -1;
/*
* denote if 3.0 query pattern compatible for 2.0
*/
int32_t tsCompatibleModel = 1;
// client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN;

View File

@ -279,34 +279,6 @@ enum {
OP_EXEC_DONE = 3,
};
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_DummyInput = 16, //TODO remove it after fully refactor.
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
OP_Join = 21,
OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
};
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blockingOptr; // block operator or not

View File

@ -289,7 +289,7 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeE
return;
}
int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId;
int32_t orderId = pRuntimeEnv->pQueryAttr->order.col.info.colId;
if (orderId <= 0) {
return;
}
@ -1914,7 +1914,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->param[3].i = functionId;
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i = pQueryAttr->order.orderColId;
pCtx->param[1].i = pQueryAttr->order.col.info.colId;
} else if (functionId == FUNCTION_INTERP) {
pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
if (pQueryAttr->fillVal != NULL) {
@ -2013,162 +2013,162 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
int32_t* op = taosArrayGet(pOperator, i);
switch (*op) {
case OP_TagScan: {
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
break;
}
case OP_MultiTableTimeInterval: {
pRuntimeEnv->proot =
createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break;
}
case OP_AllMultiTableTimeInterval: {
pRuntimeEnv->proot =
createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break;
}
case OP_TimeWindow: {
pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
}
case OP_AllTimeWindow: {
pRuntimeEnv->proot =
createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
}
case OP_Groupby: {
pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
}
case OP_SessionWindow: {
pRuntimeEnv->proot =
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
}
case OP_MultiTableAggregate: {
pRuntimeEnv->proot =
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break;
}
case OP_Aggregate: {
pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
}
case OP_Project: { // TODO refactor to remove arith operator.
SOperatorInfo* prev = pRuntimeEnv->proot;
if (i == 0) {
pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor
setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot);
}
} else {
prev = pRuntimeEnv->proot;
assert(pQueryAttr->pExpr2 != NULL);
pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
}
break;
}
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
}
case OP_Limit: {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
break;
}
case OP_Filter: { // todo refactor
int32_t numOfFilterCols = 0;
if (pQueryAttr->stableQuery) {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
} else {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
}
break;
}
case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
break;
}
case OP_MultiwayMergeSort: {
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
break;
}
case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
bool multigroupResult = pQueryAttr->multigroupResult;
if (pQueryAttr->multigroupResult) {
multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE);
}
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
break;
}
case OP_SLimit: {
int32_t num = pRuntimeEnv->proot->numOfOutput;
SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
break;
}
case OP_Distinct: {
pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
break;
}
case OP_Order: {
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
break;
}
// case OP_TagScan: {
// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// break;
// }
// case OP_MultiTableTimeInterval: {
// pRuntimeEnv->proot =
// createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// break;
// }
// case OP_AllMultiTableTimeInterval: {
// pRuntimeEnv->proot =
// createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// break;
// }
// case OP_TimeWindow: {
// pRuntimeEnv->proot =
// createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
// if (opType != OP_DummyInput && opType != OP_Join) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// }
// break;
// }
// case OP_AllTimeWindow: {
// pRuntimeEnv->proot =
// createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
// if (opType != OP_DummyInput && opType != OP_Join) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// }
// break;
// }
// case OP_Groupby: {
// pRuntimeEnv->proot =
// createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
//
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
// if (opType != OP_DummyInput) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// }
// break;
// }
// case OP_SessionWindow: {
// pRuntimeEnv->proot =
// createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
// if (opType != OP_DummyInput) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// }
// break;
// }
// case OP_MultiTableAggregate: {
// pRuntimeEnv->proot =
// createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// break;
// }
// case OP_Aggregate: {
// pRuntimeEnv->proot =
// createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
//
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
// if (opType != OP_DummyInput && opType != OP_Join) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// }
// break;
// }
//
// case OP_Project: { // TODO refactor to remove arith operator.
// SOperatorInfo* prev = pRuntimeEnv->proot;
// if (i == 0) {
// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor
// setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot);
// }
// } else {
// prev = pRuntimeEnv->proot;
// assert(pQueryAttr->pExpr2 != NULL);
// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
// }
// break;
// }
//
// case OP_StateWindow: {
// pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
// if (opType != OP_DummyInput) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
// }
// break;
// }
//
// case OP_Limit: {
// pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
// break;
// }
//
// case OP_Filter: { // todo refactor
// int32_t numOfFilterCols = 0;
// if (pQueryAttr->stableQuery) {
// SColumnInfo* pColInfo =
// extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
// pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
// freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
// } else {
// SColumnInfo* pColInfo =
// extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
// pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
// freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
// }
//
// break;
// }
//
// case OP_Fill: {
// SOperatorInfo* pInfo = pRuntimeEnv->proot;
// pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
// break;
// }
//
// case OP_MultiwayMergeSort: {
// pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
// break;
// }
//
// case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
// bool multigroupResult = pQueryAttr->multigroupResult;
// if (pQueryAttr->multigroupResult) {
// multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE);
// }
//
// pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
// pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
// break;
// }
//
// case OP_SLimit: {
// int32_t num = pRuntimeEnv->proot->numOfOutput;
// SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
// pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
// break;
// }
//
// case OP_Distinct: {
// pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// break;
// }
//
// case OP_Order: {
// pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
// break;
// }
default: {
assert(0);
@ -4557,22 +4557,22 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo);
switch(tbScanner) {
case OP_TableBlockInfoScan: {
pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
break;
}
case OP_TableSeqScan: {
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
break;
}
case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
break;
}
case OP_TableScan: {
pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
break;
}
// case OP_TableBlockInfoScan: {
// pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
// break;
// }
// case OP_TableSeqScan: {
// pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
// break;
// }
// case OP_DataBlocksOptScan: {
// pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
// break;
// }
// case OP_TableScan: {
// pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
// break;
// }
default: { // do nothing
break;
}
@ -4881,7 +4881,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableScanOperator";
pOperator->operatorType = OP_TableScan;
// pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -4905,7 +4905,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = OP_TableSeqScan;
// pOperator->operatorType = OP_TableSeqScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -4930,7 +4930,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableBlockInfoScanOperator";
pOperator->operatorType = OP_TableBlockInfoScan;
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -4946,7 +4946,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr
pTableScanInfo->numOfOutput = pDownstream->numOfOutput;
#if 0
if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) {
SAggOperatorInfo* pAggInfo = pDownstream->info;
@ -4995,6 +4995,8 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else {
assert(0);
}
#endif
}
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
@ -5009,7 +5011,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan;
// pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv;
pOptr->blockingOptr = false;
pOptr->info = pInfo;
@ -5161,7 +5163,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GlobalAggregate";
pOperator->operatorType = OP_GlobalAggregate;
// pOperator->operatorType = OP_GlobalAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -5205,7 +5207,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiwaySortOperator";
pOperator->operatorType = OP_MultiwayMergeSort;
// pOperator->operatorType = OP_MultiwayMergeSort;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -5312,7 +5314,7 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "InMemoryOrder";
pOperator->operatorType = OP_Order;
// pOperator->operatorType = OP_Order;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -5358,10 +5360,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
}
if (upstream->operatorType == OP_DataBlocksOptScan) {
STableScanInfo* pScanInfo = upstream->info;
order = getTableScanOrder(pScanInfo);
}
// if (upstream->operatorType == OP_DataBlocksOptScan) {
// STableScanInfo* pScanInfo = upstream->info;
// order = getTableScanOrder(pScanInfo);
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
@ -5413,10 +5415,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
if (upstream->operatorType == OP_DataBlocksOptScan) {
STableScanInfo* pScanInfo = upstream->info;
order = getTableScanOrder(pScanInfo);
}
// if (upstream->operatorType == OP_DataBlocksOptScan) {
// STableScanInfo* pScanInfo = upstream->info;
// order = getTableScanOrder(pScanInfo);
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
@ -6268,7 +6270,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate";
pOperator->operatorType = OP_Aggregate;
// pOperator->operatorType = OP_Aggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -6363,7 +6365,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableAggregate";
pOperator->operatorType = OP_MultiTableAggregate;
// pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -6393,7 +6395,7 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator";
pOperator->operatorType = OP_Project;
// pOperator->operatorType = OP_Project;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -6452,7 +6454,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "FilterOperator";
pOperator->operatorType = OP_Filter;
// pOperator->operatorType = OP_Filter;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->numOfOutput = numOfOutput;
@ -6473,7 +6475,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "LimitOperator";
pOperator->operatorType = OP_Limit;
// pOperator->operatorType = OP_Limit;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->exec = doLimit;
@ -6494,7 +6496,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = OP_TimeWindow;
// pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
@ -6519,7 +6521,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AllTimeIntervalAggOperator";
pOperator->operatorType = OP_AllTimeWindow;
// pOperator->operatorType = OP_AllTimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
@ -6543,7 +6545,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "StateWindowOperator";
pOperator->operatorType = OP_StateWindow;
// pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
@ -6568,7 +6570,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = OP_SessionWindow;
// pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
@ -6591,7 +6593,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableTimeIntervalOperator";
pOperator->operatorType = OP_MultiTableTimeInterval;
// pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
@ -6615,7 +6617,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AllMultiTableTimeIntervalOperator";
pOperator->operatorType = OP_AllMultiTableTimeInterval;
// pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
@ -6651,7 +6653,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Groupby;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
@ -6690,7 +6692,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->name = "FillOperator";
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Fill;
// pOperator->operatorType = OP_Fill;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
@ -6738,7 +6740,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator";
pOperator->operatorType = OP_SLimit;
// pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->exec = doSLimit;
@ -6894,7 +6896,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTableTagScan";
pOperator->operatorType = OP_TagScan;
// pOperator->operatorType = OP_TagScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@ -7035,7 +7037,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pOperator->name = "DistinctOperator";
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Distinct;
// pOperator->operatorType = OP_Distinct;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
@ -8034,7 +8036,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->limit.limit = pQueryMsg->limit;
pQueryAttr->limit.offset = pQueryMsg->offset;
pQueryAttr->order.order = pQueryMsg->order;
pQueryAttr->order.orderColId = pQueryMsg->orderColId;
pQueryAttr->order.col.info.colId = pQueryMsg->orderColId;
pQueryAttr->pExpr1 = pExprs;
pQueryAttr->pExpr2 = pSecExprs;
pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput;

View File

@ -4589,7 +4589,7 @@ SAggFunctionInfo aggFunc[35] = {{
},
{
// 16
"ts",
"dummy",
FUNCTION_TYPE_AGG,
FUNCTION_TS,
FUNCTION_TS,

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "function.h"
#include "os.h"
#include "exception.h"
@ -550,6 +551,15 @@ tExprNode* exprdup(tExprNode* pNode) {
} else if (pNode->nodeType == TEXPR_COL_NODE) {
pCloned->pSchema = calloc(1, sizeof(SSchema));
*pCloned->pSchema = *pNode->pSchema;
} else if (pNode->nodeType == TEXPR_FUNCTION_NODE) {
strcpy(pCloned->_function.functionName, pNode->_function.functionName);
int32_t num = pNode->_function.num;
pCloned->_function.num = num;
pCloned->_function.pChild = calloc(num, POINTER_BYTES);
for(int32_t i = 0; i < num; ++i) {
pCloned->_function.pChild[i] = exprdup(pNode->_function.pChild[i]);
}
}
pCloned->nodeType = pNode->nodeType;

View File

@ -54,6 +54,18 @@ bool qIsAggregateFunction(const char* functionName) {
return !scalarfunc;
}
bool qIsSelectivityFunction(const char* functionName) {
assert(functionName != NULL);
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
size_t len = strlen(functionName);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, functionName, len);
if (pInfo != NULL) {
return ((*pInfo)->status | FUNCSTATE_SELECTIVITY) != 0;
}
return false;
}
SAggFunctionInfo* qGetFunctionInfo(const char* name, int32_t len) {
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
@ -79,16 +91,17 @@ void qRemoveUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) {
bool isTagsQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int16_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
// todo handle count(tbname) query
if (strcmp(f, "project") != 0 && strcmp(f, "count") != 0) {
return false;
}
// "select count(tbname)" query
// if (functId == FUNCTION_COUNT && pExpr->base.colpDesc->colId == TSDB_TBNAME_COLUMN_INDEX) {
// continue;
// }
if (f != FUNCTION_TAGPRJ && f != FUNCTION_TID_TAG) {
return false;
}
}
return true;
@ -113,23 +126,13 @@ bool isTagsQuery(SArray* pFunctionIdList) {
bool isProjectionQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TS_DUMMY) {
continue;
}
if (f != FUNCTION_PRJ &&
f != FUNCTION_TAGPRJ &&
f != FUNCTION_TAG &&
f != FUNCTION_TS &&
f != FUNCTION_ARITHM &&
f != FUNCTION_DIFF &&
f != FUNCTION_DERIVATIVE) {
return false;
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
return true;
}
}
return true;
return false;
}
bool isDiffDerivativeQuery(SArray* pFunctionIdList) {
@ -182,30 +185,19 @@ bool isArithmeticQueryOnAggResult(SArray* pFunctionIdList) {
return false;
}
bool isGroupbyColumn(SArray* pFunctionIdList) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// int32_t numOfCols = getNumOfColumns(pTableMetaInfo->pTableMeta);
//
// SGroupbyExpr* pGroupbyExpr = &pQueryInfo->groupbyExpr;
// for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
// SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
// if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < numOfCols) { // group by normal columns
// return true;
// }
// }
return false;
bool isGroupbyColumn(SGroupbyExpr* pGroupby) {
return !pGroupby->groupbyTag;
}
bool isTopBotQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TS) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
continue;
}
if (f == FUNCTION_TOP || f == FUNCTION_BOTTOM) {
if (strcmp(f, "top") == 0 || strcmp(f, "bottom") == 0) {
return true;
}
}
@ -284,49 +276,26 @@ bool needReverseScan(SArray* pFunctionIdList) {
return false;
}
bool isSimpleAggregateRv(SArray* pFunctionIdList) {
// if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) {
// return false;
// }
//
// if (tscIsDiffDerivQuery(pQueryInfo)) {
// return false;
// }
//
// size_t numOfExprs = getNumOfExprs(pQueryInfo);
// for (int32_t i = 0; i < numOfExprs; ++i) {
// SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
// if (pExpr == NULL) {
// continue;
// }
//
// int32_t functionId = pExpr->base.functionId;
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
// if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
// return true;
// }
//
// continue;
// }
//
// if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY) {
// continue;
// }
//
// if ((!IS_MULTIOUTPUT(aAggs[functionId].status)) ||
// (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_TS_COMP)) {
// return true;
// }
// }
bool isAgg(SArray* pFunctionIdList) {
size_t size = taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < size; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
return false;
}
if (qIsAggregateFunction(f)) {
return true;
}
}
return false;
}
bool isBlockDistQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, 0);
return (num == 1 && f == FUNCTION_BLKINFO);
char* f = *(char**) taosArrayGet(pFunctionIdList, 0);
return (num == 1 && strcmp(f, "block_dist") == 0);
}
bool isTwoStageSTableQuery(SArray* pFunctionIdList, int32_t tableIndex) {
@ -432,13 +401,14 @@ bool hasTagValOutput(SArray* pFunctionIdList) {
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) {
assert(pFunctionIdList != NULL);
pDesc->blockDistribution = isBlockDistQuery(pFunctionIdList);
if (pDesc->blockDistribution) {
return;
}
pDesc->projectionQuery = isProjectionQuery(pFunctionIdList);
pDesc->onlyTagQuery = isTagsQuery(pFunctionIdList);
// pDesc->projectionQuery = isProjectionQuery(pFunctionIdList);
// pDesc->onlyTagQuery = isTagsQuery(pFunctionIdList);
pDesc->interpQuery = isInterpQuery(pFunctionIdList);
pDesc->topbotQuery = isTopBotQuery(pFunctionIdList);
pDesc->agg = isAgg(pFunctionIdList);
}

View File

@ -294,7 +294,10 @@ SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSe
SAlterTableInfo * tSetAlterTableInfo(SToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableType);
SCreatedTableInfo createNewChildTableInfo(SToken *pTableName, SArray *pTagNames, SArray *pTagVals, SToken *pToken,
SToken *igExists);
/*!
* test
* @param pSqlNode
*/
void destroyAllSqlNode(struct SSubclause *pSqlNode);
void destroySqlNode(SSqlNode *pSql);
void freeCreateTableInfo(void* p);

View File

@ -59,8 +59,7 @@ SArray *tListItemAppendToken(SArray *pList, SToken *pAliasToken, uint8_t sortOrd
if (pAliasToken) {
SListItem item;
assert(0);
// taosVariantCreate(&item.pVar, pAliasToken);
taosVariantCreate(&item.pVar, pAliasToken->z, pAliasToken->n, pAliasToken->type);
item.sortOrder = sortOrder;
taosArrayPush(pList, &item);

View File

@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <astGenerator.h>
#include <function.h>
#include "astGenerator.h"
#include "function.h"
@ -35,6 +34,11 @@
#define COLUMN_INDEX_INITIAL_VAL (-2)
#define COLUMN_INDEX_INITIALIZER { COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
static int32_t resColId = 5000;
int32_t getNewResColId() {
return resColId++;
}
static int32_t validateSelectNodeList(SQueryStmtInfo* pQueryInfo, SArray* pSelNodeList, bool outerQuery, SMsgBuf* pMsgBuf);
static int32_t extractFunctionParameterInfo(SQueryStmtInfo* pQueryInfo, int32_t tokenId, STableMetaInfo** pTableMetaInfo, SSchema* columnSchema,
tExprNode** pNode, SColumnIndex* pIndex, tSqlExprItem* pParamElem, SMsgBuf* pMsgBuf);
@ -698,6 +702,8 @@ static int32_t parseSlidingClause(SQueryStmtInfo* pQueryInfo, SToken* pSliding,
return TSDB_CODE_SUCCESS;
}
static void setTsOutputExprInfo(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, int32_t outputIndex, int32_t tableIndex);
// validate the interval info
int32_t validateIntervalNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf* pMsgBuf) {
const char* msg1 = "sliding cannot be used without interval";
@ -715,11 +721,6 @@ int32_t validateIntervalNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMs
}
}
// orderby column not set yet, set it to be the primary timestamp column
if (pQueryInfo->order.orderColId == INT32_MIN) {
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_ID;
}
// interval is not null
SToken *t = &pSqlNode->interval.interval;
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval,
@ -748,6 +749,13 @@ int32_t validateIntervalNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMs
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (tsCompatibleModel) {
SExprInfo* pFirstExpr = getExprInfo(pQueryInfo, 0);
if (pFirstExpr->pExpr->nodeType != TEXPR_FUNCTION_NODE || strcasecmp(pFirstExpr->pExpr->_function.functionName, "dummy") != 0) {
setTsOutputExprInfo(pQueryInfo, pTableMetaInfo, 0, 0);
}
}
// It is a time window query
pQueryInfo->info.timewindow = true;
return TSDB_CODE_SUCCESS;
@ -917,8 +925,6 @@ int32_t validateLimitNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBu
}
}
static void setTsOutputExprInfo(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, int32_t outputIndex, int32_t tableIndex);
int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid column name in orderby clause";
const char* msg2 = "too many order by columns";
@ -929,6 +935,8 @@ int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsg
return TSDB_CODE_SUCCESS;
}
pQueryInfo->order = taosArrayInit(4, sizeof(SOrder));
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, 0);
SArray* pSortOrder = pSqlNode->pSortOrder;
@ -939,40 +947,54 @@ int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsg
* for super table query, the order option must be less than 3.
*/
size_t size = taosArrayGetSize(pSortOrder);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) {
if ((UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) && (pQueryInfo->info.projectionQuery)) {
if (size > 1) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
} else {
if (size > 2) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
}
// handle the first part of order by
SVariant* pVar = taosArrayGet(pSortOrder, 0);
SSchema s = {0};
if (pVar->nType == TSDB_DATA_TYPE_BINARY) {
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
SToken columnName = {pVar->nLen, pVar->nType, pVar->pz};
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
bool found = false;
for(int32_t i = 0; i < taosArrayGetSize(pSortOrder); ++i) {
SListItem* pItem = taosArrayGet(pSortOrder, i);
s = *(SSchema*) getOneColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
} else { // order by [1|2|3]
if (pVar->i > getNumOfFields(&pQueryInfo->fieldsInfo)) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
SVariant* pVar = &pItem->pVar;
if (pVar->nType == TSDB_DATA_TYPE_BINARY) {
SOrder order = {0};
SExprInfo* pExprInfo = getExprInfo(pQueryInfo, pVar->i);
s = pExprInfo->base.resSchema;
// find the orde column among the result field.
for (int32_t j = 0; j < getNumOfFields(&pQueryInfo->fieldsInfo); ++j) {
SInternalField* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.internalField, j);
SSchema* pSchema = &pInfo->pExpr->base.resSchema;
if (strcasecmp(pVar->pz, pSchema->name) == 0) {
setColumn(&order.col, pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, TSDB_COL_TMP, pSchema);
order.order = pItem->sortOrder;
taosArrayPush(pQueryInfo->order, &order);
found = true;
break;
}
}
if (!found) {
return buildInvalidOperationMsg(pMsgBuf, "invalid order by column");
}
} else { // order by [1|2|3]
if (pVar->i > getNumOfFields(&pQueryInfo->fieldsInfo)) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
int32_t index = pVar->i - 1;
SExprInfo* pExprInfo = getExprInfo(pQueryInfo, index);
SOrder c = {0};
setColumn(&c.col, pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, TSDB_COL_TMP, &pExprInfo->base.resSchema);
c.order = pItem->sortOrder;
taosArrayPush(pQueryInfo->order, &c);
}
}
SListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = s.colId;
return TSDB_CODE_SUCCESS;
}
@ -1237,16 +1259,17 @@ int32_t checkForInvalidOrderby(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, S
#endif
static int32_t checkFillQueryRange(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
const char* msg3 = "start(end) time of time range required or time range too large";
const char* msg1 = "start(end) time of time range required or time range too large";
if (pQueryInfo->interval.interval == 0) {
return TSDB_CODE_SUCCESS;
}
bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER);
if (initialWindows) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
// TODO disable this check temporarily
// bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER);
// if (initialWindows) {
// return buildInvalidOperationMsg(pMsgBuf, msg1);
// }
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
@ -1256,7 +1279,7 @@ static int32_t checkFillQueryRange(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf)
// number of result is not greater than 10,000,000
if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
}
@ -1373,6 +1396,9 @@ int32_t validateFillNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf
return TSDB_CODE_SUCCESS;
}
static void pushDownAggFuncExprInfo(SQueryStmtInfo* pQueryInfo);
static void addColumnNodeFromLowerLevel(SQueryStmtInfo* pQueryInfo);
int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0));
@ -1524,11 +1550,6 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// set order by info
if (validateOrderbyNode(pQueryInfo, pSqlNode, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// set interval value
if (validateIntervalNode(pQueryInfo, pSqlNode, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
@ -1553,6 +1574,11 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
return TSDB_CODE_SUCCESS;
}
// set order by info
if (validateOrderbyNode(pQueryInfo, pSqlNode, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if ((code = validateLimitNode(pQueryInfo, pSqlNode, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code;
}
@ -1562,9 +1588,159 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
}
}
pushDownAggFuncExprInfo(pQueryInfo);
// addColumnNodeFromLowerLevel(pQueryInfo);
for(int32_t i = 0; i < 1; ++i) {
SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]);
extractFunctionDesc(functionList, &pQueryInfo->info);
if ((code = checkForInvalidExpr(pQueryInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS; // Does not build query message here
}
static bool isTagOrPrimaryTs(SExprInfo* pExprInfo) {
if (pExprInfo->pExpr->nodeType != TEXPR_COL_NODE) {
return false;
}
assert(pExprInfo->base.pColumns->info.colId == pExprInfo->pExpr->pSchema->colId);
return (TSDB_COL_IS_TAG(pExprInfo->base.pColumns->flag) || pExprInfo->pExpr->pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
}
// todo extract the table column in expression
static bool isGroupbyCol(SExprInfo* pExprInfo, SGroupbyExpr* pGroupbyExpr) {
assert(pExprInfo != NULL && pGroupbyExpr != NULL);
int32_t nodeType = pExprInfo->pExpr->nodeType;
assert(nodeType == TEXPR_COL_NODE || nodeType == TEXPR_BINARYEXPR_NODE);
for(int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) {
SColumn* pCol = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (pCol->info.colId == pExprInfo->pExpr->pSchema->colId) {
return true;
}
}
return false;
}
static bool isAllAggExpr(SArray* pList) {
assert(pList != NULL);
for (int32_t k = 0; k < taosArrayGetSize(pList); ++k) {
SExprInfo* p = taosArrayGetP(pList, k);
if (p->pExpr->nodeType != TEXPR_FUNCTION_NODE || !qIsAggregateFunction(p->pExpr->_function.functionName)) {
return false;
}
}
return true;
}
static bool isAllProjectExpr(SArray *pList) {
assert(pList != NULL);
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SExprInfo* p = taosArrayGetP(pList, i);
if (p->pExpr->nodeType == TEXPR_FUNCTION_NODE && !qIsAggregateFunction(p->pExpr->_function.functionName)) {
return false;
}
}
return true;
}
static SExprInfo* createColumnNodeFromAggFunc(SSchema* pSchema);
static void pushDownAggFuncExprInfo(SQueryStmtInfo* pQueryInfo) {
assert(pQueryInfo != NULL);
size_t level = getExprFunctionLevel(pQueryInfo);
for(int32_t i = 0; i < level - 1; ++i) {
SArray* p = pQueryInfo->exprList[i];
// If direct lower level expressions are all aggregate function, check if current function can be push down or not
SArray* pNext = pQueryInfo->exprList[i + 1];
if (!isAllAggExpr(pNext)) {
continue;
}
for (int32_t j = 0; j < taosArrayGetSize(p); ++j) {
SExprInfo* pExpr = taosArrayGetP(p, j);
if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE && qIsAggregateFunction(pExpr->pExpr->_function.functionName)) {
bool canPushDown = true;
for (int32_t k = 0; k < taosArrayGetSize(pNext); ++k) {
SExprInfo* pNextLevelExpr = taosArrayGetP(pNext, k);
// pExpr depends on the output of the down level, so it can not be push downwards
if (pExpr->base.pColumns->info.colId == pNextLevelExpr->base.resSchema.colId) {
canPushDown = false;
break;
}
}
if (canPushDown) {
taosArrayInsert(pNext, j, &pExpr);
taosArrayRemove(p, j);
// Add the project function of the current level, to output the calculated result
SExprInfo* pNew = createColumnNodeFromAggFunc(&pExpr->base.resSchema);
taosArrayInsert(p, j, &pNew);
}
}
}
}
}
// todo change the logic plan data
static void addColumnNodeFromLowerLevel(SQueryStmtInfo* pQueryInfo) {
assert(pQueryInfo != NULL);
size_t level = getExprFunctionLevel(pQueryInfo);
for (int32_t i = 0; i < level - 1; ++i) {
SArray* p = pQueryInfo->exprList[i];
if (isAllAggExpr(p)) {
continue;
}
// If direct lower level expressions are all aggregate function, check if current function can be push down or not
SArray* pNext = pQueryInfo->exprList[i + 1];
if (isAllAggExpr(pNext)) {
continue;
}
for (int32_t j = 0; j < taosArrayGetSize(pNext); ++j) {
SExprInfo* pExpr = taosArrayGetP(p, j);
bool exists = false;
for (int32_t k = 0; k < taosArrayGetSize(p); ++k) {
SExprInfo* pNextLevelExpr = taosArrayGetP(pNext, k);
// pExpr depends on the output of the down level, so it can not be push downwards
if (pExpr->base.pColumns->info.colId == pNextLevelExpr->base.resSchema.colId) {
exists = true;
break;
}
}
if (!exists) {
SExprInfo* pNew = calloc(1, sizeof(SExprInfo));
pNew->pExpr = exprdup(pExpr->pExpr);
memcpy(&pNew->base, &pExpr->base, sizeof(SSqlExpr));
int32_t pos = taosArrayGetSize(p);
// Add the project function of the current level, to output the calculated result
taosArrayInsert(p, pos - 1, &pExpr);
}
}
}
}
int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
assert(pQueryInfo != NULL && pMsgBuf != NULL);
@ -1583,9 +1759,18 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
// 1. invalid sql:
// select top(col, k) from table_name [interval(1d)|session(ts, 1d)|statewindow(col)] order by k asc
// order by normal column is not supported
int32_t colId = pQueryInfo->order.orderColId;
if (pQueryInfo->info.timewindow && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
if (pQueryInfo->order != NULL) {
size_t numOfOrder = taosArrayGetSize(pQueryInfo->order);
if (numOfOrder > 1) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
if (numOfOrder > 0) {
SColumn* pOrderCol = taosArrayGet(pQueryInfo->order, 0);
if (pQueryInfo->info.timewindow && pOrderCol->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
}
}
// select top(col, k) from table_name interval(10s) fill(prev)
@ -1593,6 +1778,41 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
// select top(col, k), count(*) from table_name
size_t size = getNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
if (pExpr->pExpr->nodeType == TEXPR_COL_NODE) {
if (!isTagOrPrimaryTs(pExpr) && !isGroupbyCol(pExpr, &pQueryInfo->groupbyExpr)) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause");
}
} else if (pExpr->pExpr->nodeType == TEXPR_BINARYEXPR_NODE) {
continue;
// todo extract all column node in tree, and check for each node
continue;
}
// dummy column is also the placeholder for primary timestamp column in the result.
const char* functionName = pExpr->pExpr->_function.functionName;
if (strcmp(functionName, "top") != 0 && strcmp(functionName, "bottom") != 0 && strcmp(functionName, "dummy") != 0) {
if (qIsAggregateFunction(functionName)) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause");
}
// the primary key is valid
if (pExpr->pExpr->nodeType == TEXPR_COL_NODE) {
if (pExpr->pExpr->pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
continue;
}
}
continue;
}
}
}
/*
@ -1603,7 +1823,11 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
size_t size = getNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
int32_t functionId = getExprFunctionId(pExpr);
if (pExpr->pExpr->nodeType != TEXPR_FUNCTION_NODE) {
continue;
}
int32_t functionId = getExprFunctionId(pExpr);
if (functionId == FUNCTION_COUNT && TSDB_COL_IS_TAG(pExpr->base.pColumns->flag)) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
@ -1667,11 +1891,35 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
pQueryInfo->info.groupbyColumn) {
return buildInvalidOperationMsg(pMsgBuf, msg9);
}
}
static int32_t resColId = 5000;
int32_t getNewResColId() {
return resColId++;
/*
* 9. invalid sql:
* select count(*), col_name from table_name
*/
if (pQueryInfo->info.agg) {
bool isSelectivity = false;
if (pQueryInfo->info.projectionQuery) {
size_t size = getNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE) {
if (!isSelectivity) {
isSelectivity = qIsSelectivityFunction(pExpr->pExpr->_function.functionName);
}
continue;
}
if (isSelectivity && isTagOrPrimaryTs(pExpr)) {
continue;
}
if (!isGroupbyCol(pExpr, &pQueryInfo->groupbyExpr)) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select");
}
}
}
}
}
int32_t addResColumnInfo(SQueryStmtInfo* pQueryInfo, int32_t outputIndex, SSchema* pSchema, SExprInfo* pSqlExpr) {
@ -1787,7 +2035,6 @@ static int32_t checkForAliasName(SMsgBuf* pMsgBuf, char* aliasName) {
return TSDB_CODE_SUCCESS;
}
static int32_t validateComplexExpr(tSqlExpr* pExpr, SQueryStmtInfo* pQueryInfo, SArray* pColList, int32_t* type, SMsgBuf* pMsgBuf);
static int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStmtInfo* pQueryInfo, SArray* pCols, bool* keepTableCols, SMsgBuf* pMsgBuf);
static int64_t getTickPerSecond(SVariant* pVariant, int32_t precision, int64_t* tickPerSec, SMsgBuf *pMsgBuf) {
@ -1820,7 +2067,7 @@ static void setTsOutputExprInfo(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTab
SSourceParam param = {0};
addIntoSourceParam(&param, NULL, &col);
SExprInfo* pExpr = createExprInfo(pTableMetaInfo, "ts_dummy", &param, &s, TSDB_KEYSIZE);
SExprInfo* pExpr = createExprInfo(pTableMetaInfo, "dummy", &param, &s, TSDB_KEYSIZE);
strncpy(pExpr->base.token, "ts", tListLen(pExpr->base.token));
SArray* pExprList = getCurrentExprList(pQueryInfo);
@ -2600,15 +2847,15 @@ static int32_t validateExprLeafFunctionNode(SQueryStmtInfo* pQueryInfo, tSqlExpr
return TSDB_CODE_SUCCESS;
}
int32_t validateScalarFunctionParamNum(tSqlExprItem* pItem, SMsgBuf* pMsgBuf) {
static int32_t validateScalarFunctionParamNum(tSqlExpr* pSqlExpr, int32_t functionId, SMsgBuf* pMsgBuf) {
int32_t code = TSDB_CODE_SUCCESS;
switch (pItem->functionId) {
switch (functionId) {
case FUNCTION_CEIL: {
code = checkForkParam(pItem->pNode, 1, pMsgBuf);
code = checkForkParam(pSqlExpr, 1, pMsgBuf);
break;
}
case FUNCTION_LENGTH: {
code = checkForkParam(pItem->pNode, 1, pMsgBuf);
code = checkForkParam(pSqlExpr, 1, pMsgBuf);
break;
}
}
@ -2616,67 +2863,54 @@ int32_t validateScalarFunctionParamNum(tSqlExprItem* pItem, SMsgBuf* pMsgBuf) {
return code;
}
int32_t validateScalarFunctionParam(SQueryStmtInfo* pQueryInfo, tSqlExpr* pExpr, SArray* pList, int32_t* exprType, SMsgBuf* pMsgBuf) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pParamList = pExpr->Expr.paramList;
*exprType = NORMAL_ARITHMETIC;
for (int32_t i = 0; i < 1; ++i) {
tSqlExprItem* pParamElem = taosArrayGet(pParamList, i);
tSqlExpr* pSqlExpr = pParamElem->pNode;
int32_t type = pSqlExpr->type;
if (type == SQL_NODE_VALUE) {
// do nothing for scalar function, or maybe the evaluation can be done here
} else if (type == SQL_NODE_SQLFUNCTION) {
code = validateExprLeafFunctionNode(pQueryInfo, pSqlExpr, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else if (type == SQL_NODE_EXPR) {
code = validateComplexExpr(pSqlExpr, pQueryInfo, pList, exprType, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else if (type == SQL_NODE_TABLE_COLUMN) {
code = validateExprLeafColumnNode(pQueryInfo, &pSqlExpr->columnName, pList, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
SExprInfo* doAddProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, SColumnIndex* pColIndex, const char* aliasName, int32_t colId) {
STableMeta* pTableMeta = getMetaInfo(pQueryInfo, pColIndex->tableIndex)->pTableMeta;
SSchema* pSchema = getOneColumnSchema(pTableMeta, pColIndex->columnIndex);
SColumnIndex index = *pColIndex;
char* funcName = NULL;
if (TSDB_COL_IS_TAG(index.type)) {
int32_t numOfCols = getNumOfColumns(pTableMeta);
index.columnIndex = pColIndex->columnIndex - numOfCols;
funcName = "project_tag";
} else {
index.columnIndex = pColIndex->columnIndex;
funcName = "project_col";
}
// todo merge with the addScalarExprAndResColumn
int32_t doAddOneProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, SSchema* pSchema, const char* aliasName,
int32_t colId, SMsgBuf* pMsgBuf) {
const char* name = (aliasName == NULL)? pSchema->name:aliasName;
SSchema s = createSchema(pSchema->type, pSchema->bytes, colId, name);
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, index.tableIndex);
SColumn c = createColumn(pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, index.type, pSchema);
SArray* pColumnList = taosArrayInit(4, sizeof(SColumn));
SToken colNameToken = {.z = pSchema->name, .n = strlen(pSchema->name)};
SSourceParam param = {0};
addIntoSourceParam(&param, NULL, &c);
tSqlExpr sqlNode = {0};
sqlNode.type = SQL_NODE_TABLE_COLUMN;
sqlNode.columnName = colNameToken;
return doAddOneExprInfo(pQueryInfo, funcName, &param, outputColIndex, pTableMetaInfo, &s, 0, s.name, true);
tExprNode* pNode = NULL;
bool keepTableCols = true;
int32_t ret = sqlExprToExprNode(&pNode, &sqlNode, pQueryInfo, pColumnList, &keepTableCols, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(pNode, NULL);
return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause");
}
SExprInfo* pExpr = createBinaryExprInfo(pNode, &s);
tstrncpy(pExpr->base.resSchema.name, name, tListLen(pExpr->base.resSchema.name));
tstrncpy(pExpr->base.token, name, tListLen(pExpr->base.token));
SArray* pExprList = getCurrentExprList(pQueryInfo);
addExprInfo(pExprList, outputColIndex, pExpr, pQueryInfo->exprListLevelIndex);
// extract columns according to the tExprNode tree
size_t num = taosArrayGetSize(pColumnList);
pExpr->base.pColumns = calloc(num, sizeof(SColumn));
for (int32_t i = 0; i < num; ++i) {
SColumn* pCol = taosArrayGet(pColumnList, i);
pExpr->base.pColumns[i] = *pCol;
}
pExpr->base.numOfCols = num;
if (pQueryInfo->exprListLevelIndex == 0) {
int32_t exists = getNumOfFields(&pQueryInfo->fieldsInfo);
addResColumnInfo(pQueryInfo, exists, &pExpr->base.resSchema, pExpr);
}
pQueryInfo->info.projectionQuery = true;
return TSDB_CODE_SUCCESS;
}
static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColumnIndex* pIndex, int32_t startPos) {
static int32_t doAddMultipleProjectExprAndResColumns(SQueryStmtInfo* pQueryInfo, SColumnIndex* pIndex, int32_t startPos, SMsgBuf* pMsgBuf) {
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, pIndex->tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
@ -2688,8 +2922,8 @@ static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColu
}
for (int32_t j = 0; j < numOfTotalColumns; ++j) {
pIndex->columnIndex = j;
doAddProjectCol(pQueryInfo, startPos + j, pIndex, NULL, getNewResColId());
SSchema* pSchema = getOneColumnSchema(pTableMetaInfo->pTableMeta, j);
doAddOneProjectCol(pQueryInfo, startPos + j, pSchema, NULL, getNewResColId(), pMsgBuf);
}
return numOfTotalColumns;
@ -2728,11 +2962,9 @@ static SSchema createConstantColumnSchema(SVariant* pVal, const SToken* exprStr,
}
static int32_t handleTbnameProjection(SQueryStmtInfo* pQueryInfo, tSqlExprItem* pItem, SColumnIndex* pIndex, int32_t startPos, bool outerQuery, SMsgBuf* pMsgBuf) {
const char* msg3 = "tbname not allowed in outer query";
const char* msg1 = "tbname not allowed in outer query";
SSchema colSchema = {0};
char* funcName = NULL;
if (outerQuery) { // todo??
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, pIndex->tableIndex);
@ -2749,36 +2981,20 @@ static int32_t handleTbnameProjection(SQueryStmtInfo* pQueryInfo, tSqlExprItem*
}
if (!existed) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
colSchema = pSchema[pIndex->columnIndex];
funcName = "project_col";
} else {
colSchema = *getTbnameColumnSchema();
funcName = "project_tag";
}
SSchema resultSchema = colSchema;
resultSchema.colId = getNewResColId();
char rawName[TSDB_COL_NAME_LEN] = {0};
setTokenAndResColumnName(pItem, resultSchema.name, rawName, sizeof(colSchema.name) - 1);
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, pIndex->tableIndex);
SColumn c = createColumn(pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, pIndex->type, &colSchema);
SSourceParam param = {0};
addIntoSourceParam(&param, NULL, &c);
doAddOneExprInfo(pQueryInfo, "project_tag", &param, startPos, pTableMetaInfo, &colSchema, 0, rawName, true);
return TSDB_CODE_SUCCESS;
return doAddOneProjectCol(pQueryInfo, startPos, &colSchema, pItem->aliasName, getNewResColId(), pMsgBuf);
}
int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, SMsgBuf* pMsgBuf) {
const char* msg1 = "tag for normal table query is not allowed";
const char* msg2 = "invalid column name";
const char* msg3 = "tbname not allowed in outer query";
if (checkForAliasName(pMsgBuf, pItem->aliasName) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
@ -2798,11 +3014,11 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem*
if (index.tableIndex == COLUMN_INDEX_INITIAL_VAL) { // all table columns are required.
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
index.tableIndex = i;
int32_t inc = doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos);
int32_t inc = doAddMultipleProjectExprAndResColumns(pQueryInfo, &index, startPos, pMsgBuf);
startPos += inc;
}
} else {
doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos);
doAddMultipleProjectExprAndResColumns(pQueryInfo, &index, startPos, pMsgBuf);
}
// add the primary timestamp column even though it is not required by user
@ -2841,7 +3057,8 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem*
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
doAddProjectCol(pQueryInfo, startPos, &index, pItem->aliasName, getNewResColId());
SSchema* pSchema = getOneColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
doAddOneProjectCol(pQueryInfo, startPos, pSchema, pItem->aliasName, getNewResColId(), pMsgBuf);
}
// add the primary timestamp column even though it is not required by user
@ -2888,57 +3105,6 @@ static int32_t validateExprLeafNode(tSqlExpr* pExpr, SQueryStmtInfo* pQueryInfo,
return TSDB_CODE_SUCCESS;
}
int32_t validateComplexExpr(tSqlExpr * pExpr, SQueryStmtInfo* pQueryInfo, SArray* pColList, int32_t* type, SMsgBuf* pMsgBuf) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
if (pExpr->type == SQL_NODE_SQLFUNCTION) {
return validateScalarFunctionParam(pQueryInfo, pExpr, pColList, type, pMsgBuf);
}
tSqlExpr* pLeft = pExpr->pLeft;
if (pLeft->type == SQL_NODE_EXPR) {
code = validateComplexExpr(pLeft, pQueryInfo, pColList, type, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
code = validateExprLeafNode(pLeft, pQueryInfo, pColList, type, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
tSqlExpr* pRight = pExpr->pRight;
if (pRight->type == SQL_NODE_EXPR) {
code = validateComplexExpr(pRight, pQueryInfo, pColList, type, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
code = validateExprLeafNode(pRight, pQueryInfo, pColList, type, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// check divide by 0
if (pExpr->tokenId == TK_DIVIDE && pRight->type == SQL_NODE_VALUE) {
int32_t type1 = pRight->value.nType;
const char* msg1 = "invalid expr (divide by 0)";
if (type1 == TSDB_DATA_TYPE_DOUBLE && pRight->value.d < DBL_EPSILON) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
} else if (type1 == TSDB_DATA_TYPE_INT && pRight->value.i == 0) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
}
return TSDB_CODE_SUCCESS;
}
static uint64_t findTmpSourceColumnInNextLevel(SQueryStmtInfo* pQueryInfo, tExprNode *pExpr) {
// This function must be a aggregate function, so it must be in the next level
pQueryInfo->exprListLevelIndex += 1;
@ -2972,31 +3138,60 @@ static tExprNode* doCreateColumnNode(SQueryStmtInfo* pQueryInfo, SColumnIndex* p
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
tExprNode* pExpr = calloc(1, sizeof(tExprNode));
pExpr->nodeType = TEXPR_COL_NODE;
pExpr->pSchema = calloc(1, sizeof(SSchema));
SSchema* pSchema = getOneColumnSchema(pTableMeta, pIndex->columnIndex);
pExpr->nodeType = TEXPR_COL_NODE;
pExpr->pSchema = calloc(1, sizeof(SSchema));
SSchema* pSchema = NULL;
if (pIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pSchema = getTbnameColumnSchema();
} else {
pSchema = getOneColumnSchema(pTableMeta, pIndex->columnIndex);
}
*(SSchema*)(pExpr->pSchema) = *pSchema;
if (keepTableCols) {
if (keepTableCols && TSDB_COL_IS_NORMAL_COL(pIndex->type)) {
SColumn c = createColumn(pTableMeta->uid, pTableMetaInfo->aliasName, pIndex->type, pExpr->pSchema);
taosArrayPush(pCols, &c);
}
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL);
SSchema* pTsSchema = getOneColumnSchema(pTableMeta, 0);
insertPrimaryTsColumn(pQueryInfo->colList, pTsSchema->name, pTableMeta->uid);
if (TSDB_COL_IS_NORMAL_COL(pIndex->type)) {
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL);
SSchema* pTsSchema = getOneColumnSchema(pTableMeta, 0);
insertPrimaryTsColumn(pQueryInfo->colList, pTsSchema->name, pTableMeta->uid);
} else {
columnListInsert(pTableMetaInfo->tagColList, pTableMeta->uid, pSchema, TSDB_COL_TAG);
}
return pExpr;
}
static SExprInfo* createColumnNodeFromAggFunc(SSchema* pSchema) {
tExprNode* pExprNode = calloc(1, sizeof(tExprNode));
pExprNode->nodeType = TEXPR_COL_NODE;
pExprNode->pSchema = calloc(1, sizeof(SSchema));
*(SSchema*)(pExprNode->pSchema) = *pSchema;
SExprInfo* pExpr = calloc(1, sizeof(SExprInfo));
if (pExpr == NULL) {
return NULL;
}
pExpr->pExpr = pExprNode;
memcpy(&pExpr->base.resSchema, pSchema, sizeof(SSchema));
return pExpr;
}
static int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SMsgBuf* pMsgBuf);
static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_t* num, tExprNode** p, SArray* pCols,
static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_t* num, tExprNode*** p, SArray* pCols,
bool* keepTableCols, const tSqlExpr* pSqlExpr, SMsgBuf* pMsgBuf) {
SArray* pParamList = pSqlExpr->Expr.paramList;
if (pParamList != NULL) {
*num = taosArrayGetSize(pParamList);
p = calloc((*num), POINTER_BYTES);
(*p) = calloc((*num), POINTER_BYTES);
for (int32_t i = 0; i < (*num); ++i) {
tSqlExprItem* pItem = taosArrayGet(pParamList, i);
@ -3006,7 +3201,7 @@ static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_
return ret;
}
int32_t code = sqlExprToExprNode(&p[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf);
int32_t code = sqlExprToExprNode(&(*p)[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3017,10 +3212,10 @@ static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_
}
*num = 1;
p = calloc(*num, POINTER_BYTES);
(*p) = calloc(*num, POINTER_BYTES);
SColumnIndex index = {.type = TSDB_COL_NORMAL, .tableIndex = 0, .columnIndex = 0};
p[0] = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols);
(*p)[0] = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols);
}
return TSDB_CODE_SUCCESS;
@ -3080,7 +3275,8 @@ int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SM
}
int32_t tokenId = pSqlExpr->tokenId;
if (pRight->type == SQL_NODE_VALUE && pRight->value.nType == TSDB_DATA_TYPE_DOUBLE && pRight->value.d == 0 && tokenId == TK_DIVIDE) {
if (pRight->type == SQL_NODE_VALUE && (pRight->value.nType == TSDB_DATA_TYPE_DOUBLE || pRight->value.nType == TSDB_DATA_TYPE_INT) &&
pRight->value.d == 0 && tokenId == TK_DIVIDE) {
return buildInvalidOperationMsg(pMsgBuf, "invalid expression (divided by 0)");
}
@ -3098,6 +3294,20 @@ int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SM
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
} else if (pSqlExpr->type == SQL_NODE_SQLFUNCTION) {
bool scalar = false;
int32_t functionId = qIsBuiltinFunction(pSqlExpr->Expr.operand.z, pSqlExpr->Expr.operand.n, &scalar);
if (functionId < 0) {
return buildInvalidOperationMsg(pMsgBuf, "invalid function name");
}
// do check the parameter number for scalar function
if (scalar) {
int32_t ret = validateScalarFunctionParamNum((tSqlExpr*) pSqlExpr, functionId, pMsgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, "invalid number of function parameters");
}
}
}
return TSDB_CODE_SUCCESS;
@ -3143,7 +3353,7 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStm
int32_t num = 0;
tExprNode** p = NULL;
int32_t code = doProcessFunctionLeafNodeParam(pQueryInfo, &num, p, pCols, keepTableCols, pSqlExpr, pMsgBuf);
int32_t code = doProcessFunctionLeafNodeParam(pQueryInfo, &num, &p, pCols, keepTableCols, pSqlExpr, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3203,6 +3413,9 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStm
(*pExpr)->pSchema = calloc(1, sizeof(SSchema));
strncpy((*pExpr)->pSchema->name, pSqlExpr->exprToken.z, pSqlExpr->exprToken.n);
// it must be the aggregate function
assert(qIsAggregateFunction((*pExpr)->pSchema->name));
uint64_t uid = findTmpSourceColumnInNextLevel(pQueryInfo, *pExpr);
if (!(*keepTableCols)) {
SColumn c = createColumn(uid, NULL, TSDB_COL_TMP, (*pExpr)->pSchema);
@ -3265,26 +3478,6 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStm
return TSDB_CODE_SUCCESS;
}
static int32_t multiColumnListInsert(SQueryStmtInfo* pQueryInfo, SArray* pColumnList, SMsgBuf* pMsgBuf) {
const char* msg1 = "tag can not be used in expression";
SColumn* p1 = taosArrayGet(pColumnList, 0);
size_t numOfNode = taosArrayGetSize(pColumnList);
for(int32_t k = 0; k < numOfNode; ++k) {
SColumn* p = taosArrayGet(pColumnList, k);
if (TSDB_COL_IS_TAG(p->flag)) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
SSchema s = createSchema(p->info.type, p->info.bytes, p->info.colId, p->name);
columnListInsert(pQueryInfo->colList, p->uid, &s, p->flag);
}
insertPrimaryTsColumn(pQueryInfo->colList, NULL, p1->uid);
return TSDB_CODE_SUCCESS;
}
static int32_t addScalarExprAndResColumn(SQueryStmtInfo* pQueryInfo, int32_t exprIndex, tSqlExprItem* pItem, SMsgBuf* pMsgBuf) {
SArray* pColumnList = taosArrayInit(4, sizeof(SColumn));
SSchema s = createSchema(TSDB_DATA_TYPE_DOUBLE, sizeof(double), getNewResColId(), "");
@ -3379,14 +3572,14 @@ int32_t validateSelectNodeList(SQueryStmtInfo* pQueryInfo, SArray* pSelNodeList,
if (type == SQL_NODE_SQLFUNCTION) {
bool scalarFunc = false;
pItem->functionId = qIsBuiltinFunction(pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n, &scalarFunc);
if (pItem->functionId == FUNCTION_INVALID_ID) {
int32_t functionId = FUNCTION_INVALID_ID;
bool valid = qIsValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n, &functionId);
if (!valid) {
if (pItem->functionId == FUNCTION_INVALID_ID) { // temporarily disable the udf
// int32_t functionId = FUNCTION_INVALID_ID;
// bool valid = qIsValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n, &functionId);
// if (!valid) {
return buildInvalidOperationMsg(pMsgBuf, msg5);
}
// }
pItem->functionId = functionId;
// pItem->functionId = functionId;
}
if (scalarFunc) { // scalar function
@ -3401,7 +3594,7 @@ int32_t validateSelectNodeList(SQueryStmtInfo* pQueryInfo, SArray* pSelNodeList,
}
} else if (type == SQL_NODE_TABLE_COLUMN || type == SQL_NODE_VALUE) {
// use the dynamic array list to decide if the function is valid or not
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
if ((code = addProjectionExprAndResColumn(pQueryInfo, pItem, outerQuery, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code;
}
@ -3920,12 +4113,6 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
validateSqlNode(p, pQueryInfo, &buf);
}
SArray* functionList = extractFunctionList(pQueryInfo->exprList[0]);
extractFunctionDesc(functionList, &pQueryInfo->info);
if ((code = checkForInvalidExpr(pQueryInfo, &buf)) != TSDB_CODE_SUCCESS) {
return code;
}
return code;
}

View File

@ -223,7 +223,7 @@ void addExprInfoParam(SSqlExpr* pExpr, char* argument, int32_t type, int32_t byt
}
int32_t getExprFunctionId(SExprInfo *pExprInfo) {
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_FUNCTION_NODE);
return 0;
}
@ -324,10 +324,17 @@ SArray* extractFunctionList(SArray* pExprInfoList) {
assert(pExprInfoList != NULL);
size_t len = taosArrayGetSize(pExprInfoList);
SArray* p = taosArrayInit(len, sizeof(int32_t));
SArray* p = taosArrayInit(len, POINTER_BYTES);
for(int32_t i = 0; i < len; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pExprInfoList, i);
taosArrayPush(p, &pExprInfo->pExpr->_function.functionName);
if (pExprInfo->pExpr->nodeType == TEXPR_FUNCTION_NODE) {
char* name = strdup(pExprInfo->pExpr->_function.functionName);
taosArrayPush(p, &name);
} else {
char* name = strdup("project");
taosArrayPush(p, &name);
}
}
return p;
@ -350,11 +357,16 @@ bool tscHasColumnFilter(SQueryStmtInfo* pQueryInfo) {
return false;
}
//void tscClearInterpInfo(SQueryStmtInfo* pQueryInfo) {
// if (!tscIsPointInterpQuery(pQueryInfo)) {
// return;
// }
//
// pQueryInfo->fillType = TSDB_FILL_NONE;
// tfree(pQueryInfo->fillVal);
//}
int32_t getExprFunctionLevel(SQueryStmtInfo* pQueryInfo) {
int32_t n = 10;
int32_t level = 0;
for(int32_t i = 0; i < n; ++i) {
SArray* pList = pQueryInfo->exprList[i];
if (taosArrayGetSize(pList) > 0) {
level += 1;
}
}
return level;
}

View File

@ -16,6 +16,7 @@
#include <function.h>
#include <gtest/gtest.h>
#include <iostream>
#include "tglobal.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
@ -398,6 +399,7 @@ TEST(testCase, function_Test5) {
TEST(testCase, function_Test10) {
sqlCheck("select c from `t.1abc`", true);
sqlCheck("select length(c) from `t.1abc`", true);
sqlCheck("select length(sum(col)) from `t.1abc`", true);
sqlCheck("select sum(length(a+b)) from `t.1abc`", true);
sqlCheck("select sum(sum(a+b)) from `t.1abc`", false);
sqlCheck("select sum(length(a) + length(b)) from `t.1abc`", true);
@ -406,12 +408,27 @@ TEST(testCase, function_Test10) {
sqlCheck("select cov(a, b) from `t.1abc`", true);
sqlCheck("select sum(length(a) + count(b)) from `t.1abc`", false);
sqlCheck("select concat(sum(a), count(b)) from `t.1abc`", true);
sqlCheck("select concat(concat(a,b), concat(a,b)) from `t.1abc`", true);
sqlCheck("select length(length(length(a))) from `t.1abc`", true);
sqlCheck("select count() from `t.1abc`", false);
sqlCheck("select block_dist() from `t.1abc`", true);
sqlCheck("select block_dist(a) from `t.1abc`", false);
sqlCheck("select count(*) from `t.1abc` interval(1s) group by a", false);
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
sqlCheck("select length119(a,b) from `t.1abc`", false);
sqlCheck("select length(a, b) from `t.1abc`", false);
sqlCheck("select block_dist() + 20 from `t.1abc`", true);
sqlCheck("select count(b), c from `t.1abc`", false);
sqlCheck("select top(a, 20), count(b) from `t.1abc`", false);
sqlCheck("select top(a, 20), b from `t.1abc`", false);
sqlCheck("select top(a, 20), a+20 from `t.1abc`", true);
// sqlCheck("select top(a, 20), bottom(a, 10) from `t.1abc`", false);
// sqlCheck("select last_row(*), count(b) from `t.1abc`", false);
// sqlCheck("select last_row(a, b) + 20 from `t.1abc`", false);
// sqlCheck("select last_row(count(*)) from `t.1abc`", false);
}
TEST(testCase, function_Test6) {
@ -441,9 +458,14 @@ TEST(testCase, function_Test6) {
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList[0];
ASSERT_EQ(taosArrayGetSize(pExprList), 5);
if (tsCompatibleModel) {
ASSERT_EQ(taosArrayGetSize(pExprList), 6);
} else {
ASSERT_EQ(taosArrayGetSize(pExprList), 5);
}
SExprInfo* p1 = (SExprInfo*)taosArrayGetP(pExprList, 0);
int32_t index = tsCompatibleModel? 1:0;
SExprInfo* p1 = (SExprInfo*)taosArrayGetP(pExprList, index);
ASSERT_EQ(p1->base.pColumns->uid, 110);
ASSERT_EQ(p1->base.numOfParams, 0);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
@ -461,9 +483,12 @@ TEST(testCase, function_Test6) {
ASSERT_STREQ(pParam->pSchema->name, "t.1abc.a+b");
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 5);
SExprInfo* p2 = (SExprInfo*)taosArrayGetP(pExprList, 1);
int32_t numOfResCol = tsCompatibleModel? 6:5;
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, numOfResCol);
index = tsCompatibleModel? 2:1;
SExprInfo* p2 = (SExprInfo*)taosArrayGetP(pExprList, index);
ASSERT_EQ(p2->base.pColumns->uid, 110);
ASSERT_EQ(p2->base.numOfParams, 0);
ASSERT_EQ(p2->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
@ -511,9 +536,10 @@ TEST(testCase, function_Test6) {
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList[0];
ASSERT_EQ(taosArrayGetSize(pExprList), 2);
ASSERT_EQ(taosArrayGetSize(pExprList), 3);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 0);
int32_t index = tsCompatibleModel? 1:0;
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, index);
ASSERT_EQ(p1->base.pColumns->uid, 110);
ASSERT_EQ(p1->base.numOfParams, 0);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_BIGINT);
@ -537,7 +563,9 @@ TEST(testCase, function_Test6) {
ASSERT_EQ(pParam->pSchema->colId, p2->base.resSchema.colId);
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
int32_t numOfCols = tsCompatibleModel? 3:2;
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, numOfCols);
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);

View File

@ -15,6 +15,7 @@
#include <function.h>
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
@ -63,7 +64,6 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
setSchema(&pSchema[1], TSDB_DATA_TYPE_INT, 4, "a", 1);
setSchema(&pSchema[2], TSDB_DATA_TYPE_DOUBLE, 8, "b", 2);
setSchema(&pSchema[3], TSDB_DATA_TYPE_DOUBLE, 8, "col", 3);
}
void generateLogicplan(const char* sql) {
@ -132,7 +132,9 @@ TEST(testCase, planner_test) {
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList[0];
ASSERT_EQ(taosArrayGetSize(pExprList), 2);
int32_t num = tsCompatibleModel? 2:1;
ASSERT_EQ(taosArrayGetSize(pExprList), num);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
ASSERT_EQ(p1->base.pColumns->uid, 110);
@ -172,6 +174,7 @@ TEST(testCase, displayPlan) {
generateLogicplan("select count(A+B) from `t.1abc` group by a");
generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
@ -179,15 +182,18 @@ TEST(testCase, displayPlan) {
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
// order by + group by column + limit offset + fill
// order by + group by column + limit offset
generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
// fill
generateLogicplan("select min(a) from `t.1abc` where ts>now and ts<now+2h interval(1s) fill(linear)");
// union + union all
// join
// union
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
// Projection(cols: [a+b #5059]) filters:(nil)
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)

View File

@ -26,18 +26,26 @@ extern "C" {
#include "taosmsg.h"
typedef struct SQueryNodeBasicInfo {
int32_t type;
char *name;
int32_t type; // operator type
char *name; // operator name
} SQueryNodeBasicInfo;
typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not
int32_t phase; // merge|partial
int32_t type; // operator type
char *name; // operator name
SEpSet *sourceEp; // data source epset
} SQueryDistPlanNodeInfo;
typedef struct SQueryTableInfo {
char *tableName;
uint64_t uid;
char *tableName;
uint64_t uid;
STimeWindow window;
} SQueryTableInfo;
typedef struct SQueryPlanNode {
SQueryNodeBasicInfo info;
SQueryTableInfo tableInfo;
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SArray *pExpr; // the query functions or sql aggregations
@ -51,9 +59,49 @@ typedef struct SQueryPlanNode {
typedef struct SQueryDistPlanNode {
SQueryNodeBasicInfo info;
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SArray *pExpr; // the query functions or sql aggregations
int32_t numOfExpr; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray *pPrevNodes; // upstream nodes, or exchange operator to load data from multiple sources.
} SQueryDistPlanNode;
typedef struct SQueryCostSummary {
int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed
int64_t cputime; // total cpu cost, not execute elapsed time
int64_t loadRemoteDataDuration; // remote io time
int64_t loadNativeDataDuration; // native disk io time
uint64_t loadNativeData; // blocks + SMA + header files
uint64_t loadRemoteData; // remote data acquired by exchange operator.
uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it
int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue.
uint64_t totalRows;
uint64_t loadRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockAgg;
uint32_t skipBlocks;
uint64_t resultSize; // generated result size in Kb.
} SQueryCostSummary;
typedef struct SQueryTask {
uint64_t queryId; // query id
uint64_t taskId; // task id
SQueryDistPlanNode *pNode; // operator tree
uint64_t status; // task status
SQueryCostSummary summary; // task execution summary
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
} SQueryTask;
#ifdef __cplusplus
}
#endif

View File

@ -27,7 +27,7 @@
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNIONALL 10
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
@ -46,14 +46,13 @@ typedef struct SJoinCond {
static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode);
static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo);
int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len);
int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0;
}
int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
exprInfoPushDown((struct SQueryStmtInfo*) pQueryInfo);
SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo);
assert(taosArrayGetSize(upstream) == 1);
@ -95,17 +94,12 @@ int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQuery
//======================================================================================================================
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo, void* pExtInfo) {
SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) {
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
pNode->info.type = type;
pNode->info.name = strdup(name);
if (pTableInfo->uid != 0 && pTableInfo->tableName) { // it is a true table
pNode->tableInfo.uid = pTableInfo->uid;
pNode->tableInfo.tableName = strdup(pTableInfo->tableName);
}
pNode->numOfExpr = numOfOutput;
pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
@ -120,9 +114,10 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
switch(type) {
case QNODE_TABLESCAN: {
STimeWindow* window = calloc(1, sizeof(STimeWindow));
memcpy(window, pExtInfo, sizeof(STimeWindow));
pNode->pExtInfo = window;
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
info->tableName = strdup(((SQueryTableInfo*) pExtInfo)->tableName);
pNode->pExtInfo = info;
break;
}
@ -168,6 +163,12 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
memcpy(pNode->pExtInfo, pExtInfo, sizeof(SLimit));
break;
}
case QNODE_SORT: {
pNode->pExtInfo = taosArrayDup(pExtInfo);
break;
}
default:
break;
}
@ -179,21 +180,20 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
SArray* pExprs, SArray* tableCols) {
if (pQueryInfo->info.onlyTagQuery) {
int32_t num = (int32_t) taosArrayGetSize(pExprs);
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL);
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, NULL);
if (pQueryInfo->info.distinct) {
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL);
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
}
return pNode;
}
STimeWindow* window = &pQueryInfo->window;
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info, window);
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
if (pQueryInfo->info.projectionQuery) {
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, info, NULL);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL);
} else {
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
@ -210,54 +210,24 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
pExpr[i] = p;
}
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, info, NULL);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
tfree(pExpr);
}
return pNode;
}
static int32_t getFunctionLevel(SQueryStmtInfo* pQueryInfo) {
int32_t n = 10;
int32_t level = 0;
for(int32_t i = 0; i < n; ++i) {
SArray* pList = pQueryInfo->exprList[i];
if (taosArrayGetSize(pList) > 0) {
level += 1;
}
}
return level;
}
static SQueryPlanNode* createOneQueryPlanNode(SArray* p, SQueryPlanNode* pNode, SExprInfo* pExpr, SQueryTableInfo* info) {
if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE) {
bool aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName);
if (aggregateFunc) {
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
return createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, numOfOutput, info, NULL);
} else {
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
return createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, numOfOutput, info, NULL);
}
} else {
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
return createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, numOfOutput, info, NULL);
}
}
static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) {
// group by column not by tag
size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo);
// check for aggregation
int32_t level = getFunctionLevel(pQueryInfo);
int32_t level = getExprFunctionLevel(pQueryInfo);
for(int32_t i = level - 1; i >= 0; --i) {
SArray* p = pQueryInfo->exprList[i];
size_t num = taosArrayGetSize(p);
size_t num = taosArrayGetSize(p);
bool aggregateFunc = false;
for(int32_t j = 0; j < num; ++j) {
SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0);
@ -273,37 +243,43 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer
if (aggregateFunc) {
if (pQueryInfo->interval.interval > 0) {
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->interval);
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->interval);
} else if (pQueryInfo->sessionWindow.gap > 0) {
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->sessionWindow);
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->sessionWindow);
} else if (pQueryInfo->stateWindow.col.info.colId > 0) {
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->stateWindow);
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->stateWindow);
} else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) {
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, info, &pQueryInfo->groupbyExpr);
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, &pQueryInfo->groupbyExpr);
} else {
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, info, NULL);
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL);
}
} else {
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, info, NULL);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
}
}
if (pQueryInfo->havingFieldNum > 0) {
// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, NULL);
}
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillEssInfo* pInfo = calloc(1, sizeof(SFillEssInfo));
pInfo->fillType = pQueryInfo->fillType;
pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t));
pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t));
memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr);
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, NULL, 0, info, pInfo);
SArray* p = pQueryInfo->exprList[0]; // top expression in select clause
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p, taosArrayGetSize(p), pInfo);
}
if (pQueryInfo->order != NULL) {
SArray* pList = pQueryInfo->exprList[0];
pNode = createQueryNode(QNODE_SORT, "Sort", &pNode, 1, pList->pData, taosArrayGetSize(pList), pQueryInfo->order);
}
if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) {
pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, info, &pQueryInfo->limit);
pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, &pQueryInfo->limit);
}
return pNode;
@ -341,44 +317,6 @@ static bool isAllAggExpr(SArray* pList) {
return true;
}
static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo) {
assert(pQueryInfo != NULL);
size_t level = getFunctionLevel(pQueryInfo);
for(int32_t i = 0; i < level - 1; ++i) {
SArray* p = pQueryInfo->exprList[i];
SArray* pNext = pQueryInfo->exprList[i + 1];
if (!isAllAggExpr(pNext)) {
continue;
}
for (int32_t j = 0; j < taosArrayGetSize(p); ++j) {
SExprInfo* pExpr = taosArrayGetP(p, j);
if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE && qIsAggregateFunction(pExpr->pExpr->_function.functionName)) {
bool canPushDown = true;
for (int32_t k = 0; k < taosArrayGetSize(pNext); ++k) {
SExprInfo* pNextLevelExpr = taosArrayGetP(pNext, k);
if (pExpr->base.pColumns->info.colId == pNextLevelExpr->base.resSchema.colId) {
// pExpr is dependent on the output of the under layer, so it can not be push downwards
canPushDown = false;
break;
}
}
if (canPushDown) {
taosArrayInsert(pNext, j, &pExpr);
taosArrayRemove(p, j);
// todo add the project function in level of "i"
}
}
}
}
}
SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
SArray* upstream = NULL;
@ -429,7 +367,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
SQueryTableInfo info = {0};
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
pQueryInfo->exprList[0]->pData, num, &info, NULL);
pQueryInfo->exprList[0]->pData, num, NULL);
// 4. add the aggregation or projection execution node
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
@ -449,8 +387,6 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree(pQueryNode->pExtInfo);
tfree(pQueryNode->pSchema);
tfree(pQueryNode->info.name);
tfree(pQueryNode->tableInfo.tableName);
// dropAllExprInfo(pQueryNode->pExpr);
if (pQueryNode->pPrevNodes != NULL) {
@ -477,13 +413,13 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
switch(pQueryNode->info.type) {
case QNODE_TABLESCAN: {
STimeWindow* win = (STimeWindow*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64 " cols: ",
pQueryNode->tableInfo.tableName, pQueryNode->tableInfo.uid, win->skey, win->ekey);
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid,
pInfo->window.skey, pInfo->window.ekey);
assert(len1 > 0);
len += len1;
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i);
len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId);
@ -499,90 +435,47 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
}
case QNODE_PROJECT: {
len1 = sprintf(buf + len, "cols: ");
len1 = sprintf(buf + len, "cols:");
assert(len1 > 0);
len += len1;
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSqlExpr* p = &pExprInfo->base;
len1 = sprintf(buf + len, "[%s #%d]", p->resSchema.name, p->resSchema.colId);
assert(len1 > 0);
len += len1;
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len, ", ");
len += len1;
}
}
len = printExprInfo(buf, pQueryNode, len);
len1 = sprintf(buf + len, ")");
len += len1;
//todo print filter info
// todo print filter info
len1 = sprintf(buf + len, " filters:(nil)\n");
len += len1;
break;
}
case QNODE_AGGREGATE: {
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSqlExpr* pExpr = &pExprInfo->base;
len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len, ", ");
len += len1;
}
}
len = printExprInfo(buf, pQueryNode, len);
len1 = sprintf(buf + len, ")\n");
len += len1;
break;
}
case QNODE_TIMEWINDOW: {
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSqlExpr* pExpr = &pExprInfo->base;
len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
len1 = sprintf(buf + len,") ");
len = printExprInfo(buf, pQueryNode, len);
len1 = sprintf(buf + len, ") ");
len += len1;
SInterval* pInterval = pQueryNode->pExtInfo;
// todo dynamic return the time precision
len1 = sprintf(buf + len, "interval:%" PRId64 "(%s), sliding:%" PRId64 "(%s), offset:%" PRId64 "(%s)\n",
pInterval->interval, TSDB_TIME_PRECISION_MILLI_STR, pInterval->sliding, TSDB_TIME_PRECISION_MILLI_STR,
pInterval->offset, TSDB_TIME_PRECISION_MILLI_STR);
pInterval->interval, TSDB_TIME_PRECISION_MILLI_STR, pInterval->sliding,
TSDB_TIME_PRECISION_MILLI_STR, pInterval->offset, TSDB_TIME_PRECISION_MILLI_STR);
len += len1;
break;
}
case QNODE_STATEWINDOW: {
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSqlExpr* pExpr = &pExprInfo->base;
len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
len1 = sprintf(buf + len,") ");
len = printExprInfo(buf, pQueryNode, len);
len1 = sprintf(buf + len, ") ");
len += len1;
SColumn* pCol = pQueryNode->pExtInfo;
@ -592,44 +485,25 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
}
case QNODE_SESSIONWINDOW: {
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSqlExpr* pExpr = &pExprInfo->base;
len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
len = printExprInfo(buf, pQueryNode, len);
len1 = sprintf(buf + len,") ");
len1 = sprintf(buf + len, ") ");
len += len1;
struct SSessionWindow* ps = pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "col:[%s #%d], gap:%"PRId64" (ms) \n", ps->col.name, ps->col.info.colId, ps->gap);
len1 = sprintf(buf + len, "col:[%s #%d], gap:%" PRId64 " (ms) \n", ps->col.name, ps->col.info.colId, ps->gap);
len += len1;
break;
}
case QNODE_GROUPBY: { // todo hide the invisible column
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSqlExpr* pExpr = &pExprInfo->base;
len1 = sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
len += len1;
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
case QNODE_GROUPBY: {
len = printExprInfo(buf, pQueryNode, len);
SGroupbyExpr* pGroupbyExpr = pQueryNode->pExtInfo;
len1 = sprintf(buf + len,") groupby_col: ");
len1 = sprintf(buf + len, ") groupby_col: ");
len += len1;
for(int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) {
SColumn* pCol = taosArrayGet(pGroupbyExpr->columnInfo, i);
len1 = sprintf(buf + len, "[%s #%d] ", pCol->name, pCol->info.colId);
len += len1;
@ -641,58 +515,64 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
case QNODE_FILL: {
SFillEssInfo* pEssInfo = pQueryNode->pExtInfo;
len1 = sprintf(buf + len,"%d", pEssInfo->fillType);
len1 = sprintf(buf + len, "%d", pEssInfo->fillType);
len += len1;
if (pEssInfo->fillType == TSDB_FILL_SET_VALUE) {
len1 = sprintf(buf + len,", val:");
len1 = sprintf(buf + len, ", val:");
len += len1;
// todo get the correct fill data type
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
len1 = sprintf(buf + len,"%"PRId64, pEssInfo->val[i]);
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
len1 = sprintf(buf + len, "%" PRId64, pEssInfo->val[i]);
len += len1;
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len,", ");
len1 = sprintf(buf + len, ", ");
len += len1;
}
}
}
len1 = sprintf(buf + len,")\n");
len1 = sprintf(buf + len, ")\n");
len += len1;
break;
}
case QNODE_LIMIT: {
SLimit* pVal = pQueryNode->pExtInfo;
len1 = sprintf(buf + len,"limit: %"PRId64", offset: %"PRId64")\n", pVal->limit, pVal->offset);
len1 = sprintf(buf + len, "limit: %" PRId64 ", offset: %" PRId64 ")\n", pVal->limit, pVal->offset);
len += len1;
break;
}
case QNODE_DISTINCT:
case QNODE_TAGSCAN: {
len1 = sprintf(buf + len,"cols: ");
len1 = sprintf(buf + len, "cols: ");
len += len1;
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSchema* resSchema = &pExprInfo->base.resSchema;
len = printExprInfo(buf, pQueryNode, len);
len1 = sprintf(buf + len, ")\n");
len += len1;
break;
}
case QNODE_SORT: {
len1 = sprintf(buf + len, "cols:");
len += len1;
SArray* pSort = pQueryNode->pExtInfo;
for (int32_t i = 0; i < taosArrayGetSize(pSort); ++i) {
SOrder* p = taosArrayGet(pSort, i);
len1 = sprintf(buf + len, " [%s #%d %s]", p->col.name, p->col.info.colId, p->order == TSDB_ORDER_ASC? "ASC":"DESC");
len1 = sprintf(buf + len,"[%s #%d]", resSchema->name, resSchema->colId);
len += len1;
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
len1 = sprintf(buf + len,")\n");
len1 = sprintf(buf + len, ")\n");
len += len1;
break;
}
@ -707,6 +587,26 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
return len;
}
int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len) {
int32_t len1 = 0;
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
SSqlExpr* pExpr = &pExprInfo->base;
len1 = sprintf(buf + len, "%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
assert(len1 > 0);
len += len1;
if (i < pQueryNode->numOfExpr - 1) {
len1 = sprintf(buf + len, ", ");
len += len1;
}
}
return len;
}
int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) {
int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen);

View File

@ -63,7 +63,7 @@ typedef struct SConvertFunc {
int32_t execFuncId;
} SConvertFunc;
static SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex, int32_t colId);
static SExprInfo* doAddOneProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex, int32_t colId);
static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo);
static char* getAccountId(SSqlObj* pSql);
@ -1890,7 +1890,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32
}
static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSqlExprItem* pItem, int32_t colId) {
SExprInfo* pExpr = doAddProjectCol(pQueryInfo, pIndex->columnIndex, pIndex->tableIndex, colId);
SExprInfo* pExpr = doAddOneProjectCol(pQueryInfo, pIndex->columnIndex, pIndex->tableIndex, colId);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
@ -2157,7 +2157,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi
return TSDB_CODE_SUCCESS;
}
SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex, int32_t colId) {
SExprInfo* doAddOneProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex, int32_t colId) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
@ -2218,7 +2218,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
}
for (int32_t j = 0; j < numOfTotalColumns; ++j) {
SExprInfo* pExpr = doAddProjectCol(pQueryInfo, j, pIndex->tableIndex, getNewResColId(pCmd));
SExprInfo* pExpr = doAddOneProjectCol(pQueryInfo, j, pIndex->tableIndex, getNewResColId(pCmd));
tstrncpy(pExpr->base.aliasName, pSchema[j].name, sizeof(pExpr->base.aliasName));
pIndex->columnIndex = j;