feature/qnode

This commit is contained in:
dapan1121 2022-02-23 18:32:10 +08:00
parent 3eec053ec6
commit fe1b4dcde0
5 changed files with 425 additions and 125 deletions

View File

@ -68,8 +68,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes));
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);

View File

@ -37,8 +37,8 @@ typedef struct SFilterColumnParam{
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param, filer_get_col_from_id fp);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param, filer_get_col_from_id fp);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);

View File

@ -26,6 +26,7 @@ extern "C" {
#include "scalar.h"
#include "querynodes.h"
#include "query.h"
#include "tep.h"
#define FILTER_DEFAULT_GROUP_SIZE 4
#define FILTER_DEFAULT_UNIT_SIZE 4
@ -199,7 +200,7 @@ typedef struct SFilterUnit {
} SFilterUnit;
typedef struct SFilterComUnit {
void *colData;
void *colData; // pointer to SColumnInfoData
void *valData;
void *valData2;
uint16_t colId;
@ -282,7 +283,7 @@ typedef struct SFilterInfo {
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
#define FLT_IS_COMPARISON_OPERATOR(_op) ((_op) >= OP_TYPE_GREATER_THAN && (_op) < OP_TYPE_IS_NOT_NULL)
#define FLT_IS_COMPARISON_OPERATOR(_op) ((_op) >= OP_TYPE_GREATER_THAN && (_op) < OP_TYPE_IS_NOT_UNKNOWN)
#define fltFatal(...) qFatal(__VA_ARGS__)
#define fltError(...) qError(__VA_ARGS__)
@ -305,7 +306,7 @@ typedef struct SFilterInfo {
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnRefNode *)((fi)->desc))->columnId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnRefNode *)((fi)->desc))->slotId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnRefNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SColumnRefNode *)((fi)->desc))->dataType.bytes * (ri))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) (colDataGet(((SColumnInfoData *)(fi)->data), (ri)))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)

View File

@ -914,7 +914,7 @@ int32_t filterAddFieldFromNode(SFilterInfo *info, SNode *node, SFilterFieldId *f
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (nodeType(node) != QUERY_NODE_COLUMN_REF && nodeType(node) != QUERY_NODE_VALUE) {
if (nodeType(node) != QUERY_NODE_COLUMN_REF && nodeType(node) != QUERY_NODE_VALUE && nodeType(node) != QUERY_NODE_NODE_LIST) {
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
@ -968,8 +968,10 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi
SFilterField *val = FILTER_UNIT_RIGHT_FIELD(info, u);
assert(FILTER_GET_FLAG(val->flag, FLD_TYPE_VALUE));
} else {
if(optr != OP_TYPE_IS_NULL && optr != OP_TYPE_IS_NOT_NULL && optr != FILTER_DUMMY_EMPTY_OPTR){
return -1;
int32_t paramNum = scalarGetOperatorParamNum(optr);
if (1 != paramNum) {
fltError("invalid right field in unit, operator:%s, rightType:%d", gOptrStr[optr].str, u->right.type);
return TSDB_CODE_QRY_APP_ERROR;
}
}
@ -1016,7 +1018,6 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
if (node->opType == OP_TYPE_IN && (!IS_VAR_DATA_TYPE(type))) {
SNodeListNode *listNode = (SNodeListNode *)node->pRight;
void *fdata = NULL;
SListCell *cell = listNode->pNodeList->pHead;
SScalarParam in = {.num = 1}, out = {.num = 1, .type = type};
@ -1036,7 +1037,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
len = tDataTypes[type].bytes;
filterAddField(info, NULL, &fdata, FLD_TYPE_VALUE, &right, len, true);
filterAddField(info, NULL, &out.data, FLD_TYPE_VALUE, &right, len, true);
filterAddUnit(info, OP_TYPE_EQUAL, &left, &right, &uidx);
@ -1337,6 +1338,8 @@ EDealRes fltTreeToGroup(SNode* pNode, void* pContext) {
return DEAL_RES_IGNORE_CHILD;
}
ctx->code = TSDB_CODE_QRY_APP_ERROR;
fltError("invalid condition type, type:%d", node->condType);
return DEAL_RES_ERROR;
@ -1736,7 +1739,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
for (uint32_t i = 0; i < info->unitNum; ++i) {
SFilterUnit* unit = &info->units[i];
if (unit->right.type != FLD_TYPE_VALUE) {
assert(unit->compare.optr == OP_TYPE_IS_NULL || unit->compare.optr == OP_TYPE_IS_NOT_NULL || unit->compare.optr == FILTER_DUMMY_EMPTY_OPTR);
assert(unit->compare.optr == FILTER_DUMMY_EMPTY_OPTR || scalarGetOperatorParamNum(unit->compare.optr) == 1);
continue;
}
@ -1792,13 +1795,15 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
}
if(type != TSDB_DATA_TYPE_JSON){
bool converted = false;
char extInfo = 0;
SScalarParam in = {.data = nodesGetValueFromNode(var), .num = 1, .type = dType->type, .bytes = dType->bytes};
SScalarParam out = {.data = fi->data, .num = 1, .type = type};
if (vectorConvertImpl(&in, &out)) {
qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION;
if (dType->type == type) {
assignVal(fi->data, nodesGetValueFromNode(var), dType->bytes, type);
} else {
SScalarParam in = {.data = nodesGetValueFromNode(var), .num = 1, .type = dType->type, .bytes = dType->bytes};
SScalarParam out = {.data = fi->data, .num = 1, .type = type};
if (vectorConvertImpl(&in, &out)) {
qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
}
@ -1809,7 +1814,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
int32_t len = taosUcs4ToMbs(varDataVal(fi->data), varDataLen(fi->data), varDataVal(newValData));
if (len < 0){
qError("filterInitValFieldData taosUcs4ToMbs error 1");
return TSDB_CODE_FAILED;
return TSDB_CODE_QRY_APP_ERROR;
}
varDataSetLen(newValData, len);
varDataCopy(fi->data, newValData);
@ -2565,7 +2570,8 @@ int32_t filterUpdateComUnits(SFilterInfo *info) {
for (uint32_t i = 0; i < info->unitNum; ++i) {
SFilterUnit *unit = &info->units[i];
info->cunits[i].colData = FILTER_UNIT_COL_DATA(info, unit, 0);
SFilterField *col = FILTER_UNIT_LEFT_FIELD(info, unit);
info->cunits[i].colData = col->data;
}
return TSDB_CODE_SUCCESS;
@ -2770,7 +2776,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
uint32_t unitNum = *(unitIdx++);
for (uint32_t u = 0; u < unitNum; ++u) {
SFilterComUnit *cunit = &info->cunits[*(unitIdx + u)];
void *colData = (char *)cunit->colData + cunit->dataSize * i;
void *colData = colDataGet((SColumnInfoData *)cunit->colData, i);
//if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx);
@ -2868,7 +2874,7 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i);
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){
if (!colData){ // for json->'key' is null
(*p)[i] = 1;
@ -2902,7 +2908,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i);
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){
if (!colData) { // for json->'key' is not null
@ -2929,7 +2935,6 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
uint16_t dataSize = info->cunits[0].dataSize;
char *colData = (char *)info->cunits[0].colData;
rangeCompFunc rfunc = gRangeCompare[info->cunits[0].rfunc];
void *valData = info->cunits[0].valData;
void *valData2 = info->cunits[0].valData2;
@ -2943,10 +2948,11 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
*p = calloc(numOfRows, sizeof(int8_t));
}
for (int32_t i = 0; i < numOfRows; ++i) {
for (int32_t i = 0; i < numOfRows; ++i) {
void *colData = colDataGet((SColumnInfoData *)info->cunits[0].colData, i);
if (colData == NULL || isNull(colData, info->cunits[0].dataType)) {
all = false;
colData += dataSize;
continue;
}
@ -2955,8 +2961,6 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
if ((*p)[i] == 0) {
all = false;
}
colData += dataSize;
}
return all;
@ -2976,7 +2980,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i);
if (colData == NULL || isNull(colData, info->cunits[uidx].dataType)) {
(*p)[i] = 0;
all = false;
@ -3027,7 +3031,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
for (uint32_t u = 0; u < group->unitNum; ++u) {
uint32_t uidx = group->unitIdxs[u];
SFilterComUnit *cunit = &info->cunits[uidx];
void *colData = (char *)cunit->colData + cunit->dataSize * i;
void *colData = colDataGet((SColumnInfoData *)(cunit->colData), i);
//if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx);
@ -3483,10 +3487,6 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
if (stat->scalarMode) {
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode) || QUERY_NODE_COLUMN_REF == nodeType(*pNode)) {
return DEAL_RES_CONTINUE;
}
@ -3570,18 +3570,19 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
}
int32_t fltOptimizeNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
//TODO
return TSDB_CODE_SUCCESS;
}
int32_t filterGetDataFromColId(void *param, int32_t id, void **data) {
int32_t fltGetDataFromColId(void *param, int32_t id, void **data) {
int32_t numOfCols = ((SFilterColumnParam *)param)->numOfCols;
SArray* pDataBlock = ((SFilterColumnParam *)param)->pDataBlock;
for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, j);
if (id == pColInfo->info.colId) {
*data = pColInfo->pData;
*data = pColInfo;
break;
}
}
@ -3589,13 +3590,28 @@ int32_t filterGetDataFromColId(void *param, int32_t id, void **data) {
return TSDB_CODE_SUCCESS;
}
int32_t fltGetDataFromSlotId(void *param, int32_t id, void **data) {
int32_t numOfCols = ((SFilterColumnParam *)param)->numOfCols;
SArray* pDataBlock = ((SFilterColumnParam *)param)->pDataBlock;
if (id < 0 || id >= numOfCols || id >= taosArrayGetSize(pDataBlock)) {
fltError("invalid slot id, id:%d, numOfCols:%d, arraySize:%d", id, numOfCols, (int32_t)taosArrayGetSize(pDataBlock));
return TSDB_CODE_QRY_APP_ERROR;
}
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, id);
*data = pColInfo;
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param, filer_get_col_from_id fp) {
return fltSetColFieldDataImpl(info, param, fp, false);
return TSDB_CODE_SUCCESS;
}
int32_t filterSetDataFromColId(SFilterInfo *info, void *param, filer_get_col_from_id fp) {
return fltSetColFieldDataImpl(info, param, fp, true);
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param) {
return fltSetColFieldDataImpl(info, param, fltGetDataFromSlotId, false);
}
int32_t filterSetDataFromColId(SFilterInfo *info, void *param) {
return fltSetColFieldDataImpl(info, param, fltGetDataFromColId, true);
}
@ -3623,6 +3639,8 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options)
SFltTreeStat stat = {0};
FLT_ERR_JRET(fltReviseNodes(info, &pNode, &stat));
info->scalarMode = stat.scalarMode;
if (!info->scalarMode) {
FLT_ERR_JRET(fltInitFromNode(pNode, info, options));
} else {
@ -3647,7 +3665,15 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pSrc, &output));
*p = output.data;
return TSDB_CODE_SUCCESS;
int8_t *r = output.data;
for (int32_t i = 0; i < output.num; ++i) {
if (0 == *(r+i)) {
return false;
}
}
return true;
}
return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols);

View File

@ -184,9 +184,26 @@ void flttMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) {
}
TEST(timerangeTest, greater) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL;
bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0};
int64_t tsmall = 222, tbig = 333;
flttMakeColRefNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall);
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode1, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
ASSERT_EQ(code, 0);
STimeWindow win = {0};
code = filterGetTimeRange(filter, &win);
ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall);
ASSERT_EQ(win.ekey, INT64_MAX);
}
TEST(timerangeTest, greater_and_lower) {
flttInitLogFile();
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL;
bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0};
@ -213,12 +230,12 @@ TEST(timerangeTest, greater_and_lower) {
ASSERT_EQ(win.ekey, tbig);
}
#if 0
TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {1, 2, 3, 4, 5};
double rightv= 2.5;
bool eRes[5] = {false, false, true, true, true};
int8_t eRes[5] = {0, 0, 1, 1, 1};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]);
@ -226,16 +243,97 @@ TEST(columnTest, smallint_column_greater_double_value) {
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_DOUBLE, &rightv);
flttMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pLeft, pRight);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BOOL);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
SColumnDataAgg stat = {0};
stat.colId = ((SColumnRefNode *)pLeft)->columnId;
stat.max = 10;
stat.min = 5;
stat.numOfNull = 0;
bool keep = filterRangeExecute(filter, &stat, 1, rowNum);
ASSERT_EQ(keep, true);
stat.max = 1;
stat.min = -1;
keep = filterRangeExecute(filter, &stat, 1, rowNum);
ASSERT_EQ(keep, true);
stat.max = 10;
stat.min = 5;
stat.numOfNull = rowNum;
keep = filterRangeExecute(filter, &stat, 1, rowNum);
ASSERT_EQ(keep, true);
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((bool *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(columnTest, int_column_greater_smallint_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int32_t leftv[5]= {1, 3, 5, 7, 9};
int16_t rightv= 4;
int8_t eRes[5] = {0, 0, 1, 1, 1};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]);
flttMakeColRefNode(&pLeft, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, leftv);
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_SMALLINT, &rightv);
flttMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pLeft, pRight);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
SColumnDataAgg stat = {0};
stat.colId = ((SColumnRefNode *)pLeft)->columnId;
stat.max = 10;
stat.min = 5;
stat.numOfNull = 0;
bool keep = filterRangeExecute(filter, &stat, 1, rowNum);
ASSERT_EQ(keep, true);
stat.max = 1;
stat.min = -1;
keep = filterRangeExecute(filter, &stat, 1, rowNum);
ASSERT_EQ(keep, false);
stat.max = 10;
stat.min = 5;
stat.numOfNull = rowNum;
keep = filterRangeExecute(filter, &stat, 1, rowNum);
ASSERT_EQ(keep, false);
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(columnTest, int_column_in_double_list) {
SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL;
int32_t leftv[5] = {1, 2, 3, 4, 5};
@ -254,17 +352,32 @@ TEST(columnTest, int_column_in_double_list) {
nodesListAppend(list, pRight);
flttMakeListNode(&listNode,list, TSDB_DATA_TYPE_INT);
flttMakeOpNode(&opNode, OP_TYPE_IN, TSDB_DATA_TYPE_BOOL, pLeft, listNode);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BOOL);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((bool *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(columnTest, binary_column_in_binary_list) {
SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL;
bool eRes[5] = {true, true, false, false, false};
@ -303,16 +416,28 @@ TEST(columnTest, binary_column_in_binary_list) {
flttMakeListNode(&listNode,list, TSDB_DATA_TYPE_BINARY);
flttMakeOpNode(&opNode, OP_TYPE_IN, TSDB_DATA_TYPE_BOOL, pLeft, listNode);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BOOL);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((bool *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(columnTest, binary_column_like_binary) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
char rightv[64] = {0};
@ -336,13 +461,24 @@ TEST(columnTest, binary_column_like_binary) {
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, rightv);
flttMakeOpNode(&opNode, OP_TYPE_LIKE, TSDB_DATA_TYPE_BOOL, pLeft, pRight);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BOOL);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((bool *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
@ -368,13 +504,24 @@ TEST(columnTest, binary_column_is_null) {
flttMakeOpNode(&opNode, OP_TYPE_IS_NULL, TSDB_DATA_TYPE_BOOL, pLeft, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BOOL);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((bool *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
@ -399,51 +546,101 @@ TEST(columnTest, binary_column_is_not_null) {
flttMakeOpNode(&opNode, OP_TYPE_IS_NOT_NULL, TSDB_DATA_TYPE_BOOL, pLeft, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BOOL);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((bool *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(logicTest, and_or_and) {
}
TEST(logicTest, or_and_or) {
}
TEST(opTest, smallint_column_greater_int_column) {
flttInitLogFile();
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5] = {1, -6, -2, 11, 101};
int32_t rightv[5]= {0, -5, -4, 23, 100};
bool eRes[5] = {true, false, true, false, true};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColRefNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeColRefNode(&pRight, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, rightv);
flttMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pLeft, pRight);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(opTest, smallint_value_add_int_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int32_t leftv = 1;
int16_t rightv[5]= {0, -5, -4, 23, 100};
double eRes[5] = {1.0, -4, -3, 24, 101};
int16_t rightv[5]= {0, -1, -4, -1, 100};
bool eRes[5] = {true, false, true, false, true};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeValueNode(&pLeft, TSDB_DATA_TYPE_INT, &leftv);
flttMakeColRefNode(&pRight, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, rightv);
flttMakeOpNode(&opNode, OP_TYPE_ADD, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
flttMakeOpNode(&opNode, OP_TYPE_IS_TRUE, TSDB_DATA_TYPE_BOOL, opNode, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((double *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(opTest, bigint_column_multi_binary_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int64_t leftv[5]= {1, 2, 3, 4, 5};
@ -453,21 +650,33 @@ TEST(opTest, bigint_column_multi_binary_column) {
rightv[i][4] = '0' + i;
varDataSetLen(rightv[i], 3);
}
double eRes[5] = {0, 2, 6, 12, 20};
bool eRes[5] = {false, true, true, true, true};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColRefNode(&pLeft, &src, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), rowNum, leftv);
flttMakeColRefNode(&pRight, &src, TSDB_DATA_TYPE_BINARY, 5, rowNum, rightv);
flttMakeOpNode(&opNode, OP_TYPE_MULTI, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
flttMakeOpNode(&opNode, OP_TYPE_IS_TRUE, TSDB_DATA_TYPE_BOOL, opNode, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((double *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
@ -480,68 +689,107 @@ TEST(opTest, smallint_column_and_binary_column) {
rightv[i][4] = '0' + i;
varDataSetLen(rightv[i], 3);
}
int64_t eRes[5] = {0, 0, 2, 0, 4};
bool eRes[5] = {false, false, true, false, true};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColRefNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeColRefNode(&pRight, &src, TSDB_DATA_TYPE_BINARY, 5, rowNum, rightv);
flttMakeOpNode(&opNode, OP_TYPE_BIT_AND, TSDB_DATA_TYPE_BIGINT, pLeft, pRight);
flttMakeOpNode(&opNode, OP_TYPE_IS_TRUE, TSDB_DATA_TYPE_BOOL, opNode, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BIGINT);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((int64_t *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(opTest, smallint_column_or_float_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {1, 2, 3, 4, 5};
float rightv[5]= {2.0, 3.0, 4.1, 5.2, 6.0};
int64_t eRes[5] = {3, 3, 7, 5, 7};
int16_t leftv[5]= {1, 2, 0, 4, 5};
float rightv[5]= {2.0, 3.0, 0, 5.2, 6.0};
bool eRes[5] = {true, true, false, true, true};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColRefNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeColRefNode(&pRight, &src, TSDB_DATA_TYPE_FLOAT, sizeof(float), rowNum, rightv);
flttMakeOpNode(&opNode, OP_TYPE_BIT_OR, TSDB_DATA_TYPE_BIGINT, pLeft, pRight);
flttMakeOpNode(&opNode, OP_TYPE_IS_TRUE, TSDB_DATA_TYPE_BOOL, opNode, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BIGINT);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((int64_t *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(opTest, smallint_column_or_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {1, 2, 3, 4, 5};
int16_t leftv[5]= {0, 2, 3, 0, -1};
double rightv= 10.2;
int64_t eRes[5] = {11, 10, 11, 14, 15};
bool eRes[5] = {true, true, true, true, true};
SSDataBlock *src = NULL;
SScalarParam res = {0};
int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]);
flttMakeColRefNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_DOUBLE, &rightv);
flttMakeOpNode(&opNode, OP_TYPE_BIT_OR, TSDB_DATA_TYPE_BIGINT, pLeft, pRight);
flttMakeOpNode(&opNode, OP_TYPE_IS_TRUE, TSDB_DATA_TYPE_BOOL, opNode, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BIGINT);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, true);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((int64_t *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
TEST(opTest, binary_column_is_true) {
SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0};
@ -561,15 +809,40 @@ TEST(opTest, binary_column_is_true) {
flttMakeOpNode(&opNode, OP_TYPE_IS_TRUE, TSDB_DATA_TYPE_BOOL, pLeft, NULL);
int32_t code = scalarCalculate(opNode, src, &res);
SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode, &filter, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.num, rowNum);
ASSERT_EQ(res.type, TSDB_DATA_TYPE_BOOL);
ASSERT_EQ(res.bytes, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
SColumnDataAgg stat = {0};
SFilterColumnParam param = {.numOfCols= src->info.numOfCols, .pDataBlock = src->pDataBlock};
code = filterSetDataFromSlotId(filter, &param);
ASSERT_EQ(code, 0);
stat.max = 5;
stat.min = 1;
stat.numOfNull = 0;
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, src, &rowRes, &stat, src->info.numOfCols);
ASSERT_EQ(keep, false);
for (int32_t i = 0; i < rowNum; ++i) {
ASSERT_EQ(*((bool *)res.data + i), eRes[i]);
ASSERT_EQ(*((int8_t *)rowRes + i), eRes[i]);
}
}
#if 0
TEST(logicTest, and_or_and) {
}
TEST(logicTest, or_and_or) {
}
#endif
int main(int argc, char** argv) {