fix issue
This commit is contained in:
parent
952f5f3f6b
commit
a0a1414af5
|
@ -1152,22 +1152,23 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE
|
||||||
pRowVal->key = ts;
|
pRowVal->key = ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveTimeSliceWinResultInfo(SStreamAggSupporter* pAggSup, int8_t trigger, SWinKey* pKey,
|
static int32_t saveTimeSliceWinResultInfo(SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, SWinKey* pKey,
|
||||||
SSHashObj* pUpdatedMap, bool needDel, SSHashObj* pDeletedMap) {
|
SSHashObj* pUpdatedMap, bool needDel, SSHashObj* pDeletedMap) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
if (trigger == STREAM_TRIGGER_AT_ONCE) {
|
if (pTwAggSup->calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
code = saveTimeSliceWinResult(pKey, pUpdatedMap);
|
code = saveTimeSliceWinResult(pKey, pUpdatedMap);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
if (needDel) {
|
if (needDel) {
|
||||||
code = saveTimeSliceWinResult(pKey, pDeletedMap);
|
code = saveTimeSliceWinResult(pKey, pDeletedMap);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
} else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
} else if (pTwAggSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||||
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, pKey->groupId, NULL, 0);
|
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, pKey->groupId, NULL, 0);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
pTwAggSup->maxTs = TMAX(pTwAggSup->maxTs, pKey->ts);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1244,7 +1245,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
||||||
if (right) {
|
if (right) {
|
||||||
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
|
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
|
||||||
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
|
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
|
||||||
code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
|
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
|
||||||
pInfo->pDeletedMap);
|
pInfo->pDeletedMap);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
@ -1260,7 +1261,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
||||||
if (left) {
|
if (left) {
|
||||||
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);
|
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);
|
||||||
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
|
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
|
||||||
code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &nextPoint.key, pInfo->pUpdatedMap,
|
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap,
|
||||||
needDel, pInfo->pDeletedMap);
|
needDel, pInfo->pDeletedMap);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
@ -1282,7 +1283,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
||||||
if (right) {
|
if (right) {
|
||||||
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
|
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
|
||||||
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
|
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
|
||||||
code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
|
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
|
||||||
pInfo->pDeletedMap);
|
pInfo->pDeletedMap);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue