feat(query): add sample function
This commit is contained in:
parent
1e503061ed
commit
d24254fb84
|
@ -101,6 +101,11 @@ bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool mavgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool mavgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t mavgFunction(SqlFunctionCtx* pCtx);
|
int32_t mavgFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
|
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -360,6 +360,32 @@ static int32_t translateMavg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (2 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"The input parameter of SAMPLE function can only be column");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||||
|
if (!IS_INTEGER_TYPE(paraType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
uint8_t colType = pCol->resType.type;
|
||||||
|
if (IS_VAR_DATA_TYPE(colType)) {
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
|
||||||
|
} else {
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// todo
|
// todo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -814,6 +840,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = mavgFunction,
|
.processFunc = mavgFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "sample",
|
||||||
|
.type = FUNCTION_TYPE_SAMPLE,
|
||||||
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateSample,
|
||||||
|
.getEnvFunc = getSampleFuncEnv,
|
||||||
|
.initFunc = sampleFunctionSetup,
|
||||||
|
.processFunc = sampleFunction,
|
||||||
|
.finalizeFunc = sampleFinalize
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
.type = FUNCTION_TYPE_ABS,
|
.type = FUNCTION_TYPE_ABS,
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
|
|
||||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||||
#define MAVG_MAX_POINTS_NUM 1000
|
#define MAVG_MAX_POINTS_NUM 1000
|
||||||
|
#define SAMPLE_MAX_POINTS_NUM 1000
|
||||||
|
|
||||||
typedef struct SSumRes {
|
typedef struct SSumRes {
|
||||||
union {
|
union {
|
||||||
|
@ -150,6 +151,15 @@ typedef struct SMavgInfo {
|
||||||
double points[];
|
double points[];
|
||||||
} SMavgInfo;
|
} SMavgInfo;
|
||||||
|
|
||||||
|
typedef struct SSampleInfo {
|
||||||
|
int32_t samples;
|
||||||
|
int32_t totalPoints;
|
||||||
|
int32_t numSampled;
|
||||||
|
int16_t colBytes;
|
||||||
|
char *data;
|
||||||
|
int64_t *timestamp;
|
||||||
|
} SSampleInfo;
|
||||||
|
|
||||||
#define SET_VAL(_info, numOfElem, res) \
|
#define SET_VAL(_info, numOfElem, res) \
|
||||||
do { \
|
do { \
|
||||||
if ((numOfElem) <= 0) { \
|
if ((numOfElem) <= 0) { \
|
||||||
|
@ -3033,3 +3043,95 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
int32_t numOfSamples = pVal->datum.i;
|
||||||
|
pEnv->calcMemSize = sizeof(SSampleInfo) + numOfSamples * (pCol->node.resType.bytes + sizeof(int64_t));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosSeedRand(taosSafeRand());
|
||||||
|
|
||||||
|
SSampleInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
pInfo->samples = pCtx->param[1].param.i;
|
||||||
|
pInfo->totalPoints = 0;
|
||||||
|
pInfo->numSampled = 0;
|
||||||
|
pInfo->colBytes = ((SColumnInfoData*)pCtx->pOutput)->info.bytes;
|
||||||
|
if (pInfo->samples < 1 || pInfo->samples > SAMPLE_MAX_POINTS_NUM) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pInfo->data = (char *)pInfo + sizeof(SSampleInfo);
|
||||||
|
pInfo->timestamp = (int64_t *)((char *)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void sampleAssignResult(SColumnInfoData *pOutput, SSampleInfo* pInfo,
|
||||||
|
char *data, TSKEY ts, int32_t index) {
|
||||||
|
assignVal(pInfo->data + index * pInfo->colBytes, data, pOutput->info.bytes, pOutput->info.type);
|
||||||
|
*(pInfo->timestamp + index) = ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doReservoirSample(SColumnInfoData *pOutput, SSampleInfo* pInfo,
|
||||||
|
char *data, TSKEY ts, int32_t index) {
|
||||||
|
pInfo->totalPoints++;
|
||||||
|
if (pInfo->numSampled < pInfo->samples) {
|
||||||
|
sampleAssignResult(pOutput, pInfo, data, ts, pInfo->numSampled);
|
||||||
|
pInfo->numSampled++;
|
||||||
|
} else {
|
||||||
|
int32_t j = taosRand() % (pInfo->totalPoints);
|
||||||
|
if (j < pInfo->samples) {
|
||||||
|
sampleAssignResult(pOutput, pInfo, data, ts, j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||||
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
|
int32_t type = pInputCol->info.type;
|
||||||
|
int32_t startOffset = pCtx->offset;
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
//colDataAppendNULL(pOutput, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* data = colDataGetData(pInputCol, i);
|
||||||
|
doReservoirSample(pOutput, pInfo, data, tsList[i], i);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pInfo->numSampled;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
//int32_t currentRow = pBlock->info.rows;
|
||||||
|
pResInfo->numOfRes = pInfo->numSampled;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInfo->numSampled; ++i) {
|
||||||
|
colDataAppend(pCol, i, pInfo->data + i * pInfo->colBytes, false);
|
||||||
|
//TODO: handle ts output
|
||||||
|
}
|
||||||
|
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue