From d77c190ec6c906f02391c46e8af0a930b18ed834 Mon Sep 17 00:00:00 2001 From: wpan Date: Thu, 22 Jul 2021 09:44:15 +0800 Subject: [PATCH] support filter duplicated unit --- src/query/inc/qFilter.h | 6 +- src/query/src/qFilter.c | 168 ++++++++++++++++++++++++++++------------ 2 files changed, 124 insertions(+), 50 deletions(-) diff --git a/src/query/inc/qFilter.h b/src/query/inc/qFilter.h index ebf0fda55b..a383b15473 100644 --- a/src/query/inc/qFilter.h +++ b/src/query/inc/qFilter.h @@ -21,10 +21,12 @@ extern "C" { #endif #include "texpr.h" +#include "hash.h" #define FILTER_DEFAULT_GROUP_SIZE 4 #define FILTER_DEFAULT_UNIT_SIZE 4 #define FILTER_DEFAULT_FIELD_SIZE 4 +#define FILTER_DEFAULT_VALUE_SIZE 4 #define FILTER_DEFAULT_GROUP_UNIT_SIZE 2 enum { @@ -175,7 +177,7 @@ typedef struct SFilterInfo { SFilterUnit *units; uint8_t *unitRes; // result uint8_t *unitFlags; // got result - SFilterPCtx *pctx; + SFilterPCtx pctx; } SFilterInfo; #define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t)) @@ -196,7 +198,7 @@ typedef struct SFilterInfo { #define FILTER_CLR_FLAG(st, f) st &= (~f) #define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src) - +#define FILTER_PACKAGE_UNIT_HASH_KEY(v, optr, idx1, idx2) do { char *_t = (char *)v; _t[0] = optr; *(uint16_t *)(_t + 1) = idx1; *(uint16_t *)(_t + 3) = idx2; } while (0) #define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RA_EXCLUDE) || FILTER_GET_FLAG(eflag,RA_EXCLUDE)))) #define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0) diff --git a/src/query/src/qFilter.c b/src/query/src/qFilter.c index 99355103b9..26ecfa0ac7 100644 --- a/src/query/src/qFilter.c +++ b/src/query/src/qFilter.c @@ -576,7 +576,7 @@ int32_t filterDetachCnfGroups(SArray* group, SArray* left, SArray* right) { return TSDB_CODE_SUCCESS; } -int32_t filterGetFiled(SFilterFields* fields, int32_t type, void *v) { +int32_t filterGetFiledByDesc(SFilterFields* fields, int32_t type, void *v) { for (uint16_t i = 0; i < fields->num; ++i) { if (0 == gDescCompare[type](fields->fields[i].desc, v)) { return i; @@ -586,14 +586,36 @@ int32_t filterGetFiled(SFilterFields* fields, int32_t type, void *v) { return -1; } -int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, SFilterFieldId *fid) { + +int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t dataLen) { + if (type == FLD_TYPE_VALUE) { + if (info->pctx.valHash == false) { + qError("value hash is empty"); + return -1; + } + + void *hv = taosHashGet(info->pctx.valHash, v, dataLen); + if (hv) { + return *(int32_t *)hv; + } + } + + return -1; +} + + +int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, SFilterFieldId *fid, int32_t dataLen) { int32_t idx = -1; uint16_t *num; num = &info->fields[type].num; - if (*num > 0 && type != FLD_TYPE_VALUE) { - idx = filterGetFiled(&info->fields[type], type, desc); + if (*num > 0) { + if (type == FLD_TYPE_COLUMN) { + idx = filterGetFiledByDesc(&info->fields[type], type, desc); + } else if (dataLen > 0 && FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) { + idx = filterGetFiledByData(info, type, data, dataLen); + } } if (idx < 0) { @@ -607,6 +629,14 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, info->fields[type].fields[idx].desc = desc; info->fields[type].fields[idx].data = data; ++(*num); + + if (data && dataLen > 0 && FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) { + if (info->pctx.valHash == NULL) { + info->pctx.valHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_VALUE_SIZE, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); + } + + taosHashPut(info->pctx.valHash, data, dataLen, &idx, sizeof(idx)); + } } fid->type = type; @@ -616,7 +646,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, } static FORCE_INLINE int32_t filterAddColFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) { - filterAddField(info, field->desc, field->data, FILTER_GET_TYPE(field->flag), fid); + filterAddField(info, field->desc, field->data, FILTER_GET_TYPE(field->flag), fid, 0); FILTER_SET_FLAG(field->flag, FLD_DESC_NO_FREE); FILTER_SET_FLAG(field->flag, FLD_DATA_NO_FREE); @@ -642,12 +672,26 @@ int32_t filterAddFieldFromNode(SFilterInfo *info, tExprNode *node, SFilterFieldI node->pVal = NULL; } - filterAddField(info, v, NULL, type, fid); + filterAddField(info, v, NULL, type, fid, 0); return TSDB_CODE_SUCCESS; } -int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFilterFieldId *right) { +int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFilterFieldId *right, uint16_t *uidx) { + if (FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) { + if (info->pctx.unitHash == NULL) { + info->pctx.unitHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_UNIT_SIZE, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, false); + } else { + int64_t v = 0; + FILTER_PACKAGE_UNIT_HASH_KEY(&v, optr, left->idx, right ? right->idx : -1); + void *hu = taosHashGet(info->pctx.unitHash, &v, sizeof(v)); + if (hu) { + *uidx = *(uint16_t *)hu; + return TSDB_CODE_SUCCESS; + } + } + } + if (info->unitNum >= info->unitSize) { uint16_t psize = info->unitSize; info->unitSize += FILTER_DEFAULT_UNIT_SIZE; @@ -674,6 +718,14 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi assert(FILTER_GET_FLAG(col->flag, FLD_TYPE_COLUMN)); info->units[info->unitNum].compare.type = FILTER_GET_COL_FIELD_TYPE(col); + + *uidx = info->unitNum; + + if (FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) { + int64_t v = 0; + FILTER_PACKAGE_UNIT_HASH_KEY(&v, optr, left->idx, right ? right->idx : -1); + taosHashPut(info->pctx.unitHash, &v, sizeof(v), uidx, sizeof(*uidx)); + } ++info->unitNum; @@ -702,8 +754,10 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g tVariant* var = tree->_node.pRight->pVal; int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(info, left)); + int32_t len = 0; + uint16_t uidx = 0; - if (tree->_node.optr == TSDB_RELATION_IN && (!IS_VAR_DATA_TYPE(type))) { + if (tree->_node.optr == TSDB_RELATION_IN && (!IS_VAR_DATA_TYPE(type))) { void *data = NULL; convertFilterSetFromBinary((void **)&data, var->pz, var->nLen, type); CHK_LRET(data == NULL, TSDB_CODE_QRY_APP_ERROR, "failed to convert in param"); @@ -714,21 +768,23 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g void *fdata = NULL; if (IS_VAR_DATA_TYPE(type)) { - uint32_t len = taosHashGetDataKeyLen((SHashObj *)data, p); + len = (int32_t)taosHashGetDataKeyLen((SHashObj *)data, p); fdata = malloc(len + VARSTR_HEADER_SIZE); varDataLen(fdata) = len; memcpy(varDataVal(fdata), key, len); + len += VARSTR_HEADER_SIZE; } else { fdata = malloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(fdata, key); + len = tDataTypes[type].bytes; } - filterAddField(info, NULL, fdata, FLD_TYPE_VALUE, &right); + filterAddField(info, NULL, fdata, FLD_TYPE_VALUE, &right, len); - filterAddUnit(info, TSDB_RELATION_EQUAL, &left, &right); + filterAddUnit(info, TSDB_RELATION_EQUAL, &left, &right, &uidx); SFilterGroup fgroup = {0}; - filterAddUnitToGroup(&fgroup, info->unitNum - 1); + filterAddUnitToGroup(&fgroup, uidx); taosArrayPush(group, &fgroup); @@ -737,10 +793,10 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g } else { filterAddFieldFromNode(info, tree->_node.pRight, &right); - filterAddUnit(info, tree->_node.optr, &left, &right); + filterAddUnit(info, tree->_node.optr, &left, &right, &uidx); SFilterGroup fgroup = {0}; - filterAddUnitToGroup(&fgroup, info->unitNum - 1); + filterAddUnitToGroup(&fgroup, uidx); taosArrayPush(group, &fgroup); } @@ -749,15 +805,25 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g } -int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u) { +int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u, uint16_t *uidx) { SFilterFieldId left, right, *pright = &right; + int32_t type = FILTER_UNIT_DATA_TYPE(u); - filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left); + filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0); SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u); FILTER_SET_FLAG(t->flag, FLD_DESC_NO_FREE); if (u->right.type == FLD_TYPE_VALUE) { - filterAddField(dst, NULL, FILTER_UNIT_VAL_DATA(src, u), FLD_TYPE_VALUE, &right); + void *data = FILTER_UNIT_VAL_DATA(src, u); + if (IS_VAR_DATA_TYPE(type)) { + if (FILTER_UNIT_OPTR(u) == TSDB_RELATION_IN) { + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, 0); + } else { + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, varDataTLen(data)); + } + } else { + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes); + } t = FILTER_UNIT_RIGHT_FIELD(src, u); FILTER_SET_FLAG(t->flag, FLD_DATA_NO_FREE); } else { @@ -765,29 +831,32 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u } - return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, pright); + return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, pright, uidx); } int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMCtx *ctx, uint16_t cidx, SFilterGroup *g, int32_t optr, SArray *res) { SFilterFieldId left, right; + uint16_t uidx = 0; SFilterField *col = FILTER_GET_COL_FIELD(src, cidx); filterAddColFieldFromField(dst, col, &left); + int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left)); + if (optr == TSDB_RELATION_AND) { if (ctx->isnull) { assert(ctx->notnull == false && ctx->isrange == false); - filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL, &uidx); + filterAddUnitToGroup(g, uidx); return TSDB_CODE_SUCCESS; } if (ctx->notnull) { assert(ctx->isnull == false && ctx->isrange == false); - filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL, &uidx); + filterAddUnitToGroup(g, uidx); return TSDB_CODE_SUCCESS; } @@ -803,14 +872,13 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC assert(!((FILTER_GET_FLAG(ra->sflag, RA_NULL)) && (FILTER_GET_FLAG(ra->eflag, RA_NULL)))); if ((!FILTER_GET_FLAG(ra->sflag, RA_NULL)) && (!FILTER_GET_FLAG(ra->eflag, RA_NULL))) { - int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left)); __compar_fn_t func = getComparFunc(type, 0); if (func(&ra->s, &ra->e) == 0) { void *data = malloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &ra->s); - filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right); - filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes); + filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right, &uidx); + filterAddUnitToGroup(g, uidx); return TSDB_CODE_SUCCESS; } } @@ -818,17 +886,17 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC if (!FILTER_GET_FLAG(ra->sflag, RA_NULL)) { void *data = malloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &ra->s); - filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right); - filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes); + filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right, &uidx); + filterAddUnitToGroup(g, uidx); } if (!FILTER_GET_FLAG(ra->eflag, RA_NULL)) { void *data = malloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &ra->e); - filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right); - filterAddUnit(dst, FILTER_GET_FLAG(ra->eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes); + filterAddUnit(dst, FILTER_GET_FLAG(ra->eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right, &uidx); + filterAddUnitToGroup(g, uidx); } return TSDB_CODE_SUCCESS; @@ -842,8 +910,8 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC assert(ctx->isnull || ctx->notnull || ctx->isrange); if (ctx->isnull) { - filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL, &uidx); + filterAddUnitToGroup(g, uidx); taosArrayPush(res, g); } @@ -851,8 +919,8 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC assert(!ctx->isrange); memset(g, 0, sizeof(*g)); - filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL, &uidx); + filterAddUnitToGroup(g, uidx); taosArrayPush(res, g); } @@ -868,14 +936,13 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC memset(g, 0, sizeof(*g)); if ((!FILTER_GET_FLAG(r->ra.sflag, RA_NULL)) &&(!FILTER_GET_FLAG(r->ra.eflag, RA_NULL))) { - int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left)); __compar_fn_t func = getComparFunc(type, 0); if (func(&r->ra.s, &r->ra.e) == 0) { void *data = malloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &r->ra.s); - filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right); - filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right); - filterAddUnitToGroup(g, dst->unitNum - 1); + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes); + filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right, &uidx); + filterAddUnitToGroup(g, uidx); taosArrayPush(res, g); @@ -885,15 +952,19 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC } if (!FILTER_GET_FLAG(r->ra.sflag, RA_NULL)) { - filterAddField(dst, NULL, &r->ra.s, FLD_TYPE_VALUE, &right); - filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right); - filterAddUnitToGroup(g, dst->unitNum - 1); + void *data = malloc(sizeof(int64_t)); + SIMPLE_COPY_VALUES(data, &r->ra.s); + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes); + filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right, &uidx); + filterAddUnitToGroup(g, uidx); } if (!FILTER_GET_FLAG(r->ra.eflag, RA_NULL)) { - filterAddField(dst, NULL, &r->ra.e, FLD_TYPE_VALUE, &right); - filterAddUnit(dst, FILTER_GET_FLAG(r->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right); - filterAddUnitToGroup(g, dst->unitNum - 1); + void *data = malloc(sizeof(int64_t)); + SIMPLE_COPY_VALUES(data, &r->ra.e); + filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes); + filterAddUnit(dst, FILTER_GET_FLAG(r->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right, &uidx); + filterAddUnitToGroup(g, uidx); } assert (g->unitNum > 0); @@ -1667,6 +1738,7 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t gResNum SFilterGroupCtx *res = NULL; SFilterColInfo *colInfo = NULL; int32_t optr = 0; + uint16_t uidx = 0; memset(info, 0, sizeof(*info)); @@ -1690,8 +1762,8 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t gResNum for (int32_t n = 0; n < usize; ++n) { SFilterUnit* u = taosArrayGetP((SArray *)colInfo->info, n); - filterAddUnitFromUnit(info, &oinfo, u); - filterAddUnitToGroup(&ng, info->unitNum - 1); + filterAddUnitFromUnit(info, &oinfo, u, &uidx); + filterAddUnitToGroup(&ng, uidx); } continue;