From 80f348de1db301f7977c77be20c2c12603b17141 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Apr 2023 14:05:59 +0000 Subject: [PATCH] fix recover --- source/libs/stream/src/streamStateRocksdb.c | 43 +++++++-------------- source/libs/stream/src/tstreamFileState.c | 15 ++----- 2 files changed, 19 insertions(+), 39 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index e0d0ae6362..4f51a5909d 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -489,7 +489,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ - return -1; \ + code = -1; \ + break; \ } \ char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ @@ -515,7 +516,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ - return -1; \ + code = -1; \ + break; \ } \ char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ @@ -550,7 +552,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ - return -1; \ + code = -1; \ + break; \ } \ char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ @@ -970,36 +973,20 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke } SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + int32_t code = 0; + const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; + STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); + char buf[128] = {0}; + int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); + rocksdb_iter_prev(pCur->iter); - SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; - int len = stateKeyEncode((void*)&sKey, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_to_last(pCur->iter); - } else { - rocksdb_iter_seek_to_last(pCur->iter); - } - return pCur; - - // rocksdb_iter_seek(pCur->iter, buf, len); - // if (rocksdb_iter_valid(pCur->iter)) { - // SStateKey curKey; - // size_t kLen = 0; - // char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - // stateKeyDecode((void*)&curKey, keyStr); - - // if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { - // pCur->number = pState->number; - // return pCur; - // } - // } - // streamStateFreeCur(pCur); + STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 4a50326260..731bdc4458 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -327,7 +327,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, SListIter iter = {0}; tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD); - const int32_t BATCH_LIMIT = 128; + const int32_t BATCH_LIMIT = 256; SListNode* pNode = NULL; void* batch = streamStateCreateBatch(); @@ -414,6 +414,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { sscanf(val, "%" PRId64 "", &ts); taosMemoryFree(val); if (ts < mark) { + // statekey winkey.ts < mark forceRemoveCheckpoint(pFileState, i); break; } else { @@ -428,16 +429,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { void* pStVal = NULL; int32_t len = 0; - SWinKey key = {.groupId = 0, .ts = 0}; - // SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); - // if (!pCur) { - // return TSDB_CODE_FAILED; - // } - // code = streamStateSeekLast(pFileState->pFileStore, pCur); - // if (code != TSDB_CODE_SUCCESS) { - // return code; - // } - SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pState, &key); + SWinKey key = {.groupId = 0, .ts = 0}; + SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key); if (pCur == NULL) { return -1; }