feat(query): add tail function
This commit is contained in:
parent
6e7e2b26f7
commit
3ef067ff03
|
@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
|
||||||
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t tailFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
|
|
|
@ -386,6 +386,32 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (2 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"The input parameter of TAIL function can only be column");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||||
|
if (!IS_INTEGER_TYPE(paraType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
uint8_t colType = pCol->resType.type;
|
||||||
|
if (IS_VAR_DATA_TYPE(colType)) {
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
|
||||||
|
} else {
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// todo
|
// todo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -850,6 +876,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = sampleFunction,
|
.processFunc = sampleFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "tail",
|
||||||
|
.type = FUNCTION_TYPE_TAIL,
|
||||||
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateTail,
|
||||||
|
.getEnvFunc = getTailFuncEnv,
|
||||||
|
.initFunc = tailFunctionSetup,
|
||||||
|
.processFunc = tailFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
.type = FUNCTION_TYPE_ABS,
|
.type = FUNCTION_TYPE_ABS,
|
||||||
|
|
|
@ -18,12 +18,15 @@
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "taggfunction.h"
|
#include "taggfunction.h"
|
||||||
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
|
||||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||||
#define MAVG_MAX_POINTS_NUM 1000
|
#define MAVG_MAX_POINTS_NUM 1000
|
||||||
#define SAMPLE_MAX_POINTS_NUM 1000
|
#define SAMPLE_MAX_POINTS_NUM 1000
|
||||||
|
#define TAIL_MAX_POINTS_NUM 100
|
||||||
|
#define TAIL_MAX_OFFSET 100
|
||||||
|
|
||||||
typedef struct SSumRes {
|
typedef struct SSumRes {
|
||||||
union {
|
union {
|
||||||
|
@ -161,6 +164,20 @@ typedef struct SSampleInfo {
|
||||||
int64_t *timestamp;
|
int64_t *timestamp;
|
||||||
} SSampleInfo;
|
} SSampleInfo;
|
||||||
|
|
||||||
|
typedef struct STailUnit {
|
||||||
|
int64_t timestamp;
|
||||||
|
char data[];
|
||||||
|
} STailUnit;
|
||||||
|
|
||||||
|
typedef struct STailInfo {
|
||||||
|
int32_t numOfPoints;
|
||||||
|
int32_t numAdded;
|
||||||
|
int32_t offset;
|
||||||
|
uint8_t colType;
|
||||||
|
int16_t colBytes;
|
||||||
|
STailUnit **pRes;
|
||||||
|
} STailInfo;
|
||||||
|
|
||||||
#define SET_VAL(_info, numOfElem, res) \
|
#define SET_VAL(_info, numOfElem, res) \
|
||||||
do { \
|
do { \
|
||||||
if ((numOfElem) <= 0) { \
|
if ((numOfElem) <= 0) { \
|
||||||
|
@ -3141,3 +3158,86 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
//
|
//
|
||||||
// return pResInfo->numOfRes;
|
// return pResInfo->numOfRes;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
|
||||||
|
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
int32_t numOfPoints = pVal->datum.i;
|
||||||
|
pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailUnit) + pCol->node.resType.bytes);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
pInfo->numAdded = 0;
|
||||||
|
pInfo->numOfPoints = pCtx->param[1].param.i;
|
||||||
|
pInfo->offset = pCtx->param[2].param.i;
|
||||||
|
pInfo->colType = pCtx->resDataInfo.type;
|
||||||
|
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||||
|
if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) ||
|
||||||
|
(pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pRes = (STailUnit **)((char *)pInfo + sizeof(STailInfo));
|
||||||
|
char *pUnit = (char *)pInfo->pRes + pInfo->numOfPoints * POINTER_BYTES;
|
||||||
|
|
||||||
|
size_t unitSize = sizeof(STailUnit) + pInfo->colBytes;
|
||||||
|
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||||
|
pInfo->pRes[i] = (STailUnit *)(pUnit + i * unitSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tailAssignResult(STailUnit* pUnit, char *data, int32_t colBytes, TSKEY ts) {
|
||||||
|
pUnit->timestamp = ts;
|
||||||
|
memcpy(pUnit->data, data, colBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tailCompFn(const void *p1, const void *p2, const void *param) {
|
||||||
|
STailUnit *d1 = *(STailUnit **)p1;
|
||||||
|
STailUnit *d2 = *(STailUnit **)p2;
|
||||||
|
return compareInt64Val(&d1->timestamp, &d2->timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts) {
|
||||||
|
STailUnit **pList = pInfo->pRes;
|
||||||
|
if (pInfo->numAdded < pInfo->numOfPoints) {
|
||||||
|
tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts);
|
||||||
|
taosheapsort((void *)pList, sizeof(STailUnit **), pInfo->numAdded + 1, NULL, tailCompFn, 0);
|
||||||
|
} else if (pList[0]->timestamp < ts) {
|
||||||
|
tailAssignResult(pList[0], data, pInfo->colBytes, ts);
|
||||||
|
taosheapadjust((void *)pList, sizeof(STailUnit **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tailFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||||
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
|
int32_t startOffset = pCtx->offset;
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
//colDataAppendNULL(pOutput, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* data = colDataGetData(pInputCol, i);
|
||||||
|
doTailAdd(pInfo, data, tsList[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pInfo->numOfPoints;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue