diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 87256f7496..345a758795 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -52,7 +52,6 @@ int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuf int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); -void releaseRowBuffPos(SRowBuffPos* pBuff); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); @@ -67,7 +66,7 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); void* getRowStateBuff(SStreamFileState* pFileState); void* getStateFileStore(SStreamFileState* pFileState); bool isDeteled(SStreamFileState* pFileState, TSKEY ts); -bool isFlushedState(SStreamFileState* pFileState, TSKEY ts); +bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap); SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState); int32_t getRowStateRowSize(SStreamFileState* pFileState); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index c507fadeca..5d37d001f6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1735,9 +1735,23 @@ void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) { tSimpleHashPut(pStDelete, &key, sizeof(SSessionKey), NULL, 0); } -static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) { +int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { + pAPI->streamStateReleaseBuf(pState, pPos, false); + return TSDB_CODE_SUCCESS; +} + +int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { + pAPI->streamStateReleaseBuf(pState, pPos, true); + return TSDB_CODE_SUCCESS; +} + +static void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) { key.win.ekey = key.win.skey; - tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); + void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey)); + if (pVal) { + releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore); + tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); + } tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); } @@ -1746,7 +1760,7 @@ static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) { pHashKey->win.ekey = pKey->win.skey; } -static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { +static void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) { if (tSimpleHashGetSize(pHashMap) == 0) { return; } @@ -1760,7 +1774,25 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { } } -int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId, +static void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins) { + if (tSimpleHashGetSize(pHashMap) == 0) { + return; + } + int32_t size = taosArrayGetSize(pWins); + for (int32_t i = 0; i < size; i++) { + SSessionKey* pWin = taosArrayGet(pWins, i); + if (!pWin) continue; + SSessionKey key = {0}; + getSessionHashKey(pWin, &key); + void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey)); + if (pVal) { + releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore); + tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); + } + } +} + +int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId, int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted) { for (int32_t i = start; i < rows; ++i) { @@ -1771,7 +1803,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS if (pStDeleted && pWinInfo->isOutput) { saveDeleteRes(pStDeleted, pWinInfo->sessionWin); } - removeSessionResult(pStUpdated, pResultRows, pWinInfo->sessionWin); + removeSessionResult(pAggSup, pStUpdated, pResultRows, pWinInfo->sessionWin); pWinInfo->sessionWin.win.skey = pStartTs[i]; } pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]); @@ -1824,11 +1856,6 @@ static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* return TSDB_CODE_SUCCESS; } -int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { - pAPI->streamStateReleaseBuf(pState, pPos, false); - return TSDB_CODE_SUCCESS; -} - void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin) { SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin); @@ -1879,7 +1906,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* if (winInfo.isOutput && pStDeleted) { saveDeleteRes(pStDeleted, winInfo.sessionWin); } - removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin); + removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, winInfo.sessionWin); doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); winNum++; @@ -1955,7 +1982,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData continue; } setSessionWinOutputInfo(pStUpdated, &winInfo); - winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap, + winRows = updateSessionWindowInfo(pAggSup, &winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap, pAggSup->pResultRows, pStUpdated, pStDeleted); int64_t winDelta = 0; @@ -2009,8 +2036,10 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc } static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) { - SSessionKey* pWin1 = (SSessionKey*)pKey1; - SSessionKey* pWin2 = (SSessionKey*)pKey2; + SResultWindowInfo* pWinInfo1 = (SResultWindowInfo*)pKey1; + SResultWindowInfo* pWinInfo2 = (SResultWindowInfo*)pKey2; + SSessionKey* pWin1 = &pWinInfo1->sessionWin; + SSessionKey* pWin2 = &pWinInfo2->sessionWin; if (pWin1->groupId > pWin2->groupId) { return 1; @@ -2210,27 +2239,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { - SRowBuffPos* pPos = *(SRowBuffPos**) taosArrayGet(pGroupResInfo->pRows, i); + SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i); + SRowBuffPos* pPos = pWinInfo->pStatePos; SResultRow* pRow = NULL; - int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); SSessionKey* pKey = (SSessionKey*) pPos->pKey; - if (code == -1) { - // for history - qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "", - pKey->win.skey, pKey->win.ekey, pKey->groupId); - pGroupResInfo->index += 1; - continue; - } - - doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); - // no results, continue to check the next one - if (pRow->numOfRows == 0) { - pGroupResInfo->index += 1; - releaseOutputBuf(pState, pPos, &pAPI->stateStore); - continue; - } - if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = pKey->groupId; @@ -2245,17 +2258,31 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.id.groupId != pKey->groupId) { - releaseOutputBuf(pState, pPos, &pAPI->stateStore); break; } } + int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { ASSERT(pBlock->info.rows > 0); - releaseOutputBuf(pState, pPos, &pAPI->stateStore); break; } + if (code == -1) { + // for history + qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "", + pKey->win.skey, pKey->win.ekey, pKey->groupId); + pGroupResInfo->index += 1; + continue; + } + + doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); + // no results, continue to check the next one + if (pRow->numOfRows == 0) { + pGroupResInfo->index += 1; + continue; + } + pGroupResInfo->index += 1; for (int32_t j = 0; j < numOfExprs; ++j) { @@ -2283,7 +2310,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa pBlock->info.dataLoad = 1; pBlock->info.rows += pRow->numOfRows; - releaseOutputBuf(pState, pPos, &pAPI->stateStore); } blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; @@ -2332,15 +2358,16 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { if (size == 0) { return; } - SRowBuffPos* pPos = taosArrayGetP(pAllWins, size - 1); - SSessionKey* pSeKey = pPos->pKey; + SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1); + SSessionKey* pSeKey = pWinInfo->pStatePos->pKey; taosArrayPush(pMaxWins, pSeKey); if (pSeKey->groupId == 0) { return; } uint64_t preGpId = pSeKey->groupId; for (int32_t i = size - 2; i >= 0; i--) { - pSeKey = taosArrayGet(pAllWins, i); + pWinInfo = taosArrayGet(pAllWins, i); + pSeKey = pWinInfo->pStatePos->pKey; if (preGpId != pSeKey->groupId) { taosArrayPush(pMaxWins, pSeKey); preGpId = pSeKey->groupId; @@ -2499,7 +2526,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -2517,7 +2544,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); // gap must be 0 doDeleteTimeWindows(pAggSup, pBlock, pWins); - removeSessionResults(pInfo->pStUpdated, pWins); + removeSessionResults(pAggSup, pInfo->pStUpdated, pWins); if (IS_FINAL_SESSION_OP(pOperator)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); @@ -2576,7 +2603,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc); - removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); + removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated); if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } @@ -2857,7 +2884,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -2875,8 +2902,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pBlock->info.type == STREAM_CLEAR) { // gap must be 0 SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); - doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins); - removeSessionResults(pInfo->pStUpdated, pWins); + doDeleteTimeWindows(pAggSup, pBlock, pWins); + removeSessionResults(pAggSup, pInfo->pStUpdated, pWins); copyDeleteWindowInfo(pWins, pInfo->pStDeleted); taosArrayDestroy(pWins); pInfo->clearState = true; @@ -2908,7 +2935,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs; copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc); - removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); + removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated); if(pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); @@ -3099,7 +3126,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pAggSup->stateStore.streamStateFreeCur(pCur); } -int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId, +int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId, SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) { *allEqual = true; @@ -3122,7 +3149,7 @@ int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNex if (pSeDeleted && pWinInfo->winInfo.isOutput) { saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin); } - removeSessionResult(pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin); + removeSessionResult(pAggSup, pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin); pWinInfo->winInfo.sessionWin.win.skey = pTs[i]; } pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]); @@ -3179,7 +3206,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); } setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); - winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, + winRows = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, pAggSup->pResultRows, pSeUpdated, pStDeleted); if (!allEqual) { uint64_t uid = 0; @@ -3356,7 +3383,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); } if (!pInfo->pSeUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -3373,7 +3400,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins); - removeSessionResults(pInfo->pSeUpdated, pWins); + removeSessionResults(&pInfo->streamAggSup, pInfo->pSeUpdated, pWins); copyDeleteWindowInfo(pWins, pInfo->pSeDeleted); taosArrayDestroy(pWins); continue; @@ -3405,7 +3432,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated); copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc); - removeSessionResults(pInfo->pSeDeleted, pInfo->pUpdated); + removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated); if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); @@ -3461,7 +3488,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur pNextWin->sessionWin.groupId); saveDeleteRes(pStDeleted, pNextWin->sessionWin); } - removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); + removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore); } diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 17e7509c4e..a29ae0e990 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -117,6 +117,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (inSessionWindow(pPos->pKey, startTs, gap)) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + pPos->beUsed = true; *pKey = *pDestWinKey; goto _end; } @@ -127,17 +128,19 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + pPos->beUsed = true; *pKey = *pDestWinKey; goto _end; } } if (index + 1 == 0) { - if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) { + if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) { void* p = NULL; void* pFileStore = getStateFileStore(pFileState); int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); @@ -193,6 +196,7 @@ _end: int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; void* pBuff = NULL; int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); @@ -387,6 +391,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void code = TSDB_CODE_SUCCESS; } else if (code == TSDB_CODE_SUCCESS && pVal) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState); + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; memcpy(pNewPos->pRowBuff, pData, *pVLen); (*pVal) = pNewPos; @@ -496,11 +501,12 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch } if (index + 1 == 0) { - if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) { + if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) { void* p = NULL; void* pFileStore = getStateFileStore(pFileState); int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); pNewPos->needFree = true; qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 042cce831c..7030307edd 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -351,7 +351,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) { int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) { int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal); - releaseRowBuffPos(pos); + streamFileStateReleaseBuff(pState->pFileState, pos, false); return code; } @@ -717,7 +717,10 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void int32_t code = TSDB_CODE_SUCCESS; SRowBuffPos* pos = (SRowBuffPos*)value; if (pos->needFree) { - if (isFlushedState(pState->pFileState, key->win.ekey)) { + if (isFlushedState(pState->pFileState, key->win.ekey, 0)) { + if (!pos->pRowBuff) { + return code; + } code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); streamStateReleaseBuf(pState, pos, true); qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b7a8ed60fb..688088dc22 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -239,6 +239,27 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { } } +void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) { + uint64_t i = 0; + SListIter iter = {0}; + tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL && i < max) { + SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; + if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) { + tdListAppend(pFlushList, &pPos); + pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); + pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); + tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(pNode); + if (pPos->pRowBuff) { + i++; + } + } + } +} + void streamFileStateClear(SStreamFileState* pFileState) { pFileState->flushMark = INT64_MIN; pFileState->maxTs = INT64_MIN; @@ -283,10 +304,13 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); num = TMAX(num, FLUSH_NUM); - popUsedBuffs(pFileState, pFlushList, num, false); - + clearFlushedRowBuff(pFileState, pFlushList, num); if (isListEmpty(pFlushList)) { - popUsedBuffs(pFileState, pFlushList, num, true); + popUsedBuffs(pFileState, pFlushList, num, false); + + if (isListEmpty(pFlushList)) { + popUsedBuffs(pFileState, pFlushList, num, true); + } } flushSnapshot(pFileState, pFlushList, false); @@ -383,7 +407,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi memcpy(pNewPos->pKey, pKey, keyLen); TSKEY ts = pFileState->getTs(pKey); - if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts)) { + if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) { int32_t len = 0; void* p = NULL; int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len); @@ -450,8 +474,6 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { return false; } -void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; } - SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN : pFileState->maxTs - pFileState->deleteMark; @@ -663,8 +685,8 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark); } -bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) { - return ts <= pFileState->flushMark; +bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { + return ts <= (pFileState->flushMark + gap); } int32_t getRowStateRowSize(SStreamFileState* pFileState) { diff --git a/tests/script/tsim/stream/basic4.sim b/tests/script/tsim/stream/basic4.sim index 29cbef3109..e7a27976f7 100644 --- a/tests/script/tsim/stream/basic4.sim +++ b/tests/script/tsim/stream/basic4.sim @@ -11,7 +11,7 @@ sql connect sql create database test vgroups 1; sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double); -sql create stream streams1 trigger at_once into streamt as select _wstart, count(*) c1 from t1 interval(1s); +sql create stream streams0 trigger at_once ignore expired 0 ignore update 0 into streamt as select _wstart, count(*) c1 from t1 interval(1s); sql insert into t1 values(1648791211000,1,2,3,1.0); sql insert into t1 values(1648791212001,2,2,3,1.1); @@ -77,7 +77,7 @@ sql create database test2 vgroups 10; sql use test2; sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); -sql create stream streams2 trigger at_once ignore expired 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s); +sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s); sql insert into t1 values(1648791211000,1,2,3,1.0); sql insert into t1 values(1648791212001,2,2,3,1.1); @@ -137,4 +137,183 @@ if $rows != 29 then goto loop3 endi +print step2============= + +sql create database test1 vgroups 1; +sql use test1; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once ignore expired 0 ignore update 1 into streamt1 as select _wstart, count(*) c1 from t1 session(ts, 1s); + +sql insert into t1 values(1648791211000,1,2,3,1.0); +sql insert into t1 values(1648791213000,1,2,3,1.1); +sql insert into t1 values(1648791215000,1,2,3,1.1); +sql insert into t1 values(1648791217000,1,2,3,1.1); +sql insert into t1 values(1648791219000,1,2,3,1.1); +sql insert into t1 values(1648791221000,1,2,3,1.0); +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791225000,1,2,3,1.0); +sql insert into t1 values(1648791227000,1,2,3,1.0); +sql insert into t1 values(1648791229000,1,2,3,1.0); + +sql insert into t1 values(1648791231000,1,2,3,1.0); +sql insert into t1 values(1648791233000,1,2,3,1.1); +sql insert into t1 values(1648791235000,1,2,3,1.1); +sql insert into t1 values(1648791237000,1,2,3,1.1); +sql insert into t1 values(1648791239000,1,2,3,1.1); +sql insert into t1 values(1648791241000,1,2,3,1.0); +sql insert into t1 values(1648791243000,1,2,3,1.0); +sql insert into t1 values(1648791245000,1,2,3,1.0); +sql insert into t1 values(1648791247000,1,2,3,1.0); +sql insert into t1 values(1648791249000,1,2,3,1.0); + +sql insert into t1 values(1648791251000,1,2,3,1.0); +sql insert into t1 values(1648791253000,1,2,3,1.1); +sql insert into t1 values(1648791255000,1,2,3,1.1); +sql insert into t1 values(1648791257000,1,2,3,1.1); +sql insert into t1 values(1648791259000,1,2,3,1.1); +sql insert into t1 values(1648791261000,1,2,3,1.0); +sql insert into t1 values(1648791263000,1,2,3,1.0); +sql insert into t1 values(1648791265000,1,2,3,1.0); +sql insert into t1 values(1648791267000,1,2,3,1.0); +sql insert into t1 values(1648791269000,1,2,3,1.0); + +$loop_count = 0 + +loop4: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt1; +sql select * from streamt1; + +if $rows != 30 then + print =====rows=$rows + goto loop4 +endi + +sql insert into t1 values(1648791211001,1,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.1); +sql insert into t1 values(1648791215001,1,2,3,1.1); +sql insert into t1 values(1648791217001,1,2,3,1.1); +sql insert into t1 values(1648791219001,1,2,3,1.1); +sql insert into t1 values(1648791221001,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791225001,1,2,3,1.0); +sql insert into t1 values(1648791227001,1,2,3,1.0); +sql insert into t1 values(1648791229001,1,2,3,1.0); + +$loop_count = 0 + +loop5: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt1; +sql select * from streamt1; + +if $rows != 30 then + print =====rows=$rows + goto loop5 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +if $data91 != 2 then + print =====data91=$data91 + goto loop5 +endi + +sql insert into t1 values(1648791231001,1,2,3,1.0); +sql insert into t1 values(1648791233001,1,2,3,1.1); +sql insert into t1 values(1648791235001,1,2,3,1.1); +sql insert into t1 values(1648791237001,1,2,3,1.1); +sql insert into t1 values(1648791239001,1,2,3,1.1); +sql insert into t1 values(1648791241001,1,2,3,1.0); +sql insert into t1 values(1648791243001,1,2,3,1.0); +sql insert into t1 values(1648791245001,1,2,3,1.0); +sql insert into t1 values(1648791247001,1,2,3,1.0); +sql insert into t1 values(1648791249001,1,2,3,1.0); + +$loop_count = 0 + +loop6: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt1; +sql select * from streamt1; + +if $rows != 30 then + print =====rows=$rows + goto loop6 +endi + +if $data[10][1] != 2 then + print =====data[10][1]=$data[10][1] + goto loop6 +endi + +if $data[19][1] != 2 then + print =====data[19][1]=$data[19][1] + goto loop6 +endi + +sql insert into t1 values(1648791251001,1,2,3,1.0); +sql insert into t1 values(1648791253001,1,2,3,1.1); +sql insert into t1 values(1648791255001,1,2,3,1.1); +sql insert into t1 values(1648791257001,1,2,3,1.1); +sql insert into t1 values(1648791259001,1,2,3,1.1); +sql insert into t1 values(1648791261001,1,2,3,1.0); +sql insert into t1 values(1648791263001,1,2,3,1.0); +sql insert into t1 values(1648791265001,1,2,3,1.0); +sql insert into t1 values(1648791267001,1,2,3,1.0); +sql insert into t1 values(1648791269001,1,2,3,1.0); + +$loop_count = 0 + +loop7: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt1; +sql select * from streamt1; + +if $rows != 30 then + print =====rows=$rows + goto loop7 +endi + +if $data[20][1] != 2 then + print =====[20][1]=$[20][1] + goto loop7 +endi + +if $data[29][1] != 2 then + print =====[29][1]=$[29][1] + goto loop7 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file