From 8f199e0eb89291ddea340ca285583e7f10600797 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 17 Feb 2022 19:30:43 +0800 Subject: [PATCH] feature/qnode --- include/common/tname.h | 10 + include/common/ttypes.h | 2 - include/libs/function/function.h | 11 +- include/libs/function/functionMgt.h | 1 + include/libs/nodes/nodes.h | 1 - include/libs/nodes/querynodes.h | 1 + include/libs/scalar/scalar.h | 36 ++ source/libs/CMakeLists.txt | 3 +- source/libs/executor/CMakeLists.txt | 2 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 6 +- source/libs/executor/test/CMakeLists.txt | 2 +- source/libs/executor/test/executorTests.cpp | 2 + source/libs/function/inc/tunaryoperator.h | 6 +- source/libs/function/src/taggfunction.c | 5 +- source/libs/function/src/tfunction.c | 3 +- source/libs/function/src/tunaryoperator.c | 12 +- source/libs/nodes/src/nodesUtilFuncs.c | 1 + source/libs/scalar/CMakeLists.txt | 16 + source/libs/scalar/inc/filter.h | 347 ++++++++++++++++++ .../inc/tscalar.h => scalar/inc/sclInt.h} | 9 +- .../inc/sclfunc.h} | 8 +- .../tbinoperator.h => scalar/inc/sclvector.h} | 2 +- .../libs/{executor => scalar}/inc/tfilter.h | 12 +- source/libs/scalar/inc/tsclfunc.h | 45 +++ .../src/tfilter.c => scalar/src/filter.c} | 6 +- .../src/tscalar.c => scalar/src/scalar.c} | 114 +++--- .../src/sclfunc.c} | 5 +- .../tbinoperator.c => scalar/src/sclvector.c} | 30 +- source/libs/scalar/test/CMakeLists.txt | 18 + source/libs/scalar/test/scalarTests.cpp | 55 +++ 31 files changed, 670 insertions(+), 103 deletions(-) create mode 100644 include/libs/scalar/scalar.h create mode 100644 source/libs/scalar/CMakeLists.txt create mode 100644 source/libs/scalar/inc/filter.h rename source/libs/{function/inc/tscalar.h => scalar/inc/sclInt.h} (91%) rename source/libs/{function/inc/tscalarfunction.h => scalar/inc/sclfunc.h} (92%) rename source/libs/{function/inc/tbinoperator.h => scalar/inc/sclvector.h} (97%) rename source/libs/{executor => scalar}/inc/tfilter.h (97%) create mode 100644 source/libs/scalar/inc/tsclfunc.h rename source/libs/{executor/src/tfilter.c => scalar/src/filter.c} (99%) rename source/libs/{function/src/tscalar.c => scalar/src/scalar.c} (80%) rename source/libs/{function/src/tscalarfunction.c => scalar/src/sclfunc.c} (99%) rename source/libs/{function/src/tbinoperator.c => scalar/src/sclvector.c} (98%) create mode 100644 source/libs/scalar/test/CMakeLists.txt create mode 100644 source/libs/scalar/test/scalarTests.cpp diff --git a/include/common/tname.h b/include/common/tname.h index 12a0d34cb4..47028fbce1 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -16,6 +16,11 @@ #ifndef TDENGINE_TNAME_H #define TDENGINE_TNAME_H +#ifdef __cplusplus +extern "C" { +#endif + + #include "tdef.h" #include "tmsg.h" @@ -59,4 +64,9 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId); SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); +#ifdef __cplusplus +} +#endif + + #endif // TDENGINE_TNAME_H diff --git a/include/common/ttypes.h b/include/common/ttypes.h index afa605044a..5a1e442277 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -122,8 +122,6 @@ typedef struct { *(int32_t *)(_v) = (int32_t)(_data); \ break; \ default: \ - (void *)(_v) = (void *)(_data); \ - (void *)(_data) = NULL; \ break; \ } \ } while (0) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index dfcac7ca3f..bb5d51686d 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -226,7 +226,12 @@ typedef struct SAggFunctionInfo { int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId); } SAggFunctionInfo; -struct SScalarParam; +typedef struct SScalarParam { + void* data; + int32_t num; + int32_t type; + int32_t bytes; +} SScalarParam; typedef struct SScalarFunctionInfo { char name[FUNCTIONS_NAME_MAX_LENGTH]; @@ -285,10 +290,6 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num); bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry); -struct SScalarFunctionSupport* createScalarFuncSupport(int32_t num); -void destroyScalarFuncSupport(struct SScalarFunctionSupport* pSupport, int32_t num); -struct SScalarFunctionSupport* getScalarFuncSupport(struct SScalarFunctionSupport* pSupport, int32_t index); - /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // fill api struct SFillInfo; diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 84b1f8d5bf..e77ccaa225 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "querynodes.h" +#include "function.h" typedef enum EFunctionType { // aggregate function diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 8a8e230359..068b80cf59 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -104,7 +104,6 @@ SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode); SListCell* nodesListErase(SNodeList* pList, SListCell* pCell); SNode* nodesListGetNode(SNodeList* pList, int32_t index); void nodesDestroyList(SNodeList* pList); -void *nodesGetValueFromNode(SValueNode *pNode); typedef enum EDealRes { DEAL_RES_CONTINUE = 1, diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index cea0aa32a8..de6f8747e8 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -277,6 +277,7 @@ bool nodesIsJsonOp(const SOperatorNode* pOp); bool nodesIsTimeorderQuery(const SNode* pQuery); bool nodesIsTimelineQuery(const SNode* pQuery); +void *nodesGetValueFromNode(SValueNode *pNode); #ifdef __cplusplus } diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h new file mode 100644 index 0000000000..df83a0de35 --- /dev/null +++ b/include/libs/scalar/scalar.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_SCALAR_H +#define TDENGINE_SCALAR_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "function.h" +#include "nodes.h" + +typedef struct SFilterInfo SFilterInfo; + + +int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes); +int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst); + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_SCALAR_H diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index 049b69991f..a2454e4c5e 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -13,4 +13,5 @@ add_subdirectory(function) add_subdirectory(qcom) add_subdirectory(qworker) add_subdirectory(tfs) -add_subdirectory(nodes) \ No newline at end of file +add_subdirectory(nodes) +add_subdirectory(scalar) \ No newline at end of file diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index 9b53cc1fbb..12a78134c3 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -13,7 +13,7 @@ add_library(executor STATIC ${EXECUTOR_SRC}) # INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor" # ) target_link_libraries(executor - PRIVATE os util common function parser planner qcom vnode + PRIVATE os util common function parser planner qcom vnode scalar nodes ) target_include_directories( diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 232b54554f..23acdf1c33 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -29,9 +29,9 @@ extern "C" { #include "executil.h" #include "executor.h" #include "planner.h" +#include "scalar.h" #include "taosdef.h" #include "tarray.h" -#include "tfilter.h" #include "thash.h" #include "tlockfree.h" #include "tpagedbuf.h" diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f5dc7a82b1..5508b5ecd7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2148,7 +2148,7 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT // NOTE: pTableCheckInfo need to update the query time range and the lastKey info pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - pRuntimeEnv->scalarSup = createScalarFuncSupport(pQueryAttr->numOfOutput); + //pRuntimeEnv->scalarSup = createScalarFuncSupport(pQueryAttr->numOfOutput); if (pRuntimeEnv->scalarSup == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) { @@ -2174,7 +2174,7 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT return TSDB_CODE_SUCCESS; _clean: - destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pRuntimeEnv->pQueryAttr->numOfOutput); + //destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pRuntimeEnv->pQueryAttr->numOfOutput); tfree(pRuntimeEnv->pResultRowHashTable); tfree(pRuntimeEnv->keyBuf); tfree(pRuntimeEnv->prevRow); @@ -2212,7 +2212,7 @@ static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) { //qDebug("QInfo:0x%"PRIx64" teardown runtime env", pQInfo->qId); - destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput); + //destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput); // destroyUdfInfo(pRuntimeEnv->pUdfInfo); destroyResultBuf(pRuntimeEnv->pResultBuf); doFreeQueryHandle(pRuntimeEnv); diff --git a/source/libs/executor/test/CMakeLists.txt b/source/libs/executor/test/CMakeLists.txt index ece84207c7..c24993eb89 100644 --- a/source/libs/executor/test/CMakeLists.txt +++ b/source/libs/executor/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(executorTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( executorTest - PUBLIC os util common transport gtest taos qcom executor function planner + PRIVATE os util common transport gtest taos qcom executor function planner scalar nodes ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index ebea6755d7..c9657607c2 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -32,6 +32,8 @@ #include "trpc.h" #include "stub.h" #include "executor.h" +#include "tmsg.h" +#include "tname.h" namespace { diff --git a/source/libs/function/inc/tunaryoperator.h b/source/libs/function/inc/tunaryoperator.h index 08cc6f69c6..cd40297e07 100644 --- a/source/libs/function/inc/tunaryoperator.h +++ b/source/libs/function/inc/tunaryoperator.h @@ -20,10 +20,10 @@ extern "C" { #endif -#include "tscalarfunction.h" +//#include "tscalarfunction.h" -typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput); -_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator); +//typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput); +//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator); #ifdef __cplusplus } diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 441b6fe3e7..1aa5871468 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "tscalarfunction.h" #include "os.h" #include "taosdef.h" #include "tmsg.h" @@ -3221,6 +3220,7 @@ static void diff_function(SqlFunctionCtx *pCtx) { } } +#if 0 char *getArithColumnData(void *param, const char* name, int32_t colId) { SScalarFunctionSupport *pSupport = (SScalarFunctionSupport *)param; @@ -3235,10 +3235,11 @@ char *getArithColumnData(void *param, const char* name, int32_t colId) { assert(index >= 0); return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes; } +#endif static void arithmetic_function(SqlFunctionCtx *pCtx) { GET_RES_INFO(pCtx)->numOfRes += pCtx->size; - SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz; + //SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz; SScalarParam output = {0}; output.data = pCtx->pOutput; diff --git a/source/libs/function/src/tfunction.c b/source/libs/function/src/tfunction.c index 36c9e2513f..e302643c32 100644 --- a/source/libs/function/src/tfunction.c +++ b/source/libs/function/src/tfunction.c @@ -3,7 +3,6 @@ #include "function.h" #include "thash.h" #include "taggfunction.h" -#include "tscalarfunction.h" static SHashObj* functionHashTable = NULL; static SHashObj* udfHashTable = NULL; @@ -18,12 +17,14 @@ static void doInitFunctionHashTable() { taosHashPut(functionHashTable, aggFunc[i].name, len, (void*)&ptr, POINTER_BYTES); } +/* numOfEntries = tListLen(scalarFunc); for(int32_t i = 0; i < numOfEntries; ++i) { int32_t len = (int32_t) strlen(scalarFunc[i].name); SScalarFunctionInfo* ptr = &scalarFunc[i]; taosHashPut(functionHashTable, scalarFunc[i].name, len, (void*)&ptr, POINTER_BYTES); } +*/ udfHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, true); } diff --git a/source/libs/function/src/tunaryoperator.c b/source/libs/function/src/tunaryoperator.c index 9651f98d08..957f0799c5 100644 --- a/source/libs/function/src/tunaryoperator.c +++ b/source/libs/function/src/tunaryoperator.c @@ -4,10 +4,10 @@ // TODO dynamic define these functions -_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) { - assert(0); -} +//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) { +// assert(0); +//} -bool isStringOperatorFn(int32_t op) { - return op == FUNCTION_LENGTH; -} +//bool isStringOperatorFn(int32_t op) { +// return op == FUNCTION_LENGTH; +//} diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8eef6aa364..520a3f82ca 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -16,6 +16,7 @@ #include "querynodes.h" #include "nodesShowStmts.h" #include "taoserror.h" +#include "taos.h" static SNode* makeNode(ENodeType type, size_t size) { SNode* p = calloc(1, size); diff --git a/source/libs/scalar/CMakeLists.txt b/source/libs/scalar/CMakeLists.txt new file mode 100644 index 0000000000..d2d02bc0dc --- /dev/null +++ b/source/libs/scalar/CMakeLists.txt @@ -0,0 +1,16 @@ +aux_source_directory(src SCALAR_SRC) + +add_library(scalar STATIC ${SCALAR_SRC}) +target_include_directories( + scalar + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scalar" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_link_libraries(scalar + PRIVATE os util common nodes function qcom + ) + +if(${BUILD_TEST}) + ADD_SUBDIRECTORY(test) +endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/libs/scalar/inc/filter.h b/source/libs/scalar/inc/filter.h new file mode 100644 index 0000000000..27665da0c8 --- /dev/null +++ b/source/libs/scalar/inc/filter.h @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_QFILTER_H +#define TDENGINE_QFILTER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "thash.h" +#include "tname.h" +#include "common.h" +#include "scalar.h" +#include "querynodes.h" + +#define FILTER_DEFAULT_GROUP_SIZE 4 +#define FILTER_DEFAULT_UNIT_SIZE 4 +#define FILTER_DEFAULT_FIELD_SIZE 4 +#define FILTER_DEFAULT_VALUE_SIZE 4 +#define FILTER_DEFAULT_GROUP_UNIT_SIZE 2 + +#define FILTER_DUMMY_EMPTY_OPTR 127 + +#define MAX_NUM_STR_SIZE 40 + +#define FILTER_RM_UNIT_MIN_ROWS 100 + +enum { + FLD_TYPE_COLUMN = 1, + FLD_TYPE_VALUE = 2, + FLD_TYPE_MAX = 3, + FLD_DESC_NO_FREE = 4, + FLD_DATA_NO_FREE = 8, + FLD_DATA_IS_HASH = 16, +}; + +enum { + MR_ST_START = 1, + MR_ST_FIN = 2, + MR_ST_ALL = 4, + MR_ST_EMPTY = 8, +}; + +enum { + RANGE_FLG_EXCLUDE = 1, + RANGE_FLG_INCLUDE = 2, + RANGE_FLG_NULL = 4, +}; + +enum { + FI_OPTION_NO_REWRITE = 1, + FI_OPTION_TIMESTAMP = 2, + FI_OPTION_NEED_UNIQE = 4, +}; + +enum { + FI_STATUS_ALL = 1, + FI_STATUS_EMPTY = 2, + FI_STATUS_REWRITE = 4, + FI_STATUS_CLONED = 8, +}; + +enum { + FI_STATUS_BLK_ALL = 1, + FI_STATUS_BLK_EMPTY = 2, + FI_STATUS_BLK_ACTIVE = 4, +}; + +enum { + RANGE_TYPE_UNIT = 1, + RANGE_TYPE_VAR_HASH = 2, + RANGE_TYPE_MR_CTX = 3, +}; + +typedef struct OptrStr { + uint16_t optr; + char *str; +} OptrStr; + +typedef struct SFilterRange { + int64_t s; + int64_t e; + char sflag; + char eflag; +} SFilterRange; + +typedef struct SFilterColRange { + uint16_t idx; //column field idx + bool isNull; + bool notNull; + bool isRange; + SFilterRange ra; +} SFilterColRange; + +typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t); +typedef int32_t(*filter_desc_compare_func)(const void *, const void *); +typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SColumnDataAgg *, int16_t); + +typedef struct SFilterRangeCompare { + int64_t s; + int64_t e; + rangeCompFunc func; +} SFilterRangeCompare; + +typedef struct SFilterRangeNode { + struct SFilterRangeNode* prev; + struct SFilterRangeNode* next; + union { + SFilterRange ra; + SFilterRangeCompare rc; + }; +} SFilterRangeNode; + +typedef struct SFilterRangeCtx { + int32_t type; + int32_t options; + int8_t status; + bool isnull; + bool notnull; + bool isrange; + int16_t colId; + __compar_fn_t pCompareFunc; + SFilterRangeNode *rf; //freed + SFilterRangeNode *rs; +} SFilterRangeCtx ; + +typedef struct SFilterVarCtx { + int32_t type; + int32_t options; + int8_t status; + bool isnull; + bool notnull; + bool isrange; + SHashObj *wild; + SHashObj *value; +} SFilterVarCtx; + +typedef struct SFilterField { + uint16_t flag; + void* desc; + void* data; +} SFilterField; + +typedef struct SFilterFields { + uint16_t size; + uint16_t num; + SFilterField *fields; +} SFilterFields; + +typedef struct SFilterFieldId { + uint16_t type; + uint16_t idx; +} SFilterFieldId; + +typedef struct SFilterGroup { + uint16_t unitSize; + uint16_t unitNum; + uint16_t *unitIdxs; + uint8_t *unitFlags; // !unit result +} SFilterGroup; + +typedef struct SFilterColInfo { + uint8_t type; + int32_t dataType; + void *info; +} SFilterColInfo; + +typedef struct SFilterGroupCtx { + uint16_t colNum; + uint16_t *colIdx; + SFilterColInfo *colInfo; +} SFilterGroupCtx; + +typedef struct SFilterColCtx { + uint16_t colIdx; + void* ctx; +} SFilterColCtx; + +typedef struct SFilterCompare { + uint8_t type; + uint8_t optr; + uint8_t optr2; +} SFilterCompare; + +typedef struct SFilterUnit { + SFilterCompare compare; + SFilterFieldId left; + SFilterFieldId right; + SFilterFieldId right2; +} SFilterUnit; + +typedef struct SFilterComUnit { + void *colData; + void *valData; + void *valData2; + uint16_t colId; + uint16_t dataSize; + uint8_t dataType; + uint8_t optr; + int8_t func; + int8_t rfunc; +} SFilterComUnit; + +typedef struct SFilterPCtx { + SHashObj *valHash; + SHashObj *unitHash; +} SFilterPCtx; + +typedef struct SFilterInfo { + uint32_t options; + uint32_t status; + uint16_t unitSize; + uint16_t unitNum; + uint16_t groupNum; + uint16_t colRangeNum; + SFilterFields fields[FLD_TYPE_MAX]; + SFilterGroup *groups; + uint16_t *cgroups; + SFilterUnit *units; + SFilterComUnit *cunits; + uint8_t *unitRes; // result + uint8_t *unitFlags; // got result + SFilterRangeCtx **colRange; + filter_exec_func func; + uint8_t blkFlag; + uint16_t blkGroupNum; + uint16_t *blkUnits; + int8_t *blkUnitRes; + + SFilterPCtx pctx; +} SFilterInfo; + +#define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t)) + +#define FILTER_NO_MERGE_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_NCHAR) +#define FILTER_NO_MERGE_OPTR(o) ((o) == TSDB_RELATION_ISNULL || (o) == TSDB_RELATION_NOTNULL || (o) == FILTER_DUMMY_EMPTY_OPTR) + +#define MR_EMPTY_RES(ctx) (ctx->rs == NULL) + +#define SET_AND_OPTR(ctx, o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { if (!(ctx)->isrange) { (ctx)->notnull = true; } } else if (o != FILTER_DUMMY_EMPTY_OPTR) { (ctx)->isrange = true; (ctx)->notnull = false; } } while (0) +#define SET_OR_OPTR(ctx,o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { (ctx)->notnull = true; (ctx)->isrange = false; } else if (o != FILTER_DUMMY_EMPTY_OPTR) { if (!(ctx)->notnull) { (ctx)->isrange = true; } } } while (0) +#define CHK_OR_OPTR(ctx) ((ctx)->isnull == true && (ctx)->notnull == true) +#define CHK_AND_OPTR(ctx) ((ctx)->isnull == true && (((ctx)->notnull == true) || ((ctx)->isrange == true))) + + +#define FILTER_GET_FLAG(st, f) (st & f) +#define FILTER_SET_FLAG(st, f) st |= (f) +#define FILTER_CLR_FLAG(st, f) st &= (~f) + +#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src) +#define FILTER_PACKAGE_UNIT_HASH_KEY(v, optr, idx1, idx2) do { char *_t = (char *)v; _t[0] = optr; *(uint16_t *)(_t + 1) = idx1; *(uint16_t *)(_t + 3) = idx2; } while (0) +#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RANGE_FLG_EXCLUDE) || FILTER_GET_FLAG(eflag,RANGE_FLG_EXCLUDE)))) +#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0) + +#define RESET_RANGE(ctx, r) do { (r)->next = (ctx)->rf; (ctx)->rf = r; } while (0) +#define FREE_RANGE(ctx, r) do { if ((r)->prev) { (r)->prev->next = (r)->next; } else { (ctx)->rs = (r)->next;} if ((r)->next) { (r)->next->prev = (r)->prev; } RESET_RANGE(ctx, r); } while (0) +#define FREE_FROM_RANGE(ctx, r) do { SFilterRangeNode *_r = r; if ((_r)->prev) { (_r)->prev->next = NULL; } else { (ctx)->rs = NULL;} while (_r) {SFilterRangeNode *n = (_r)->next; RESET_RANGE(ctx, _r); _r = n; } } while (0) +#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0) +#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0) + +#define ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) +#define ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0) +#define ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) + +#define CHK_RETV(c) do { if (c) { return; } } while (0) +#define CHK_RET(c, r) do { if (c) { return r; } } while (0) +#define CHK_JMP(c) do { if (c) { goto _return; } } while (0) +#define CHK_LRETV(c,...) do { if (c) { qError(__VA_ARGS__); return; } } while (0) +#define CHK_LRET(c, r,...) do { if (c) { if (r) {qError(__VA_ARGS__); } else { qDebug(__VA_ARGS__); } return r; } } while (0) + +#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx])) +#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx])) +#define FILTER_GET_COL_FIELD_TYPE(fi) (((SSchema *)((fi)->desc))->type) +#define FILTER_GET_COL_FIELD_SIZE(fi) (((SSchema *)((fi)->desc))->bytes) +#define FILTER_GET_COL_FIELD_ID(fi) (((SSchema *)((fi)->desc))->colId) +#define FILTER_GET_COL_FIELD_DESC(fi) ((SSchema *)((fi)->desc)) +#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri)) +#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType) +#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data) +#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX) + +#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid]) +#define FILTER_UNIT_LEFT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->left) +#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right) +#define FILTER_UNIT_RIGHT2_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right2) +#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type) +#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u)) +#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri) +#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u)) +#define FILTER_UNIT_COL_ID(i, u) FILTER_GET_COL_FIELD_ID(FILTER_UNIT_LEFT_FIELD(i, u)) +#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u)) +#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx) +#define FILTER_UNIT_OPTR(u) ((u)->compare.optr) +#define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func) + +#define FILTER_UNIT_CLR_F(i) memset((i)->unitFlags, 0, (i)->unitNum * sizeof(*info->unitFlags)) +#define FILTER_UNIT_SET_F(i, idx) (i)->unitFlags[idx] = 1 +#define FILTER_UNIT_GET_F(i, idx) ((i)->unitFlags[idx]) +#define FILTER_UNIT_GET_R(i, idx) ((i)->unitRes[idx]) +#define FILTER_UNIT_SET_R(i, idx, v) (i)->unitRes[idx] = (v) + +#define FILTER_PUSH_UNIT(colInfo, u) do { (colInfo).type = RANGE_TYPE_UNIT; (colInfo).dataType = FILTER_UNIT_DATA_TYPE(u);taosArrayPush((SArray *)((colInfo).info), &u);} while (0) +#define FILTER_PUSH_VAR_HASH(colInfo, ha) do { (colInfo).type = RANGE_TYPE_VAR_HASH; (colInfo).info = ha;} while (0) +#define FILTER_PUSH_CTX(colInfo, ctx) do { (colInfo).type = RANGE_TYPE_MR_CTX; (colInfo).info = ctx;} while (0) + +#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint16_t) * n); memcpy(*(dst), src, sizeof(uint16_t) * n);} while (0) + +#define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) do { if ((gres)->colCtxs == NULL) { (gres)->colCtxs = taosArrayInit(gres->colNum, sizeof(SFilterColCtx)); } SFilterColCtx cCtx = {idx, ctx}; taosArrayPush((gres)->colCtxs, &cCtx); } while (0) + + +#define FILTER_ALL_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_ALL) +#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY) + +#if 0 +extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options); +extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols); +extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock); +extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win); +extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar); +extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo); +extern void filterFreeInfo(SFilterInfo *info); +extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows); +#else +//REMOVE THESE!!!!!!!!!!!!!!!!!!!! +#include "function.h" +#endif +extern bool filterDoCompare(__compar_fn_t func, uint8_t optr, void *left, void *right); +extern __compar_fn_t filterGetCompFunc(int32_t type, int32_t optr); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_QFILTER_H diff --git a/source/libs/function/inc/tscalar.h b/source/libs/scalar/inc/sclInt.h similarity index 91% rename from source/libs/function/inc/tscalar.h rename to source/libs/scalar/inc/sclInt.h index f7f30e40c6..3cfea6890f 100644 --- a/source/libs/function/inc/tscalar.h +++ b/source/libs/scalar/inc/sclInt.h @@ -12,12 +12,15 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#ifndef TDENGINE_SCALAR_H -#define TDENGINE_SCALAR_H +#ifndef TDENGINE_SCALARINT_H +#define TDENGINE_SCALARINT_H #ifdef __cplusplus extern "C" { #endif +#include "common.h" +#include "thash.h" +#include "query.h" typedef struct SScalarCtx { int32_t code; @@ -45,4 +48,4 @@ typedef struct SScalarCtx { } #endif -#endif // TDENGINE_SCALAR_H +#endif // TDENGINE_SCALARINT_H \ No newline at end of file diff --git a/source/libs/function/inc/tscalarfunction.h b/source/libs/scalar/inc/sclfunc.h similarity index 92% rename from source/libs/function/inc/tscalarfunction.h rename to source/libs/scalar/inc/sclfunc.h index f36f5d4fcf..8e03470033 100644 --- a/source/libs/function/inc/tscalarfunction.h +++ b/source/libs/scalar/inc/sclfunc.h @@ -20,13 +20,7 @@ extern "C" { #endif #include "function.h" - -typedef struct SScalarParam { - void* data; - int32_t num; - int32_t type; - int32_t bytes; -} SScalarParam; +#include "scalar.h" typedef struct SScalarFunctionSupport { struct SExprInfo *pExprInfo; diff --git a/source/libs/function/inc/tbinoperator.h b/source/libs/scalar/inc/sclvector.h similarity index 97% rename from source/libs/function/inc/tbinoperator.h rename to source/libs/scalar/inc/sclvector.h index 678d6169ab..69800a54ea 100644 --- a/source/libs/function/inc/tbinoperator.h +++ b/source/libs/scalar/inc/sclvector.h @@ -20,7 +20,7 @@ extern "C" { #endif -#include "tscalarfunction.h" +#include "sclfunc.h" typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, void *output, int32_t order); _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator); diff --git a/source/libs/executor/inc/tfilter.h b/source/libs/scalar/inc/tfilter.h similarity index 97% rename from source/libs/executor/inc/tfilter.h rename to source/libs/scalar/inc/tfilter.h index 55edf27949..27665da0c8 100644 --- a/source/libs/executor/inc/tfilter.h +++ b/source/libs/scalar/inc/tfilter.h @@ -22,7 +22,9 @@ extern "C" { #include "thash.h" #include "tname.h" -#include "function.h" +#include "common.h" +#include "scalar.h" +#include "querynodes.h" #define FILTER_DEFAULT_GROUP_SIZE 4 #define FILTER_DEFAULT_UNIT_SIZE 4 @@ -322,7 +324,7 @@ typedef struct SFilterInfo { #define FILTER_ALL_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_ALL) #define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY) - +#if 0 extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options); extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols); extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock); @@ -331,6 +333,12 @@ extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo); extern void filterFreeInfo(SFilterInfo *info); extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows); +#else +//REMOVE THESE!!!!!!!!!!!!!!!!!!!! +#include "function.h" +#endif +extern bool filterDoCompare(__compar_fn_t func, uint8_t optr, void *left, void *right); +extern __compar_fn_t filterGetCompFunc(int32_t type, int32_t optr); #ifdef __cplusplus } diff --git a/source/libs/scalar/inc/tsclfunc.h b/source/libs/scalar/inc/tsclfunc.h new file mode 100644 index 0000000000..8e03470033 --- /dev/null +++ b/source/libs/scalar/inc/tsclfunc.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_TSCALARFUNCTION_H +#define TDENGINE_TSCALARFUNCTION_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "function.h" +#include "scalar.h" + +typedef struct SScalarFunctionSupport { + struct SExprInfo *pExprInfo; + int32_t numOfCols; + SColumnInfo *colList; + void *exprList; // client side used + int32_t offset; + char** data; +} SScalarFunctionSupport; + +extern struct SScalarFunctionInfo scalarFunc[8]; + +int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarParam* pOutput, + void* param, char* (*getSourceDataBlock)(void*, const char*, int32_t)); + + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCALARFUNCTION_H diff --git a/source/libs/executor/src/tfilter.c b/source/libs/scalar/src/filter.c similarity index 99% rename from source/libs/executor/src/tfilter.c rename to source/libs/scalar/src/filter.c index c42260b214..b680ac6778 100644 --- a/source/libs/executor/src/tfilter.c +++ b/source/libs/scalar/src/filter.c @@ -17,7 +17,7 @@ #include "thash.h" //#include "queryLog.h" #include "tcompare.h" -#include "tfilter.h" +#include "filter.h" OptrStr gOptrStr[] = { {TSDB_RELATION_INVALID, "invalid"}, @@ -273,6 +273,10 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) { return comparFn; } +__compar_fn_t filterGetCompFunc(int32_t type, int32_t optr) { + return gDataCompare[filterGetCompFuncIdx(type, optr)]; +} + static FORCE_INLINE int32_t filterCompareGroupCtx(const void *pLeft, const void *pRight) { SFilterGroupCtx *left = *((SFilterGroupCtx**)pLeft), *right = *((SFilterGroupCtx**)pRight); diff --git a/source/libs/function/src/tscalar.c b/source/libs/scalar/src/scalar.c similarity index 80% rename from source/libs/function/src/tscalar.c rename to source/libs/scalar/src/scalar.c index df61d628b3..c8783b1402 100644 --- a/source/libs/function/src/tscalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1,5 +1,10 @@ #include "nodes.h" -#include "tscalar.h" +#include "common.h" +#include "querynodes.h" +#include "function.h" +#include "functionMgt.h" +#include "sclvector.h" +#include "sclInt.h" int32_t sclGetOperatorParamNum(EOperatorType type) { if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type) { @@ -48,7 +53,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t SColumnRef *ref = (SColumnRef *)node; if (ref->slotId >= taosArrayGetSize(ctx->pSrc->pDataBlock)) { - sclError("column ref slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, taosArrayGetSize(ctx->pSrc->pDataBlock)); + sclError("column ref slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(ctx->pSrc->pDataBlock)); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -80,7 +85,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t } if (param->num > *rowNum) { - if (1 != param->num) && (1 < *rowNum) { + if ((1 != param->num) && (1 < *rowNum)) { sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->num); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -113,9 +118,9 @@ int32_t sclParamMoveNext(SScalarParam *params, int32_t num) { int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) { int32_t code = 0; - *pParams = calloc(pParamList->length, sizeof(SScalarParam)); - if (NULL == *pParams) { - sclError("calloc %d failed", pParamList->length * sizeof(SScalarParam)); + SScalarParam *paramList = calloc(pParamList->length, sizeof(SScalarParam)); + if (NULL == paramList) { + sclError("calloc %d failed", (int32_t)(pParamList->length * sizeof(SScalarParam))); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -126,16 +131,18 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - SCL_ERR_JRET(sclInitParam(cell->pNode, &pParams[i], ctx, rowNum)); + SCL_ERR_JRET(sclInitParam(cell->pNode, ¶mList[i], ctx, rowNum)); cell = cell->pNext; } + *pParams = paramList; + return TSDB_CODE_SUCCESS; _return: - tfree(*pParams); + tfree(paramList); SCL_RET(code); } @@ -147,22 +154,24 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - *pParams = calloc(paramNum, sizeof(SScalarParam)); - if (NULL == *pParams) { - sclError("calloc %d failed", paramNum * sizeof(SScalarParam)); + SScalarParam *paramList = calloc(paramNum, sizeof(SScalarParam)); + if (NULL == paramList) { + sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam))); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCL_ERR_JRET(sclInitParam(node->pLeft, &pParams[0], ctx, rowNum)); + SCL_ERR_JRET(sclInitParam(node->pLeft, ¶mList[0], ctx, rowNum)); if (paramNum > 1) { - SCL_ERR_JRET(sclInitParam(node->pRight, &pParams[1], ctx, rowNum)); + SCL_ERR_JRET(sclInitParam(node->pRight, ¶mList[1], ctx, rowNum)); } + *pParams = paramList; + return TSDB_CODE_SUCCESS; _return: - tfree(*pParams); + tfree(paramList); SCL_RET(code); } @@ -187,7 +196,7 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu output->type = node->node.resType.type; output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes)); if (NULL == output->data) { - sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes)); + sclError("calloc %d failed", (int32_t)(rowNum * sizeof(tDataTypes[output->type].bytes))); SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -236,7 +245,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o output->type = node->node.resType.type; output->data = calloc(rowNum, sizeof(bool)); if (NULL == output->data) { - sclError("calloc %d failed", (int32_t)rowNum * sizeof(bool)); + sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool))); SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -266,7 +275,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o _return: tfree(params); - CTG_RET(code); + SCL_RET(code); } int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) { @@ -277,9 +286,9 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp SCL_ERR_RET(sclInitOperatorParams(¶ms, node, ctx, &rowNum)); output->type = node->node.resType.type; - output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes)); + output->data = calloc(rowNum, tDataTypes[output->type].bytes); if (NULL == output->data) { - sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes)); + sclError("calloc %d failed", (int32_t)rowNum * tDataTypes[output->type].bytes); SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -305,7 +314,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp _return: tfree(params); - CTG_RET(code); + SCL_RET(code); } @@ -318,7 +327,7 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) { return DEAL_RES_ERROR; } - SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE); + SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { sclError("make value node failed"); sclFreeParam(&output); @@ -328,8 +337,13 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) { res->node.resType = node->node.resType; - SET_TYPED_DATA(nodesGetValueFromNode(res), output.type, output.data); - + if (IS_VAR_DATA_TYPE(output.type)) { + res->datum.p = output.data; + output.data = NULL; + } else { + memcpy(nodesGetValueFromNode(res), output.data, tDataTypes[output.type].bytes); + } + nodesDestroyNode(*pNode); *pNode = (SNode*)res; @@ -347,7 +361,7 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) { return DEAL_RES_ERROR; } - SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE); + SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { sclError("make value node failed"); sclFreeParam(&output); @@ -357,7 +371,12 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) { res->node.resType = node->node.resType; - SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data); + if (IS_VAR_DATA_TYPE(output.type)) { + res->datum.p = output.data; + output.data = NULL; + } else { + memcpy(nodesGetValueFromNode(res), output.data, tDataTypes[output.type].bytes); + } nodesDestroyNode(*pNode); *pNode = (SNode*)res; @@ -376,7 +395,7 @@ EDealRes sclRewriteOperator(SNode** pNode, void* pContext) { return DEAL_RES_ERROR; } - SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE); + SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { sclError("make value node failed"); sclFreeParam(&output); @@ -386,7 +405,12 @@ EDealRes sclRewriteOperator(SNode** pNode, void* pContext) { res->node.resType = node->node.resType; - SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data); + if (IS_VAR_DATA_TYPE(output.type)) { + res->datum.p = output.data; + output.data = NULL; + } else { + memcpy(nodesGetValueFromNode(res), output.data, tDataTypes[output.type].bytes); + } nodesDestroyNode(*pNode); *pNode = (SNode*)res; @@ -422,9 +446,9 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { } -EDealRes sclWalkFunction(SNode** pNode, void* pContext) { +EDealRes sclWalkFunction(SNode* pNode, void* pContext) { SScalarCtx *ctx = (SScalarCtx *)pContext; - SFunctionNode *node = (SFunctionNode *)*pNode; + SFunctionNode *node = (SFunctionNode *)pNode; SScalarParam output = {0}; ctx->code = sclExecFuncion(node, ctx, &output); @@ -432,7 +456,7 @@ EDealRes sclWalkFunction(SNode** pNode, void* pContext) { return DEAL_RES_ERROR; } - if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) { + if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } @@ -441,9 +465,9 @@ EDealRes sclWalkFunction(SNode** pNode, void* pContext) { } -EDealRes sclWalkLogic(SNode** pNode, void* pContext) { +EDealRes sclWalkLogic(SNode* pNode, void* pContext) { SScalarCtx *ctx = (SScalarCtx *)pContext; - SLogicConditionNode *node = (SLogicConditionNode *)*pNode; + SLogicConditionNode *node = (SLogicConditionNode *)pNode; SScalarParam output = {0}; ctx->code = sclExecLogic(node, ctx, &output); @@ -451,7 +475,7 @@ EDealRes sclWalkLogic(SNode** pNode, void* pContext) { return DEAL_RES_ERROR; } - if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) { + if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } @@ -460,9 +484,9 @@ EDealRes sclWalkLogic(SNode** pNode, void* pContext) { } -EDealRes sclWalkOperator(SNode** pNode, void* pContext) { +EDealRes sclWalkOperator(SNode* pNode, void* pContext) { SScalarCtx *ctx = (SScalarCtx *)pContext; - SOperatorNode *node = (SOperatorNode *)*pNode; + SOperatorNode *node = (SOperatorNode *)pNode; SScalarParam output = {0}; ctx->code = sclExecOperator(node, ctx, &output); @@ -470,7 +494,7 @@ EDealRes sclWalkOperator(SNode** pNode, void* pContext) { return DEAL_RES_ERROR; } - if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) { + if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } @@ -479,24 +503,24 @@ EDealRes sclWalkOperator(SNode** pNode, void* pContext) { } -EDealRes sclCalcWalker(SNode** pNode, void* pContext) { - if (QUERY_NODE_VALUE == nodeType(*pNode)) { +EDealRes sclCalcWalker(SNode* pNode, void* pContext) { + if (QUERY_NODE_VALUE == nodeType(pNode)) { return DEAL_RES_CONTINUE; } - if (QUERY_NODE_FUNCTION == nodeType(*pNode)) { + if (QUERY_NODE_FUNCTION == nodeType(pNode)) { return sclWalkFunction(pNode, pContext); } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) { return sclWalkLogic(pNode, pContext); } - if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { + if (QUERY_NODE_OPERATOR == nodeType(pNode)) { return sclWalkOperator(pNode, pContext); } - sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode)); + sclError("invalid node type for calculating constants, type:%d", nodeType(pNode)); SScalarCtx *ctx = (SScalarCtx *)pContext; @@ -540,7 +564,7 @@ int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) { SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - nodesWalkNodePostOrder(&pNode, sclCalcWalker, (void *)&ctx); + nodesWalkNodePostOrder(pNode, sclCalcWalker, (void *)&ctx); if (ctx.code) { nodesDestroyNode(pNode); @@ -548,9 +572,9 @@ int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) { SCL_ERR_RET(ctx.code); } - SScalarParam *res = taosHashGet(ctx.pRes, &pNode, POINTER_BYTES); + SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); if (NULL == res) { - sclError("no res for calculating, node:%d, type:%d", pNode, nodeType(pNode)); + sclError("no res for calculating, node:%p, type:%d", pNode, nodeType(pNode)); SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } diff --git a/source/libs/function/src/tscalarfunction.c b/source/libs/scalar/src/sclfunc.c similarity index 99% rename from source/libs/function/src/tscalarfunction.c rename to source/libs/scalar/src/sclfunc.c index df8a654c96..ab2cc8b056 100644 --- a/source/libs/function/src/tscalarfunction.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1,6 +1,5 @@ -#include "tscalarfunction.h" -#include "tbinoperator.h" -#include "tunaryoperator.h" +#include "sclfunc.h" +#include "sclvector.h" static void assignBasicParaInfo(struct SScalarParam* dst, const struct SScalarParam* src) { dst->type = src->type; diff --git a/source/libs/function/src/tbinoperator.c b/source/libs/scalar/src/sclvector.c similarity index 98% rename from source/libs/function/src/tbinoperator.c rename to source/libs/scalar/src/sclvector.c index 31fc2813e2..07f32c87d8 100644 --- a/source/libs/function/src/tbinoperator.c +++ b/source/libs/scalar/src/sclvector.c @@ -16,8 +16,11 @@ #include "os.h" #include "ttypes.h" -#include "tbinoperator.h" +#include "sclvector.h" #include "tcompare.h" +#include "querynodes.h" +#include "filter.h" +#include "query.h" //GET_TYPED_DATA(v, double, pRight->type, (char *)&((right)[i])); @@ -152,8 +155,8 @@ int64_t getVectorBigintValue_FLOAT(void *src, int32_t index) { int64_t getVectorBigintValue_DOUBLE(void *src, int32_t index) { return (int64_t)*((double *)src + index); } -_getDoubleValue_fn_t getVectorBigintValueFn(int32_t srcType) { - _getDoubleValue_fn_t p = NULL; +_getBigintValue_fn_t getVectorBigintValueFn(int32_t srcType) { + _getBigintValue_fn_t p = NULL; if(srcType==TSDB_DATA_TYPE_TINYINT) { p = getVectorBigintValue_TINYINT; }else if(srcType==TSDB_DATA_TYPE_UTINYINT) { @@ -325,7 +328,7 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) { if (len < 0){ qError("castConvert taosUcs4ToMbs error 1"); tfree(tmp); - return; + return TSDB_CODE_QRY_APP_ERROR; } tmp[len] = 0; @@ -391,7 +394,7 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) { if (len < 0){ qError("castConvert taosUcs4ToMbs error 1"); tfree(tmp); - return; + return TSDB_CODE_QRY_APP_ERROR; } tmp[len] = 0; @@ -457,7 +460,7 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) { if (len < 0){ qError("castConvert taosUcs4ToMbs error 1"); tfree(tmp); - return; + return TSDB_CODE_QRY_APP_ERROR; } tmp[len] = 0; @@ -560,13 +563,13 @@ int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* p paramOut2->num = param2->num; paramOut2->data = malloc(paramOut2->num * tDataTypes[paramOut2->type].bytes); if (NULL == paramOut2->data) { - tfree(paramOut1->data) + tfree(paramOut1->data); return TSDB_CODE_QRY_OUT_OF_MEMORY; } code = vectorConvertImpl(param2, paramOut2); if (code) { - tfree(paramOut1->data) + tfree(paramOut1->data); tfree(paramOut2->data); return code; } @@ -941,8 +944,7 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord, int32_t optr) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int8_t funcIdx = filterGetCompFuncIdx(pLeft->type, optr); - __compar_fn_t fp = gDataCompare[funcIdx]; + __compar_fn_t fp = filterGetCompFunc(pLeft->type, optr); bool res = false; bool *output=(bool *)out; @@ -1003,19 +1005,19 @@ void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t SScalarParam *param2 = NULL; int32_t type = 0; - if (pLeftOut->type) { + if (pLeftOut.type) { param1 = &pLeftOut; } else { param1 = pLeft; } - if (pRightOut->type) { + if (pRightOut.type) { param2 = &pRightOut; } else { param2 = pRight; } - vectorCompareImpl(pLeftOut, pRightOut, out, _ord, TSDB_RELATION_GREATER); + vectorCompareImpl(param1, param2, out, _ord, TSDB_RELATION_GREATER); } void vectorGreater(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { @@ -1091,7 +1093,7 @@ void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; bool res = false; - bool *output=(bool *)out; + bool *output = (bool *)out; _getValueAddr_fn_t getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type); for (; i >= 0 && i < pLeft->num; i += step, output += 1) { diff --git a/source/libs/scalar/test/CMakeLists.txt b/source/libs/scalar/test/CMakeLists.txt new file mode 100644 index 0000000000..a9af1ece30 --- /dev/null +++ b/source/libs/scalar/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build scalar unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(scalarTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + scalarTest + PUBLIC os util common gtest qcom function nodes +) + +TARGET_INCLUDE_DIRECTORIES( + scalarTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scalar/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/scalar/inc" +) diff --git a/source/libs/scalar/test/scalarTests.cpp b/source/libs/scalar/test/scalarTests.cpp new file mode 100644 index 0000000000..38fe072f9e --- /dev/null +++ b/source/libs/scalar/test/scalarTests.cpp @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wformat" +#pragma GCC diagnostic ignored "-Wint-to-pointer-cast" +#pragma GCC diagnostic ignored "-Wpointer-arith" + +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "tep.h" +#include "stub.h" +#include "addr_any.h" +#include "scalar.h" + +namespace { + +} + +TEST(scalarTest, func) { + +} + + +int main(int argc, char** argv) { + srand(time(NULL)); + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#pragma GCC diagnostic pop