[TD-2129]<fix>: fix bug in twa calculation.

This commit is contained in:
Haojun Liao 2020-11-27 18:07:33 +08:00
parent 36b47a79a4
commit 0e98a4eadb
5 changed files with 343 additions and 68 deletions

View File

@ -64,13 +64,13 @@
} \
} while (0);
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do {\
for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \
aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \
} \
} while(0);
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \
aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \
} \
} while (0);
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
@ -3625,11 +3625,10 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) {
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->lastKey = INT64_MIN;
pInfo->type = pCtx->inputType;
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->lastKey = INT64_MIN;
pInfo->win = TSWINDOW_INITIALIZER;
return true;
}
@ -3640,10 +3639,24 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->lastKey == INT64_MIN) {
pInfo->lastKey = pCtx->nStartQueryTimestamp;
if (pCtx->start.key != INT64_MIN) {
assert(pCtx->start.key < primaryKey[index] && pInfo->lastKey == INT64_MIN);
pInfo->lastKey = primaryKey[index];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (primaryKey[index] - pCtx->start.key);
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pCtx->start.key;
notNullElems++;
} else if (pInfo->lastKey == INT64_MIN) {
pInfo->lastKey = primaryKey[index];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pInfo->lastKey;
notNullElems++;
}
@ -3732,6 +3745,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
default: assert(0);
}
// the last interpolated time window value
if (pCtx->end.key != INT64_MIN) {
pInfo->dOutput += ((pInfo->lastValue + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->lastKey);
pInfo->lastValue = pCtx->end.val;
pInfo->lastKey = pCtx->end.key;
}
pInfo->win.ekey = pInfo->lastKey;
return notNullElems;
}
@ -3751,7 +3772,7 @@ static void twa_function(SQLFunctionCtx *pCtx) {
return;
}
int32_t notNullElems = twa_function_impl(pCtx, 0, pCtx->size);
int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, pCtx->size);
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
@ -3795,8 +3816,7 @@ static void twa_func_merge(SQLFunctionCtx *pCtx) {
numOfNotNull++;
pBuf->dOutput += pInput->dOutput;
pBuf->SKey = pInput->SKey;
pBuf->EKey = pInput->EKey;
pBuf->win = pInput->win;
pBuf->lastKey = pInput->lastKey;
}
@ -3824,17 +3844,17 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo);
assert(pInfo->EKey >= pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult);
assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult);
if (pInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return;
}
if (pInfo->SKey == pInfo->EKey) {
if (pInfo->win.ekey == pInfo->win.skey) {
*(double *)pCtx->aOutputBuf = pInfo->lastValue;
} else {
*(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->EKey - pInfo->SKey);
*(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey);
}
GET_RES_INFO(pCtx)->numOfRes = 1;

View File

@ -63,9 +63,11 @@ typedef struct SSqlGroupbyExpr {
typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t rowId:15;
bool closed:1; // this result status: closed or opened
uint16_t numOfRows; // number of rows of current time window
int32_t rowId:29; // row index in buffer page
bool startInterp; // the time window start timestamp has done the interpolation already.
bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current time window
} SResultRow;
@ -187,6 +189,7 @@ typedef struct SQueryRuntimeEnv {
bool topBotQuery; // false
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
@ -195,6 +198,8 @@ typedef struct SQueryRuntimeEnv {
SResultRowPool* pool; // window result object pool
int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow;
char** nextRow;
} SQueryRuntimeEnv;
enum {

View File

@ -152,6 +152,11 @@ typedef struct SResultRowCellInfo {
uint32_t numOfRes; // num of output result in current buffer
} SResultRowCellInfo;
typedef struct SPoint1 {
int64_t key;
double val;
} SPoint1;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
struct SQLFunctionCtx;
@ -194,6 +199,8 @@ typedef struct SQLFunctionCtx {
SResultRowCellInfo *resultInfo;
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
} SQLFunctionCtx;
typedef struct SQLAggFuncElem {
@ -243,13 +250,11 @@ enum {
};
typedef struct STwaInfo {
TSKEY lastKey;
int8_t hasResult; // flag to denote has value
int16_t type; // source data type
TSKEY SKey;
TSKEY EKey;
double dOutput;
double lastValue;
TSKEY lastKey;
int8_t hasResult; // flag to denote has value
double dOutput;
double lastValue;
STimeWindow win;
} STwaInfo;
/* global sql function array */

View File

@ -27,6 +27,7 @@
#include "query.h"
#include "queryLog.h"
#include "tlosertree.h"
#include "ttype.h"
#define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1)
@ -194,6 +195,7 @@ static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *
static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo);
static int32_t checkForQueryBuf(size_t numOfTables);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
@ -400,6 +402,17 @@ static bool isTopBottomQuery(SQuery *pQuery) {
return false;
}
static bool timeWindowInterpoRequired(SQuery *pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TWA) {
return true;
}
}
return false;
}
static bool hasTagValOutput(SQuery* pQuery) {
SExprInfo *pExprInfo = &pQuery->pExpr1[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
@ -596,7 +609,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
}
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo,
STimeWindow *win, bool masterscan, bool* newWind) {
STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) {
assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
@ -604,6 +617,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
if (pResultRow == NULL) {
*newWind = false;
// no master scan, no result generated means error occurs
return masterscan? -1:0;
}
@ -619,15 +633,40 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
// set time window for current result
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow);
return TSDB_CODE_SUCCESS;
}
static bool getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
static bool getResultRowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot]->closed;
}
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) {
pResult->startInterp = true;
} else {
pResult->endInterp = true;
}
}
static bool isResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) {
return pResult->startInterp == true;
} else {
return pResult->endInterp == true;
}
}
static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos,
int16_t order, int64_t *pData) {
int32_t forwardStep = 0;
@ -991,6 +1030,113 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
return dataBlock;
}
// window start key interpolation
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY start = tsCols[pos];
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
TSKEY prevTs = (pos == 0)? lastTs : tsCols[pos - 1];
// if lastTs == INT64_MIN, it is the first block, no need to do the start time interpolation
if (((lastTs != INT64_MIN && pos >= 0) || (lastTs == INT64_MIN && pos > 0)) && win->skey > lastTs &&
win->skey < start) {
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
if (k == 0 && pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
continue;
}
double v1 = 0, v2 = 0, v = 0;
char *prevVal = pos == 0 ? pRuntimeEnv->prevRow[k] : pColInfo->pData + (pos - 1) * pColInfo->info.bytes;
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal);
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes);
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = start, .val = &v2};
SPoint point = (SPoint){.key = win->skey, .val = &v};
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
pRuntimeEnv->pCtx[k].start.key = point.key;
pRuntimeEnv->pCtx[k].start.val = v;
}
return true;
} else {
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
pRuntimeEnv->pCtx[k].start.key = INT64_MIN;
}
return false;
}
}
static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, TSKEY ekey, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY trueEndKey = tsCols[pos];
if (win->ekey < ekey && win->ekey != trueEndKey) {
int32_t nextIndex = pos + 1;
TSKEY next = tsCols[nextIndex];
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
if (k == 0 && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
continue;
}
double v1 = 0, v2 = 0, v = 0;
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes);
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + nextIndex * pColInfo->info.bytes);
SPoint point1 = (SPoint){.key = trueEndKey, .val = &v1};
SPoint point2 = (SPoint){.key = next, .val = &v2};
SPoint point = (SPoint){.key = win->ekey, .val = &v};
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
pRuntimeEnv->pCtx[k].end.key = point.key;
pRuntimeEnv->pCtx[k].end.val = v;
}
return true;
} else { // current time window does not ended in current data block, do nothing
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
pRuntimeEnv->pCtx[k].end.key = INT64_MIN;
}
return false;
}
}
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
memcpy(pRuntimeEnv->prevRow[k], pColInfo->pData + (pColInfo->info.bytes * (pDataBlockInfo->rows - 1)),
pColInfo->info.bytes);
}
}
static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY* tsCols, int32_t step) {
TSKEY ts = TSKEY_INITIAL_VAL;
if (tsCols == NULL) {
ts = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.skey : pDataBlockInfo->window.ekey;
} else {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
ts = tsCols[offset];
}
return ts;
}
/**
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
* @param pRuntimeEnv
@ -1004,12 +1150,12 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
TSKEY *tsCols = NULL;
if (pDataBlock != NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, 0);
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
tsCols = (TSKEY *)(pColInfo->pData);
}
@ -1018,7 +1164,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
@ -1026,18 +1172,13 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
TSKEY ts = TSKEY_INITIAL_VAL;
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
if (tsCols == NULL) {
ts = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.skey:pDataBlockInfo->window.ekey;
} else {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
ts = tsCols[offset];
}
bool hasTimeWindow = false;
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
if (ret != TSDB_CODE_SUCCESS) {
tfree(sasArray);
return;
}
@ -1050,7 +1191,27 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
bool pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
// window start key interpolation
if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pQuery->pos, pDataBlock, tsCols, &win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
}
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols,
pDataBlockInfo->window.ekey, &win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
}
}
bool pStatus = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
}
@ -1066,7 +1227,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) !=
TSDB_CODE_SUCCESS) {
break;
}
@ -1077,7 +1239,26 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
bool closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
// window start(end) key interpolation
if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
}
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
}
}
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
}
@ -1097,7 +1278,11 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
}
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pRuntimeEnv->timeWindowInterpo) {
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock);
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
@ -1320,20 +1505,20 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order);
}
int32_t j = 0;
int32_t offset = -1;
// TSKEY prev = -1;
for (j = 0; j < pDataBlockInfo->rows; ++j) {
for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) {
offset = GET_COL_DATA_POS(pQuery, j, step);
if (pRuntimeEnv->pTSBuf != NULL) {
int32_t r = doTSJoinFilter(pRuntimeEnv, offset);
if (r == TS_JOIN_TAG_NOT_EQUALS) {
int32_t ret = doTSJoinFilter(pRuntimeEnv, offset);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
break;
} else if (r == TS_JOIN_TS_NOT_EQUALS) {
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
continue;
} else {
assert(r == TS_JOIN_TS_EQUAL);
assert(ret == TS_JOIN_TS_EQUAL);
}
}
@ -1346,8 +1531,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
int64_t ts = tsCols[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow);
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
@ -1355,8 +1541,28 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if (!hasTimeWindow) {
continue;
}
/*
// window start key interpolation
if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pos, pDataBlock, tsCols, &win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
}
bool closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols,
pDataBlockInfo->window.ekey, &win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
}
}
*/
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset);
STimeWindow nextWin = win;
@ -1375,12 +1581,32 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) {
break;
}
if (hasTimeWindow) {
closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
/*
// window start(end) key interpolation
if (pRuntimeEnv->timeWindowInterpo) {
bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
}
alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
if (!alreadyInterp) {
bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
}
}
*/
closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset);
}
}
@ -1405,6 +1631,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
}
// prev = tsCols[offset];
if (pRuntimeEnv->pTSBuf != NULL) {
// if timestamp filter list is empty, quit current query
if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) {
@ -1530,10 +1758,10 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
* top/bottom values emerge, so does diff function
*/
if (functionId == TSDB_FUNC_TWA) {
SResultRowCellInfo* pInfo = GET_RES_INFO(pCtx);
STwaInfo *pTWAInfo = (STwaInfo*) GET_ROWCELL_INTERBUF(pInfo);
pTWAInfo->SKey = pQuery->window.skey;
pTWAInfo->EKey = pQuery->window.ekey;
pCtx->param[1].i64Key = pQuery->window.skey;
pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[2].i64Key = pQuery->window.ekey;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
}
} else if (functionId == TSDB_FUNC_ARITHM) {
@ -1679,6 +1907,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pCtx->functionId = pSqlFuncMsg->functionId;
pCtx->stableQuery = pRuntimeEnv->stableQuery;
pCtx->interBufBytes = pQuery->pExpr1[i].interBytes;
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pSqlFuncMsg->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
@ -1713,6 +1943,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
}
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
// if it is group by normal column, do not set output buffer, the output buffer is pResult
// fixed output query/multi-output query for normal table
if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
@ -1783,6 +2015,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->offset);
tfree(pRuntimeEnv->keyBuf);
tfree(pRuntimeEnv->rowCellInfoOffset);
tfree(pRuntimeEnv->prevRow);
taosHashCleanup(pRuntimeEnv->pResultRowHashTable);
pRuntimeEnv->pResultRowHashTable = NULL;
@ -2281,12 +2514,14 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pW
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow) !=
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow, &pResult) !=
TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
@ -3268,7 +3503,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowR
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
bool closed = getTimeWindowResStatus(pWindowResInfo, i);
bool closed = getResultRowStatus(pWindowResInfo, i);
if (!closed) {
continue;
}
@ -4619,6 +4854,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
pRuntimeEnv->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
setScanLimitationByResultBuffer(pQuery);
@ -6449,9 +6685,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
goto _cleanup;
}
int32_t srcSize = 0;
for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->colList[i] = pQueryMsg->colList[i];
pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters);
srcSize += pQuery->colList[i].bytes;
}
// calculate the result row size
@ -6520,6 +6758,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW);
pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv));
pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize);
char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow;
pQInfo->runtimeEnv.prevRow[0] = start;
for(int32_t i = 1; i < pQuery->numOfCols; ++i) {
pQInfo->runtimeEnv.prevRow[i] = pQInfo->runtimeEnv.prevRow[i - 1] + pQuery->colList[i-1].bytes;
}
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pQInfo->pBuf == NULL) {

View File

@ -389,8 +389,7 @@ uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if ((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
pRuntimeEnv->groupbyNormalCol) {
if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) {
return 0;
}