From ee5b67a965c4d30ce9415193050273f0f61bf1f7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 1 May 2022 22:20:24 +0800 Subject: [PATCH] enh(index): support index filter --- source/libs/executor/CMakeLists.txt | 4 +- source/libs/executor/src/indexoperator.c | 181 ++++++++++++++++------- source/libs/index/src/index.c | 35 ++--- 3 files changed, 147 insertions(+), 73 deletions(-) diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index 3dfef9b59f..bfa54be71f 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,7 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC}) # ) target_link_libraries(executor - PRIVATE os util common function parser planner qcom vnode scalar nodes + PRIVATE os util common function parser planner qcom vnode scalar nodes index ) target_include_directories( @@ -19,4 +19,4 @@ target_include_directories( #if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) -#endif(${BUILD_TEST}) \ No newline at end of file +#endif(${BUILD_TEST}) diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index 1f8e73b80a..911ba22fe6 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -15,7 +15,9 @@ #include "indexoperator.h" #include "executorimpl.h" +#include "index.h" #include "nodes.h" +#include "tdatablock.h" typedef struct SIFCtx { int32_t code; @@ -48,11 +50,19 @@ typedef struct SIFCtx { } while (0) typedef struct SIFParam { - SArray * result; SHashObj *pFilter; + + SArray *result; + char * condValue; + + col_id_t colId; + + int64_t suid; // add later + char dbName[TSDB_DB_NAME_LEN]; + char colName[TSDB_COL_NAME_LEN]; } SIFParam; -typedef int32_t (*sif_func_t)(SNode *left, SNode *rigth, SIFParam *output); +typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output); // construct tag filter operator later static void destroyTagFilterOperatorInfo(void *param) { STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param; @@ -60,7 +70,10 @@ static void destroyTagFilterOperatorInfo(void *param) { static void sifFreeParam(SIFParam *param) { if (param == NULL) return; + taosArrayDestroy(param->result); + taosMemoryFree(param->condValue); + taosHashCleanup(param->pFilter); } static int32_t sifGetOperParamNum(EOperatorType ty) { @@ -71,15 +84,70 @@ static int32_t sifGetOperParamNum(EOperatorType ty) { } return 2; } +static int32_t sifValidateColumn(SColumnNode *cn) { + // add more check + if (cn == NULL) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + if (cn->colType != COLUMN_TYPE_TAG) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t sifGetValueFromNode(SNode *node, char **value) { + // covert data From snode; + SValueNode *vn = (SValueNode *)node; + + char * pData = nodesGetValueFromNode(vn); + SDataType *pType = &vn->node.resType; + int32_t type = pType->type; + int32_t valLen = 0; + + if (IS_VAR_DATA_TYPE(type)) { + int32_t dataLen = varDataTLen(pData); + if (type == TSDB_DATA_TYPE_JSON) { + if (*pData == TSDB_DATA_TYPE_NULL) { + dataLen = 0; + } else if (*pData == TSDB_DATA_TYPE_NCHAR) { + dataLen = varDataTLen(pData + CHAR_BYTES); + } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { + dataLen = LONG_BYTES; + } else if (*pData == TSDB_DATA_TYPE_BOOL) { + dataLen = CHAR_BYTES; + } + dataLen += CHAR_BYTES; + } + valLen = dataLen; + } else { + valLen = pType->bytes; + } + char *tv = taosMemoryCalloc(1, valLen + 1); + if (tv == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + memcpy(tv, pData, valLen); + *value = tv; + + return TSDB_CODE_SUCCESS; +} + static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { switch (nodeType(node)) { case QUERY_NODE_VALUE: { SValueNode *vn = (SValueNode *)node; - + SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue)); + param->colId = -1; break; } case QUERY_NODE_COLUMN: { SColumnNode *cn = (SColumnNode *)node; + /*only support tag column*/ + SIF_ERR_RET(sifValidateColumn(cn)); + param->colId = cn->colId; + memcpy(param->dbName, cn->dbName, sizeof(cn->dbName)); + memcpy(param->colName, cn->colName, sizeof(cn->colName)); break; } @@ -89,7 +157,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { qError("invalid length for node:%p, length: %d", node, LIST_LENGTH(nl->pNodeList)); SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + SIF_ERR_RET(scalarGenerateSetFromList((void **)¶m->pFilter, node, nl->dataType.type)); if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { taosHashCleanup(param->pFilter); qError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); @@ -163,58 +231,63 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu qError("index-filter not support buildin function"); return TSDB_CODE_QRY_INVALID_INPUT; } - -static int32_t sifLessThanFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; -} -static int32_t sifLessEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; -} -static int32_t sifGreaterThanFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; -} -static int32_t sifGreaterEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later +static int32_t sifIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { + SIndexMultiTermQuery *mq = indexMultiTermQueryCreate(MUST); return TSDB_CODE_SUCCESS; } -static int32_t sifEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; +static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_LOWER_THAN; + return sifIndex(left, right, id, output); } -static int32_t sifNotEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; -} -static int32_t sifInFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; -} -static int32_t sifNotInFunc(SNode *left, SNode *right, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; -} -static int32_t sifLikeFunc(SNode *left, SNode *right, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; -} -static int32_t sifNotLikeFunc(SNode *left, SNode *right, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; +static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_LOWER_EQUAL; + return sifIndex(left, right, id, output); } -static int32_t sifMatchFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; +static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_GREATER_THAN; + return sifIndex(left, right, id, output); } -static int32_t sifNotMatchFunc(SNode *left, SNode *rigth, SIFParam *output) { - // impl later - return TSDB_CODE_SUCCESS; +static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_GREATER_EQUAL; + return sifIndex(left, right, id, output); } -static int32_t sifDefaultFunc(SNode *left, SNode *rigth, SIFParam *output) { + +static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_EQUAL; + return sifIndex(left, right, id, output); +} +static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_NOT_EQUAL; + return sifIndex(left, right, id, output); +} +static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_IN; + return sifIndex(left, right, id, output); +} +static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_NOT_IN; + return sifIndex(left, right, id, output); +} +static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_LIKE; + return sifIndex(left, right, id, output); +} +static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_NOT_LIKE; + return sifIndex(left, right, id, output); +} + +static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_MATCH; + return sifIndex(left, right, id, output); +} +static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { + int id = OP_TYPE_NMATCH; + return sifIndex(left, right, id, output); +} +static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) { // add more except return TSDB_CODE_QRY_INVALID_INPUT; } @@ -252,17 +325,18 @@ static sif_func_t sifGetOperFn(int32_t funcId) { return sifDefaultFunc; } static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { - int32_t code = 0; - SIFParam *params = NULL; - SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx)); - + int32_t code = 0; int32_t nParam = sifGetOperParamNum(node->opType); if (nParam <= 1) { SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + + SIFParam *params = NULL; + SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx)); + sif_func_t operFn = sifGetOperFn(node->opType); - return operFn(node->pLeft, node->pRight, output); + return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output); _return: taosMemoryFree(params); SIF_RET(code); @@ -335,7 +409,6 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { if (ctx->code) { return DEAL_RES_ERROR; } - if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 7d52abcd1b..aeb5e01175 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -258,13 +258,13 @@ void indexOptsDestroy(SIndexOpts* opts) { * */ SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) { - SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery)); - if (p == NULL) { + SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery)); + if (mtq == NULL) { return NULL; } - p->opera = opera; - p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); - return p; + mtq->opera = opera; + mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery)); + return mtq; } void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) { for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) { @@ -282,23 +282,24 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) { - SIndexTerm* t = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm))); - if (t == NULL) { + SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm))); + if (tm == NULL) { return NULL; } - t->suid = suid; - t->operType = oper; - t->colType = colType; + tm->suid = suid; + tm->operType = oper; + tm->colType = colType; - t->colName = (char*)taosMemoryCalloc(1, nColName + 1); - memcpy(t->colName, colName, nColName); - t->nColName = nColName; + tm->colName = (char*)taosMemoryCalloc(1, nColName + 1); + memcpy(tm->colName, colName, nColName); + tm->nColName = nColName; - t->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); - memcpy(t->colVal, colVal, nColVal); - t->nColVal = nColVal; - return t; + tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); + memcpy(tm->colVal, colVal, nColVal); + tm->nColVal = nColVal; + + return tm; } void indexTermDestroy(SIndexTerm* p) { taosMemoryFree(p->colName);