fix issue
This commit is contained in:
parent
4fe68034cf
commit
6f2a78a4bb
|
@ -924,6 +924,60 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
|
||||||
|
TSKEY prevWKey = INT64_MIN;
|
||||||
|
TSKEY nextWKey = INT64_MIN;
|
||||||
|
if (hasPrevWindow(pFillSup)) {
|
||||||
|
prevWKey = pFillSup->prev.key;
|
||||||
|
}
|
||||||
|
if (hasNextWindow(pFillSup)) {
|
||||||
|
nextWKey = pFillSup->next.key;
|
||||||
|
}
|
||||||
|
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
|
||||||
|
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
|
||||||
|
|
||||||
|
pFillInfo->needFill = true;
|
||||||
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
|
switch (pFillInfo->type) {
|
||||||
|
case TSDB_FILL_NULL:
|
||||||
|
case TSDB_FILL_NULL_F:
|
||||||
|
case TSDB_FILL_SET_VALUE:
|
||||||
|
case TSDB_FILL_SET_VALUE_F: {
|
||||||
|
if (ts != pFillSup->cur.key) {
|
||||||
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
|
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
|
||||||
|
} else {
|
||||||
|
pFillInfo->needFill = false;
|
||||||
|
pFillInfo->pos = FILL_POS_START;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
copyNonFillValueInfo(pFillSup, pFillInfo);
|
||||||
|
} break;
|
||||||
|
case TSDB_FILL_PREV: {
|
||||||
|
if (ts != pFillSup->cur.key) {
|
||||||
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
|
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
|
||||||
|
} else if (hasPrevWindow(pFillSup)) {
|
||||||
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
|
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
|
||||||
|
} else {
|
||||||
|
pFillInfo->needFill = false;
|
||||||
|
pFillInfo->pos = FILL_POS_START;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
pFillInfo->pResRow = &pFillSup->prev;
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (ts != pFillSup->cur.key) {
|
||||||
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
|
static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
|
||||||
if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) {
|
if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) {
|
||||||
pFillInfo->needFill = false;
|
pFillInfo->needFill = false;
|
||||||
|
@ -957,7 +1011,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
|
||||||
pFillInfo->pos = FILL_POS_START;
|
pFillInfo->pos = FILL_POS_START;
|
||||||
}
|
}
|
||||||
copyNonFillValueInfo(pFillSup, pFillInfo);
|
copyNonFillValueInfo(pFillSup, pFillInfo);
|
||||||
pFillInfo->pResRow->key = ts;
|
|
||||||
} break;
|
} break;
|
||||||
case TSDB_FILL_PREV: {
|
case TSDB_FILL_PREV: {
|
||||||
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey != pFillInfo->prePointKey &&
|
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey != pFillInfo->prePointKey &&
|
||||||
|
@ -1263,7 +1316,7 @@ void getPrevResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKE
|
||||||
*pNextKey = INT64_MIN;
|
*pNextKey = INT64_MIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup,
|
void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwSup, SStreamFillSupporter* pFillSup,
|
||||||
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
|
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1304,8 +1357,12 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
|
||||||
pFillInfo->nextPointKey = nextPoint.key.ts;
|
pFillInfo->nextPointKey = nextPoint.key.ts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);
|
if (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||||
|
setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts);
|
||||||
|
} else {
|
||||||
|
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);
|
||||||
|
}
|
||||||
doStreamFillRange(pFillSup, pFillInfo, pBlock);
|
doStreamFillRange(pFillSup, pFillInfo, pBlock);
|
||||||
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
|
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
|
||||||
releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore);
|
releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore);
|
||||||
|
@ -1330,7 +1387,7 @@ static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
uint16_t opType = pOperator->operatorType;
|
uint16_t opType = pOperator->operatorType;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
|
doBuildTimeSlicePointResult(pAggSup, &pInfo->twAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
|
||||||
if (pInfo->pRes->info.rows != 0) {
|
if (pInfo->pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
||||||
(*ppRes) = pInfo->pRes;
|
(*ppRes) = pInfo->pRes;
|
||||||
|
|
Loading…
Reference in New Issue