Merge remote-tracking branch 'origin/3.0' into feature/shm
This commit is contained in:
commit
9eb8b4eb5c
|
@ -197,6 +197,11 @@ typedef struct SGroupbyExpr {
|
|||
bool groupbyTag; // group by tag or column
|
||||
} SGroupbyExpr;
|
||||
|
||||
enum {
|
||||
FUNC_PARAM_TYPE_VALUE = 0,
|
||||
FUNC_PARAM_TYPE_COLUMN,
|
||||
};
|
||||
|
||||
typedef struct SFunctParam {
|
||||
int32_t type;
|
||||
SColumn* pCol;
|
||||
|
|
|
@ -241,7 +241,6 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
|
|||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
}
|
||||
}
|
||||
|
||||
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
|
||||
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
|
||||
|
||||
|
@ -1200,8 +1199,19 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
|
|||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].currentStage = MAIN_SCAN;
|
||||
|
||||
SExprInfo expr = pOperator->pExpr[i];
|
||||
for (int32_t j = 0; j < expr.base.numOfParams; ++j) {
|
||||
SFunctParam *pFuncParam = &expr.base.pParam[j];
|
||||
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||
int32_t slotId = pFuncParam->pCol->slotId;
|
||||
pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
pCtx[i].input.totalRows = pBlock->info.rows;
|
||||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||
pCtx[i].input.startRowIndex = 0;
|
||||
ASSERT(pCtx[i].input.pData[j] != NULL);
|
||||
}
|
||||
}
|
||||
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
|
||||
int32_t slotId = pOperator->pExpr[i].base.pParam[0].pCol->slotId;
|
||||
|
||||
// uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag;
|
||||
// if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
|
||||
|
@ -1219,12 +1229,11 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
|
|||
// }
|
||||
|
||||
// in case of the block distribution query, the inputBytes is not a constant value.
|
||||
pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
pCtx[i].input.totalRows = pBlock->info.rows;
|
||||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||
pCtx[i].input.startRowIndex = 0;
|
||||
//pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
//pCtx[i].input.totalRows = pBlock->info.rows;
|
||||
//pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||
//pCtx[i].input.startRowIndex = 0;
|
||||
|
||||
ASSERT(pCtx[i].input.pData[0] != NULL);
|
||||
|
||||
// uint32_t status = aAggs[pCtx[i].functionId].status;
|
||||
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
|
||||
|
@ -1282,15 +1291,17 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
|
|||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||
ASSERT(!fmIsAggFunc(pCtx->functionId));
|
||||
|
||||
SScalarParam p = {.numOfRows = pSrcBlock->info.rows};
|
||||
int32_t slotId = pExpr[k].base.pParam[0].pCol->slotId;
|
||||
p.columnData = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
||||
taosArrayPush(pBlockList, &pSrcBlock);
|
||||
|
||||
SScalarParam dest = {0};
|
||||
dest.columnData = taosArrayGet(pResult->pDataBlock, k);
|
||||
pCtx[k].sfp.process(&p, 1, &dest);
|
||||
|
||||
scalarCalculate((SNode *)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||
pResult->info.rows = dest.numOfRows;
|
||||
|
||||
taosArrayDestroy(pBlockList);
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -8568,24 +8579,24 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
pExp->pExpr->_function.num = 1;
|
||||
pExp->pExpr->_function.functionId = -1;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
|
||||
// it is a project query, or group by column
|
||||
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
|
||||
SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
|
||||
SDataType* pType = &pColNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||
pType->precision, pColNode->colName);
|
||||
pCol->slotId = pColNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
|
||||
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
pCol->slotId = pColNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
|
||||
|
@ -8602,31 +8613,51 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
|
||||
// TODO: value parameter needs to be handled
|
||||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = numOfParam;
|
||||
|
||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
||||
SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
|
||||
if (p1->type == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
|
||||
|
||||
pCol->slotId = pcn->slotId;
|
||||
pCol->bytes = pcn->node.resType.bytes;
|
||||
pCol->type = pcn->node.resType.type;
|
||||
pCol->scale = pcn->node.resType.scale;
|
||||
pCol->precision = pcn->node.resType.precision;
|
||||
pCol->dataBlockId = pcn->dataBlockId;
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
SColumn* pCol = pExp->base.pParam[j].pCol;
|
||||
|
||||
pCol->slotId = pcn->slotId;
|
||||
pCol->bytes = pcn->node.resType.bytes;
|
||||
pCol->type = pcn->node.resType.type;
|
||||
pCol->scale = pcn->node.resType.scale;
|
||||
pCol->precision = pcn->node.resType.precision;
|
||||
pCol->dataBlockId = pcn->dataBlockId;
|
||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||
SValueNode* pvn = (SValueNode*)p1;
|
||||
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||
}
|
||||
}
|
||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
||||
SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
|
||||
SDataType* pType = &pNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||
pType->precision, pNode->node.aliasName);
|
||||
|
||||
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
||||
|
||||
pCol->slotId = pTargetNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
pCol->slotId = pTargetNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -173,7 +173,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "power",
|
||||
.name = "pow",
|
||||
.type = FUNCTION_TYPE_POW,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
|
@ -409,13 +409,29 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
|||
// todo
|
||||
break;
|
||||
|
||||
case FUNCTION_TYPE_ABS: {
|
||||
case FUNCTION_TYPE_ABS:
|
||||
case FUNCTION_TYPE_CEIL:
|
||||
case FUNCTION_TYPE_FLOOR:
|
||||
case FUNCTION_TYPE_ROUND: {
|
||||
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
int32_t paraType = pParam->node.resType.type;
|
||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType };
|
||||
break;
|
||||
}
|
||||
|
||||
case FUNCTION_TYPE_SIN:
|
||||
case FUNCTION_TYPE_COS:
|
||||
case FUNCTION_TYPE_TAN:
|
||||
case FUNCTION_TYPE_ASIN:
|
||||
case FUNCTION_TYPE_ACOS:
|
||||
case FUNCTION_TYPE_ATAN:
|
||||
case FUNCTION_TYPE_SQRT:
|
||||
case FUNCTION_TYPE_LOG:
|
||||
case FUNCTION_TYPE_POW: {
|
||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE };
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
ASSERT(0); // to found the fault ASAP.
|
||||
}
|
||||
|
|
|
@ -107,94 +107,13 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
#if 0
|
||||
if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
char **input = NULL, *output = NULL;
|
||||
bool hasNullInput = false;
|
||||
input = taosMemoryCalloc(inputNum, sizeof(char *));
|
||||
for (int32_t i = 0; i < pOutput->num; ++i) {
|
||||
for (int32_t j = 0; j < inputNum; ++j) {
|
||||
if (pInput[j].num == 1) {
|
||||
input[j] = pInput[j].data;
|
||||
} else {
|
||||
input[j] = pInput[j].data + i * pInput[j].bytes;
|
||||
}
|
||||
if (isNull(input[j], pInput[j].type)) {
|
||||
hasNullInput = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
output = pOutput->data + i * pOutput->bytes;
|
||||
|
||||
if (hasNullInput) {
|
||||
setNull(output, pOutput->type, pOutput->bytes);
|
||||
continue;
|
||||
}
|
||||
|
||||
double base;
|
||||
GET_TYPED_DATA(base, double, pInput[1].type, input[1]);
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, pInput[0].type, input[0]);
|
||||
double result = log(v) / log(base);
|
||||
SET_TYPED_DATA(output, pOutput->type, result);
|
||||
}
|
||||
|
||||
taosMemoryFree(input);
|
||||
#endif
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
#if 0
|
||||
if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pOutput->type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pOutput->bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||
|
||||
char **input = NULL, *output = NULL;
|
||||
bool hasNullInput = false;
|
||||
input = taosMemoryCalloc(inputNum, sizeof(char *));
|
||||
for (int32_t i = 0; i < pOutput->num; ++i) {
|
||||
for (int32_t j = 0; j < inputNum; ++j) {
|
||||
if (pInput[j].num == 1) {
|
||||
input[j] = pInput[j].data;
|
||||
} else {
|
||||
input[j] = pInput[j].data + i * pInput[j].bytes;
|
||||
}
|
||||
if (isNull(input[j], pInput[j].type)) {
|
||||
hasNullInput = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
output = pOutput->data + i * pOutput->bytes;
|
||||
|
||||
if (hasNullInput) {
|
||||
setNull(output, pOutput->type, pOutput->bytes);
|
||||
continue;
|
||||
}
|
||||
|
||||
double base;
|
||||
GET_TYPED_DATA(base, double, pInput[1].type, input[1]);
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, pInput[0].type, input[0]);
|
||||
double result = pow(v, base);
|
||||
SET_TYPED_DATA(output, pOutput->type, result);
|
||||
}
|
||||
|
||||
taosMemoryFree(input);
|
||||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef float (*_float_fn)(float);
|
||||
typedef double (*_double_fn)(double);
|
||||
typedef double (*_double_fn_2)(double, double);
|
||||
|
||||
double tlog(double v, double base) {
|
||||
return log(v) / log(base);
|
||||
}
|
||||
|
||||
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
|
||||
int32_t type = GET_PARAM_TYPE(pInput);
|
||||
|
@ -221,6 +140,35 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
|
||||
if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SColumnInfoData *pInputData[2];
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
_getDoubleValue_fn_t getValueFn[2];
|
||||
|
||||
for (int32_t i = 0; i < inputNum; ++i) {
|
||||
pInputData[i] = pInput[i].columnData;
|
||||
getValueFn[i]= getVectorDoubleValueFn(GET_PARAM_TYPE(&pInput[i]));
|
||||
}
|
||||
|
||||
double *out = (double *)pOutputData->pData;
|
||||
|
||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||
if (colDataIsNull_f(pInputData[0]->nullbitmap, i) ||
|
||||
colDataIsNull_f(pInputData[1]->nullbitmap, 0)) {
|
||||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0));
|
||||
}
|
||||
|
||||
pOutput->numOfRows = pInput->numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
|
||||
int32_t type = GET_PARAM_TYPE(pInput);
|
||||
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
||||
|
@ -292,6 +240,14 @@ int32_t acosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
|||
return doScalarFunctionUnique(pInput, inputNum, pOutput, acos);
|
||||
}
|
||||
|
||||
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return doScalarFunctionUnique2(pInput, inputNum, pOutput, pow);
|
||||
}
|
||||
|
||||
int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return doScalarFunctionUnique2(pInput, inputNum, pOutput, tlog);
|
||||
}
|
||||
|
||||
int32_t sqrtFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return doScalarFunctionUnique(pInput, inputNum, pOutput, sqrt);
|
||||
}
|
||||
|
|
|
@ -2833,11 +2833,11 @@ TEST(ScalarFunctionTest, logFunction_column) {
|
|||
int32_t rowNum = 3;
|
||||
int32_t type;
|
||||
int32_t otype = TSDB_DATA_TYPE_DOUBLE;
|
||||
double result[] = {2.0, 4.0, 3.0};
|
||||
double result[] = {2.0, 3.0, 4.0};
|
||||
pInput = (SScalarParam *)taosMemoryCalloc(2, sizeof(SScalarParam));
|
||||
|
||||
//TINYINT
|
||||
int8_t val_tinyint[2][3] = {{25, 81, 64}, {5, 3, 4}};
|
||||
int8_t val_tinyint[2][3] = {{9, 27, 81}, {3, 3, 3}};
|
||||
type = TSDB_DATA_TYPE_TINYINT;
|
||||
for (int32_t i = 0; i < 2; ++i) {
|
||||
scltMakeDataBlock(&input[i], type, 0, rowNum, false);
|
||||
|
@ -2863,7 +2863,7 @@ TEST(ScalarFunctionTest, logFunction_column) {
|
|||
scltDestroyDataBlock(pOutput);
|
||||
|
||||
//FLOAT
|
||||
float val_float[2][3] = {{25.0, 81.0, 64.0}, {5.0, 3.0, 4.0}};
|
||||
float val_float[2][3] = {{9.0, 27.0, 81.0}, {3.0, 3.0, 3.0}};
|
||||
type = TSDB_DATA_TYPE_FLOAT;
|
||||
for (int32_t i = 0; i < 2; ++i) {
|
||||
scltMakeDataBlock(&input[i], type, 0, rowNum, false);
|
||||
|
@ -2888,8 +2888,8 @@ TEST(ScalarFunctionTest, logFunction_column) {
|
|||
scltDestroyDataBlock(pOutput);
|
||||
|
||||
//TINYINT AND FLOAT
|
||||
int8_t param0[] = {25, 81, 64};
|
||||
float param1[] = {5.0, 3.0, 4.0};
|
||||
int8_t param0[] = {9, 27, 81};
|
||||
float param1[] = {3.0, 3.0, 3.0};
|
||||
scltMakeDataBlock(&input[0], TSDB_DATA_TYPE_TINYINT, 0, rowNum, false);
|
||||
pInput[0] = *input[0];
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
|
@ -3003,17 +3003,17 @@ TEST(ScalarFunctionTest, powFunction_column) {
|
|||
int32_t rowNum = 3;
|
||||
int32_t type;
|
||||
int32_t otype = TSDB_DATA_TYPE_DOUBLE;
|
||||
double result[] = {32.0, 27.0, 16.0};
|
||||
double result[] = {8.0, 27.0, 64.0};
|
||||
pInput = (SScalarParam *)taosMemoryCalloc(2, sizeof(SScalarParam));
|
||||
|
||||
//TINYINT
|
||||
int8_t val_tinyint[2][3] = {{2, 3, 4}, {5, 3, 2}};
|
||||
int8_t val_tinyint[2][3] = {{2, 3, 4}, {3, 3, 3}};
|
||||
type = TSDB_DATA_TYPE_TINYINT;
|
||||
for (int32_t i = 0; i < 2; ++i) {
|
||||
scltMakeDataBlock(&input[i], type, 0, rowNum, false);
|
||||
pInput[i] = *input[i];
|
||||
for (int32_t j = 0; j < rowNum; ++j) {
|
||||
colDataAppend(pInput[i].columnData, i, (const char*) &val_tinyint[i][j], false);
|
||||
colDataAppend(pInput[i].columnData, j, (const char*) &val_tinyint[i][j], false);
|
||||
|
||||
}
|
||||
PRINTF("tiny_int before POW:%d,%d,%d\n", *((int8_t *)pInput[i].data + 0),
|
||||
|
@ -3034,7 +3034,7 @@ TEST(ScalarFunctionTest, powFunction_column) {
|
|||
scltDestroyDataBlock(pOutput);
|
||||
|
||||
//FLOAT
|
||||
float val_float[2][3] = {{2.0, 3.0, 4.0}, {5.0, 3.0, 2.0}};
|
||||
float val_float[2][3] = {{2.0, 3.0, 4.0}, {3.0, 3.0, 3.0}};
|
||||
type = TSDB_DATA_TYPE_FLOAT;
|
||||
for (int32_t i = 0; i < 2; ++i) {
|
||||
scltMakeDataBlock(&input[i], type, 0, rowNum, false);
|
||||
|
@ -3060,7 +3060,7 @@ TEST(ScalarFunctionTest, powFunction_column) {
|
|||
|
||||
//TINYINT AND FLOAT
|
||||
int8_t param0[] = {2, 3, 4};
|
||||
float param1[] = {5.0, 3.0, 2.0};
|
||||
float param1[] = {3.0, 3.0, 2.0};
|
||||
scltMakeDataBlock(&input[0], TSDB_DATA_TYPE_TINYINT, 0, rowNum, false);
|
||||
pInput[0] = *input[0];
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
|
|
|
@ -37,6 +37,6 @@
|
|||
|
||||
# ---- tmq
|
||||
./test.sh -f tsim/tmq/basic.sim
|
||||
#./test.sh -f tsim/tmq/basic1.sim
|
||||
./test.sh -f tsim/tmq/basic1.sim
|
||||
|
||||
#======================b1-end===============
|
||||
|
|
|
@ -107,80 +107,54 @@ print rows: $rows
|
|||
print $data00 $data01 $data02 $data03
|
||||
print $data10 $data11 $data12 $data13
|
||||
print $data20 $data21 $data22 $data23
|
||||
if $row != 20 then
|
||||
return -1
|
||||
print $data80 $data81 $data82 $data83
|
||||
print $data90 $data91 $data92 $data93
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#if $data00 != 10 then
|
||||
# return -1
|
||||
#endi
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#if $data10 != 10 then
|
||||
# return -1
|
||||
#endi
|
||||
if $data11 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select first(ts),c1 from group_tb0 where c1<20 group by c1;
|
||||
if $row != 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @70-01-01 08:01:40.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != @70-01-01 08:01:40.009@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#if $data90 != 10 then
|
||||
# return -1
|
||||
#endi
|
||||
if $data91 != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select first(ts), ts, c1 from group_tb0 where c1 < 20 group by c1;
|
||||
print $row
|
||||
if $row != 20 then
|
||||
sql select first(ts),c1 from group_tb0 group by c1;
|
||||
print rows: $rows
|
||||
print $data00 $data01 $data02 $data03
|
||||
print $data10 $data11 $data12 $data13
|
||||
print $data20 $data21 $data22 $data23
|
||||
print $data80 $data81 $data82 $data83
|
||||
print $data90 $data91 $data92 $data93
|
||||
if $row != 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != $data01 then
|
||||
if $data00 != @2022-01-01 00:00:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data90 != @2022-01-01 00:00:00.009@ then
|
||||
return -1
|
||||
endi
|
||||
if $data91 != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != $data11 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data20 != $data21 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != $data91 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data92 != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1;
|
||||
if $row != 20 then
|
||||
|
|
|
@ -26,4 +26,4 @@ run tsim/show/basic.sim
|
|||
run tsim/table/basic1.sim
|
||||
|
||||
run tsim/tmq/basic.sim
|
||||
#run tsim/tmq/basic1.sim
|
||||
run tsim/tmq/basic1.sim
|
||||
|
|
|
@ -3,9 +3,10 @@
|
|||
# vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
system sh/stop_dnodes.sh
|
||||
|
||||
|
@ -135,42 +136,42 @@ print inserted totalMsgCnt: $totalMsgCnt
|
|||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
|
||||
print cmd result----> $system_content
|
||||
if $system_content != @{consume success: 20}@ then
|
||||
if $system_content != @{consume success: 20, 0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
|
||||
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
|
||||
#print cmd result----> $system_content
|
||||
#if $system_content != @{consume success: 20}@ then
|
||||
#if $system_content != @{consume success: 20, 0}@ then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
|
||||
print cmd result----> $system_content
|
||||
if $system_content != @{consume success: 10}@ then
|
||||
if $system_content != @{consume success: 10, 0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
||||
print cmd result----> $system_content
|
||||
if $system_content != @{consume success: 10}@ then
|
||||
if $system_content != @{consume success: 10, 0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
||||
print cmd result----> $system_content
|
||||
if $system_content != @{consume success: 20}@ then
|
||||
if $system_content != @{consume success: 20, 0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
||||
print cmd result----> $system_content
|
||||
if $system_content != @{consume success: 20}@ then
|
||||
if $system_content != @{consume success: 20, 0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -219,33 +219,33 @@ tmq_list_t* build_topic_list() {
|
|||
return topic_list;
|
||||
}
|
||||
|
||||
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
void loop_consume(tmq_t* tmq) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
//int64_t startTime = taosGetTimestampUs();
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
|
||||
if (tmqmessage) {
|
||||
totalMsgs++;
|
||||
skipLogNum += tmqGetSkipLogNum(tmqmessage);
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 1);
|
||||
if (tmqMsg) {
|
||||
totalMsgs++;
|
||||
|
||||
#if 0
|
||||
TAOS_ROW row;
|
||||
while (NULL != (row = tmq_get_row(tmqMsg))) {
|
||||
totalRows++;
|
||||
}
|
||||
tmq_message_destroy(tmqmessage);
|
||||
#endif
|
||||
|
||||
skipLogNum += tmqGetSkipLogNum(tmqMsg);
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
msg_process(tmqMsg);
|
||||
}
|
||||
tmq_message_destroy(tmqMsg);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
//int64_t endTime = taosGetTimestampUs();
|
||||
//double consumeTime = (double)(endTime - startTime) / 1000000;
|
||||
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err) {
|
||||
|
@ -253,7 +253,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
exit(-1);
|
||||
}
|
||||
|
||||
printf("{consume success: %d}", totalMsgs);
|
||||
printf("{consume success: %d, %d}", totalMsgs, totalRows);
|
||||
}
|
||||
|
||||
int main(int32_t argc, char *argv[]) {
|
||||
|
@ -266,7 +266,21 @@ int main(int32_t argc, char *argv[]) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
perf_loop(tmq, topic_list);
|
||||
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
|
||||
if (err) {
|
||||
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
loop_consume(tmq);
|
||||
|
||||
#if 0
|
||||
err = tmq_unsubscribe(tmq);
|
||||
if (err) {
|
||||
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue