diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a69fd54f9a..b34b3420fe 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -65,7 +65,7 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur); int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); @@ -85,11 +85,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState); +int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur); int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); + int32_t streamStateSessionClear_rocksdb(SStreamState* pState); int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ccf71e71ff..675deed233 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1962,7 +1962,7 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke memset(*pVal, 0, size); return 0; } -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { +int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) { qDebug("streamStateCurPrev_rocksdb"); if (!pCur) return -1; @@ -2190,10 +2190,6 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SStreamStateCur* pCur = createStreamStateCursor(); - if (pCur == NULL) { - return NULL; - } - pCur->number = pState->number; pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, @@ -2215,6 +2211,15 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey); return pCur; } + +int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) { + qDebug("streamStateCurPrev_rocksdb"); + if (!pCur) return -1; + + rocksdb_iter_prev(pCur->iter); + return 0; +} + SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 7c5fcba10c..60b93a5590 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -670,7 +670,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { #ifdef USE_ROCKSDB - return streamStateCurPrev_rocksdb(pState, pCur); + return streamStateCurPrev_rocksdb(pCur); #else if (!pCur) { return -1; @@ -714,7 +714,7 @@ void streamStateFreeVal(void* val) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) { #ifdef USE_ROCKSDB - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SRowBuffPos* pos = (SRowBuffPos*)value; if (pos->needFree) { if (isFlushedState(pState->pFileState, key->win.ekey, 0)) { @@ -725,7 +725,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void streamStateReleaseBuf(pState, pos, true); putFreeBuff(pState->pFileState, pos); qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, - key->win.ekey, key->groupId, code); + key->win.ekey, key->groupId, code); } else { code = putSessionWinResultBuff(pState->pFileState, value); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index d5258964f2..ac404893f0 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -641,6 +641,26 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { return code; } +int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { + int code = TSDB_CODE_SUCCESS; + SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore); + if (pCur == NULL) { + return -1; + } + while (code == TSDB_CODE_SUCCESS) { + void* pVal = NULL; + int32_t vlen = 0; + SSessionKey key = {0}; + code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen); + if (code != 0) { + break; + } + taosMemoryFree(pVal); + code = streamStateSessionCurPrev_rocksdb(pCur); + } + streamStateFreeCur(pCur); + return code; +} int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; if (pFileState->maxTs != INT64_MIN) { @@ -660,9 +680,9 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { break; } void* pVal = NULL; - int32_t pVLen = 0; + int32_t vlen = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen); + code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -670,8 +690,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { taosMemoryFreeClear(pVal); break; } - ASSERT(pVLen == pFileState->rowSize); - memcpy(pNewPos->pRowBuff, pVal, pVLen); + ASSERT(vlen == pFileState->rowSize); + memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); @@ -679,7 +699,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { destroyRowBuffPos(pNewPos); break; } - code = streamStateCurPrev_rocksdb(pFileState->pFileStore, pCur); + code = streamStateCurPrev_rocksdb(pCur); } streamStateFreeCur(pCur);