diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index d9a7cea411..7dc13ce5f3 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -336,7 +336,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * } if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) { QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); - taosMemoryFree(msg); + rpcFreeCont(msg); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } diff --git a/source/libs/scalar/inc/filterInt.h b/source/libs/scalar/inc/filterInt.h index e26bc59d47..2023d38777 100644 --- a/source/libs/scalar/inc/filterInt.h +++ b/source/libs/scalar/inc/filterInt.h @@ -101,6 +101,11 @@ typedef int32_t (*filter_desc_compare_func)(const void *, const void *); typedef bool (*filter_exec_func)(void *, int32_t, SColumnInfoData *, SColumnDataAgg *, int16_t, int32_t *); typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char *, void **); +typedef struct SFilterDataInfo { + int32_t idx; + void* addr; +} SFilterDataInfo; + typedef struct SFilterRangeCompare { int64_t s; int64_t e; @@ -294,9 +299,9 @@ struct SFilterInfo { #define CHK_OR_OPTR(ctx) ((ctx)->isnull == true && (ctx)->notnull == true) #define CHK_AND_OPTR(ctx) ((ctx)->isnull == true && (((ctx)->notnull == true) || ((ctx)->isrange == true))) -#define FILTER_GET_FLAG(st, f) (st & f) -#define FILTER_SET_FLAG(st, f) st |= (f) -#define FILTER_CLR_FLAG(st, f) st &= (~f) +#define FILTER_GET_FLAG(st, f) ((st) & (f)) +#define FILTER_SET_FLAG(st, f) (st) |= (f) +#define FILTER_CLR_FLAG(st, f) (st) &= (~f) #define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src) #define FLT_PACKAGE_UNIT_HASH_KEY(v, op1, op2, lidx, ridx, ridx2) \ diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 45931c209c..1cf4bd9923 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -963,16 +963,17 @@ int32_t filterGetFiledByDesc(SFilterFields *fields, int32_t type, void *v) { return -1; } -int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t dataLen) { +int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t dataLen, bool *sameBuf) { if (type == FLD_TYPE_VALUE) { if (info->pctx.valHash == false) { qError("value hash is empty"); return -1; } - void *hv = taosHashGet(info->pctx.valHash, v, dataLen); - if (hv) { - return *(int32_t *)hv; + SFilterDataInfo *dInfo = taosHashGet(info->pctx.valHash, v, dataLen); + if (dInfo) { + *sameBuf = (dInfo->addr == v); + return dInfo->idx; } } @@ -982,9 +983,10 @@ int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t d // In the params, we should use void *data instead of void **data, there is no need to use taosMemoryFreeClear(*data) to // set *data = 0 Besides, fields data value is a pointer, so dataLen should be POINTER_BYTES for better. int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type, SFilterFieldId *fid, int32_t dataLen, - bool freeIfExists) { + bool freeIfExists, int16_t *srcFlag) { int32_t idx = -1; uint32_t *num; + bool sameBuf = false; num = &info->fields[type].num; @@ -992,7 +994,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type, if (type == FLD_TYPE_COLUMN) { idx = filterGetFiledByDesc(&info->fields[type], type, desc); } else if (data && (*data) && dataLen > 0 && FILTER_GET_FLAG(info->options, FLT_OPTION_NEED_UNIQE)) { - idx = filterGetFiledByData(info, type, *data, dataLen); + idx = filterGetFiledByData(info, type, *data, dataLen, &sameBuf); } } @@ -1020,11 +1022,17 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); } - taosHashPut(info->pctx.valHash, *data, dataLen, &idx, sizeof(idx)); + SFilterDataInfo dInfo = {idx, *data}; + taosHashPut(info->pctx.valHash, *data, dataLen, &dInfo, sizeof(dInfo)); + if (srcFlag) { + FILTER_SET_FLAG(*srcFlag, FLD_DATA_NO_FREE); + } } - } else { - if (data && freeIfExists) { + } else if (type != FLD_TYPE_COLUMN && data) { + if (freeIfExists) { taosMemoryFreeClear(*data); + } else if (sameBuf) { + FILTER_SET_FLAG(*srcFlag, FLD_DATA_NO_FREE); } } @@ -1035,7 +1043,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type, } static FORCE_INLINE int32_t filterAddColFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) { - filterAddField(info, field->desc, &field->data, FILTER_GET_TYPE(field->flag), fid, 0, false); + filterAddField(info, field->desc, &field->data, FILTER_GET_TYPE(field->flag), fid, 0, false, NULL); FILTER_SET_FLAG(field->flag, FLD_DATA_NO_FREE); @@ -1064,7 +1072,7 @@ int32_t filterAddFieldFromNode(SFilterInfo *info, SNode *node, SFilterFieldId *f v = node; } - filterAddField(info, v, NULL, type, fid, 0, true); + filterAddField(info, v, NULL, type, fid, 0, true, NULL); return TSDB_CODE_SUCCESS; } @@ -1195,7 +1203,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) { len = tDataTypes[type].bytes; - filterAddField(info, NULL, (void **)&out.columnData->pData, FLD_TYPE_VALUE, &right, len, true); + filterAddField(info, NULL, (void **)&out.columnData->pData, FLD_TYPE_VALUE, &right, len, true, NULL); out.columnData->pData = NULL; } else { void *data = taosMemoryCalloc(1, tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes); // reserved space for simple_copy @@ -1203,7 +1211,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) { FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } memcpy(data, nodesGetValueFromNode(valueNode), tDataTypes[type].bytes); - filterAddField(info, NULL, (void **)&data, FLD_TYPE_VALUE, &right, len, true); + filterAddField(info, NULL, (void **)&data, FLD_TYPE_VALUE, &right, len, true, NULL); } filterAddUnit(info, OP_TYPE_EQUAL, &left, &right, &uidx); @@ -1234,28 +1242,26 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit *u uint8_t type = FILTER_UNIT_DATA_TYPE(u); uint16_t flag = 0; - filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0, false); + filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0, false, NULL); SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u); if (u->right.type == FLD_TYPE_VALUE) { void *data = FILTER_UNIT_VAL_DATA(src, u); + SFilterField *rField = FILTER_UNIT_RIGHT_FIELD(src, u); + if (IS_VAR_DATA_TYPE(type)) { if (FILTER_UNIT_OPTR(u) == OP_TYPE_IN) { filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, POINTER_BYTES, - false); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right. + false, &rField->flag); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right. t = FILTER_GET_FIELD(dst, right); FILTER_SET_FLAG(t->flag, FLD_DATA_IS_HASH); } else { - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, varDataTLen(data), false); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, varDataTLen(data), false, &rField->flag); } } else { - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, false); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, false, &rField->flag); } - - flag = FLD_DATA_NO_FREE; - t = FILTER_UNIT_RIGHT_FIELD(src, u); - FILTER_SET_FLAG(t->flag, flag); } else { pright = NULL; } @@ -1313,17 +1319,17 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan if (func(&ra->s, &ra->e) == 0) { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &ra->s); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); filterAddUnit(dst, OP_TYPE_EQUAL, &left, &right, &uidx); filterAddUnitToGroup(g, uidx); return TSDB_CODE_SUCCESS; } else { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &ra->s); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); void *data2 = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data2, &ra->e); - filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true, NULL); filterAddUnitImpl( dst, FILTER_GET_FLAG(ra->sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL, &left, @@ -1337,7 +1343,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan if (!FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &ra->s); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL, &left, &right, &uidx); filterAddUnitToGroup(g, uidx); @@ -1346,7 +1352,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan if (!FILTER_GET_FLAG(ra->eflag, RANGE_FLG_NULL)) { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &ra->e); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); filterAddUnit(dst, FILTER_GET_FLAG(ra->eflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_LOWER_THAN : OP_TYPE_LOWER_EQUAL, &left, &right, &uidx); filterAddUnitToGroup(g, uidx); @@ -1393,16 +1399,16 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan if (func(&r->ra.s, &r->ra.e) == 0) { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &r->ra.s); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); filterAddUnit(dst, OP_TYPE_EQUAL, &left, &right, &uidx); filterAddUnitToGroup(g, uidx); } else { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &r->ra.s); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); void *data2 = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data2, &r->ra.e); - filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true, NULL); filterAddUnitImpl( dst, FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL, &left, @@ -1421,7 +1427,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan if (!FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_NULL)) { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &r->ra.s); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL, &left, &right, &uidx); filterAddUnitToGroup(g, uidx); @@ -1430,7 +1436,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan if (!FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_NULL)) { void *data = taosMemoryMalloc(sizeof(int64_t)); SIMPLE_COPY_VALUES(data, &r->ra.e); - filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true); + filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL); filterAddUnit(dst, FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_LOWER_THAN : OP_TYPE_LOWER_EQUAL, &left, &right, &uidx); filterAddUnitToGroup(g, uidx);