diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 39854d1824..a69fd54f9a 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -69,7 +69,7 @@ int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); -SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState); SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); // func cf @@ -84,6 +84,8 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); +SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState); + int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 0cb9ba4aee..ccf71e71ff 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2044,7 +2044,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } -SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { qDebug("streamStateGetCur_rocksdb"); int32_t code = 0; @@ -2175,7 +2175,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k return code; } -SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, const SSessionKey* key) { +SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { qDebug("streamStateSessionSeekToLast_rocksdb"); int32_t code = 0; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 8f4c13c12d..d5258964f2 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -58,7 +58,7 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { +int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) { SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen); if (pos) { (*pos)->beFlushed = true; @@ -67,7 +67,7 @@ int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { } int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { - size_t keyLen = pFileState->keyLen; + size_t keyLen = pFileState->keyLen; SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen); if (ppPos) { if ((*ppPos) == pPos) { @@ -77,13 +77,9 @@ int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pP return TSDB_CODE_SUCCESS; } -void stateHashBuffClearFn(void* pBuff) { - tSimpleHashClear(pBuff); -} +void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); } -void stateHashBuffCleanupFn(void* pBuff) { - tSimpleHashCleanup(pBuff); -} +void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); } int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { return streamStateDel_rocksdb(pFileState->pFileStore, pKey); @@ -95,7 +91,7 @@ int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) { SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey)); - SWinKey* pWinKey = pPos->pKey; + SWinKey* pWinKey = pPos->pKey; pStateKey->key = *pWinKey; pStateKey->opNum = num; return pStateKey; @@ -111,15 +107,15 @@ int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, i void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) { SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey)); - SSessionKey* pWinKey = pPos->pKey; + SSessionKey* pWinKey = pPos->pKey; pStateKey->key = *pWinKey; pStateKey->opNum = num; return pStateKey; } SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, - GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, - int64_t checkpointId, int8_t type) { + GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, + int8_t type) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -178,7 +174,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->maxTs = INT64_MIN; pFileState->id = taosStrdup(taskId); - //todo(liuyao) optimize + // todo(liuyao) optimize if (type == STREAM_STATE_BUFF_HASH) { recoverSnapshot(pFileState, checkpointId); } @@ -290,9 +286,7 @@ void streamFileStateClear(SStreamFileState* pFileState) { bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; } -void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { - pPos->beUsed = used; -} +void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; } void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { uint64_t i = 0; @@ -357,8 +351,8 @@ int32_t clearRowBuff(SStreamFileState* pFileState) { } void* getFreeBuff(SStreamFileState* pFileState) { - SList* lists = pFileState->freeBuffs; - int32_t buffSize = pFileState->rowSize; + SList* lists = pFileState->freeBuffs; + int32_t buffSize = pFileState->rowSize; SListNode* pNode = tdListPopHead(lists); if (!pNode) { return NULL; @@ -406,7 +400,7 @@ _end: } SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { - SRowBuffPos* newPos = getNewRowPos(pFileState); + SRowBuffPos* newPos = getNewRowPos(pFileState); newPos->beUsed = true; newPos->beFlushed = false; newPos->needFree = false; @@ -656,8 +650,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { deleteExpiredCheckPoint(pFileState, mark); } - SWinKey key = {.groupId = 0, .ts = 0}; - SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key); + SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore); if (pCur == NULL) { return -1; } @@ -700,22 +693,14 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { pFileState->maxTs = TMAX(pFileState->maxTs, ts); } -void* getRowStateBuff(SStreamFileState* pFileState) { - return pFileState->rowStateBuff; -} +void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; } -void* getStateFileStore(SStreamFileState* pFileState) { - return pFileState->pFileStore; -} +void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; } bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark); } -bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { - return ts <= (pFileState->flushMark + gap); -} +bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } -int32_t getRowStateRowSize(SStreamFileState* pFileState) { - return pFileState->rowSize; -} +int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }