diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4e9e5a8654..bcd3e294d1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -63,6 +63,9 @@ static SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int3 static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo); static char* getAccountId(SSqlObj* pSql); +static bool serializeExprListToVariant(SArray* pList, tVariant **dest); +static int32_t validateParamOfRelationIn(tVariant *pVar, int32_t colType); + static bool has(SArray* pFieldList, int32_t startIdx, const char* name); static char* cloneCurrentDBName(SSqlObj* pSql); static int32_t getDelimiterIndex(SStrToken* pTableName); @@ -134,14 +137,79 @@ static bool validateDebugFlag(int32_t v); static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo); -static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) { +static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) { return pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0; } + int16_t getNewResColId(SSqlCmd* pCmd) { return pCmd->resColumnId--; } +// serialize expr in exprlist to binary +// formate "type | size | value" +bool serializeExprListToVariant(SArray* pList, tVariant **dst) { + bool ret = false; + if (!pList || pList->size <= 0) { + return ret; + } + + SBufferWriter bw = tbufInitWriter( NULL, false ); + tbufEnsureCapacity(&bw, 512); + + tSqlExprItem* item = (tSqlExprItem *)taosArrayGet(pList, 0); + int32_t size = pList->size; + int32_t type = item->pNode->token.type; + if (type < 0) { + tbufCloseWriter(&bw); + return ret; + } + + toTSDBType(item->pNode->token.type); + tbufWriteUint32(&bw, item->pNode->token.type); + tbufWriteInt32(&bw, size); + + for (int32_t i = 0; i < size; i++) { + tSqlExpr* pSub = ((tSqlExprItem*)(taosArrayGet(pList, i)))->pNode; + + // validate type in exprList + if (type != pSub->token.type) { + break; + } + tVariant var; + tVariantCreate(&var, &pSub->token); + if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT + || type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_INT) { + tbufWriteInt64(&bw, var.i64); + } else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) { + tbufWriteDouble(&bw, var.dKey); + } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + tbufWriteBinary(&bw, var.pz, var.nLen); + } + tVariantDestroy(&var); + + if (i == size - 1) { ret = true;} + } + + if (ret == true) { + if ((*dst = calloc(1, sizeof(tVariant))) != NULL) { + tVariantCreateFromBinary(*dst, tbufGetData(&bw, false), tbufTell(&bw), TSDB_DATA_TYPE_BINARY); + } else { + ret = false; + } + } + tbufCloseWriter(&bw); + return ret; +} + +static int32_t validateParamOfRelationIn(tVariant *pVar, int32_t colType) { + if (pVar->nType != TSDB_DATA_TYPE_BINARY) { + return -1; + } + SBufferReader br = tbufInitReader(pVar->pz, pVar->nLen, false); + return tbufReadUint32(&br) == colType ? 0: -1; +} + static uint8_t convertOptr(SStrToken *pToken) { switch (pToken->type) { case TK_LT: @@ -3205,7 +3273,27 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, retVal = tVariantDump(&pRight->value, (char*)&pColumnFilter->upperBndd, colType, false); // TK_GT,TK_GE,TK_EQ,TK_NE are based on the pColumn->lowerBndd - } else if (colType == TSDB_DATA_TYPE_BINARY) { + } else if (pExpr->tokenId == TK_IN) { + tVariant *pVal; + if (pRight->tokenId != TK_SET || !serializeExprListToVariant(pRight->pParam, &pVal)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg); + } + if (validateParamOfRelationIn(pVal, colType) != TSDB_CODE_SUCCESS) { + tVariantDestroy(pVal); + free(pVal); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg); + } + pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen + 1); + pColumnFilter->len = pVal->nLen; + pColumnFilter->filterstr = 1; + memcpy((char *)(pColumnFilter->pz), (char *)(pVal->pz), pVal->nLen); + //retVal = tVariantDump(pVal, (char *)(pColumnFilter->pz), TSDB_DATA_TYPE_BINARY, false); + + tVariantDestroy(pVal); + free(pVal); + + } + else if (colType == TSDB_DATA_TYPE_BINARY) { pColumnFilter->pz = (int64_t)calloc(1, bufLen * TSDB_NCHAR_SIZE); pColumnFilter->len = pRight->value.nLen; retVal = tVariantDump(&pRight->value, (char*)pColumnFilter->pz, colType, false); @@ -3253,6 +3341,9 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, case TK_NOTNULL: pColumnFilter->lowerRelOptr = TSDB_RELATION_NOTNULL; break; + case TK_IN: + pColumnFilter->lowerRelOptr = TSDB_RELATION_IN; + break; default: return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg); } @@ -3368,7 +3459,7 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC && pExpr->tokenId != TK_ISNULL && pExpr->tokenId != TK_NOTNULL && pExpr->tokenId != TK_LIKE - ) { + && pExpr->tokenId != TK_IN) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } } else { diff --git a/src/common/inc/texpr.h b/src/common/inc/texpr.h index 275dd12fd7..2e49a69366 100644 --- a/src/common/inc/texpr.h +++ b/src/common/inc/texpr.h @@ -94,6 +94,8 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, char *(*cb)(void *, const char*, int32_t)); +void buildFilterSetFromBinary(void **q, const char *buf, int32_t len); + #ifdef __cplusplus } #endif diff --git a/src/common/src/texpr.c b/src/common/src/texpr.c index f2dd3890a1..0805eff9ff 100644 --- a/src/common/src/texpr.c +++ b/src/common/src/texpr.c @@ -466,6 +466,28 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { return expr; } +void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) { + SBufferReader br = tbufInitReader(buf, len, false); + uint32_t type = tbufReadUint32(&br); + SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false); + int dummy = -1; + int32_t sz = tbufReadInt32(&br); + for (int32_t i = 0; i < sz; i++) { + if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT || type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_INT) { + int64_t val = tbufReadInt64(&br); + taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy)); + } else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) { + double val = tbufReadDouble(&br); + taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy)); + } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + size_t t = 0; + const char *val = tbufReadBinary(&br, &t); + taosHashPut(pObj, (char *)val, t, &dummy, sizeof(dummy)); + } + } + *q = (void *)pObj; +} + tExprNode* exprdup(tExprNode* pNode) { if (pNode == NULL) { return NULL; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b5b4fb5854..27d1e14b45 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -113,6 +113,7 @@ typedef struct SColumnFilterElem { int16_t bytes; // column length __filter_func_t fp; SColumnFilterInfo filterInfo; + void *q; } SColumnFilterElem; typedef struct SSingleColumnFilterInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 25e7e446bd..350e7118a8 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6940,6 +6940,10 @@ int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfF } pSingleColFilter->bytes = pCols[i].bytes; + + if (lower == TSDB_RELATION_IN) { + buildFilterSetFromBinary(&pSingleColFilter->q, (char *)(pSingleColFilter->filterInfo.pz), pSingleColFilter->filterInfo.len); + } } j++; diff --git a/src/query/src/qFilterfunc.c b/src/query/src/qFilterfunc.c index dabce88423..7769ec4a57 100644 --- a/src/query/src/qFilterfunc.c +++ b/src/query/src/qFilterfunc.c @@ -253,6 +253,25 @@ bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type) { return true; } +bool inOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type) { + if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT || type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_INT) { + int64_t minv = -1, maxv = -1; + GET_TYPED_DATA(minv, int64_t, type, minval); + GET_TYPED_DATA(maxv, int64_t, type, maxval); + if (minv == maxv) { + return NULL != taosHashGet((SHashObj *)pFilter->q, (char *)&minv, sizeof(minv)); + } + return true; + } else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_DOUBLE) { + double v; + GET_TYPED_DATA(v, double, type, minval); + return NULL != taosHashGet((SHashObj *)pFilter->q, (char *)&v, sizeof(v)); + } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){ + return NULL != taosHashGet((SHashObj *)pFilter->q, varDataVal(minval), varDataLen(minval)); + } + + return false; +} /////////////////////////////////////////////////////////////////////////////// bool rangeFilter_ii(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) { @@ -389,6 +408,7 @@ bool (*filterOperators[])(SColumnFilterElem *pFilter, const char* minval, const likeOperator, isNullOperator, notNullOperator, + inOperator, }; bool (*rangeFilterOperators[])(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type) = { @@ -397,6 +417,7 @@ bool (*rangeFilterOperators[])(SColumnFilterElem *pFilter, const char* minval, c rangeFilter_ie, rangeFilter_ei, rangeFilter_ii, + }; __filter_func_t getFilterOperator(int32_t lowerOptr, int32_t upperOptr) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1545d44395..bd609a6708 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -3153,11 +3153,22 @@ void filterPrepare(void* expr, void* param) { pInfo->sch = *pSchema; pInfo->optr = pExpr->_node.optr; - pInfo->compare = getComparFunc(pSchema->type, pInfo->optr); + pInfo->compare = getComparFunc(pInfo->sch.type, pInfo->optr); pInfo->indexed = pTSSchema->columns->colId == pInfo->sch.colId; if (pInfo->optr == TSDB_RELATION_IN) { - pInfo->q = (char*) pCond->arr; + int dummy = -1; + SHashObj *pObj = NULL; + if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) { + pObj = taosHashInit(256, taosGetDefaultHashFunction(pInfo->sch.type), true, false); + SArray *arr = (SArray *)(pCond->arr); + for (size_t i = 0; i < taosArrayGetSize(arr); i++) { + taosHashPut(pObj, (char *)taosArrayGet(arr, i), arr->elemSize, &dummy, sizeof(dummy)); + } + } else { + buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen); + } + pInfo->q = (char *)pObj; } else if (pCond != NULL) { uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE; if (size < (uint32_t)pSchema->bytes) { @@ -3328,6 +3339,20 @@ static bool tableFilterFp(const void* pNode, void* param) { } else if (pInfo->optr == TSDB_RELATION_NOTNULL) { return (val != NULL) && (!isNull(val, pInfo->sch.type)); } + } else if (pInfo->optr == TSDB_RELATION_IN) { + int type = pInfo->sch.type; + if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT || type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_INT) { + int64_t v; + GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val); + return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v)); + } else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_DOUBLE) { + double v; + GET_TYPED_DATA(v, double, pInfo->sch.type, val); + return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v)); + } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){ + return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val)); + } + } int32_t ret = 0; @@ -3670,14 +3695,18 @@ static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) { if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL || optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) { - pCond->start = calloc(1, sizeof(SEndPoint)); + pCond->start = calloc(1, sizeof(SEndPoint)); pCond->start->optr = queryColInfo->optr; - pCond->start->v = queryColInfo->q; + pCond->start->v = queryColInfo->q; } else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) { - pCond->end = calloc(1, sizeof(SEndPoint)); + pCond->end = calloc(1, sizeof(SEndPoint)); pCond->end->optr = queryColInfo->optr; - pCond->end->v = queryColInfo->q; - } else if (optr == TSDB_RELATION_IN || optr == TSDB_RELATION_LIKE) { + pCond->end->v = queryColInfo->q; + } else if (optr == TSDB_RELATION_IN) { + pCond->start = calloc(1, sizeof(SEndPoint)); + pCond->start->optr = queryColInfo->optr; + pCond->start->v = queryColInfo->q; + } else if (optr == TSDB_RELATION_LIKE) { assert(0); } @@ -3762,6 +3791,19 @@ static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SAr taosArrayPush(result, &info); } + } else if (optr == TSDB_RELATION_IN) { + while(tSkipListIterNext(iter)) { + SSkipListNode* pNode = tSkipListIterGet(iter); + + int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v); + if (ret != 0) { + break; + } + + STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; + taosArrayPush(result, &info); + } + } else { assert(0); } @@ -3855,7 +3897,7 @@ void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *re param->setupInfoFn(pExpr, param->pExtInfo); tQueryInfo *pQueryInfo = pExpr->_node.info; - if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) { + if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE && pQueryInfo->optr != TSDB_RELATION_IN)) { queryIndexedColumn(pSkipList, pQueryInfo, result); } else { queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn); diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index 09199eaee3..f91d13e6d0 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -17,6 +17,7 @@ #include "ttype.h" #include "tcompare.h" #include "tarray.h" +#include "hash.h" int32_t compareInt32Val(const void *pLeft, const void *pRight) { int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight); @@ -291,9 +292,12 @@ int32_t taosArrayCompareString(const void* a, const void* b) { return compareLenPrefixedStr(x, y); } -static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) { - const SArray* arr = (const SArray*) pRight; - return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1; +//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) { +// const SArray* arr = (const SArray*) pRight; +// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1; +//} +static int32_t compareFindItemInSet(const void *pLeft, const void* pRight) { + return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0; } static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) { @@ -325,7 +329,7 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) { if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */ comparFn = compareStrPatternComp; } else if (optr == TSDB_RELATION_IN) { - comparFn = compareFindStrInArray; + comparFn = compareFindItemInSet; } else { /* normal relational comparFn */ comparFn = compareLenPrefixedStr; }