diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7c4aab2491..ae84299c1c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -65,6 +65,7 @@ void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); void streamStateDestroy(SStreamState* pState); +int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); typedef struct { rocksdb_iterator_t* iter; diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 4f2618f8dc..25267be354 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -54,7 +54,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pS int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); -int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId); +int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); #ifdef __cplusplus } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0ad2423ca3..86f78989d1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2151,7 +2151,7 @@ FETCH_NEXT_BLOCK: // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - // printDataBlock(pInfo->pRes, "stream scan"); + printDataBlock(pInfo->pRes, "stream scan"); qDebug("scan rows: %" PRId64, pBlockInfo->rows); if (pBlockInfo->rows > 0) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index eb7c605d69..811194031b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1454,10 +1454,6 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { return w; } -static void deleteIntervalDiscBuf(SStreamState* pState, TSKEY mark) { - //todo(liuyao) delete expired check point -} - static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) { int32_t size = taosArrayGetSize(pChildren); for (int32_t i = 0; i < size; i++) { @@ -2404,6 +2400,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p STimeWindow nextWin = {0}; if (IS_FINAL_OP(pInfo)) { nextWin = getFinalTimeWindow(ts, &pInfo->interval); + qDebug("===stream===final ts:%" PRId64 ", getFinalTimeWindow:%" PRId64,ts, nextWin.skey); } else { nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); } @@ -2448,14 +2445,17 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } if (ignore) { - startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); + // startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); + int32_t prevEndPos = startPos; + startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); if (startPos < 0) { break; } continue; } } - + + qDebug("===stream===final setIntervalOutputBuf:%" PRId64, nextWin.skey); int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); pResult = (SResultRow*)pResPos->pRowBuff; @@ -2474,6 +2474,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .groupId = groupId, }; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { + qDebug("===stream===is final%d saveWinResult:%" PRId64, IS_FINAL_OP(pInfo), key.ts); saveWinResult(&key, pResPos, pUpdatedMap); } @@ -2575,9 +2576,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); qDebug("===stream===clear semi operator"); } else { - deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); + streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } qDebug("===stream===interval final close"); @@ -4813,10 +4814,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; } - deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); setOperatorCompleted(pOperator); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); + streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index cee72002da..85b176c684 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1072,6 +1072,14 @@ void streamStateDestroy(SStreamState* pState) { taosMemoryFreeClear(pState); } +int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { +#ifdef USE_ROCKSDB + return deleteExpiredCheckPoint(pState->pFileState, mark); +#else + return 0; +#endif +} + #if 0 char* streamStateSessionDump(SStreamState* pState) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index eab95d1214..88fdfb6b3c 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -372,17 +372,20 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, return code; } + int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { const char* taskKey = "streamFileState"; char keyBuf[128] = {0}; sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } + int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { const char* taskKey = "streamFileState"; return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list); } -int32_t recoverSnapshot(SStreamFileState* pFileState) { + +int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t code = TSDB_CODE_SUCCESS; const char* taskKey = "streamFileState"; int64_t maxCheckPointId = 0; @@ -410,13 +413,18 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { TSKEY ts; sscanf(val, "%" PRId64 "", &ts); taosMemoryFree(val); - if (ts < pFileState->deleteMark) { + if (ts < mark) { forceRemoveCheckpoint(pFileState, i); break; } else { } } + return code; +} +int32_t recoverSnapshot(SStreamFileState* pFileState) { + int32_t code = TSDB_CODE_SUCCESS; + deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); void* pStVal = NULL; int32_t len = 0;