[TD-4597]<fix> fix stddev and percentile func mixed with session and state_window
This commit is contained in:
parent
a87f83bff0
commit
934265ffac
|
@ -464,6 +464,7 @@ typedef struct SSWindowOperatorInfo {
|
|||
TSKEY prevTs; // previous timestamp
|
||||
int32_t numOfRows; // number of rows
|
||||
int32_t start; // start row index
|
||||
bool reptScan; // next round scan
|
||||
} SSWindowOperatorInfo;
|
||||
|
||||
typedef struct SStateWindowOperatorInfo {
|
||||
|
@ -473,7 +474,7 @@ typedef struct SStateWindowOperatorInfo {
|
|||
int32_t colIndex; // start row index
|
||||
int32_t start;
|
||||
char* prevData; // previous data
|
||||
|
||||
bool reptScan;
|
||||
} SStateWindowOperatorInfo ;
|
||||
|
||||
typedef struct SDistinctOperatorInfo {
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
|
||||
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
|
||||
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
|
||||
#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN)
|
||||
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
|
||||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
||||
|
||||
|
@ -1336,6 +1337,10 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|||
|
||||
int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap;
|
||||
pInfo->numOfRows = 0;
|
||||
if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) {
|
||||
pInfo->reptScan = true;
|
||||
pInfo->prevTs = INT64_MIN;
|
||||
}
|
||||
|
||||
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
|
||||
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
|
||||
|
@ -1345,7 +1350,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|||
pInfo->prevTs = tsList[j];
|
||||
pInfo->numOfRows = 1;
|
||||
pInfo->start = j;
|
||||
} else if (tsList[j] - pInfo->prevTs <= gap) {
|
||||
} else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) {
|
||||
pInfo->curWindow.ekey = tsList[j];
|
||||
pInfo->prevTs = tsList[j];
|
||||
pInfo->numOfRows += 1;
|
||||
|
@ -5175,6 +5180,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
|
||||
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
|
||||
if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) {
|
||||
pInfo->reptScan = true;
|
||||
tfree(pInfo->prevData);
|
||||
}
|
||||
|
||||
pInfo->numOfRows = 0;
|
||||
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
|
||||
|
@ -5761,6 +5770,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
|
|||
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
|
||||
pInfo->colIndex = -1;
|
||||
pInfo->reptScan = false;
|
||||
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
|
||||
|
@ -5788,7 +5798,8 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
|
|||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
|
||||
|
||||
pInfo->prevTs = INT64_MIN;
|
||||
pInfo->prevTs = INT64_MIN;
|
||||
pInfo->reptScan = false;
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
pOperator->name = "SessionWindowAggOperator";
|
||||
|
|
Loading…
Reference in New Issue