[TD-2129]<fix>: fix bugs in twa query.
This commit is contained in:
parent
1b8256013c
commit
55a455a854
|
@ -3624,7 +3624,7 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) {
|
|||
return false;
|
||||
}
|
||||
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes;
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
pInfo->lastKey = INT64_MIN;
|
||||
|
@ -3633,43 +3633,116 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void setTWALastVal(SQLFunctionCtx *pCtx, const char *data, int32_t i, STwaInfo *pInfo) {
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
pInfo->iLastValue = GET_INT32_VAL(data + pCtx->inputBytes * i);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
pInfo->iLastValue = GET_INT8_VAL(data + pCtx->inputBytes * i);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
pInfo->iLastValue = GET_INT16_VAL(data + pCtx->inputBytes * i);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
pInfo->iLastValue = GET_INT64_VAL(data + pCtx->inputBytes * i);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
pInfo->dLastValue = GET_FLOAT_VAL(data + pCtx->inputBytes * i);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
pInfo->dLastValue = GET_DOUBLE_VAL(data + pCtx->inputBytes * i);
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) {
|
||||
int32_t notNullElems = 0;
|
||||
TSKEY *primaryKey = pCtx->ptsList;
|
||||
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
if (pInfo->lastKey == INT64_MIN) {
|
||||
pInfo->lastKey = pCtx->nStartQueryTimestamp;
|
||||
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
|
||||
pInfo->hasResult = DATA_SET_FLAG;
|
||||
notNullElems++;
|
||||
}
|
||||
|
||||
int32_t i = index;
|
||||
|
||||
// calculate the value of
|
||||
switch(pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
for (++i; i < size; i++) {
|
||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
||||
pInfo->lastValue = val[i];
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
for (++i; i < size; i++) {
|
||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
||||
pInfo->lastValue = val[i];
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
for (++i; i < size; i++) {
|
||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
||||
pInfo->lastValue = val[i];
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
for (++i; i < size; i++) {
|
||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
||||
pInfo->lastValue = val[i];
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
for (++i; i < size; i++) {
|
||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
||||
pInfo->lastValue = val[i];
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
for (++i; i < size; i++) {
|
||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
||||
pInfo->lastValue = val[i];
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: assert(0);
|
||||
}
|
||||
|
||||
return notNullElems;
|
||||
}
|
||||
|
||||
static void twa_function(SQLFunctionCtx *pCtx) {
|
||||
void * data = GET_INPUT_CHAR(pCtx);
|
||||
TSKEY *primaryKey = pCtx->ptsList;
|
||||
|
||||
int32_t notNullElems = 0;
|
||||
|
||||
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
int32_t i = 0;
|
||||
|
||||
// skip null value
|
||||
int32_t i = 0;
|
||||
while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
|
||||
i++;
|
||||
}
|
||||
|
@ -3678,40 +3751,7 @@ static void twa_function(SQLFunctionCtx *pCtx) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (pInfo->lastKey == INT64_MIN) {
|
||||
pInfo->lastKey = pCtx->nStartQueryTimestamp;
|
||||
setTWALastVal(pCtx, data, i, pInfo);
|
||||
|
||||
pInfo->hasResult = DATA_SET_FLAG;
|
||||
}
|
||||
|
||||
notNullElems++;
|
||||
|
||||
if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
|
||||
pInfo->dOutput += pInfo->dLastValue * (primaryKey[i] - pInfo->lastKey);
|
||||
} else {
|
||||
pInfo->iOutput += pInfo->iLastValue * (primaryKey[i] - pInfo->lastKey);
|
||||
}
|
||||
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
setTWALastVal(pCtx, data, i, pInfo);
|
||||
|
||||
for (++i; i < pCtx->size; i++) {
|
||||
if (pCtx->hasNull && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
notNullElems++;
|
||||
if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
|
||||
pInfo->dOutput += pInfo->dLastValue * (primaryKey[i] - pInfo->lastKey);
|
||||
} else {
|
||||
pInfo->iOutput += pInfo->iLastValue * (primaryKey[i] - pInfo->lastKey);
|
||||
}
|
||||
|
||||
pInfo->lastKey = primaryKey[i];
|
||||
setTWALastVal(pCtx, data, i, pInfo);
|
||||
}
|
||||
|
||||
int32_t notNullElems = twa_function_impl(pCtx, 0, pCtx->size);
|
||||
SET_VAL(pCtx, notNullElems, 1);
|
||||
|
||||
if (notNullElems > 0) {
|
||||
|
@ -3721,8 +3761,6 @@ static void twa_function(SQLFunctionCtx *pCtx) {
|
|||
if (pCtx->stableQuery) {
|
||||
memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo));
|
||||
}
|
||||
|
||||
// pCtx->numOfIteratedElems += notNullElems;
|
||||
}
|
||||
|
||||
static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
|
@ -3730,35 +3768,12 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SET_VAL(pCtx, 1, 1);
|
||||
|
||||
TSKEY *primaryKey = pCtx->ptsList;
|
||||
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pInfo->lastKey == INT64_MIN) {
|
||||
pInfo->lastKey = pCtx->nStartQueryTimestamp;
|
||||
setTWALastVal(pCtx, pData, 0, pInfo);
|
||||
|
||||
pInfo->hasResult = DATA_SET_FLAG;
|
||||
}
|
||||
|
||||
if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
|
||||
pInfo->dOutput += pInfo->dLastValue * (primaryKey[index] - pInfo->lastKey);
|
||||
} else {
|
||||
pInfo->iOutput += pInfo->iLastValue * (primaryKey[index] - pInfo->lastKey);
|
||||
}
|
||||
|
||||
// record the last key/value
|
||||
pInfo->lastKey = primaryKey[index];
|
||||
setTWALastVal(pCtx, pData, 0, pInfo);
|
||||
|
||||
// pCtx->numOfIteratedElems += 1;
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
|
||||
|
||||
int32_t notNullElems = twa_function_impl(pCtx, index, 1);
|
||||
SET_VAL(pCtx, notNullElems, 1);
|
||||
|
||||
if (pCtx->stableQuery) {
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo));
|
||||
}
|
||||
}
|
||||
|
@ -3778,16 +3793,11 @@ static void twa_func_merge(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
numOfNotNull++;
|
||||
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
|
||||
pBuf->iOutput += pInput->iOutput;
|
||||
} else {
|
||||
pBuf->dOutput += pInput->dOutput;
|
||||
}
|
||||
|
||||
pBuf->dOutput += pInput->dOutput;
|
||||
|
||||
pBuf->SKey = pInput->SKey;
|
||||
pBuf->EKey = pInput->EKey;
|
||||
pBuf->lastKey = pInput->lastKey;
|
||||
pBuf->iLastValue = pInput->iLastValue;
|
||||
}
|
||||
|
||||
SET_VAL(pCtx, numOfNotNull, 1);
|
||||
|
@ -3822,12 +3832,8 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
if (pInfo->SKey == pInfo->EKey) {
|
||||
*(double *)pCtx->aOutputBuf = 0;
|
||||
} else if (pInfo->type >= TSDB_DATA_TYPE_TINYINT && pInfo->type <= TSDB_DATA_TYPE_BIGINT) {
|
||||
pInfo->iOutput += pInfo->iLastValue * (pInfo->EKey - pInfo->lastKey);
|
||||
*(double *)pCtx->aOutputBuf = pInfo->iOutput / (double)(pInfo->EKey - pInfo->SKey);
|
||||
*(double *)pCtx->aOutputBuf = pInfo->lastValue;
|
||||
} else {
|
||||
pInfo->dOutput += pInfo->dLastValue * (pInfo->EKey - pInfo->lastKey);
|
||||
*(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->EKey - pInfo->SKey);
|
||||
}
|
||||
|
||||
|
|
|
@ -248,16 +248,8 @@ typedef struct STwaInfo {
|
|||
int16_t type; // source data type
|
||||
TSKEY SKey;
|
||||
TSKEY EKey;
|
||||
|
||||
union {
|
||||
double dOutput;
|
||||
int64_t iOutput;
|
||||
};
|
||||
|
||||
union {
|
||||
double dLastValue;
|
||||
int64_t iLastValue;
|
||||
};
|
||||
double dOutput;
|
||||
double lastValue;
|
||||
} STwaInfo;
|
||||
|
||||
/* global sql function array */
|
||||
|
@ -276,8 +268,6 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha
|
|||
(_r)->initialized = false; \
|
||||
} while (0)
|
||||
|
||||
//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf);
|
||||
|
||||
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) {
|
||||
pResInfo->initialized = true; // the this struct has been initialized flag
|
||||
|
||||
|
|
Loading…
Reference in New Issue