enh: adjust udf example codes
This commit is contained in:
parent
8e81abe224
commit
7be4d5b592
|
@ -27,8 +27,8 @@ DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn* resultCol) {
|
|||
|
||||
for (int32_t i = 0; i < block->numOfRows; ++i) {
|
||||
if (udfColDataIsNull(block->udfCols[0], i)) {
|
||||
udfTrace("block:%p, row:%d result is null since col:0 is null", block, i);
|
||||
udfColDataSetNull(resultCol, i);
|
||||
udfTrace("block:%p, row:%d result is null since col:0 is null", block, i);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -38,8 +38,8 @@ DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn* resultCol) {
|
|||
int32_t j = 1;
|
||||
for (; j < block->numOfCols; ++j) {
|
||||
if (udfColDataIsNull(block->udfCols[j], i)) {
|
||||
udfTrace("block:%p, row:%d result is null since col:%d is null", block, i, j);
|
||||
udfColDataSetNull(resultCol, i);
|
||||
udfTrace("block:%p, row:%d result is null since col:%d is null", block, i, j);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -9,38 +9,55 @@ DLL_EXPORT int32_t l2norm_init() { return 0; }
|
|||
DLL_EXPORT int32_t l2norm_destroy() { return 0; }
|
||||
|
||||
DLL_EXPORT int32_t l2norm_start(SUdfInterBuf* buf) {
|
||||
int32_t bufLen = sizeof(double);
|
||||
if (buf->bufLen < bufLen) {
|
||||
udfError("failed to execute udf since input buflen:%d < %d", buf->bufLen, bufLen);
|
||||
return TSDB_CODE_UDF_INVALID_BUFSIZE;
|
||||
}
|
||||
|
||||
udfTrace("start aggregation, buflen:%d used:%d", buf->bufLen, bufLen);
|
||||
*(int64_t*)(buf->buf) = 0;
|
||||
buf->bufLen = sizeof(double);
|
||||
buf->numOfResult = 1;
|
||||
buf->bufLen = bufLen;
|
||||
buf->numOfResult = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) {
|
||||
double sumSquares = *(double*)interBuf->buf;
|
||||
int8_t numNotNull = 0;
|
||||
udfTrace("block:%p, processing begins, cols:%d rows:%d", block, block->numOfCols, block->numOfRows);
|
||||
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
|
||||
if (col->colMeta.type != TSDB_DATA_TYPE_INT && col->colMeta.type != TSDB_DATA_TYPE_DOUBLE) {
|
||||
udfError("block:%p, col:%d type:%d should be int(%d) or double(%d)", block, i, col->colMeta.type,
|
||||
TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_DOUBLE);
|
||||
return TSDB_CODE_UDF_INVALID_INPUT;
|
||||
}
|
||||
}
|
||||
|
||||
double sumSquares = *(double*)interBuf->buf;
|
||||
int8_t numNotNull = 0;
|
||||
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
for (int32_t j = 0; j < block->numOfRows; ++j) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
if (udfColDataIsNull(col, j)) {
|
||||
udfTrace("block:%p, col:%d row:%d is null", block, i, j);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (col->colMeta.type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
char* cell = udfColDataGetData(col, j);
|
||||
int32_t num = *(int32_t*)cell;
|
||||
sumSquares += (double)num * num;
|
||||
udfTrace("block:%p, col:%d row:%d data:%d", block, i, j, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
char* cell = udfColDataGetData(col, j);
|
||||
double num = *(double*)cell;
|
||||
sumSquares += num * num;
|
||||
udfTrace("block:%p, col:%d row:%d data:%f", block, i, j, num);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -48,11 +65,14 @@ DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInte
|
|||
}
|
||||
++numNotNull;
|
||||
}
|
||||
udfTrace("block:%p, col:%d result is %f", block, i, sumSquares);
|
||||
}
|
||||
|
||||
*(double*)(newInterBuf->buf) = sumSquares;
|
||||
newInterBuf->bufLen = sizeof(double);
|
||||
newInterBuf->numOfResult = 1;
|
||||
|
||||
udfTrace("block:%p, result is %f", block, sumSquares);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -61,5 +81,7 @@ DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) {
|
|||
*(double*)(resultData->buf) = sqrt(sumSquares);
|
||||
resultData->bufLen = sizeof(double);
|
||||
resultData->numOfResult = 1;
|
||||
|
||||
udfTrace("end aggregation, result is %f", *(double*)(resultData->buf));
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -6,27 +6,33 @@
|
|||
|
||||
#define STR_MAX_LEN 256 // inter buffer length
|
||||
|
||||
// init
|
||||
DLL_EXPORT int32_t max_vol_init() { return 0; }
|
||||
|
||||
// destory
|
||||
DLL_EXPORT int32_t max_vol_destroy() { return 0; }
|
||||
|
||||
// start
|
||||
DLL_EXPORT int32_t max_vol_start(SUdfInterBuf *buf) {
|
||||
int32_t bufLen = sizeof(float) + STR_MAX_LEN;
|
||||
if (buf->bufLen < bufLen) {
|
||||
udfError("failed to execute udf since input buflen:%d < %d", buf->bufLen, bufLen);
|
||||
return TSDB_CODE_UDF_INVALID_BUFSIZE;
|
||||
}
|
||||
|
||||
udfTrace("start aggregation, buflen:%d used:%d", buf->bufLen, bufLen);
|
||||
memset(buf->buf, 0, sizeof(float) + STR_MAX_LEN);
|
||||
// set init value
|
||||
*((float *)buf->buf) = -10000000;
|
||||
buf->bufLen = sizeof(float) + STR_MAX_LEN;
|
||||
*((float *)buf->buf) = INT32_MIN;
|
||||
buf->bufLen = bufLen;
|
||||
buf->numOfResult = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
||||
udfTrace("block:%p, processing begins, cols:%d rows:%d", block, block->numOfCols, block->numOfRows);
|
||||
|
||||
float maxValue = *(float *)interBuf->buf;
|
||||
char strBuff[STR_MAX_LEN] = "inter1buf";
|
||||
|
||||
if (block->numOfCols < 2) {
|
||||
udfError("block:%p, cols:%d needs to be greater than 2", block, block->numOfCols);
|
||||
return TSDB_CODE_UDF_INVALID_INPUT;
|
||||
}
|
||||
|
||||
|
@ -36,10 +42,12 @@ DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInt
|
|||
if (i == block->numOfCols - 1) {
|
||||
// last column is device id , must varchar
|
||||
if (col->colMeta.type != TSDB_DATA_TYPE_VARCHAR) {
|
||||
udfError("block:%p, col:%d type:%d should be varchar(%d)", block, i, col->colMeta.type, TSDB_DATA_TYPE_VARCHAR);
|
||||
return TSDB_CODE_UDF_INVALID_INPUT;
|
||||
}
|
||||
} else {
|
||||
if (col->colMeta.type != TSDB_DATA_TYPE_FLOAT) {
|
||||
udfError("block:%p, col:%d type:%d should be float(%d)", block, i, col->colMeta.type, TSDB_DATA_TYPE_FLOAT);
|
||||
return TSDB_CODE_UDF_INVALID_INPUT;
|
||||
}
|
||||
}
|
||||
|
@ -47,28 +55,41 @@ DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInt
|
|||
|
||||
// calc max voltage
|
||||
SUdfColumn *lastCol = block->udfCols[block->numOfCols - 1];
|
||||
for (int32_t i = 0; i < (block->numOfCols - 1); ++i) {
|
||||
for (int32_t i = 0; i < block->numOfCols - 1; ++i) {
|
||||
for (int32_t j = 0; j < block->numOfRows; ++j) {
|
||||
SUdfColumn *col = block->udfCols[i];
|
||||
if (udfColDataIsNull(col, j)) {
|
||||
udfTrace("block:%p, col:%d row:%d is null", block, i, j);
|
||||
continue;
|
||||
}
|
||||
|
||||
char *data = udfColDataGetData(col, j);
|
||||
float voltage = *(float *)data;
|
||||
if (voltage > maxValue) {
|
||||
|
||||
if (voltage <= maxValue) {
|
||||
udfTrace("block:%p, col:%d row:%d data:%f", block, i, j, voltage);
|
||||
} else {
|
||||
maxValue = voltage;
|
||||
char *valData = udfColDataGetData(lastCol, j);
|
||||
char *valData = udfColDataGetData(lastCol, j);
|
||||
int32_t valDataLen = udfColDataGetDataLen(lastCol, j);
|
||||
|
||||
// get device id
|
||||
char *deviceId = valData + sizeof(uint16_t);
|
||||
sprintf(strBuff, "%s_(%d,%d)_%f", deviceId, j, i, maxValue);
|
||||
char *deviceId = valData + sizeof(uint16_t);
|
||||
int32_t deviceIdLen = valDataLen < (STR_MAX_LEN - 1) ? valDataLen : (STR_MAX_LEN - 1);
|
||||
|
||||
strncpy(strBuff, deviceId, deviceIdLen);
|
||||
snprintf(strBuff + deviceIdLen, STR_MAX_LEN - deviceIdLen, "_(%d,%d)_%f", j, i, maxValue);
|
||||
udfTrace("block:%p, col:%d row:%d data:%f, as max_val:%s", block, i, j, voltage, strBuff);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*(float *)newInterBuf->buf = maxValue;
|
||||
strcpy(newInterBuf->buf + sizeof(float), strBuff);
|
||||
strncpy(newInterBuf->buf + sizeof(float), strBuff, STR_MAX_LEN);
|
||||
newInterBuf->bufLen = sizeof(float) + strlen(strBuff) + 1;
|
||||
newInterBuf->numOfResult = 1;
|
||||
|
||||
udfTrace("block:%p, result is %s", block, strBuff);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -86,5 +107,7 @@ DLL_EXPORT int32_t max_vol_finish(SUdfInterBuf *buf, SUdfInterBuf *resultData) {
|
|||
resultData->bufLen = len + sizeof(uint16_t);
|
||||
// set row count
|
||||
resultData->numOfResult = 1;
|
||||
|
||||
udfTrace("end aggregation, result is %s", str);
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue