Merge pull request #15051 from taosdata/feat/agg_client_api
feat(query): add histogram function scalar version
This commit is contained in:
commit
afc48c3d55
|
@ -116,6 +116,7 @@ int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
|||
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -2331,6 +2331,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getHistogramFuncEnv,
|
||||
.initFunc = histogramFunctionSetup,
|
||||
.processFunc = histogramFunction,
|
||||
.sprocessFunc = histogramScalarFunction,
|
||||
.finalizeFunc = histogramFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = histogramCombine,
|
||||
|
|
|
@ -2601,3 +2601,218 @@ int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SSca
|
|||
pOutput->numOfRows = pInput->numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef enum { UNKNOWN_BIN = 0, USER_INPUT_BIN, LINEAR_BIN, LOG_BIN } EHistoBinType;
|
||||
|
||||
static int8_t getHistogramBinType(char* binTypeStr) {
|
||||
int8_t binType;
|
||||
if (strcasecmp(binTypeStr, "user_input") == 0) {
|
||||
binType = USER_INPUT_BIN;
|
||||
} else if (strcasecmp(binTypeStr, "linear_bin") == 0) {
|
||||
binType = LINEAR_BIN;
|
||||
} else if (strcasecmp(binTypeStr, "log_bin") == 0) {
|
||||
binType = LOG_BIN;
|
||||
} else {
|
||||
binType = UNKNOWN_BIN;
|
||||
}
|
||||
|
||||
return binType;
|
||||
}
|
||||
|
||||
typedef struct SHistoFuncBin {
|
||||
double lower;
|
||||
double upper;
|
||||
int64_t count;
|
||||
double percentage;
|
||||
} SHistoFuncBin;
|
||||
|
||||
static bool getHistogramBinDesc(SHistoFuncBin** bins, int32_t* binNum, char* binDescStr, int8_t binType, bool normalized) {
|
||||
cJSON* binDesc = cJSON_Parse(binDescStr);
|
||||
int32_t numOfBins;
|
||||
double* intervals;
|
||||
if (cJSON_IsObject(binDesc)) { /* linaer/log bins */
|
||||
int32_t numOfParams = cJSON_GetArraySize(binDesc);
|
||||
int32_t startIndex;
|
||||
if (numOfParams != 4) {
|
||||
return false;
|
||||
}
|
||||
|
||||
cJSON* start = cJSON_GetObjectItem(binDesc, "start");
|
||||
cJSON* factor = cJSON_GetObjectItem(binDesc, "factor");
|
||||
cJSON* width = cJSON_GetObjectItem(binDesc, "width");
|
||||
cJSON* count = cJSON_GetObjectItem(binDesc, "count");
|
||||
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
|
||||
|
||||
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
|
||||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t counter = (int32_t)count->valueint;
|
||||
if (infinity->valueint == false) {
|
||||
startIndex = 0;
|
||||
numOfBins = counter + 1;
|
||||
} else {
|
||||
startIndex = 1;
|
||||
numOfBins = counter + 3;
|
||||
}
|
||||
|
||||
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
|
||||
if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) {
|
||||
// linear bin process
|
||||
if (width->valuedouble == 0) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < counter + 1; ++i) {
|
||||
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
|
||||
if (isinf(intervals[startIndex])) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
startIndex++;
|
||||
}
|
||||
} else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) {
|
||||
// log bin process
|
||||
if (start->valuedouble == 0) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < counter + 1; ++i) {
|
||||
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
|
||||
if (isinf(intervals[startIndex])) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
startIndex++;
|
||||
}
|
||||
} else {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (infinity->valueint == true) {
|
||||
intervals[0] = -INFINITY;
|
||||
intervals[numOfBins - 1] = INFINITY;
|
||||
// in case of desc bin orders, -inf/inf should be swapped
|
||||
ASSERT(numOfBins >= 4);
|
||||
if (intervals[1] > intervals[numOfBins - 2]) {
|
||||
TSWAP(intervals[0], intervals[numOfBins - 1]);
|
||||
}
|
||||
}
|
||||
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
|
||||
if (binType != USER_INPUT_BIN) {
|
||||
return false;
|
||||
}
|
||||
numOfBins = cJSON_GetArraySize(binDesc);
|
||||
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
|
||||
cJSON* bin = binDesc->child;
|
||||
if (bin == NULL) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
int i = 0;
|
||||
while (bin) {
|
||||
intervals[i] = bin->valuedouble;
|
||||
if (!cJSON_IsNumber(bin)) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
if (i != 0 && intervals[i] <= intervals[i - 1]) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
bin = bin->next;
|
||||
i++;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
*binNum = numOfBins - 1;
|
||||
*bins = taosMemoryCalloc(numOfBins, sizeof(SHistoFuncBin));
|
||||
for (int32_t i = 0; i < *binNum; ++i) {
|
||||
(*bins)[i].lower = intervals[i] < intervals[i + 1] ? intervals[i] : intervals[i + 1];
|
||||
(*bins)[i].upper = intervals[i + 1] > intervals[i] ? intervals[i + 1] : intervals[i];
|
||||
(*bins)[i].count = 0;
|
||||
}
|
||||
|
||||
taosMemoryFree(intervals);
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
|
||||
SHistoFuncBin *bins;
|
||||
int32_t numOfBins = 0;
|
||||
int32_t totalCount = 0;
|
||||
|
||||
int8_t binType = getHistogramBinType(varDataVal(pInput[1].columnData->pData));
|
||||
char* binDesc = varDataVal(pInput[2].columnData->pData);
|
||||
int64_t normalized = *(int64_t *)(pInput[3].columnData->pData);
|
||||
|
||||
int32_t type = GET_PARAM_TYPE(pInput);
|
||||
if (!getHistogramBinDesc(&bins, &numOfBins, binDesc, binType, (bool)normalized)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||
if (colDataIsNull_s(pInputData, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputData, i);
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
|
||||
for (int32_t k = 0; k < numOfBins; ++k) {
|
||||
if (v > bins[k].lower && v <= bins[k].upper) {
|
||||
bins[k].count++;
|
||||
totalCount++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (normalized) {
|
||||
for (int32_t k = 0; k < numOfBins; ++k) {
|
||||
if (totalCount != 0) {
|
||||
bins[k].percentage = bins[k].count / (double)totalCount;
|
||||
} else {
|
||||
bins[k].percentage = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < numOfBins; ++k) {
|
||||
int32_t len;
|
||||
char buf[512] = {0};
|
||||
if (!normalized) {
|
||||
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}",
|
||||
bins[k].lower, bins[k].upper, bins[k].count);
|
||||
} else {
|
||||
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
|
||||
bins[k].lower, bins[k].upper, bins[k].percentage);
|
||||
}
|
||||
varDataSetLen(buf, len);
|
||||
colDataAppend(pOutputData, k, buf, false);
|
||||
}
|
||||
|
||||
taosMemoryFree(bins);
|
||||
pOutput->numOfRows = numOfBins;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue