[TD-2129]
This commit is contained in:
parent
ab1a8746e1
commit
dcb0bc49e8
|
@ -426,8 +426,7 @@ static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_VAL(pCtx, 1, 1);
|
SET_VAL(pCtx, 1, 1);
|
||||||
|
*((int64_t *)pCtx->aOutputBuf) += pCtx->size;
|
||||||
*((int64_t *)pCtx->aOutputBuf) += 1;
|
|
||||||
|
|
||||||
// do not need it actually
|
// do not need it actually
|
||||||
SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx);
|
||||||
|
@ -3632,114 +3631,119 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) {
|
static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t tsIndex, int32_t index, int32_t size) {
|
||||||
int32_t notNullElems = 0;
|
int32_t notNullElems = 0;
|
||||||
TSKEY *primaryKey = pCtx->ptsList;
|
TSKEY *primaryKey = pCtx->ptsList;
|
||||||
|
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
int32_t i = index;
|
int32_t i = index;
|
||||||
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
||||||
|
|
||||||
if (pCtx->start.key != INT64_MIN) {
|
if (pCtx->start.key != INT64_MIN) {
|
||||||
assert(pCtx->start.key < primaryKey[index] && pInfo->lastKey == INT64_MIN);
|
assert((pCtx->start.key < primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_ASC) ||
|
||||||
|
(pCtx->start.key > primaryKey[tsIndex + i] && pCtx->order == TSDB_ORDER_DESC));
|
||||||
|
|
||||||
pInfo->lastKey = primaryKey[index];
|
assert(pInfo->lastKey == INT64_MIN);
|
||||||
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
|
|
||||||
|
pInfo->lastKey = primaryKey[tsIndex + i];
|
||||||
|
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
|
||||||
|
|
||||||
pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key);
|
pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key);
|
||||||
|
|
||||||
pInfo->hasResult = DATA_SET_FLAG;
|
pInfo->hasResult = DATA_SET_FLAG;
|
||||||
pInfo->win.skey = pCtx->start.key;
|
pInfo->win.skey = pCtx->start.key;
|
||||||
notNullElems++;
|
notNullElems++;
|
||||||
i += 1;
|
i += step;
|
||||||
} else if (pInfo->lastKey == INT64_MIN) {
|
} else if (pInfo->lastKey == INT64_MIN) {
|
||||||
pInfo->lastKey = primaryKey[index];
|
pInfo->lastKey = primaryKey[tsIndex + i];
|
||||||
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
|
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
|
||||||
|
|
||||||
pInfo->hasResult = DATA_SET_FLAG;
|
pInfo->hasResult = DATA_SET_FLAG;
|
||||||
pInfo->win.skey = pInfo->lastKey;
|
pInfo->win.skey = pInfo->lastKey;
|
||||||
notNullElems++;
|
notNullElems++;
|
||||||
i += 1;
|
i += step;
|
||||||
}
|
}
|
||||||
|
|
||||||
// calculate the value of
|
// calculate the value of
|
||||||
switch(pCtx->inputType) {
|
switch(pCtx->inputType) {
|
||||||
case TSDB_DATA_TYPE_TINYINT: {
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
|
||||||
for (; i < size; i++) {
|
for (; i < size && i >= 0; i += step) {
|
||||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
|
||||||
pInfo->lastValue = val[i];
|
pInfo->lastValue = val[i];
|
||||||
pInfo->lastKey = primaryKey[i];
|
pInfo->lastKey = primaryKey[i + tsIndex];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_SMALLINT: {
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
|
||||||
for (; i < size; i++) {
|
for (; i < size && i >= 0; i += step) {
|
||||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
|
||||||
pInfo->lastValue = val[i];
|
pInfo->lastValue = val[i];
|
||||||
pInfo->lastKey = primaryKey[i];
|
pInfo->lastKey = primaryKey[i + tsIndex];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_INT: {
|
case TSDB_DATA_TYPE_INT: {
|
||||||
int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
|
||||||
for (; i < size; i++) {
|
for (; i < size && i >= 0; i += step) {
|
||||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
|
||||||
pInfo->lastValue = val[i];
|
pInfo->lastValue = val[i];
|
||||||
pInfo->lastKey = primaryKey[i];
|
pInfo->lastKey = primaryKey[i + tsIndex];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_BIGINT: {
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, 0);
|
||||||
for (; i < size; i++) {
|
for (; i < size && i >= 0; i += step) {
|
||||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
|
||||||
pInfo->lastValue = (double) val[i];
|
pInfo->lastValue = (double) val[i];
|
||||||
pInfo->lastKey = primaryKey[i];
|
pInfo->lastKey = primaryKey[i + tsIndex];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_FLOAT: {
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, 0);
|
||||||
for (; i < size; i++) {
|
for (; i < size && i >= 0; i += step) {
|
||||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
|
||||||
pInfo->lastValue = val[i];
|
pInfo->lastValue = val[i];
|
||||||
pInfo->lastKey = primaryKey[i];
|
pInfo->lastKey = primaryKey[i + tsIndex];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_DOUBLE: {
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, 0);
|
||||||
for (; i < size; i++) {
|
for (; i < size && i >= 0; i += step) {
|
||||||
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + tsIndex] - pInfo->lastKey);
|
||||||
pInfo->lastValue = val[i];
|
pInfo->lastValue = val[i];
|
||||||
pInfo->lastKey = primaryKey[i];
|
pInfo->lastKey = primaryKey[i + tsIndex];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3764,16 +3768,13 @@ static void twa_function(SQLFunctionCtx *pCtx) {
|
||||||
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
// skip null value
|
// skip null value
|
||||||
int32_t i = 0;
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
||||||
|
int32_t i = (pCtx->order == TSDB_ORDER_ASC)? 0:(pCtx->size - 1);
|
||||||
while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
|
while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
|
||||||
i++;
|
i += step;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i >= pCtx->size) {
|
int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, i, pCtx->size);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, pCtx->size);
|
|
||||||
SET_VAL(pCtx, notNullElems, 1);
|
SET_VAL(pCtx, notNullElems, 1);
|
||||||
|
|
||||||
if (notNullElems > 0) {
|
if (notNullElems > 0) {
|
||||||
|
@ -3791,11 +3792,136 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t notNullElems = twa_function_impl(pCtx, index, 1);
|
int32_t notNullElems = 0;
|
||||||
|
TSKEY *primaryKey = pCtx->ptsList;
|
||||||
|
|
||||||
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
int32_t i = pCtx->startOffset;
|
||||||
|
int32_t size = pCtx->size;
|
||||||
|
|
||||||
|
if (pCtx->start.key != INT64_MIN) {
|
||||||
|
assert(pInfo->lastKey == INT64_MIN);
|
||||||
|
|
||||||
|
pInfo->lastKey = primaryKey[index];
|
||||||
|
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
|
||||||
|
|
||||||
|
pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key);
|
||||||
|
|
||||||
|
pInfo->hasResult = DATA_SET_FLAG;
|
||||||
|
pInfo->win.skey = pCtx->start.key;
|
||||||
|
notNullElems++;
|
||||||
|
i += 1;
|
||||||
|
} else if (pInfo->lastKey == INT64_MIN) {
|
||||||
|
pInfo->lastKey = primaryKey[index];
|
||||||
|
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, index));
|
||||||
|
|
||||||
|
pInfo->hasResult = DATA_SET_FLAG;
|
||||||
|
pInfo->win.skey = pInfo->lastKey;
|
||||||
|
notNullElems++;
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate the value of
|
||||||
|
switch(pCtx->inputType) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||||
|
for (; i < size; i++) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
|
||||||
|
pInfo->lastValue = val[i];
|
||||||
|
pInfo->lastKey = primaryKey[i + index];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||||
|
for (; i < size; i++) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
|
||||||
|
pInfo->lastValue = val[i];
|
||||||
|
pInfo->lastKey = primaryKey[i + index];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||||
|
for (; i < size; i++) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
|
||||||
|
pInfo->lastValue = val[i];
|
||||||
|
pInfo->lastKey = primaryKey[i + index];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||||
|
for (; i < size; i++) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
|
||||||
|
pInfo->lastValue = (double) val[i];
|
||||||
|
pInfo->lastKey = primaryKey[i + index];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||||
|
for (; i < size; i++) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
|
||||||
|
pInfo->lastValue = val[i];
|
||||||
|
pInfo->lastKey = primaryKey[i + index];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||||
|
for (; i < size; i++) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i + index] - pInfo->lastKey);
|
||||||
|
pInfo->lastValue = val[i];
|
||||||
|
pInfo->lastKey = primaryKey[i + index];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// the last interpolated time window value
|
||||||
|
if (pCtx->end.key != INT64_MIN) {
|
||||||
|
pInfo->dOutput += ((pInfo->lastValue + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->lastKey);
|
||||||
|
pInfo->lastValue = pCtx->end.val;
|
||||||
|
pInfo->lastKey = pCtx->end.key;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->win.ekey = pInfo->lastKey;
|
||||||
|
|
||||||
SET_VAL(pCtx, notNullElems, 1);
|
SET_VAL(pCtx, notNullElems, 1);
|
||||||
|
|
||||||
|
if (notNullElems > 0) {
|
||||||
|
pResInfo->hasResult = DATA_SET_FLAG;
|
||||||
|
}
|
||||||
|
|
||||||
if (pCtx->stableQuery) {
|
if (pCtx->stableQuery) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
|
||||||
memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo));
|
memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -196,6 +196,7 @@ static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
|
||||||
static int32_t checkForQueryBuf(size_t numOfTables);
|
static int32_t checkForQueryBuf(size_t numOfTables);
|
||||||
static void releaseQueryBuf(size_t numOfTables);
|
static void releaseQueryBuf(size_t numOfTables);
|
||||||
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
|
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
|
||||||
|
static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type);
|
||||||
|
|
||||||
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
||||||
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
||||||
|
@ -660,7 +661,7 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
|
static bool resultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
|
||||||
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
|
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
|
||||||
if (type == RESULT_ROW_START_INTERP) {
|
if (type == RESULT_ROW_START_INTERP) {
|
||||||
return pResult->startInterp == true;
|
return pResult->startInterp == true;
|
||||||
|
@ -989,7 +990,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
||||||
if (functionId == TSDB_FUNC_ARITHM) {
|
if (functionId == TSDB_FUNC_ARITHM) {
|
||||||
sas->pArithExpr = &pQuery->pExpr1[col];
|
sas->pArithExpr = &pQuery->pExpr1[col];
|
||||||
|
|
||||||
sas->offset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1);
|
sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1);
|
||||||
sas->colList = pQuery->colList;
|
sas->colList = pQuery->colList;
|
||||||
sas->numOfCols = pQuery->numOfCols;
|
sas->numOfCols = pQuery->numOfCols;
|
||||||
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
|
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
|
||||||
|
@ -1032,85 +1033,89 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
||||||
return dataBlock;
|
return dataBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
// window start key interpolation
|
static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
|
||||||
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
|
if (type == RESULT_ROW_START_INTERP) {
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
|
pCtx[k].start.key = INT64_MIN;
|
||||||
TSKEY start = tsCols[pos];
|
|
||||||
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
|
|
||||||
TSKEY prevTs = (pos == 0)? lastTs : tsCols[pos - 1];
|
|
||||||
|
|
||||||
// if lastTs == INT64_MIN, it is the first block, no need to do the start time interpolation
|
|
||||||
if (((lastTs != INT64_MIN && pos >= 0) || (lastTs == INT64_MIN && pos > 0)) && win->skey > lastTs &&
|
|
||||||
win->skey < start) {
|
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
|
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
|
|
||||||
if (k == 0 && pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
|
||||||
assert(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
double v1 = 0, v2 = 0, v = 0;
|
|
||||||
|
|
||||||
char *prevVal = pos == 0 ? pRuntimeEnv->prevRow[k] : ((char*)pColInfo->pData) + (pos - 1) * pColInfo->info.bytes;
|
|
||||||
|
|
||||||
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal);
|
|
||||||
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes);
|
|
||||||
|
|
||||||
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
|
|
||||||
SPoint point2 = (SPoint){.key = start, .val = &v2};
|
|
||||||
SPoint point = (SPoint){.key = win->skey, .val = &v};
|
|
||||||
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
|
|
||||||
pRuntimeEnv->pCtx[k].start.key = point.key;
|
|
||||||
pRuntimeEnv->pCtx[k].start.val = v;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
|
||||||
} else {
|
} else {
|
||||||
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
pRuntimeEnv->pCtx[k].start.key = INT64_MIN;
|
pCtx[k].end.key = INT64_MIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, TSKEY ekey, STimeWindow* win) {
|
//static double getTSWindowInterpoVal(SColumnInfoData* pColInfo, int16_t srcColIndex, int16_t rowIndex, TSKEY key, char** prevRow, TSKEY* tsCols, int32_t step) {
|
||||||
|
// TSKEY start = tsCols[rowIndex];
|
||||||
|
// TSKEY prevTs = (rowIndex == 0)? *(TSKEY *) prevRow[0] : tsCols[rowIndex - step];
|
||||||
|
//
|
||||||
|
// double v1 = 0, v2 = 0, v = 0;
|
||||||
|
// char *prevVal = (rowIndex == 0)? prevRow[srcColIndex] : ((char*)pColInfo->pData) + (rowIndex - step) * pColInfo->info.bytes;
|
||||||
|
//
|
||||||
|
// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal);
|
||||||
|
// GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + rowIndex * pColInfo->info.bytes);
|
||||||
|
//
|
||||||
|
// SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
|
||||||
|
// SPoint point2 = (SPoint){.key = start, .val = &v2};
|
||||||
|
// SPoint point = (SPoint){.key = key, .val = &v};
|
||||||
|
// taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
|
||||||
|
//
|
||||||
|
// return v;
|
||||||
|
//}
|
||||||
|
|
||||||
|
// window start key interpolation
|
||||||
|
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
TSKEY trueEndKey = tsCols[pos];
|
|
||||||
|
|
||||||
if (win->ekey < ekey && win->ekey != trueEndKey) {
|
TSKEY curTs = tsCols[pos];
|
||||||
int32_t nextIndex = pos + 1;
|
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
|
||||||
TSKEY next = tsCols[nextIndex];
|
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
|
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
|
|
||||||
if (k == 0 && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
|
|
||||||
pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
double v1 = 0, v2 = 0, v = 0;
|
|
||||||
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes);
|
|
||||||
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + nextIndex * pColInfo->info.bytes);
|
|
||||||
|
|
||||||
SPoint point1 = (SPoint){.key = trueEndKey, .val = &v1};
|
|
||||||
SPoint point2 = (SPoint){.key = next, .val = &v2};
|
|
||||||
SPoint point = (SPoint){.key = win->ekey, .val = &v};
|
|
||||||
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
|
|
||||||
pRuntimeEnv->pCtx[k].end.key = point.key;
|
|
||||||
pRuntimeEnv->pCtx[k].end.val = v;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
|
||||||
|
// start exactly from this point, no need to do interpolation
|
||||||
|
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey;
|
||||||
|
if (key == curTs) {
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_START_INTERP);
|
||||||
return true;
|
return true;
|
||||||
} else { // current time window does not ended in current data block, do nothing
|
}
|
||||||
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
|
|
||||||
pRuntimeEnv->pCtx[k].end.key = INT64_MIN;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if (lastTs == INT64_MIN && ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))) {
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_START_INTERP);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
TSKEY prevTs = ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))?
|
||||||
|
lastTs:tsCols[pos - step];
|
||||||
|
|
||||||
|
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t endRowIndex, SArray* pDataBlock, TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) {
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
TSKEY actualEndKey = tsCols[endRowIndex];
|
||||||
|
|
||||||
|
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey;
|
||||||
|
|
||||||
|
// not ended in current data block, do not invoke interpolation
|
||||||
|
if ((key > blockEkey && QUERY_IS_ASC_QUERY(pQuery)) || (key < blockEkey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_END_INTERP);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// there is actual end point of current time window, no interpolation need
|
||||||
|
if (key == actualEndKey) {
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_END_INTERP);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
int32_t nextRowIndex = endRowIndex + step;
|
||||||
|
assert(nextRowIndex >= 0);
|
||||||
|
|
||||||
|
TSKEY nextKey = tsCols[nextRowIndex];
|
||||||
|
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) {
|
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) {
|
||||||
|
@ -1119,10 +1124,10 @@ static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0;
|
||||||
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
|
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
|
||||||
memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * (pDataBlockInfo->rows - 1)),
|
memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes);
|
||||||
pColInfo->info.bytes);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1174,11 +1179,13 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
||||||
|
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
|
int32_t prevIndex = curTimeWindowIndex(pWindowResInfo);
|
||||||
|
|
||||||
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
|
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
|
||||||
|
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||||
|
|
||||||
bool hasTimeWindow = false;
|
bool hasTimeWindow = false;
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
tfree(sasArray);
|
tfree(sasArray);
|
||||||
|
@ -1193,20 +1200,47 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
||||||
TSKEY ekey = reviseWindowEkey(pQuery, &win);
|
TSKEY ekey = reviseWindowEkey(pQuery, &win);
|
||||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
|
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
|
||||||
|
|
||||||
|
// prev time window not interpolation yet.
|
||||||
|
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
|
||||||
|
if (prevIndex != -1 && prevIndex < curIndex) {
|
||||||
|
for(int32_t j = prevIndex; j < curIndex; ++j) {
|
||||||
|
SResultRow *pRes = pWindowResInfo->pResult[j];
|
||||||
|
|
||||||
|
STimeWindow w = pRes->win;
|
||||||
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &w, masterScan, &hasTimeWindow, &pResult);
|
||||||
|
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
||||||
|
|
||||||
|
int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1;
|
||||||
|
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
|
||||||
|
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
|
||||||
|
|
||||||
|
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
|
||||||
|
doBlockwiseApplyFunctions(pRuntimeEnv, closed, &w, startPos, 0, tsCols, pDataBlockInfo->rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore current time window
|
||||||
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
|
||||||
|
assert (ret == TSDB_CODE_SUCCESS); // null data, too many state code
|
||||||
|
}
|
||||||
|
|
||||||
// window start key interpolation
|
// window start key interpolation
|
||||||
if (pRuntimeEnv->timeWindowInterpo) {
|
if (pRuntimeEnv->timeWindowInterpo) {
|
||||||
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
|
||||||
if (!alreadyInterp) {
|
if (!done) {
|
||||||
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pQuery->pos, pDataBlock, tsCols, &win);
|
int32_t startRowIndex = pQuery->pos;
|
||||||
|
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &win);
|
||||||
if (interp) {
|
if (interp) {
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
|
||||||
if (!alreadyInterp) {
|
if (!done) {
|
||||||
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols,
|
int32_t endRowIndex = pQuery->pos + (forwardStep - 1) * step;
|
||||||
pDataBlockInfo->window.ekey, &win);
|
|
||||||
|
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey;
|
||||||
|
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &win);
|
||||||
if (interp) {
|
if (interp) {
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||||
}
|
}
|
||||||
|
@ -1243,17 +1277,20 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
||||||
|
|
||||||
// window start(end) key interpolation
|
// window start(end) key interpolation
|
||||||
if (pRuntimeEnv->timeWindowInterpo) {
|
if (pRuntimeEnv->timeWindowInterpo) {
|
||||||
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
|
||||||
if (!alreadyInterp) {
|
if (!done) {
|
||||||
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin);
|
int32_t startRowIndex = startPos;
|
||||||
|
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &nextWin);
|
||||||
if (interp) {
|
if (interp) {
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
|
||||||
if (!alreadyInterp) {
|
if (!done) {
|
||||||
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin);
|
int32_t endRowIndex = startPos + (forwardStep - 1)*step;
|
||||||
|
TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey;
|
||||||
|
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &nextWin);
|
||||||
if (interp) {
|
if (interp) {
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||||
}
|
}
|
||||||
|
@ -1459,6 +1496,45 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||||
|
int32_t functionId = pQuery->pExpr1[k].base.functionId;
|
||||||
|
if (functionId != TSDB_FUNC_TWA) {
|
||||||
|
pRuntimeEnv->pCtx[k].start.key = INT64_MIN;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColIndex* pColIndex = &pQuery->pExpr1[k].base.colInfo;
|
||||||
|
int16_t index = pColIndex->colIndex;
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, index);
|
||||||
|
|
||||||
|
assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey);
|
||||||
|
double v1 = 0, v2 = 0, v = 0;
|
||||||
|
|
||||||
|
if (prevRowIndex == -1) {
|
||||||
|
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[k]);
|
||||||
|
} else {
|
||||||
|
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
|
||||||
|
|
||||||
|
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
|
||||||
|
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
|
||||||
|
SPoint point = (SPoint){.key = windowKey, .val = &v};
|
||||||
|
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
|
||||||
|
|
||||||
|
if (type == RESULT_ROW_START_INTERP) {
|
||||||
|
pRuntimeEnv->pCtx[k].start.key = point.key;
|
||||||
|
pRuntimeEnv->pCtx[k].start.val = v;
|
||||||
|
} else {
|
||||||
|
pRuntimeEnv->pCtx[k].end.key = point.key;
|
||||||
|
pRuntimeEnv->pCtx[k].end.val = v;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
||||||
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||||
|
@ -1489,6 +1565,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||||
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
|
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
|
||||||
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
|
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
|
||||||
|
pCtx[k].size = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the input column data
|
// set the input column data
|
||||||
|
@ -1508,7 +1585,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t offset = -1;
|
int32_t offset = -1;
|
||||||
// TSKEY prev = -1;
|
TSKEY prevTs = *(TSKEY*) pRuntimeEnv->prevRow[0];
|
||||||
|
int32_t prevRowIndex = -1;
|
||||||
|
|
||||||
for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) {
|
for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) {
|
||||||
offset = GET_COL_DATA_POS(pQuery, j, step);
|
offset = GET_COL_DATA_POS(pQuery, j, step);
|
||||||
|
@ -1530,7 +1608,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
|
|
||||||
// interval window query, decide the time window according to the primary timestamp
|
// interval window query, decide the time window according to the primary timestamp
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
int64_t ts = tsCols[offset];
|
int32_t prevWindowIndex = curTimeWindowIndex(pWindowResInfo);
|
||||||
|
|
||||||
|
int64_t ts = tsCols[offset];
|
||||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||||
|
|
||||||
bool hasTimeWindow = false;
|
bool hasTimeWindow = false;
|
||||||
|
@ -1543,27 +1623,58 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
if (!hasTimeWindow) {
|
if (!hasTimeWindow) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
// window start key interpolation
|
// window start key interpolation
|
||||||
if (pRuntimeEnv->timeWindowInterpo) {
|
if (pRuntimeEnv->timeWindowInterpo) {
|
||||||
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
// check for the time window end time interpolation
|
||||||
if (!alreadyInterp) {
|
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
|
||||||
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pos, pDataBlock, tsCols, &win);
|
if (prevWindowIndex != -1 && prevWindowIndex < curIndex) {
|
||||||
if (interp) {
|
for (int32_t k = prevWindowIndex; k < curIndex; ++k) {
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
SResultRow *pRes = pWindowResInfo->pResult[k];
|
||||||
|
STimeWindow w = pRes->win;
|
||||||
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &w, masterScan, &hasTimeWindow, &pResult);
|
||||||
|
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
||||||
|
|
||||||
|
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? w.ekey:w.skey;
|
||||||
|
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_END_INTERP);
|
||||||
|
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||||
|
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_START_INTERP);
|
||||||
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
|
pRuntimeEnv->pCtx[i].size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
|
||||||
|
doRowwiseApplyFunctions(pRuntimeEnv, closed, &w, offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore current time window
|
||||||
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow,
|
||||||
|
&pResult);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
|
||||||
if (!alreadyInterp) {
|
if (!done) {
|
||||||
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols,
|
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win.skey:win.ekey;
|
||||||
pDataBlockInfo->window.ekey, &win);
|
if (prevTs != INT64_MIN && ts != key) {
|
||||||
if (interp) {
|
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_START_INTERP);
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
||||||
|
} else {
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_START_INTERP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_END_INTERP);
|
||||||
|
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||||
|
pRuntimeEnv->pCtx[k].size = 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_START_INTERP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
|
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
|
||||||
doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset);
|
doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset);
|
||||||
|
|
||||||
|
@ -1588,26 +1699,19 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasTimeWindow) {
|
if (hasTimeWindow) {
|
||||||
/*
|
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
|
||||||
// window start(end) key interpolation
|
if (!done) {
|
||||||
if (pRuntimeEnv->timeWindowInterpo) {
|
if (prevTs != INT64_MIN && ((QUERY_IS_ASC_QUERY(pQuery) && (prevTs < nextWin.skey)) || (!QUERY_IS_ASC_QUERY(pQuery) && prevTs > nextWin.ekey))) {
|
||||||
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? nextWin.skey:nextWin.ekey;
|
||||||
if (!alreadyInterp) {
|
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_START_INTERP);
|
||||||
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin);
|
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
||||||
if (interp) {
|
} else {
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_START_INTERP);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfCols, RESULT_ROW_END_INTERP);
|
||||||
if (!alreadyInterp) {
|
|
||||||
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin);
|
|
||||||
if (interp) {
|
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
|
closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
|
||||||
doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset);
|
doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset);
|
||||||
}
|
}
|
||||||
|
@ -1633,7 +1737,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// prev = tsCols[offset];
|
prevTs = tsCols[offset];
|
||||||
|
prevRowIndex = offset;
|
||||||
|
|
||||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||||
// if timestamp filter list is empty, quit current query
|
// if timestamp filter list is empty, quit current query
|
||||||
|
|
Loading…
Reference in New Issue