diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index eea3fd92d2..0d0bdf29c0 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1220,20 +1220,22 @@ void clearExpiredState(SStreamFileState* pFileState) { SSHashObj* pSearchBuff = pFileState->searchBuff; void* pIte = NULL; int32_t iter = 0; - while ((pIte = tSimpleHashIterate(pFileState->searchBuff, pIte, &iter)) != NULL) { + while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) { SArray* pWinStates = *((void**)pIte); int32_t size = taosArrayGetSize(pWinStates); for (int32_t i = 0; i < size - 1; i++) { SWinKey* pKey = taosArrayGet(pWinStates, i); int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey)); - qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff); + qTrace("clear expired buff, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_buff); - int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); - qTrace("%s at line %d res:%d", __func__, __LINE__, code_file); + if (isFlushedState(pFileState, pKey->ts, 0)) { + int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); + qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file); + } - if (pFileState->hasFillCatch == false || isFlushedState(pFileState, pKey->ts, 0)) { - code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); - qTrace("state fill delete.%s at line %d res %d", __func__, __LINE__, code_file); + if (pFileState->hasFillCatch == false) { + int32_t code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); + qTrace("force clear expired file, ts:%" PRId64 ". %s at line %d res %d", pKey->ts, __func__, __LINE__, code_file); } } taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);