diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index f206ac5166..7e5028b969 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1599,6 +1599,11 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin pScanInfo->twAggSup = *pTwSup; } +static TSKEY sesionTs(void* pKey) { + SSessionKey* pWinKey = (SSessionKey*)pKey; + return pWinKey->win.skey; +} + int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, @@ -1621,7 +1626,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->stateStore.streamStateSetNumber(pSup->pState, -1); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( - tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, compareTs, pSup->pState, + tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index fbf4b2693c..cc96778762 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -264,7 +264,6 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP if (index >= 0) { SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index); if (pItemPos == pPos) { - pItemPos->beFlushed = true; taosArrayRemove(pWinStates, index); } } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 4b6ea07047..a14fd186f8 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -67,7 +67,7 @@ int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { } int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { - size_t keyLen = pFileState->rowSize; + size_t keyLen = pFileState->keyLen; SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen); if (ppPos) { if ((*ppPos) == pPos) { @@ -539,7 +539,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } pPos->beFlushed = true; - qDebug("===stream===flushed start:%" PRId64 ", end:%" PRId64 , ((SSessionKey*)pPos->pKey)->win.skey, ((SSessionKey*)pPos->pKey)->win.ekey); + qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey)); if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStateClearBatch(batch);