diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 39854d1824..b34b3420fe 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -65,11 +65,11 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur); int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); -SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState); SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); // func cf @@ -84,10 +84,14 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); +SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState); +int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur); + int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); + int32_t streamStateSessionClear_rocksdb(SStreamState* pState); int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index fb3551778a..6cef3cca75 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1970,7 +1970,7 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke memset(*pVal, 0, size); return 0; } -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { +int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) { qDebug("streamStateCurPrev_rocksdb"); if (!pCur) return -1; @@ -2052,7 +2052,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } -SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { qDebug("streamStateGetCur_rocksdb"); int32_t code = 0; @@ -2062,9 +2062,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK return NULL; } - char buf[128] = {0}; - int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); - { char tbuf[256] = {0}; stateKeyToString((void*)&maxStateKey, tbuf); @@ -2079,6 +2076,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + char buf[128] = {0}; + int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_prev(pCur->iter); while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { @@ -2183,6 +2182,52 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey); return code; } + +SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { + qDebug("streamStateSessionSeekToLast_rocksdb"); + + int32_t code = 0; + + SSessionKey maxSessionKey = {.groupId = UINT64_MAX, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}}; + SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = INT64_MAX}; + + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0); + if (code != 0) { + return NULL; + } + + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = createStreamStateCursor(); + pCur->number = pState->number; + pCur->db = wrapper->rocksdb; + pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); + + char buf[128] = {0}; + int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf); + rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); + rocksdb_iter_prev(pCur->iter); + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_prev(pCur->iter); + } + + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + pCur = NULL; + } + + STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey); + return pCur; +} + +int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) { + qDebug("streamStateCurPrev_rocksdb"); + if (!pCur) return -1; + + rocksdb_iter_prev(pCur->iter); + return 0; +} + SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); @@ -2301,6 +2346,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con } return pCur; } + int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { qDebug("streamStateSessionGetKVByCur_rocksdb"); if (!pCur) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 7c5fcba10c..60b93a5590 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -670,7 +670,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { #ifdef USE_ROCKSDB - return streamStateCurPrev_rocksdb(pState, pCur); + return streamStateCurPrev_rocksdb(pCur); #else if (!pCur) { return -1; @@ -714,7 +714,7 @@ void streamStateFreeVal(void* val) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) { #ifdef USE_ROCKSDB - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SRowBuffPos* pos = (SRowBuffPos*)value; if (pos->needFree) { if (isFlushedState(pState->pFileState, key->win.ekey, 0)) { @@ -725,7 +725,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void streamStateReleaseBuf(pState, pos, true); putFreeBuff(pState->pFileState, pos); qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, - key->win.ekey, key->groupId, code); + key->win.ekey, key->groupId, code); } else { code = putSessionWinResultBuff(pState->pFileState, value); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 8f4c13c12d..ac404893f0 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -58,7 +58,7 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { +int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) { SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen); if (pos) { (*pos)->beFlushed = true; @@ -67,7 +67,7 @@ int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { } int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { - size_t keyLen = pFileState->keyLen; + size_t keyLen = pFileState->keyLen; SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen); if (ppPos) { if ((*ppPos) == pPos) { @@ -77,13 +77,9 @@ int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pP return TSDB_CODE_SUCCESS; } -void stateHashBuffClearFn(void* pBuff) { - tSimpleHashClear(pBuff); -} +void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); } -void stateHashBuffCleanupFn(void* pBuff) { - tSimpleHashCleanup(pBuff); -} +void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); } int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { return streamStateDel_rocksdb(pFileState->pFileStore, pKey); @@ -95,7 +91,7 @@ int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) { SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey)); - SWinKey* pWinKey = pPos->pKey; + SWinKey* pWinKey = pPos->pKey; pStateKey->key = *pWinKey; pStateKey->opNum = num; return pStateKey; @@ -111,15 +107,15 @@ int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, i void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) { SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey)); - SSessionKey* pWinKey = pPos->pKey; + SSessionKey* pWinKey = pPos->pKey; pStateKey->key = *pWinKey; pStateKey->opNum = num; return pStateKey; } SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, - GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, - int64_t checkpointId, int8_t type) { + GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, + int8_t type) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -178,7 +174,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->maxTs = INT64_MIN; pFileState->id = taosStrdup(taskId); - //todo(liuyao) optimize + // todo(liuyao) optimize if (type == STREAM_STATE_BUFF_HASH) { recoverSnapshot(pFileState, checkpointId); } @@ -290,9 +286,7 @@ void streamFileStateClear(SStreamFileState* pFileState) { bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; } -void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { - pPos->beUsed = used; -} +void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; } void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { uint64_t i = 0; @@ -357,8 +351,8 @@ int32_t clearRowBuff(SStreamFileState* pFileState) { } void* getFreeBuff(SStreamFileState* pFileState) { - SList* lists = pFileState->freeBuffs; - int32_t buffSize = pFileState->rowSize; + SList* lists = pFileState->freeBuffs; + int32_t buffSize = pFileState->rowSize; SListNode* pNode = tdListPopHead(lists); if (!pNode) { return NULL; @@ -406,7 +400,7 @@ _end: } SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { - SRowBuffPos* newPos = getNewRowPos(pFileState); + SRowBuffPos* newPos = getNewRowPos(pFileState); newPos->beUsed = true; newPos->beFlushed = false; newPos->needFree = false; @@ -647,6 +641,26 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { return code; } +int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { + int code = TSDB_CODE_SUCCESS; + SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore); + if (pCur == NULL) { + return -1; + } + while (code == TSDB_CODE_SUCCESS) { + void* pVal = NULL; + int32_t vlen = 0; + SSessionKey key = {0}; + code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen); + if (code != 0) { + break; + } + taosMemoryFree(pVal); + code = streamStateSessionCurPrev_rocksdb(pCur); + } + streamStateFreeCur(pCur); + return code; +} int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; if (pFileState->maxTs != INT64_MIN) { @@ -656,8 +670,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { deleteExpiredCheckPoint(pFileState, mark); } - SWinKey key = {.groupId = 0, .ts = 0}; - SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key); + SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore); if (pCur == NULL) { return -1; } @@ -667,9 +680,9 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { break; } void* pVal = NULL; - int32_t pVLen = 0; + int32_t vlen = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen); + code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -677,8 +690,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { taosMemoryFreeClear(pVal); break; } - ASSERT(pVLen == pFileState->rowSize); - memcpy(pNewPos->pRowBuff, pVal, pVLen); + ASSERT(vlen == pFileState->rowSize); + memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); @@ -686,7 +699,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { destroyRowBuffPos(pNewPos); break; } - code = streamStateCurPrev_rocksdb(pFileState->pFileStore, pCur); + code = streamStateCurPrev_rocksdb(pCur); } streamStateFreeCur(pCur); @@ -700,22 +713,14 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { pFileState->maxTs = TMAX(pFileState->maxTs, ts); } -void* getRowStateBuff(SStreamFileState* pFileState) { - return pFileState->rowStateBuff; -} +void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; } -void* getStateFileStore(SStreamFileState* pFileState) { - return pFileState->pFileStore; -} +void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; } bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark); } -bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { - return ts <= (pFileState->flushMark + gap); -} +bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } -int32_t getRowStateRowSize(SStreamFileState* pFileState) { - return pFileState->rowSize; -} +int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }