Merge pull request #24535 from taosdata/fix/TD-28342
ignore invalid state
This commit is contained in:
commit
9d82fd8c78
|
@ -90,6 +90,7 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
|
|||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||
pNewPos->needFree = true;
|
||||
pNewPos->beFlushed = true;
|
||||
memcpy(pNewPos->pRowBuff, p, *pVLen);
|
||||
taosMemoryFree(p);
|
||||
return pNewPos;
|
||||
|
@ -217,6 +218,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
|
|||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||
pNewPos->needFree = true;
|
||||
pNewPos->beFlushed = true;
|
||||
void* pBuff = NULL;
|
||||
int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -307,6 +309,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
|
|||
}
|
||||
pNewPos = getNewRowPosForWrite(pFileState);
|
||||
pNewPos->needFree = true;
|
||||
pNewPos->beFlushed = true;
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -482,6 +485,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
|
|||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
|
||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||
pNewPos->needFree = true;
|
||||
pNewPos->beFlushed = true;
|
||||
memcpy(pNewPos->pRowBuff, pData, *pVLen);
|
||||
(*pVal) = pNewPos;
|
||||
}
|
||||
|
|
|
@ -698,6 +698,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
|
|||
stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
|
||||
key->win.ekey, key->groupId, code);
|
||||
} else {
|
||||
pos->beFlushed = false;
|
||||
code = putSessionWinResultBuff(pState->pFileState, value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,8 @@ sql create table ts3 using st tags(3,2,2);
|
|||
sql create table ts4 using st tags(4,2,2);
|
||||
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st partition by tbname interval(10s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||
sql insert into ts2 values(1648791213001,1,12,3,1.0);
|
||||
|
||||
|
|
Loading…
Reference in New Issue