feat(query): add irate function
This commit is contained in:
parent
55c524e811
commit
d2059e8079
|
@ -101,6 +101,11 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
int32_t derivativeFunction(SqlFunctionCtx *pCtx);
|
int32_t derivativeFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
|
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
|
int32_t irateFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
|
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
|
||||||
|
|
|
@ -978,6 +978,21 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
|
||||||
|
if (!IS_NUMERIC_TYPE(colType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// first(col_list) will be rewritten as first(col)
|
// first(col_list) will be rewritten as first(col)
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
@ -1796,6 +1811,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = derivativeFunction,
|
.processFunc = derivativeFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "irate",
|
||||||
|
.type = FUNCTION_TYPE_IRATE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateIrate,
|
||||||
|
.getEnvFunc = getIrateFuncEnv,
|
||||||
|
.initFunc = irateFuncSetup,
|
||||||
|
.processFunc = irateFunction,
|
||||||
|
.finalizeFunc = irateFinalize
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "last_row",
|
.name = "last_row",
|
||||||
.type = FUNCTION_TYPE_LAST_ROW,
|
.type = FUNCTION_TYPE_LAST_ROW,
|
||||||
|
|
|
@ -59,6 +59,12 @@ typedef struct STuplePos {
|
||||||
int32_t offset;
|
int32_t offset;
|
||||||
} STuplePos;
|
} STuplePos;
|
||||||
|
|
||||||
|
typedef struct SMinmaxResInfo {
|
||||||
|
bool assign; // assign the first value or not
|
||||||
|
int64_t v;
|
||||||
|
STuplePos tuplePos;
|
||||||
|
} SMinmaxResInfo;
|
||||||
|
|
||||||
typedef struct STopBotResItem {
|
typedef struct STopBotResItem {
|
||||||
SVariant v;
|
SVariant v;
|
||||||
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
||||||
|
@ -148,6 +154,12 @@ typedef struct SElapsedInfo {
|
||||||
int64_t timeUnit;
|
int64_t timeUnit;
|
||||||
} SElapsedInfo;
|
} SElapsedInfo;
|
||||||
|
|
||||||
|
typedef struct STwaInfo {
|
||||||
|
double dOutput;
|
||||||
|
SPoint1 p;
|
||||||
|
STimeWindow win;
|
||||||
|
} STwaInfo;
|
||||||
|
|
||||||
typedef struct SHistoFuncBin {
|
typedef struct SHistoFuncBin {
|
||||||
double lower;
|
double lower;
|
||||||
double upper;
|
double upper;
|
||||||
|
@ -234,6 +246,22 @@ typedef struct SUniqueInfo {
|
||||||
char pItems[];
|
char pItems[];
|
||||||
} SUniqueInfo;
|
} SUniqueInfo;
|
||||||
|
|
||||||
|
typedef struct SDerivInfo {
|
||||||
|
double prevValue; // previous value
|
||||||
|
TSKEY prevTs; // previous timestamp
|
||||||
|
bool ignoreNegative; // ignore the negative value
|
||||||
|
int64_t tsWindow; // time window for derivative
|
||||||
|
bool valueSet; // the value has been set already
|
||||||
|
} SDerivInfo;
|
||||||
|
|
||||||
|
typedef struct SRateInfo {
|
||||||
|
double firstValue;
|
||||||
|
TSKEY firstKey;
|
||||||
|
double lastValue;
|
||||||
|
TSKEY lastKey;
|
||||||
|
int8_t hasResult; // flag to denote has value
|
||||||
|
} SRateInfo;
|
||||||
|
|
||||||
#define SET_VAL(_info, numOfElem, res) \
|
#define SET_VAL(_info, numOfElem, res) \
|
||||||
do { \
|
do { \
|
||||||
if ((numOfElem) <= 0) { \
|
if ((numOfElem) <= 0) { \
|
||||||
|
@ -927,12 +955,6 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
|
||||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMinmaxResInfo {
|
|
||||||
bool assign; // assign the first value or not
|
|
||||||
int64_t v;
|
|
||||||
STuplePos tuplePos;
|
|
||||||
} SMinmaxResInfo;
|
|
||||||
|
|
||||||
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (!functionSetup(pCtx, pResultInfo)) {
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
return false; // not initialized since it has been initialized
|
return false; // not initialized since it has been initialized
|
||||||
|
@ -4667,12 +4689,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct STwaInfo {
|
|
||||||
double dOutput;
|
|
||||||
SPoint1 p;
|
|
||||||
STimeWindow win;
|
|
||||||
} STwaInfo;
|
|
||||||
|
|
||||||
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(STwaInfo);
|
pEnv->calcMemSize = sizeof(STwaInfo);
|
||||||
return true;
|
return true;
|
||||||
|
@ -5121,14 +5137,6 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SDerivInfo {
|
|
||||||
double prevValue; // previous value
|
|
||||||
TSKEY prevTs; // previous timestamp
|
|
||||||
bool ignoreNegative; // ignore the negative value
|
|
||||||
int64_t tsWindow; // time window for derivative
|
|
||||||
bool valueSet; // the value has been set already
|
|
||||||
} SDerivInfo;
|
|
||||||
|
|
||||||
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SDerivInfo);
|
pEnv->calcMemSize = sizeof(SDerivInfo);
|
||||||
return true;
|
return true;
|
||||||
|
@ -5223,6 +5231,118 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(SRateInfo);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResInfo)) {
|
||||||
|
return false; // not initialized since it has been initialized
|
||||||
|
}
|
||||||
|
|
||||||
|
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
pInfo->firstKey = INT64_MIN;
|
||||||
|
pInfo->lastKey = INT64_MIN;
|
||||||
|
pInfo->firstValue = (double)INT64_MIN;
|
||||||
|
pInfo->lastValue = (double)INT64_MIN;
|
||||||
|
|
||||||
|
pInfo->hasResult = 0;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
int32_t type = pInputCol->info.type;
|
||||||
|
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems++;
|
||||||
|
|
||||||
|
char* data = colDataGetData(pInputCol, i);
|
||||||
|
double v = 0;
|
||||||
|
GET_TYPED_DATA(v, double, type, data);
|
||||||
|
|
||||||
|
if (INT64_MIN == pRateInfo->lastKey) {
|
||||||
|
pRateInfo->lastValue = v;
|
||||||
|
pRateInfo->lastKey = tsList[i];
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsList[i] > pRateInfo->lastKey) {
|
||||||
|
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
|
||||||
|
pRateInfo->firstValue = pRateInfo->lastValue;
|
||||||
|
pRateInfo->firstKey = pRateInfo->lastKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRateInfo->lastValue = v;
|
||||||
|
pRateInfo->lastKey = tsList[i];
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) {
|
||||||
|
pRateInfo->firstValue = v;
|
||||||
|
pRateInfo->firstKey = tsList[i];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
|
||||||
|
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) ||
|
||||||
|
(pRateInfo->firstKey >= pRateInfo->lastKey)) {
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
double diff = 0;
|
||||||
|
// If the previous value of the last is greater than the last value, only keep the last point instead of the delta
|
||||||
|
// value between two values.
|
||||||
|
diff = pRateInfo->lastValue;
|
||||||
|
if (diff >= pRateInfo->firstValue) {
|
||||||
|
diff -= pRateInfo->firstValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
|
||||||
|
if (duration == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (duration > 0)? ((double)diff) / (duration/tickPerSec):0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||||
|
|
||||||
|
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
double result = doCalcRate(pInfo, 1000);
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes);
|
||||||
|
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t interpFunction(SqlFunctionCtx* pCtx) {
|
int32_t interpFunction(SqlFunctionCtx* pCtx) {
|
||||||
#if 0
|
#if 0
|
||||||
int32_t fillType = (int32_t) pCtx->param[2].i64;
|
int32_t fillType = (int32_t) pCtx->param[2].i64;
|
||||||
|
|
Loading…
Reference in New Issue