feature/qnode
This commit is contained in:
parent
76d5298ba1
commit
8f199e0eb8
|
@ -16,6 +16,11 @@
|
||||||
#ifndef TDENGINE_TNAME_H
|
#ifndef TDENGINE_TNAME_H
|
||||||
#define TDENGINE_TNAME_H
|
#define TDENGINE_TNAME_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
|
||||||
|
@ -59,4 +64,9 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId);
|
||||||
|
|
||||||
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name);
|
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#endif // TDENGINE_TNAME_H
|
#endif // TDENGINE_TNAME_H
|
||||||
|
|
|
@ -122,8 +122,6 @@ typedef struct {
|
||||||
*(int32_t *)(_v) = (int32_t)(_data); \
|
*(int32_t *)(_v) = (int32_t)(_data); \
|
||||||
break; \
|
break; \
|
||||||
default: \
|
default: \
|
||||||
(void *)(_v) = (void *)(_data); \
|
|
||||||
(void *)(_data) = NULL; \
|
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
|
@ -226,7 +226,12 @@ typedef struct SAggFunctionInfo {
|
||||||
int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
|
int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
|
||||||
} SAggFunctionInfo;
|
} SAggFunctionInfo;
|
||||||
|
|
||||||
struct SScalarParam;
|
typedef struct SScalarParam {
|
||||||
|
void* data;
|
||||||
|
int32_t num;
|
||||||
|
int32_t type;
|
||||||
|
int32_t bytes;
|
||||||
|
} SScalarParam;
|
||||||
|
|
||||||
typedef struct SScalarFunctionInfo {
|
typedef struct SScalarFunctionInfo {
|
||||||
char name[FUNCTIONS_NAME_MAX_LENGTH];
|
char name[FUNCTIONS_NAME_MAX_LENGTH];
|
||||||
|
@ -285,10 +290,6 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num);
|
||||||
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
|
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
|
||||||
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
|
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
|
||||||
|
|
||||||
struct SScalarFunctionSupport* createScalarFuncSupport(int32_t num);
|
|
||||||
void destroyScalarFuncSupport(struct SScalarFunctionSupport* pSupport, int32_t num);
|
|
||||||
struct SScalarFunctionSupport* getScalarFuncSupport(struct SScalarFunctionSupport* pSupport, int32_t index);
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// fill api
|
// fill api
|
||||||
struct SFillInfo;
|
struct SFillInfo;
|
||||||
|
|
|
@ -21,6 +21,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
|
#include "function.h"
|
||||||
|
|
||||||
typedef enum EFunctionType {
|
typedef enum EFunctionType {
|
||||||
// aggregate function
|
// aggregate function
|
||||||
|
|
|
@ -104,7 +104,6 @@ SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode);
|
||||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
|
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
|
||||||
SNode* nodesListGetNode(SNodeList* pList, int32_t index);
|
SNode* nodesListGetNode(SNodeList* pList, int32_t index);
|
||||||
void nodesDestroyList(SNodeList* pList);
|
void nodesDestroyList(SNodeList* pList);
|
||||||
void *nodesGetValueFromNode(SValueNode *pNode);
|
|
||||||
|
|
||||||
typedef enum EDealRes {
|
typedef enum EDealRes {
|
||||||
DEAL_RES_CONTINUE = 1,
|
DEAL_RES_CONTINUE = 1,
|
||||||
|
|
|
@ -277,6 +277,7 @@ bool nodesIsJsonOp(const SOperatorNode* pOp);
|
||||||
|
|
||||||
bool nodesIsTimeorderQuery(const SNode* pQuery);
|
bool nodesIsTimeorderQuery(const SNode* pQuery);
|
||||||
bool nodesIsTimelineQuery(const SNode* pQuery);
|
bool nodesIsTimelineQuery(const SNode* pQuery);
|
||||||
|
void *nodesGetValueFromNode(SValueNode *pNode);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef TDENGINE_SCALAR_H
|
||||||
|
#define TDENGINE_SCALAR_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "function.h"
|
||||||
|
#include "nodes.h"
|
||||||
|
|
||||||
|
typedef struct SFilterInfo SFilterInfo;
|
||||||
|
|
||||||
|
|
||||||
|
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes);
|
||||||
|
int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst);
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_SCALAR_H
|
|
@ -13,4 +13,5 @@ add_subdirectory(function)
|
||||||
add_subdirectory(qcom)
|
add_subdirectory(qcom)
|
||||||
add_subdirectory(qworker)
|
add_subdirectory(qworker)
|
||||||
add_subdirectory(tfs)
|
add_subdirectory(tfs)
|
||||||
add_subdirectory(nodes)
|
add_subdirectory(nodes)
|
||||||
|
add_subdirectory(scalar)
|
|
@ -13,7 +13,7 @@ add_library(executor STATIC ${EXECUTOR_SRC})
|
||||||
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
|
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
|
||||||
# )
|
# )
|
||||||
target_link_libraries(executor
|
target_link_libraries(executor
|
||||||
PRIVATE os util common function parser planner qcom vnode
|
PRIVATE os util common function parser planner qcom vnode scalar nodes
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
|
|
|
@ -29,9 +29,9 @@ extern "C" {
|
||||||
#include "executil.h"
|
#include "executil.h"
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
|
#include "scalar.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "tfilter.h"
|
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
#include "tpagedbuf.h"
|
#include "tpagedbuf.h"
|
||||||
|
|
|
@ -2148,7 +2148,7 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT
|
||||||
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
|
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
|
||||||
pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pRuntimeEnv->scalarSup = createScalarFuncSupport(pQueryAttr->numOfOutput);
|
//pRuntimeEnv->scalarSup = createScalarFuncSupport(pQueryAttr->numOfOutput);
|
||||||
|
|
||||||
if (pRuntimeEnv->scalarSup == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL ||
|
if (pRuntimeEnv->scalarSup == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL ||
|
||||||
pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) {
|
pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) {
|
||||||
|
@ -2174,7 +2174,7 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_clean:
|
_clean:
|
||||||
destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pRuntimeEnv->pQueryAttr->numOfOutput);
|
//destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pRuntimeEnv->pQueryAttr->numOfOutput);
|
||||||
tfree(pRuntimeEnv->pResultRowHashTable);
|
tfree(pRuntimeEnv->pResultRowHashTable);
|
||||||
tfree(pRuntimeEnv->keyBuf);
|
tfree(pRuntimeEnv->keyBuf);
|
||||||
tfree(pRuntimeEnv->prevRow);
|
tfree(pRuntimeEnv->prevRow);
|
||||||
|
@ -2212,7 +2212,7 @@ static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" teardown runtime env", pQInfo->qId);
|
//qDebug("QInfo:0x%"PRIx64" teardown runtime env", pQInfo->qId);
|
||||||
|
|
||||||
destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput);
|
//destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput);
|
||||||
// destroyUdfInfo(pRuntimeEnv->pUdfInfo);
|
// destroyUdfInfo(pRuntimeEnv->pUdfInfo);
|
||||||
destroyResultBuf(pRuntimeEnv->pResultBuf);
|
destroyResultBuf(pRuntimeEnv->pResultBuf);
|
||||||
doFreeQueryHandle(pRuntimeEnv);
|
doFreeQueryHandle(pRuntimeEnv);
|
||||||
|
|
|
@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
|
ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
|
||||||
TARGET_LINK_LIBRARIES(
|
TARGET_LINK_LIBRARIES(
|
||||||
executorTest
|
executorTest
|
||||||
PUBLIC os util common transport gtest taos qcom executor function planner
|
PRIVATE os util common transport gtest taos qcom executor function planner scalar nodes
|
||||||
)
|
)
|
||||||
|
|
||||||
TARGET_INCLUDE_DIRECTORIES(
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
|
|
@ -32,6 +32,8 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "stub.h"
|
#include "stub.h"
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "tname.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
|
|
@ -20,10 +20,10 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "tscalarfunction.h"
|
//#include "tscalarfunction.h"
|
||||||
|
|
||||||
typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput);
|
//typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput);
|
||||||
_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator);
|
//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tscalarfunction.h"
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
@ -3221,6 +3220,7 @@ static void diff_function(SqlFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
char *getArithColumnData(void *param, const char* name, int32_t colId) {
|
char *getArithColumnData(void *param, const char* name, int32_t colId) {
|
||||||
SScalarFunctionSupport *pSupport = (SScalarFunctionSupport *)param;
|
SScalarFunctionSupport *pSupport = (SScalarFunctionSupport *)param;
|
||||||
|
|
||||||
|
@ -3235,10 +3235,11 @@ char *getArithColumnData(void *param, const char* name, int32_t colId) {
|
||||||
assert(index >= 0);
|
assert(index >= 0);
|
||||||
return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes;
|
return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static void arithmetic_function(SqlFunctionCtx *pCtx) {
|
static void arithmetic_function(SqlFunctionCtx *pCtx) {
|
||||||
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
|
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
|
||||||
SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz;
|
//SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz;
|
||||||
|
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
output.data = pCtx->pOutput;
|
output.data = pCtx->pOutput;
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "taggfunction.h"
|
#include "taggfunction.h"
|
||||||
#include "tscalarfunction.h"
|
|
||||||
|
|
||||||
static SHashObj* functionHashTable = NULL;
|
static SHashObj* functionHashTable = NULL;
|
||||||
static SHashObj* udfHashTable = NULL;
|
static SHashObj* udfHashTable = NULL;
|
||||||
|
@ -18,12 +17,14 @@ static void doInitFunctionHashTable() {
|
||||||
taosHashPut(functionHashTable, aggFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
|
taosHashPut(functionHashTable, aggFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
numOfEntries = tListLen(scalarFunc);
|
numOfEntries = tListLen(scalarFunc);
|
||||||
for(int32_t i = 0; i < numOfEntries; ++i) {
|
for(int32_t i = 0; i < numOfEntries; ++i) {
|
||||||
int32_t len = (int32_t) strlen(scalarFunc[i].name);
|
int32_t len = (int32_t) strlen(scalarFunc[i].name);
|
||||||
SScalarFunctionInfo* ptr = &scalarFunc[i];
|
SScalarFunctionInfo* ptr = &scalarFunc[i];
|
||||||
taosHashPut(functionHashTable, scalarFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
|
taosHashPut(functionHashTable, scalarFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
udfHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, true);
|
udfHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,10 +4,10 @@
|
||||||
|
|
||||||
|
|
||||||
// TODO dynamic define these functions
|
// TODO dynamic define these functions
|
||||||
_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) {
|
//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) {
|
||||||
assert(0);
|
// assert(0);
|
||||||
}
|
//}
|
||||||
|
|
||||||
bool isStringOperatorFn(int32_t op) {
|
//bool isStringOperatorFn(int32_t op) {
|
||||||
return op == FUNCTION_LENGTH;
|
// return op == FUNCTION_LENGTH;
|
||||||
}
|
//}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "nodesShowStmts.h"
|
#include "nodesShowStmts.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "taos.h"
|
||||||
|
|
||||||
static SNode* makeNode(ENodeType type, size_t size) {
|
static SNode* makeNode(ENodeType type, size_t size) {
|
||||||
SNode* p = calloc(1, size);
|
SNode* p = calloc(1, size);
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
aux_source_directory(src SCALAR_SRC)
|
||||||
|
|
||||||
|
add_library(scalar STATIC ${SCALAR_SRC})
|
||||||
|
target_include_directories(
|
||||||
|
scalar
|
||||||
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scalar"
|
||||||
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(scalar
|
||||||
|
PRIVATE os util common nodes function qcom
|
||||||
|
)
|
||||||
|
|
||||||
|
if(${BUILD_TEST})
|
||||||
|
ADD_SUBDIRECTORY(test)
|
||||||
|
endif(${BUILD_TEST})
|
|
@ -0,0 +1,347 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_QFILTER_H
|
||||||
|
#define TDENGINE_QFILTER_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tname.h"
|
||||||
|
#include "common.h"
|
||||||
|
#include "scalar.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
|
||||||
|
#define FILTER_DEFAULT_GROUP_SIZE 4
|
||||||
|
#define FILTER_DEFAULT_UNIT_SIZE 4
|
||||||
|
#define FILTER_DEFAULT_FIELD_SIZE 4
|
||||||
|
#define FILTER_DEFAULT_VALUE_SIZE 4
|
||||||
|
#define FILTER_DEFAULT_GROUP_UNIT_SIZE 2
|
||||||
|
|
||||||
|
#define FILTER_DUMMY_EMPTY_OPTR 127
|
||||||
|
|
||||||
|
#define MAX_NUM_STR_SIZE 40
|
||||||
|
|
||||||
|
#define FILTER_RM_UNIT_MIN_ROWS 100
|
||||||
|
|
||||||
|
enum {
|
||||||
|
FLD_TYPE_COLUMN = 1,
|
||||||
|
FLD_TYPE_VALUE = 2,
|
||||||
|
FLD_TYPE_MAX = 3,
|
||||||
|
FLD_DESC_NO_FREE = 4,
|
||||||
|
FLD_DATA_NO_FREE = 8,
|
||||||
|
FLD_DATA_IS_HASH = 16,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
MR_ST_START = 1,
|
||||||
|
MR_ST_FIN = 2,
|
||||||
|
MR_ST_ALL = 4,
|
||||||
|
MR_ST_EMPTY = 8,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
RANGE_FLG_EXCLUDE = 1,
|
||||||
|
RANGE_FLG_INCLUDE = 2,
|
||||||
|
RANGE_FLG_NULL = 4,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
FI_OPTION_NO_REWRITE = 1,
|
||||||
|
FI_OPTION_TIMESTAMP = 2,
|
||||||
|
FI_OPTION_NEED_UNIQE = 4,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
FI_STATUS_ALL = 1,
|
||||||
|
FI_STATUS_EMPTY = 2,
|
||||||
|
FI_STATUS_REWRITE = 4,
|
||||||
|
FI_STATUS_CLONED = 8,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
FI_STATUS_BLK_ALL = 1,
|
||||||
|
FI_STATUS_BLK_EMPTY = 2,
|
||||||
|
FI_STATUS_BLK_ACTIVE = 4,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
RANGE_TYPE_UNIT = 1,
|
||||||
|
RANGE_TYPE_VAR_HASH = 2,
|
||||||
|
RANGE_TYPE_MR_CTX = 3,
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef struct OptrStr {
|
||||||
|
uint16_t optr;
|
||||||
|
char *str;
|
||||||
|
} OptrStr;
|
||||||
|
|
||||||
|
typedef struct SFilterRange {
|
||||||
|
int64_t s;
|
||||||
|
int64_t e;
|
||||||
|
char sflag;
|
||||||
|
char eflag;
|
||||||
|
} SFilterRange;
|
||||||
|
|
||||||
|
typedef struct SFilterColRange {
|
||||||
|
uint16_t idx; //column field idx
|
||||||
|
bool isNull;
|
||||||
|
bool notNull;
|
||||||
|
bool isRange;
|
||||||
|
SFilterRange ra;
|
||||||
|
} SFilterColRange;
|
||||||
|
|
||||||
|
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
|
||||||
|
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
|
||||||
|
typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SColumnDataAgg *, int16_t);
|
||||||
|
|
||||||
|
typedef struct SFilterRangeCompare {
|
||||||
|
int64_t s;
|
||||||
|
int64_t e;
|
||||||
|
rangeCompFunc func;
|
||||||
|
} SFilterRangeCompare;
|
||||||
|
|
||||||
|
typedef struct SFilterRangeNode {
|
||||||
|
struct SFilterRangeNode* prev;
|
||||||
|
struct SFilterRangeNode* next;
|
||||||
|
union {
|
||||||
|
SFilterRange ra;
|
||||||
|
SFilterRangeCompare rc;
|
||||||
|
};
|
||||||
|
} SFilterRangeNode;
|
||||||
|
|
||||||
|
typedef struct SFilterRangeCtx {
|
||||||
|
int32_t type;
|
||||||
|
int32_t options;
|
||||||
|
int8_t status;
|
||||||
|
bool isnull;
|
||||||
|
bool notnull;
|
||||||
|
bool isrange;
|
||||||
|
int16_t colId;
|
||||||
|
__compar_fn_t pCompareFunc;
|
||||||
|
SFilterRangeNode *rf; //freed
|
||||||
|
SFilterRangeNode *rs;
|
||||||
|
} SFilterRangeCtx ;
|
||||||
|
|
||||||
|
typedef struct SFilterVarCtx {
|
||||||
|
int32_t type;
|
||||||
|
int32_t options;
|
||||||
|
int8_t status;
|
||||||
|
bool isnull;
|
||||||
|
bool notnull;
|
||||||
|
bool isrange;
|
||||||
|
SHashObj *wild;
|
||||||
|
SHashObj *value;
|
||||||
|
} SFilterVarCtx;
|
||||||
|
|
||||||
|
typedef struct SFilterField {
|
||||||
|
uint16_t flag;
|
||||||
|
void* desc;
|
||||||
|
void* data;
|
||||||
|
} SFilterField;
|
||||||
|
|
||||||
|
typedef struct SFilterFields {
|
||||||
|
uint16_t size;
|
||||||
|
uint16_t num;
|
||||||
|
SFilterField *fields;
|
||||||
|
} SFilterFields;
|
||||||
|
|
||||||
|
typedef struct SFilterFieldId {
|
||||||
|
uint16_t type;
|
||||||
|
uint16_t idx;
|
||||||
|
} SFilterFieldId;
|
||||||
|
|
||||||
|
typedef struct SFilterGroup {
|
||||||
|
uint16_t unitSize;
|
||||||
|
uint16_t unitNum;
|
||||||
|
uint16_t *unitIdxs;
|
||||||
|
uint8_t *unitFlags; // !unit result
|
||||||
|
} SFilterGroup;
|
||||||
|
|
||||||
|
typedef struct SFilterColInfo {
|
||||||
|
uint8_t type;
|
||||||
|
int32_t dataType;
|
||||||
|
void *info;
|
||||||
|
} SFilterColInfo;
|
||||||
|
|
||||||
|
typedef struct SFilterGroupCtx {
|
||||||
|
uint16_t colNum;
|
||||||
|
uint16_t *colIdx;
|
||||||
|
SFilterColInfo *colInfo;
|
||||||
|
} SFilterGroupCtx;
|
||||||
|
|
||||||
|
typedef struct SFilterColCtx {
|
||||||
|
uint16_t colIdx;
|
||||||
|
void* ctx;
|
||||||
|
} SFilterColCtx;
|
||||||
|
|
||||||
|
typedef struct SFilterCompare {
|
||||||
|
uint8_t type;
|
||||||
|
uint8_t optr;
|
||||||
|
uint8_t optr2;
|
||||||
|
} SFilterCompare;
|
||||||
|
|
||||||
|
typedef struct SFilterUnit {
|
||||||
|
SFilterCompare compare;
|
||||||
|
SFilterFieldId left;
|
||||||
|
SFilterFieldId right;
|
||||||
|
SFilterFieldId right2;
|
||||||
|
} SFilterUnit;
|
||||||
|
|
||||||
|
typedef struct SFilterComUnit {
|
||||||
|
void *colData;
|
||||||
|
void *valData;
|
||||||
|
void *valData2;
|
||||||
|
uint16_t colId;
|
||||||
|
uint16_t dataSize;
|
||||||
|
uint8_t dataType;
|
||||||
|
uint8_t optr;
|
||||||
|
int8_t func;
|
||||||
|
int8_t rfunc;
|
||||||
|
} SFilterComUnit;
|
||||||
|
|
||||||
|
typedef struct SFilterPCtx {
|
||||||
|
SHashObj *valHash;
|
||||||
|
SHashObj *unitHash;
|
||||||
|
} SFilterPCtx;
|
||||||
|
|
||||||
|
typedef struct SFilterInfo {
|
||||||
|
uint32_t options;
|
||||||
|
uint32_t status;
|
||||||
|
uint16_t unitSize;
|
||||||
|
uint16_t unitNum;
|
||||||
|
uint16_t groupNum;
|
||||||
|
uint16_t colRangeNum;
|
||||||
|
SFilterFields fields[FLD_TYPE_MAX];
|
||||||
|
SFilterGroup *groups;
|
||||||
|
uint16_t *cgroups;
|
||||||
|
SFilterUnit *units;
|
||||||
|
SFilterComUnit *cunits;
|
||||||
|
uint8_t *unitRes; // result
|
||||||
|
uint8_t *unitFlags; // got result
|
||||||
|
SFilterRangeCtx **colRange;
|
||||||
|
filter_exec_func func;
|
||||||
|
uint8_t blkFlag;
|
||||||
|
uint16_t blkGroupNum;
|
||||||
|
uint16_t *blkUnits;
|
||||||
|
int8_t *blkUnitRes;
|
||||||
|
|
||||||
|
SFilterPCtx pctx;
|
||||||
|
} SFilterInfo;
|
||||||
|
|
||||||
|
#define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t))
|
||||||
|
|
||||||
|
#define FILTER_NO_MERGE_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_NCHAR)
|
||||||
|
#define FILTER_NO_MERGE_OPTR(o) ((o) == TSDB_RELATION_ISNULL || (o) == TSDB_RELATION_NOTNULL || (o) == FILTER_DUMMY_EMPTY_OPTR)
|
||||||
|
|
||||||
|
#define MR_EMPTY_RES(ctx) (ctx->rs == NULL)
|
||||||
|
|
||||||
|
#define SET_AND_OPTR(ctx, o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { if (!(ctx)->isrange) { (ctx)->notnull = true; } } else if (o != FILTER_DUMMY_EMPTY_OPTR) { (ctx)->isrange = true; (ctx)->notnull = false; } } while (0)
|
||||||
|
#define SET_OR_OPTR(ctx,o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { (ctx)->notnull = true; (ctx)->isrange = false; } else if (o != FILTER_DUMMY_EMPTY_OPTR) { if (!(ctx)->notnull) { (ctx)->isrange = true; } } } while (0)
|
||||||
|
#define CHK_OR_OPTR(ctx) ((ctx)->isnull == true && (ctx)->notnull == true)
|
||||||
|
#define CHK_AND_OPTR(ctx) ((ctx)->isnull == true && (((ctx)->notnull == true) || ((ctx)->isrange == true)))
|
||||||
|
|
||||||
|
|
||||||
|
#define FILTER_GET_FLAG(st, f) (st & f)
|
||||||
|
#define FILTER_SET_FLAG(st, f) st |= (f)
|
||||||
|
#define FILTER_CLR_FLAG(st, f) st &= (~f)
|
||||||
|
|
||||||
|
#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src)
|
||||||
|
#define FILTER_PACKAGE_UNIT_HASH_KEY(v, optr, idx1, idx2) do { char *_t = (char *)v; _t[0] = optr; *(uint16_t *)(_t + 1) = idx1; *(uint16_t *)(_t + 3) = idx2; } while (0)
|
||||||
|
#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RANGE_FLG_EXCLUDE) || FILTER_GET_FLAG(eflag,RANGE_FLG_EXCLUDE))))
|
||||||
|
#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0)
|
||||||
|
|
||||||
|
#define RESET_RANGE(ctx, r) do { (r)->next = (ctx)->rf; (ctx)->rf = r; } while (0)
|
||||||
|
#define FREE_RANGE(ctx, r) do { if ((r)->prev) { (r)->prev->next = (r)->next; } else { (ctx)->rs = (r)->next;} if ((r)->next) { (r)->next->prev = (r)->prev; } RESET_RANGE(ctx, r); } while (0)
|
||||||
|
#define FREE_FROM_RANGE(ctx, r) do { SFilterRangeNode *_r = r; if ((_r)->prev) { (_r)->prev->next = NULL; } else { (ctx)->rs = NULL;} while (_r) {SFilterRangeNode *n = (_r)->next; RESET_RANGE(ctx, _r); _r = n; } } while (0)
|
||||||
|
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
|
||||||
|
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
|
||||||
|
|
||||||
|
#define ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
|
||||||
|
#define ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
|
||||||
|
#define ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
|
||||||
|
|
||||||
|
#define CHK_RETV(c) do { if (c) { return; } } while (0)
|
||||||
|
#define CHK_RET(c, r) do { if (c) { return r; } } while (0)
|
||||||
|
#define CHK_JMP(c) do { if (c) { goto _return; } } while (0)
|
||||||
|
#define CHK_LRETV(c,...) do { if (c) { qError(__VA_ARGS__); return; } } while (0)
|
||||||
|
#define CHK_LRET(c, r,...) do { if (c) { if (r) {qError(__VA_ARGS__); } else { qDebug(__VA_ARGS__); } return r; } } while (0)
|
||||||
|
|
||||||
|
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
|
||||||
|
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
|
||||||
|
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SSchema *)((fi)->desc))->type)
|
||||||
|
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SSchema *)((fi)->desc))->bytes)
|
||||||
|
#define FILTER_GET_COL_FIELD_ID(fi) (((SSchema *)((fi)->desc))->colId)
|
||||||
|
#define FILTER_GET_COL_FIELD_DESC(fi) ((SSchema *)((fi)->desc))
|
||||||
|
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri))
|
||||||
|
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType)
|
||||||
|
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
|
||||||
|
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
|
||||||
|
|
||||||
|
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
|
||||||
|
#define FILTER_UNIT_LEFT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->left)
|
||||||
|
#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right)
|
||||||
|
#define FILTER_UNIT_RIGHT2_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right2)
|
||||||
|
#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type)
|
||||||
|
#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u))
|
||||||
|
#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri)
|
||||||
|
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
|
||||||
|
#define FILTER_UNIT_COL_ID(i, u) FILTER_GET_COL_FIELD_ID(FILTER_UNIT_LEFT_FIELD(i, u))
|
||||||
|
#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
|
||||||
|
#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx)
|
||||||
|
#define FILTER_UNIT_OPTR(u) ((u)->compare.optr)
|
||||||
|
#define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func)
|
||||||
|
|
||||||
|
#define FILTER_UNIT_CLR_F(i) memset((i)->unitFlags, 0, (i)->unitNum * sizeof(*info->unitFlags))
|
||||||
|
#define FILTER_UNIT_SET_F(i, idx) (i)->unitFlags[idx] = 1
|
||||||
|
#define FILTER_UNIT_GET_F(i, idx) ((i)->unitFlags[idx])
|
||||||
|
#define FILTER_UNIT_GET_R(i, idx) ((i)->unitRes[idx])
|
||||||
|
#define FILTER_UNIT_SET_R(i, idx, v) (i)->unitRes[idx] = (v)
|
||||||
|
|
||||||
|
#define FILTER_PUSH_UNIT(colInfo, u) do { (colInfo).type = RANGE_TYPE_UNIT; (colInfo).dataType = FILTER_UNIT_DATA_TYPE(u);taosArrayPush((SArray *)((colInfo).info), &u);} while (0)
|
||||||
|
#define FILTER_PUSH_VAR_HASH(colInfo, ha) do { (colInfo).type = RANGE_TYPE_VAR_HASH; (colInfo).info = ha;} while (0)
|
||||||
|
#define FILTER_PUSH_CTX(colInfo, ctx) do { (colInfo).type = RANGE_TYPE_MR_CTX; (colInfo).info = ctx;} while (0)
|
||||||
|
|
||||||
|
#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint16_t) * n); memcpy(*(dst), src, sizeof(uint16_t) * n);} while (0)
|
||||||
|
|
||||||
|
#define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) do { if ((gres)->colCtxs == NULL) { (gres)->colCtxs = taosArrayInit(gres->colNum, sizeof(SFilterColCtx)); } SFilterColCtx cCtx = {idx, ctx}; taosArrayPush((gres)->colCtxs, &cCtx); } while (0)
|
||||||
|
|
||||||
|
|
||||||
|
#define FILTER_ALL_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_ALL)
|
||||||
|
#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY)
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options);
|
||||||
|
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
|
||||||
|
extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock);
|
||||||
|
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
|
||||||
|
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
|
||||||
|
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
|
||||||
|
extern void filterFreeInfo(SFilterInfo *info);
|
||||||
|
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
|
||||||
|
#else
|
||||||
|
//REMOVE THESE!!!!!!!!!!!!!!!!!!!!
|
||||||
|
#include "function.h"
|
||||||
|
#endif
|
||||||
|
extern bool filterDoCompare(__compar_fn_t func, uint8_t optr, void *left, void *right);
|
||||||
|
extern __compar_fn_t filterGetCompFunc(int32_t type, int32_t optr);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_QFILTER_H
|
|
@ -12,12 +12,15 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#ifndef TDENGINE_SCALAR_H
|
#ifndef TDENGINE_SCALARINT_H
|
||||||
#define TDENGINE_SCALAR_H
|
#define TDENGINE_SCALARINT_H
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
#include "common.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
typedef struct SScalarCtx {
|
typedef struct SScalarCtx {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
@ -45,4 +48,4 @@ typedef struct SScalarCtx {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif // TDENGINE_SCALAR_H
|
#endif // TDENGINE_SCALARINT_H
|
|
@ -20,13 +20,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
#include "scalar.h"
|
||||||
typedef struct SScalarParam {
|
|
||||||
void* data;
|
|
||||||
int32_t num;
|
|
||||||
int32_t type;
|
|
||||||
int32_t bytes;
|
|
||||||
} SScalarParam;
|
|
||||||
|
|
||||||
typedef struct SScalarFunctionSupport {
|
typedef struct SScalarFunctionSupport {
|
||||||
struct SExprInfo *pExprInfo;
|
struct SExprInfo *pExprInfo;
|
|
@ -20,7 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "tscalarfunction.h"
|
#include "sclfunc.h"
|
||||||
|
|
||||||
typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, void *output, int32_t order);
|
typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, void *output, int32_t order);
|
||||||
_bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator);
|
_bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator);
|
|
@ -22,7 +22,9 @@ extern "C" {
|
||||||
|
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
#include "function.h"
|
#include "common.h"
|
||||||
|
#include "scalar.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
|
||||||
#define FILTER_DEFAULT_GROUP_SIZE 4
|
#define FILTER_DEFAULT_GROUP_SIZE 4
|
||||||
#define FILTER_DEFAULT_UNIT_SIZE 4
|
#define FILTER_DEFAULT_UNIT_SIZE 4
|
||||||
|
@ -322,7 +324,7 @@ typedef struct SFilterInfo {
|
||||||
#define FILTER_ALL_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_ALL)
|
#define FILTER_ALL_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_ALL)
|
||||||
#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY)
|
#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY)
|
||||||
|
|
||||||
|
#if 0
|
||||||
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options);
|
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options);
|
||||||
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
|
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
|
||||||
extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock);
|
extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock);
|
||||||
|
@ -331,6 +333,12 @@ extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows,
|
||||||
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
|
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
|
||||||
extern void filterFreeInfo(SFilterInfo *info);
|
extern void filterFreeInfo(SFilterInfo *info);
|
||||||
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
|
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
|
||||||
|
#else
|
||||||
|
//REMOVE THESE!!!!!!!!!!!!!!!!!!!!
|
||||||
|
#include "function.h"
|
||||||
|
#endif
|
||||||
|
extern bool filterDoCompare(__compar_fn_t func, uint8_t optr, void *left, void *right);
|
||||||
|
extern __compar_fn_t filterGetCompFunc(int32_t type, int32_t optr);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef TDENGINE_TSCALARFUNCTION_H
|
||||||
|
#define TDENGINE_TSCALARFUNCTION_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "function.h"
|
||||||
|
#include "scalar.h"
|
||||||
|
|
||||||
|
typedef struct SScalarFunctionSupport {
|
||||||
|
struct SExprInfo *pExprInfo;
|
||||||
|
int32_t numOfCols;
|
||||||
|
SColumnInfo *colList;
|
||||||
|
void *exprList; // client side used
|
||||||
|
int32_t offset;
|
||||||
|
char** data;
|
||||||
|
} SScalarFunctionSupport;
|
||||||
|
|
||||||
|
extern struct SScalarFunctionInfo scalarFunc[8];
|
||||||
|
|
||||||
|
int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarParam* pOutput,
|
||||||
|
void* param, char* (*getSourceDataBlock)(void*, const char*, int32_t));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_TSCALARFUNCTION_H
|
|
@ -17,7 +17,7 @@
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
//#include "queryLog.h"
|
//#include "queryLog.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tfilter.h"
|
#include "filter.h"
|
||||||
|
|
||||||
OptrStr gOptrStr[] = {
|
OptrStr gOptrStr[] = {
|
||||||
{TSDB_RELATION_INVALID, "invalid"},
|
{TSDB_RELATION_INVALID, "invalid"},
|
||||||
|
@ -273,6 +273,10 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
|
||||||
return comparFn;
|
return comparFn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
__compar_fn_t filterGetCompFunc(int32_t type, int32_t optr) {
|
||||||
|
return gDataCompare[filterGetCompFuncIdx(type, optr)];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t filterCompareGroupCtx(const void *pLeft, const void *pRight) {
|
static FORCE_INLINE int32_t filterCompareGroupCtx(const void *pLeft, const void *pRight) {
|
||||||
SFilterGroupCtx *left = *((SFilterGroupCtx**)pLeft), *right = *((SFilterGroupCtx**)pRight);
|
SFilterGroupCtx *left = *((SFilterGroupCtx**)pLeft), *right = *((SFilterGroupCtx**)pRight);
|
|
@ -1,5 +1,10 @@
|
||||||
#include "nodes.h"
|
#include "nodes.h"
|
||||||
#include "tscalar.h"
|
#include "common.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
#include "function.h"
|
||||||
|
#include "functionMgt.h"
|
||||||
|
#include "sclvector.h"
|
||||||
|
#include "sclInt.h"
|
||||||
|
|
||||||
int32_t sclGetOperatorParamNum(EOperatorType type) {
|
int32_t sclGetOperatorParamNum(EOperatorType type) {
|
||||||
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type) {
|
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type) {
|
||||||
|
@ -48,7 +53,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
|
|
||||||
SColumnRef *ref = (SColumnRef *)node;
|
SColumnRef *ref = (SColumnRef *)node;
|
||||||
if (ref->slotId >= taosArrayGetSize(ctx->pSrc->pDataBlock)) {
|
if (ref->slotId >= taosArrayGetSize(ctx->pSrc->pDataBlock)) {
|
||||||
sclError("column ref slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, taosArrayGetSize(ctx->pSrc->pDataBlock));
|
sclError("column ref slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(ctx->pSrc->pDataBlock));
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,7 +85,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
if (param->num > *rowNum) {
|
if (param->num > *rowNum) {
|
||||||
if (1 != param->num) && (1 < *rowNum) {
|
if ((1 != param->num) && (1 < *rowNum)) {
|
||||||
sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->num);
|
sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->num);
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -113,9 +118,9 @@ int32_t sclParamMoveNext(SScalarParam *params, int32_t num) {
|
||||||
|
|
||||||
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) {
|
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*pParams = calloc(pParamList->length, sizeof(SScalarParam));
|
SScalarParam *paramList = calloc(pParamList->length, sizeof(SScalarParam));
|
||||||
if (NULL == *pParams) {
|
if (NULL == paramList) {
|
||||||
sclError("calloc %d failed", pParamList->length * sizeof(SScalarParam));
|
sclError("calloc %d failed", (int32_t)(pParamList->length * sizeof(SScalarParam)));
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,16 +131,18 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC
|
||||||
SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCL_ERR_JRET(sclInitParam(cell->pNode, &pParams[i], ctx, rowNum));
|
SCL_ERR_JRET(sclInitParam(cell->pNode, ¶mList[i], ctx, rowNum));
|
||||||
|
|
||||||
cell = cell->pNext;
|
cell = cell->pNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*pParams = paramList;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
tfree(*pParams);
|
tfree(paramList);
|
||||||
SCL_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,22 +154,24 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pParams = calloc(paramNum, sizeof(SScalarParam));
|
SScalarParam *paramList = calloc(paramNum, sizeof(SScalarParam));
|
||||||
if (NULL == *pParams) {
|
if (NULL == paramList) {
|
||||||
sclError("calloc %d failed", paramNum * sizeof(SScalarParam));
|
sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCL_ERR_JRET(sclInitParam(node->pLeft, &pParams[0], ctx, rowNum));
|
SCL_ERR_JRET(sclInitParam(node->pLeft, ¶mList[0], ctx, rowNum));
|
||||||
if (paramNum > 1) {
|
if (paramNum > 1) {
|
||||||
SCL_ERR_JRET(sclInitParam(node->pRight, &pParams[1], ctx, rowNum));
|
SCL_ERR_JRET(sclInitParam(node->pRight, ¶mList[1], ctx, rowNum));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*pParams = paramList;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
tfree(*pParams);
|
tfree(paramList);
|
||||||
SCL_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +196,7 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
|
||||||
output->type = node->node.resType.type;
|
output->type = node->node.resType.type;
|
||||||
output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes));
|
output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes));
|
||||||
if (NULL == output->data) {
|
if (NULL == output->data) {
|
||||||
sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes));
|
sclError("calloc %d failed", (int32_t)(rowNum * sizeof(tDataTypes[output->type].bytes)));
|
||||||
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,7 +245,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
|
||||||
output->type = node->node.resType.type;
|
output->type = node->node.resType.type;
|
||||||
output->data = calloc(rowNum, sizeof(bool));
|
output->data = calloc(rowNum, sizeof(bool));
|
||||||
if (NULL == output->data) {
|
if (NULL == output->data) {
|
||||||
sclError("calloc %d failed", (int32_t)rowNum * sizeof(bool));
|
sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
|
||||||
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +275,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
tfree(params);
|
tfree(params);
|
||||||
CTG_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
|
int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
|
||||||
|
@ -277,9 +286,9 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
SCL_ERR_RET(sclInitOperatorParams(¶ms, node, ctx, &rowNum));
|
SCL_ERR_RET(sclInitOperatorParams(¶ms, node, ctx, &rowNum));
|
||||||
|
|
||||||
output->type = node->node.resType.type;
|
output->type = node->node.resType.type;
|
||||||
output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes));
|
output->data = calloc(rowNum, tDataTypes[output->type].bytes);
|
||||||
if (NULL == output->data) {
|
if (NULL == output->data) {
|
||||||
sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes));
|
sclError("calloc %d failed", (int32_t)rowNum * tDataTypes[output->type].bytes);
|
||||||
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -305,7 +314,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
tfree(params);
|
tfree(params);
|
||||||
CTG_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -318,7 +327,7 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
if (NULL == res) {
|
if (NULL == res) {
|
||||||
sclError("make value node failed");
|
sclError("make value node failed");
|
||||||
sclFreeParam(&output);
|
sclFreeParam(&output);
|
||||||
|
@ -328,8 +337,13 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
|
||||||
|
|
||||||
res->node.resType = node->node.resType;
|
res->node.resType = node->node.resType;
|
||||||
|
|
||||||
SET_TYPED_DATA(nodesGetValueFromNode(res), output.type, output.data);
|
if (IS_VAR_DATA_TYPE(output.type)) {
|
||||||
|
res->datum.p = output.data;
|
||||||
|
output.data = NULL;
|
||||||
|
} else {
|
||||||
|
memcpy(nodesGetValueFromNode(res), output.data, tDataTypes[output.type].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
nodesDestroyNode(*pNode);
|
nodesDestroyNode(*pNode);
|
||||||
*pNode = (SNode*)res;
|
*pNode = (SNode*)res;
|
||||||
|
|
||||||
|
@ -347,7 +361,7 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
if (NULL == res) {
|
if (NULL == res) {
|
||||||
sclError("make value node failed");
|
sclError("make value node failed");
|
||||||
sclFreeParam(&output);
|
sclFreeParam(&output);
|
||||||
|
@ -357,7 +371,12 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) {
|
||||||
|
|
||||||
res->node.resType = node->node.resType;
|
res->node.resType = node->node.resType;
|
||||||
|
|
||||||
SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data);
|
if (IS_VAR_DATA_TYPE(output.type)) {
|
||||||
|
res->datum.p = output.data;
|
||||||
|
output.data = NULL;
|
||||||
|
} else {
|
||||||
|
memcpy(nodesGetValueFromNode(res), output.data, tDataTypes[output.type].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
nodesDestroyNode(*pNode);
|
nodesDestroyNode(*pNode);
|
||||||
*pNode = (SNode*)res;
|
*pNode = (SNode*)res;
|
||||||
|
@ -376,7 +395,7 @@ EDealRes sclRewriteOperator(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
if (NULL == res) {
|
if (NULL == res) {
|
||||||
sclError("make value node failed");
|
sclError("make value node failed");
|
||||||
sclFreeParam(&output);
|
sclFreeParam(&output);
|
||||||
|
@ -386,7 +405,12 @@ EDealRes sclRewriteOperator(SNode** pNode, void* pContext) {
|
||||||
|
|
||||||
res->node.resType = node->node.resType;
|
res->node.resType = node->node.resType;
|
||||||
|
|
||||||
SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data);
|
if (IS_VAR_DATA_TYPE(output.type)) {
|
||||||
|
res->datum.p = output.data;
|
||||||
|
output.data = NULL;
|
||||||
|
} else {
|
||||||
|
memcpy(nodesGetValueFromNode(res), output.data, tDataTypes[output.type].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
nodesDestroyNode(*pNode);
|
nodesDestroyNode(*pNode);
|
||||||
*pNode = (SNode*)res;
|
*pNode = (SNode*)res;
|
||||||
|
@ -422,9 +446,9 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
EDealRes sclWalkFunction(SNode** pNode, void* pContext) {
|
EDealRes sclWalkFunction(SNode* pNode, void* pContext) {
|
||||||
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
||||||
SFunctionNode *node = (SFunctionNode *)*pNode;
|
SFunctionNode *node = (SFunctionNode *)pNode;
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
|
|
||||||
ctx->code = sclExecFuncion(node, ctx, &output);
|
ctx->code = sclExecFuncion(node, ctx, &output);
|
||||||
|
@ -432,7 +456,7 @@ EDealRes sclWalkFunction(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) {
|
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
|
||||||
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -441,9 +465,9 @@ EDealRes sclWalkFunction(SNode** pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
EDealRes sclWalkLogic(SNode** pNode, void* pContext) {
|
EDealRes sclWalkLogic(SNode* pNode, void* pContext) {
|
||||||
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
||||||
SLogicConditionNode *node = (SLogicConditionNode *)*pNode;
|
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
|
|
||||||
ctx->code = sclExecLogic(node, ctx, &output);
|
ctx->code = sclExecLogic(node, ctx, &output);
|
||||||
|
@ -451,7 +475,7 @@ EDealRes sclWalkLogic(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) {
|
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
|
||||||
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -460,9 +484,9 @@ EDealRes sclWalkLogic(SNode** pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
EDealRes sclWalkOperator(SNode** pNode, void* pContext) {
|
EDealRes sclWalkOperator(SNode* pNode, void* pContext) {
|
||||||
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
||||||
SOperatorNode *node = (SOperatorNode *)*pNode;
|
SOperatorNode *node = (SOperatorNode *)pNode;
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
|
|
||||||
ctx->code = sclExecOperator(node, ctx, &output);
|
ctx->code = sclExecOperator(node, ctx, &output);
|
||||||
|
@ -470,7 +494,7 @@ EDealRes sclWalkOperator(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) {
|
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
|
||||||
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -479,24 +503,24 @@ EDealRes sclWalkOperator(SNode** pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
EDealRes sclCalcWalker(SNode** pNode, void* pContext) {
|
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_VALUE == nodeType(*pNode)) {
|
if (QUERY_NODE_VALUE == nodeType(pNode)) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
|
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
|
||||||
return sclWalkFunction(pNode, pContext);
|
return sclWalkFunction(pNode, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
|
||||||
return sclWalkLogic(pNode, pContext);
|
return sclWalkLogic(pNode, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
|
if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
|
||||||
return sclWalkOperator(pNode, pContext);
|
return sclWalkOperator(pNode, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode));
|
sclError("invalid node type for calculating constants, type:%d", nodeType(pNode));
|
||||||
|
|
||||||
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
SScalarCtx *ctx = (SScalarCtx *)pContext;
|
||||||
|
|
||||||
|
@ -540,7 +564,7 @@ int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) {
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesWalkNodePostOrder(&pNode, sclCalcWalker, (void *)&ctx);
|
nodesWalkNodePostOrder(pNode, sclCalcWalker, (void *)&ctx);
|
||||||
|
|
||||||
if (ctx.code) {
|
if (ctx.code) {
|
||||||
nodesDestroyNode(pNode);
|
nodesDestroyNode(pNode);
|
||||||
|
@ -548,9 +572,9 @@ int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) {
|
||||||
SCL_ERR_RET(ctx.code);
|
SCL_ERR_RET(ctx.code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SScalarParam *res = taosHashGet(ctx.pRes, &pNode, POINTER_BYTES);
|
SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
|
||||||
if (NULL == res) {
|
if (NULL == res) {
|
||||||
sclError("no res for calculating, node:%d, type:%d", pNode, nodeType(pNode));
|
sclError("no res for calculating, node:%p, type:%d", pNode, nodeType(pNode));
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
#include "tscalarfunction.h"
|
#include "sclfunc.h"
|
||||||
#include "tbinoperator.h"
|
#include "sclvector.h"
|
||||||
#include "tunaryoperator.h"
|
|
||||||
|
|
||||||
static void assignBasicParaInfo(struct SScalarParam* dst, const struct SScalarParam* src) {
|
static void assignBasicParaInfo(struct SScalarParam* dst, const struct SScalarParam* src) {
|
||||||
dst->type = src->type;
|
dst->type = src->type;
|
|
@ -16,8 +16,11 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "tbinoperator.h"
|
#include "sclvector.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
#include "filter.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
//GET_TYPED_DATA(v, double, pRight->type, (char *)&((right)[i]));
|
//GET_TYPED_DATA(v, double, pRight->type, (char *)&((right)[i]));
|
||||||
|
|
||||||
|
@ -152,8 +155,8 @@ int64_t getVectorBigintValue_FLOAT(void *src, int32_t index) {
|
||||||
int64_t getVectorBigintValue_DOUBLE(void *src, int32_t index) {
|
int64_t getVectorBigintValue_DOUBLE(void *src, int32_t index) {
|
||||||
return (int64_t)*((double *)src + index);
|
return (int64_t)*((double *)src + index);
|
||||||
}
|
}
|
||||||
_getDoubleValue_fn_t getVectorBigintValueFn(int32_t srcType) {
|
_getBigintValue_fn_t getVectorBigintValueFn(int32_t srcType) {
|
||||||
_getDoubleValue_fn_t p = NULL;
|
_getBigintValue_fn_t p = NULL;
|
||||||
if(srcType==TSDB_DATA_TYPE_TINYINT) {
|
if(srcType==TSDB_DATA_TYPE_TINYINT) {
|
||||||
p = getVectorBigintValue_TINYINT;
|
p = getVectorBigintValue_TINYINT;
|
||||||
}else if(srcType==TSDB_DATA_TYPE_UTINYINT) {
|
}else if(srcType==TSDB_DATA_TYPE_UTINYINT) {
|
||||||
|
@ -325,7 +328,7 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
|
||||||
if (len < 0){
|
if (len < 0){
|
||||||
qError("castConvert taosUcs4ToMbs error 1");
|
qError("castConvert taosUcs4ToMbs error 1");
|
||||||
tfree(tmp);
|
tfree(tmp);
|
||||||
return;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp[len] = 0;
|
tmp[len] = 0;
|
||||||
|
@ -391,7 +394,7 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
|
||||||
if (len < 0){
|
if (len < 0){
|
||||||
qError("castConvert taosUcs4ToMbs error 1");
|
qError("castConvert taosUcs4ToMbs error 1");
|
||||||
tfree(tmp);
|
tfree(tmp);
|
||||||
return;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp[len] = 0;
|
tmp[len] = 0;
|
||||||
|
@ -457,7 +460,7 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
|
||||||
if (len < 0){
|
if (len < 0){
|
||||||
qError("castConvert taosUcs4ToMbs error 1");
|
qError("castConvert taosUcs4ToMbs error 1");
|
||||||
tfree(tmp);
|
tfree(tmp);
|
||||||
return;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp[len] = 0;
|
tmp[len] = 0;
|
||||||
|
@ -560,13 +563,13 @@ int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* p
|
||||||
paramOut2->num = param2->num;
|
paramOut2->num = param2->num;
|
||||||
paramOut2->data = malloc(paramOut2->num * tDataTypes[paramOut2->type].bytes);
|
paramOut2->data = malloc(paramOut2->num * tDataTypes[paramOut2->type].bytes);
|
||||||
if (NULL == paramOut2->data) {
|
if (NULL == paramOut2->data) {
|
||||||
tfree(paramOut1->data)
|
tfree(paramOut1->data);
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = vectorConvertImpl(param2, paramOut2);
|
code = vectorConvertImpl(param2, paramOut2);
|
||||||
if (code) {
|
if (code) {
|
||||||
tfree(paramOut1->data)
|
tfree(paramOut1->data);
|
||||||
tfree(paramOut2->data);
|
tfree(paramOut2->data);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -941,8 +944,7 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _
|
||||||
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord, int32_t optr) {
|
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord, int32_t optr) {
|
||||||
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
|
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
|
||||||
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
||||||
int8_t funcIdx = filterGetCompFuncIdx(pLeft->type, optr);
|
__compar_fn_t fp = filterGetCompFunc(pLeft->type, optr);
|
||||||
__compar_fn_t fp = gDataCompare[funcIdx];
|
|
||||||
bool res = false;
|
bool res = false;
|
||||||
|
|
||||||
bool *output=(bool *)out;
|
bool *output=(bool *)out;
|
||||||
|
@ -1003,19 +1005,19 @@ void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
|
||||||
SScalarParam *param2 = NULL;
|
SScalarParam *param2 = NULL;
|
||||||
|
|
||||||
int32_t type = 0;
|
int32_t type = 0;
|
||||||
if (pLeftOut->type) {
|
if (pLeftOut.type) {
|
||||||
param1 = &pLeftOut;
|
param1 = &pLeftOut;
|
||||||
} else {
|
} else {
|
||||||
param1 = pLeft;
|
param1 = pLeft;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRightOut->type) {
|
if (pRightOut.type) {
|
||||||
param2 = &pRightOut;
|
param2 = &pRightOut;
|
||||||
} else {
|
} else {
|
||||||
param2 = pRight;
|
param2 = pRight;
|
||||||
}
|
}
|
||||||
|
|
||||||
vectorCompareImpl(pLeftOut, pRightOut, out, _ord, TSDB_RELATION_GREATER);
|
vectorCompareImpl(param1, param2, out, _ord, TSDB_RELATION_GREATER);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vectorGreater(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
|
void vectorGreater(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
|
||||||
|
@ -1091,7 +1093,7 @@ void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
|
||||||
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
||||||
bool res = false;
|
bool res = false;
|
||||||
|
|
||||||
bool *output=(bool *)out;
|
bool *output = (bool *)out;
|
||||||
_getValueAddr_fn_t getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type);
|
_getValueAddr_fn_t getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type);
|
||||||
|
|
||||||
for (; i >= 0 && i < pLeft->num; i += step, output += 1) {
|
for (; i >= 0 && i < pLeft->num; i += step, output += 1) {
|
|
@ -0,0 +1,18 @@
|
||||||
|
|
||||||
|
MESSAGE(STATUS "build scalar unit test")
|
||||||
|
|
||||||
|
# GoogleTest requires at least C++11
|
||||||
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(scalarTest ${SOURCE_LIST})
|
||||||
|
TARGET_LINK_LIBRARIES(
|
||||||
|
scalarTest
|
||||||
|
PUBLIC os util common gtest qcom function nodes
|
||||||
|
)
|
||||||
|
|
||||||
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
scalarTest
|
||||||
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scalar/"
|
||||||
|
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/scalar/inc"
|
||||||
|
)
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <tglobal.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||||
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
#pragma GCC diagnostic ignored "-Wformat"
|
||||||
|
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
|
||||||
|
#pragma GCC diagnostic ignored "-Wpointer-arith"
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
|
||||||
|
#include "taos.h"
|
||||||
|
#include "tdef.h"
|
||||||
|
#include "tvariant.h"
|
||||||
|
#include "tep.h"
|
||||||
|
#include "stub.h"
|
||||||
|
#include "addr_any.h"
|
||||||
|
#include "scalar.h"
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(scalarTest, func) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
srand(time(NULL));
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
#pragma GCC diagnostic pop
|
Loading…
Reference in New Issue