remove state commit
This commit is contained in:
parent
65e7dbda0c
commit
c79b722372
|
@ -119,8 +119,8 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL
|
||||||
pRowSup->groupId = groupId;
|
pRowSup->groupId = groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey,
|
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
|
||||||
int32_t pos, int32_t order, int64_t* pData) {
|
int32_t order, int64_t* pData) {
|
||||||
int32_t forwardRows = 0;
|
int32_t forwardRows = 0;
|
||||||
|
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
|
@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
||||||
if (NULL == pr) {
|
if (NULL == pr) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
|
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
|
||||||
|
|
||||||
if (pr->closed) {
|
if (pr->closed) {
|
||||||
|
@ -1318,11 +1318,11 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
|
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
|
||||||
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
|
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
|
||||||
if (NULL == pResult) {
|
if (NULL == pResult) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SqlFunctionCtx* pCtx = pSup->pCtx;
|
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
|
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
|
||||||
|
@ -2534,7 +2534,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
} else {
|
} else {
|
||||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
|
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
|
||||||
&pInfo->interval, &pInfo->delKey);
|
&pInfo->interval, &pInfo->delKey);
|
||||||
streamStateCommit(pTaskInfo->streamInfo.pState);
|
// streamStateCommit(pTaskInfo->streamInfo.pState);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4734,7 +4734,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
|
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
|
||||||
&pInfo->delKey);
|
&pInfo->delKey);
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
streamStateCommit(pTaskInfo->streamInfo.pState);
|
// streamStateCommit(pTaskInfo->streamInfo.pState);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue