refact code
This commit is contained in:
parent
375847cad6
commit
d5a8aedb34
|
@ -57,6 +57,11 @@ enum {
|
|||
FILTER_NO_REWRITE = 4,
|
||||
};
|
||||
|
||||
enum {
|
||||
RANGE_TYPE_UNIT = 1,
|
||||
RANGE_TYPE_COL_RANGE,
|
||||
};
|
||||
|
||||
typedef struct OptrStr {
|
||||
uint16_t optr;
|
||||
char *str;
|
||||
|
@ -117,12 +122,18 @@ typedef struct SFilterGroup {
|
|||
uint8_t *unitFlags; // !unit result
|
||||
} SFilterGroup;
|
||||
|
||||
typedef struct SFilterColInfo {
|
||||
uint8_t type;
|
||||
void *info;
|
||||
} SFilterColInfo;
|
||||
|
||||
typedef struct SFilterGroupCtx {
|
||||
uint16_t num;
|
||||
int32_t *col;
|
||||
SArray *colRange;
|
||||
uint16_t colNum;
|
||||
uint16_t *colIdx;
|
||||
SArray **colInfo;
|
||||
} SFilterGroupCtx;
|
||||
|
||||
|
||||
typedef struct SFilterCompare {
|
||||
__compar_fn_t pCompareFunc;
|
||||
int32_t type;
|
||||
|
@ -195,6 +206,11 @@ typedef struct SFilterInfo {
|
|||
#define FILTER_GET_VAL_FIELD_DATA(fi) ((fi)->data)
|
||||
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
|
||||
|
||||
|
||||
#define FILTER_PUSH_UNIT(colInfo, u) do { SFilterColInfo* _info = malloc(sizeof(SFilterColInfo)); _info->type = RANGE_TYPE_UNIT; _info->info = u; taosArrayPush((SArray *)(colInfo), &_info);} while (0)
|
||||
#define FILTER_PUSH_RANGE(colInfo, cra) do { SFilterColInfo* _info = malloc(sizeof(SFilterColInfo)); _info->type = RANGE_TYPE_COL_RANGE; _info->info = cra; taosArrayPush((SArray *)(colInfo), &_info);} while (0)
|
||||
#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint16_t) * n); memcpy(*(dst), src, sizeof(uint16_t) * n);} while (0)
|
||||
|
||||
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
|
||||
#define FILTER_UNIT_LEFT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->left)
|
||||
#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right)
|
||||
|
|
|
@ -58,6 +58,14 @@ filter_desc_compare_func gDescCompare [FLD_TYPE_MAX] = {
|
|||
};
|
||||
|
||||
|
||||
static FORCE_INLINE int32_t filterCompareGroupCtx(const void *pLeft, const void *pRight) {
|
||||
SFilterGroupCtx *left = *((SFilterGroupCtx**)pLeft), *right = *((SFilterGroupCtx**)pRight);
|
||||
if (left->colNum > right->colNum) return 1;
|
||||
if (left->colNum < right->colNum) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t filterInitUnitsFields(SFilterInfo *info) {
|
||||
info->unitSize = FILTER_DEFAULT_UNIT_SIZE;
|
||||
info->units = calloc(info->unitSize, sizeof(SFilterUnit));
|
||||
|
@ -886,6 +894,28 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg) {
|
|||
}
|
||||
}
|
||||
|
||||
void filterFreeGroupCtx(SFilterGroupCtx* gRes) {
|
||||
if (gRes == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tfree(gRes->colIdx);
|
||||
|
||||
int16_t i = 0, j = 0;
|
||||
|
||||
while (i < gRes->colNum) {
|
||||
if (gRes->colInfo[j]) {
|
||||
taosArrayDestroy(gRes->colInfo[j]);
|
||||
++i;
|
||||
}
|
||||
|
||||
++j;
|
||||
}
|
||||
|
||||
tfree(gRes->colInfo);
|
||||
tfree(gRes);
|
||||
}
|
||||
|
||||
void filterFreeInfo(SFilterInfo *info) {
|
||||
CHK_RETV(info == NULL);
|
||||
|
||||
|
@ -1038,72 +1068,50 @@ int32_t filterAddUnitRange(SFilterInfo *info, SFilterUnit* u, SFilterRMCtx *ctx,
|
|||
|
||||
|
||||
|
||||
int32_t filterProcessUnitsInOneGroup(SFilterInfo *info, SFilterGroup* g, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum) {
|
||||
int32_t filterMergeBetweenColumns(SFilterInfo *info, SFilterGroupCtx* gRes, uint16_t colIdx) {
|
||||
bool isnull = false, notnull = false, isrange = false;
|
||||
int32_t num = 0;
|
||||
SFilterRMCtx *cur = NULL;
|
||||
SFilterUnit* u1 = FILTER_GROUP_UNIT(info, g, id1);
|
||||
SFilterUnit* u2 = FILTER_GROUP_UNIT(info, g, id2);
|
||||
uint8_t optr1 = FILTER_UNIT_OPTR(u1);
|
||||
uint8_t optr2 = FILTER_UNIT_OPTR(u2);
|
||||
uint16_t cidx = FILTER_UNIT_COL_IDX(u1);
|
||||
SArray* colArray = gRes->colInfo[colIdx];
|
||||
int32_t size = (int32_t)taosArrayGetSize(colArray);
|
||||
SFilterRMCtx* cur = filterInitMergeRange(type, 0);
|
||||
|
||||
int32_t type = FILTER_UNIT_DATA_TYPE(u1);
|
||||
|
||||
SET_AND_OPTR(optr1);
|
||||
SET_AND_OPTR(optr2);
|
||||
for (uint32_t i = 0; i < size; ++i) {
|
||||
SFilterColInfo* colInfo = taosArrayGet(colArray, i);
|
||||
SFilterUnit* u = colInfo->info;
|
||||
int32_t type = FILTER_UNIT_DATA_TYPE(u);
|
||||
uint8_t optr = FILTER_UNIT_OPTR(u);
|
||||
|
||||
SET_AND_OPTR(optr);
|
||||
CHK_JMP(CHK_AND_OPTR());
|
||||
|
||||
cur = filterInitMergeRange(type, 0);
|
||||
|
||||
if (!FILTER_NO_MERGE_OPTR(optr1)) {
|
||||
filterAddUnitRange(info, u1, cur, TSDB_RELATION_AND);
|
||||
}
|
||||
|
||||
if (!FILTER_NO_MERGE_OPTR(optr2)) {
|
||||
filterAddUnitRange(info, u2, cur, TSDB_RELATION_AND);
|
||||
CHK_JMP(MR_EMPTY_RES(cur));
|
||||
}
|
||||
|
||||
|
||||
for (int32_t i = id2 + 1; i < g->unitNum; ++i) {
|
||||
SFilterUnit* u = FILTER_GROUP_UNIT(info, g, i);
|
||||
if (cidx != FILTER_UNIT_COL_IDX(u)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
++(*removedNum);
|
||||
optr2 = FILTER_UNIT_OPTR(u);
|
||||
SET_AND_OPTR(optr2);
|
||||
CHK_JMP(CHK_AND_OPTR());
|
||||
|
||||
if (!FILTER_NO_MERGE_OPTR(optr2)) {
|
||||
if (!FILTER_NO_MERGE_OPTR(optr)) {
|
||||
filterAddUnitRange(info, u, cur, TSDB_RELATION_AND);
|
||||
CHK_JMP(MR_EMPTY_RES(cur));
|
||||
}
|
||||
}
|
||||
|
||||
SFilterColRange cra = {0};
|
||||
cra.idx = cidx;
|
||||
SFilterColRange *cra = calloc(1, sizeof(*cra));
|
||||
cra->idx = colIdx;
|
||||
|
||||
filterGetMergeRangeNum(cur, &num);
|
||||
assert(num == 1 || num == 0);
|
||||
|
||||
if (num == 1) {
|
||||
filterGetMergeRangeRes(cur, &cra.ra);
|
||||
filterPostProcessRange(cur, &cra.ra, &cra.notNull);
|
||||
filterGetMergeRangeRes(cur, &cra->ra);
|
||||
filterPostProcessRange(cur, &cra->ra, &cra->notNull);
|
||||
} else {
|
||||
if (isnull) {
|
||||
cra.isNull = true;
|
||||
cra->isNull = true;
|
||||
}
|
||||
|
||||
if (notnull) {
|
||||
cra.notNull = true;
|
||||
cra->notNull = true;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(res, &cra);
|
||||
taosArrayClear(colArray);
|
||||
|
||||
FILTER_PUSH_RANGE(colArray, cra);
|
||||
|
||||
filterFreeMergeRange(cur);
|
||||
|
||||
|
@ -1111,9 +1119,8 @@ int32_t filterProcessUnitsInOneGroup(SFilterInfo *info, SFilterGroup* g, uint16_
|
|||
|
||||
_err_return:
|
||||
|
||||
g->unitNum = 0;
|
||||
|
||||
*removedNum = g->unitNum;
|
||||
taosArrayDestroy(colArray);
|
||||
gRes->colInfo[colIdx] = NULL;
|
||||
|
||||
filterFreeMergeRange(cur);
|
||||
|
||||
|
@ -1122,97 +1129,76 @@ _err_return:
|
|||
}
|
||||
|
||||
|
||||
int32_t filterProcessUnits(SFilterInfo *info, SFilterGroupCtx*** res) {
|
||||
int32_t *col = NULL;
|
||||
SFilterGroupCtx *gctx = NULL;
|
||||
SArray *colRange = NULL;
|
||||
bool gresUsed = false;
|
||||
bool colInit = false;
|
||||
uint16_t removedNum = 0;
|
||||
int32_t filterMergeBetweenUnits(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t* gResNum) {
|
||||
bool emptyGroup = false;
|
||||
uint16_t *colIdx = malloc(info->fields[FLD_TYPE_COLUMN].num * sizeof(uint16_t));
|
||||
uint16_t colIdxi = 0;
|
||||
uint16_t gResIdx = 0;
|
||||
|
||||
for (uint16_t i = 0; i < info->groupNum; ++i) {
|
||||
SFilterGroup* g = info->groups + i;
|
||||
|
||||
if (g->unitNum <= 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
gresUsed = false;
|
||||
colInit = false;
|
||||
|
||||
removedNum = 0;
|
||||
gRes[gResIdx] = calloc(1, sizeof(SFilterGroupCtx));
|
||||
gRes[gResIdx]->colInfo = calloc(info->fields[FLD_TYPE_COLUMN].num, POINTER_BYTES);
|
||||
colIdxi = 0;
|
||||
emptyGroup = false;
|
||||
|
||||
for (uint16_t j = 0; j < g->unitNum; ++j) {
|
||||
SFilterUnit* u = FILTER_GROUP_UNIT(info, g, j);
|
||||
int32_t type = FILTER_UNIT_DATA_TYPE(u);
|
||||
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (colInit == false) {
|
||||
if (col == NULL) {
|
||||
col = calloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(int32_t));
|
||||
} else {
|
||||
memset(col, 0, info->fields[FLD_TYPE_COLUMN].num * sizeof(int32_t));
|
||||
}
|
||||
|
||||
colInit = true;
|
||||
}
|
||||
|
||||
uint16_t cidx = FILTER_UNIT_COL_IDX(u);
|
||||
|
||||
if (col[cidx] == 0) {
|
||||
col[cidx] = j + 1;
|
||||
} else if (col[cidx] == -1) {
|
||||
continue;
|
||||
} else {
|
||||
if (colRange == NULL) {
|
||||
colRange = taosArrayInit(4, sizeof(SFilterColRange));
|
||||
if (gRes[gResIdx]->colInfo[cidx] == NULL) {
|
||||
gRes[gResIdx]->colInfo[cidx] = (SArray *)taosArrayInit(4, POINTER_BYTES);
|
||||
colIdx[colIdxi++] = cidx;
|
||||
++gRes[gResIdx]->colNum;
|
||||
}
|
||||
|
||||
removedNum += 2;
|
||||
FILTER_PUSH_UNIT(gRes[gResIdx]->colInfo[cidx], u);
|
||||
}
|
||||
|
||||
filterProcessUnitsInOneGroup(info, g, col[cidx] - 1, j, colRange, &removedNum);
|
||||
if (colIdxi > 1) {
|
||||
qsort(colIdx, colIdxi, sizeof(uint16_t), compareUint16Val);
|
||||
}
|
||||
|
||||
col[cidx] = -1;
|
||||
for (uint16_t l = 0; l < colIdxi; ++l) {
|
||||
SArray* colArray = gRes[gResIdx]->colInfo[colIdx[l]];
|
||||
int32_t size = (int32_t)taosArrayGetSize(colArray);
|
||||
|
||||
if (g->unitNum == 0) {
|
||||
if (size <= 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SFilterColInfo* colInfo = taosArrayGet(colArray, 0);
|
||||
SFilterUnit* u = colInfo->info;
|
||||
int32_t type = FILTER_UNIT_DATA_TYPE(u);
|
||||
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
continue;
|
||||
}
|
||||
|
||||
filterMergeBetweenColumns(info, gRes[gResIdx], colIdx[l]);
|
||||
|
||||
if (gRes[gResIdx]->colInfo[colIdx[l]] == NULL) {
|
||||
emptyGroup = true;
|
||||
break;
|
||||
} else {
|
||||
gresUsed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (g->unitNum == 0) {
|
||||
if (gresUsed) {
|
||||
taosArrayClear(colRange);
|
||||
}
|
||||
if (emptyGroup) {
|
||||
filterFreeGroupCtx(gRes[gResIdx]);
|
||||
gRes[gResIdx] = NULL;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (gresUsed) {
|
||||
gctx = malloc(sizeof(*gctx));
|
||||
gctx->num = g->unitNum - removedNum + 1;
|
||||
gctx->col = col;
|
||||
gctx->colRange = colRange;
|
||||
|
||||
col = NULL;
|
||||
colRange = NULL;
|
||||
|
||||
if (*res == NULL) {
|
||||
*res = calloc(info->groupNum, sizeof(SFilterGroupCtx *));
|
||||
gRes[gResIdx]->colNum = colIdxi;
|
||||
FILTER_COPY_IDX(&gRes[gResIdx]->colIdx, colIdx, colIdxi);
|
||||
++gResIdx;
|
||||
}
|
||||
|
||||
(*res)[i] = gctx;
|
||||
}
|
||||
}
|
||||
tfree(colIdx);
|
||||
|
||||
tfree(col);
|
||||
if (colRange) {
|
||||
taosArrayDestroy(colRange);
|
||||
}
|
||||
*gResNum = gResIdx;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1220,7 +1206,7 @@ int32_t filterProcessUnits(SFilterInfo *info, SFilterGroupCtx*** res) {
|
|||
|
||||
|
||||
|
||||
int32_t filterProcessGroupsSameColumn(SFilterInfo *info, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum, SFilterGroupCtx** unitRes) {
|
||||
int32_t filterMergeTwoGroups(SFilterInfo *info, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum, SFilterGroupCtx** unitRes) {
|
||||
bool isnull = false, notnull = false;
|
||||
int32_t num = 0;
|
||||
SFilterColRange *cra = NULL;
|
||||
|
@ -1372,7 +1358,58 @@ _err_return:
|
|||
}
|
||||
|
||||
|
||||
int32_t filterProcessGroups(SFilterInfo *info, SFilterGroupCtx** unitRes, SFilterGroupCtx* groupRes) {
|
||||
int32_t filterMergeBetweenGroups(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t *gResNum) {
|
||||
if (*gResNum <= 1) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qsort(gRes, *gResNum, POINTER_BYTES, filterCompareGroupCtx);
|
||||
|
||||
int32_t pEnd = 0, cStart = 0, cEnd = 0;
|
||||
uint16_t pColNum = 0, cColNum = 0;
|
||||
int32_t movedNum = 0;
|
||||
|
||||
cColNum = gRes[0]->colNum;
|
||||
|
||||
for (int32_t i = 1; i < *gResNum; ++i) {
|
||||
if (gRes[i]->colNum == cColNum) {
|
||||
continue;
|
||||
}
|
||||
|
||||
cEnd = i - 1;
|
||||
|
||||
movedNum = 0;
|
||||
if (pColNum > 0) {
|
||||
for (int32_t m = 0; m <= pEnd; ++m) {
|
||||
for (int32_t n = cStart; n <= cEnd; ++n) {
|
||||
filterMergeTwoGroups(&gRes[m], &gRes[n]);
|
||||
|
||||
if (gRes[n] == NULL) {
|
||||
memmove(&gRes[n], &gRes[n+1], (*gResNum-n-1) * POINTER_BYTES);
|
||||
--cEnd;
|
||||
--(*gResNum);
|
||||
++movedNum;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t m = cStart; m <= cEnd; ++m) {
|
||||
for (int32_t n = cStart + 1; n <= cEnd; ++n) {
|
||||
filterMergeTwoGroups(&gRes[m], &gRes[n]);
|
||||
}
|
||||
}
|
||||
|
||||
pColNum = cColNum;
|
||||
pEnd = cEnd;
|
||||
|
||||
i -= movedNum;
|
||||
cStart = i;
|
||||
cEnd = i;
|
||||
cColNum = gRes[i]->colNum;
|
||||
}
|
||||
|
||||
|
||||
int32_t *col = NULL;
|
||||
uint16_t cidx;
|
||||
uint16_t removedNum = 0;
|
||||
|
@ -1462,10 +1499,7 @@ int32_t filterProcessGroups(SFilterInfo *info, SFilterGroupCtx** unitRes, SFilte
|
|||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_err_return:
|
||||
tfree(col);
|
||||
if (colRange) {
|
||||
taosArrayDestroy(colRange);
|
||||
}
|
||||
tfree(gColNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1656,12 +1690,13 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx* gctx, SFilterGroupCtx*
|
|||
|
||||
|
||||
int32_t filterPreprocess(SFilterInfo *info) {
|
||||
SFilterGroupCtx** unitsRes = NULL;
|
||||
SFilterGroupCtx** gRes = calloc(info->groupNum, sizeof(SFilterGroupCtx *));
|
||||
int32_t gResNum = 0;
|
||||
SFilterGroupCtx groupRes = {0};
|
||||
|
||||
filterProcessUnits(info, &unitsRes);
|
||||
filterMergeBetweenUnits(info, gRes, &gResNum);
|
||||
|
||||
filterProcessGroups(info, unitsRes, &groupRes);
|
||||
filterMergeBetweenGroups(info, gRes, gResNum);
|
||||
|
||||
if (FILTER_GET_FLAG(info->flags, FILTER_ALL)) {
|
||||
qInfo("Final - FilterInfo: [ALL]");
|
||||
|
@ -1673,8 +1708,6 @@ int32_t filterPreprocess(SFilterInfo *info) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
filterCheckRangeCoverage(info, &groupRes, unitsRes);
|
||||
|
||||
//TODO GET COLUMN RANGE
|
||||
|
||||
filterRewrite(info, &groupRes, unitsRes);
|
||||
|
|
|
@ -1247,7 +1247,7 @@ if $data00 != @21-05-05 18:19:04.000@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select * from stb1 where (c1 > 60 and c2 > 40) or (c1 > 62 and c2 > 50);
|
||||
xxx sql select * from stb1 where (c1 > 60 and c2 > 40) or (c1 > 62 and c2 > 50);
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
|
Loading…
Reference in New Issue