Merge pull request #12191 from taosdata/feature/3.0_glzhao
feat(query): add histogram param parsing
This commit is contained in:
commit
2139ccceb0
|
@ -241,7 +241,7 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE };
|
||||
pFunc->node.resType = (SDataType) { .bytes = 512, .type = TSDB_DATA_TYPE_BINARY };
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,13 +14,14 @@
|
|||
*/
|
||||
|
||||
#include "builtinsimpl.h"
|
||||
#include "cJSON.h"
|
||||
#include "function.h"
|
||||
#include "querynodes.h"
|
||||
#include "taggfunction.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tpercentile.h"
|
||||
|
||||
#define HISTOGRAM_MAX_BINS_NUM 100
|
||||
#define HISTOGRAM_MAX_BINS_NUM 100
|
||||
|
||||
typedef struct SSumRes {
|
||||
union {
|
||||
|
@ -106,6 +107,13 @@ typedef struct SHistoFuncInfo {
|
|||
SHistoFuncBin bins[];
|
||||
} SHistoFuncInfo;
|
||||
|
||||
typedef enum {
|
||||
UNKNOWN_BIN = 0,
|
||||
USER_INPUT_BIN,
|
||||
LINEAR_BIN,
|
||||
LOG_BIN
|
||||
} EHistoBinType;
|
||||
|
||||
|
||||
#define SET_VAL(_info, numOfElem, res) \
|
||||
do { \
|
||||
|
@ -1801,28 +1809,225 @@ bool getHistogramFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
static int8_t getHistogramBinType(char *binTypeStr) {
|
||||
int8_t binType;
|
||||
if (strcasecmp(binTypeStr, "user_input") == 0) {
|
||||
binType = USER_INPUT_BIN;
|
||||
} else if (strcasecmp(binTypeStr, "linear_bin") == 0) {
|
||||
binType = LINEAR_BIN;
|
||||
} else if (strcasecmp(binTypeStr, "log_bin") == 0) {
|
||||
binType = LOG_BIN;
|
||||
} else {
|
||||
binType = UNKNOWN_BIN;
|
||||
}
|
||||
|
||||
return binType;
|
||||
}
|
||||
|
||||
static bool getHistogramBinDesc(SHistoFuncInfo *pInfo, char *binDescStr, int8_t binType, bool normalized) {
|
||||
cJSON* binDesc = cJSON_Parse(binDescStr);
|
||||
int32_t numOfBins;
|
||||
double* intervals;
|
||||
if (cJSON_IsObject(binDesc)) { /* linaer/log bins */
|
||||
int32_t numOfParams = cJSON_GetArraySize(binDesc);
|
||||
int32_t startIndex;
|
||||
if (numOfParams != 4) {
|
||||
return false;
|
||||
}
|
||||
|
||||
cJSON* start = cJSON_GetObjectItem(binDesc, "start");
|
||||
cJSON* factor = cJSON_GetObjectItem(binDesc, "factor");
|
||||
cJSON* width = cJSON_GetObjectItem(binDesc, "width");
|
||||
cJSON* count = cJSON_GetObjectItem(binDesc, "count");
|
||||
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
|
||||
|
||||
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
|
||||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t counter = (int32_t)count->valueint;
|
||||
if (infinity->valueint == false) {
|
||||
startIndex = 0;
|
||||
numOfBins = counter + 1;
|
||||
} else {
|
||||
startIndex = 1;
|
||||
numOfBins = counter + 3;
|
||||
}
|
||||
|
||||
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
|
||||
if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) {
|
||||
// linear bin process
|
||||
if (width->valuedouble == 0) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < counter + 1; ++i) {
|
||||
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
|
||||
if (isinf(intervals[startIndex])) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
startIndex++;
|
||||
}
|
||||
} else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) {
|
||||
// log bin process
|
||||
if (start->valuedouble == 0) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < counter + 1; ++i) {
|
||||
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
|
||||
if (isinf(intervals[startIndex])) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
startIndex++;
|
||||
}
|
||||
} else {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (infinity->valueint == true) {
|
||||
intervals[0] = -INFINITY;
|
||||
intervals[numOfBins - 1] = INFINITY;
|
||||
// in case of desc bin orders, -inf/inf should be swapped
|
||||
ASSERT(numOfBins >= 4);
|
||||
if (intervals[1] > intervals[numOfBins - 2]) {
|
||||
TSWAP(intervals[0], intervals[numOfBins - 1]);
|
||||
}
|
||||
}
|
||||
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
|
||||
if (binType != USER_INPUT_BIN) {
|
||||
return false;
|
||||
}
|
||||
numOfBins = cJSON_GetArraySize(binDesc);
|
||||
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
|
||||
cJSON* bin = binDesc->child;
|
||||
if (bin == NULL) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
int i = 0;
|
||||
while (bin) {
|
||||
intervals[i] = bin->valuedouble;
|
||||
if (!cJSON_IsNumber(bin)) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
if (i != 0 && intervals[i] <= intervals[i - 1]) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
bin = bin->next;
|
||||
i++;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
pInfo->numOfBins = numOfBins - 1;
|
||||
pInfo->normalized = normalized;
|
||||
for (int32_t i = 0; i < pInfo->numOfBins; ++i) {
|
||||
pInfo->bins[i].lower = intervals[i] < intervals[i + 1] ? intervals[i] : intervals[i + 1];
|
||||
pInfo->bins[i].upper = intervals[i + 1] > intervals[i] ? intervals[i + 1] : intervals[i];
|
||||
pInfo->bins[i].count = 0;
|
||||
}
|
||||
|
||||
taosMemoryFree(intervals);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
char* binType = pCtx->param[1].param.pz;
|
||||
char* binDesc = pCtx->param[2].param.pz;
|
||||
int64_t nornalized = pCtx->param[3].param.i;
|
||||
SHistoFuncInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
|
||||
int8_t binType = getHistogramBinType(varDataVal(pCtx->param[1].param.pz));
|
||||
if (binType == UNKNOWN_BIN) {
|
||||
return false;
|
||||
}
|
||||
char* binDesc = varDataVal(pCtx->param[2].param.pz);
|
||||
int64_t normalized = pCtx->param[3].param.i;
|
||||
if (!getHistogramBinDesc(pInfo, binDesc, binType, (bool)normalized)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t histogramFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
for (int32_t i = start; i < numOfRows + start; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pCol, i);
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
|
||||
for (int32_t k = 0; k < pInfo->numOfBins; ++k) {
|
||||
if (v > pInfo->bins[k].lower && v <= pInfo->bins[k].upper) {
|
||||
pInfo->bins[k].count++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, pInfo->numOfBins);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
//if (pInfo->hasResult == true) {
|
||||
// SET_DOUBLE_VAL(&pInfo->result, pInfo->max - pInfo->min);
|
||||
//}
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
|
||||
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
|
||||
int32_t len;
|
||||
char buf[512] = {0};
|
||||
if (!pInfo->normalized) {
|
||||
len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%"PRId64"}",
|
||||
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].count);
|
||||
} else {
|
||||
len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
|
||||
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].percentage);
|
||||
}
|
||||
varDataSetLen(buf, len);
|
||||
colDataAppend(pCol, currentRow, buf, false);
|
||||
currentRow++;
|
||||
}
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue