From 36eef2fdefe2461c5f291bcba16c9058177d145d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 13 Apr 2023 09:57:04 +0000 Subject: [PATCH 1/4] refactor code --- source/libs/stream/inc/streamBackendRocksdb.h | 10 +- source/libs/stream/src/streamStateRocksdb.c | 105 +++++++++++++----- 2 files changed, 86 insertions(+), 29 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a33a0a577b..01934ecf77 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -89,5 +89,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); -int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); + +void* streamDefaultIterCreate_rocksdb(SStreamState* pState); +int32_t streamDefaultIterValid_rocksdb(void* iter); +void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); +int32_t streamDefaultIter_rocksdb(void* iter); +char** streamDefaultIterKey_rocksdb(void* iter); +char* streamDefaultIterVal_rocksdb(void* iter); + +// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 9977732a9a..0fa7b78ce2 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -646,37 +646,86 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { return code; } +void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {} +int32_t streamDefaultIterValid_rocksdb(void* iter); +void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); +int32_t streamDefaultIter_rocksdb(void* iter); +char** streamDefaultIterKey_rocksdb(void* iter); +char* streamDefaultIterVal_rocksdb(void* iter); +// typedef struct { +// char* start; +// char* end; +// void* result; +// } StreamFilterArg; + +// typedef int (*streamfilterFunc)(StreamFilterArg* arg); + +// int32_t streamDefaultIterFilter_rocksdb(SStreamState* pState, streamfilterFunc filterFunc, StreamFilterArg* arg) { +// int code = 0; +// char* err = NULL; + +// rocksdb_snapshot_t* snapshot = NULL; +// rocksdb_readoptions_t* readopts = NULL; +// rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); +// if (pIter == NULL) { +// return -1; +// } +// char* start = arg->start; +// char* end = arg->end; +// SArray* result = arg->result; + +// rocksdb_iter_seek(pIter, start, strlen(start)); +// while (rocksdb_iter_valid(pIter)) { +// const char* key = rocksdb_iter_key(pIter, NULL); +// if (end != NULL && strcmp(key, end) > 0) { +// break; +// } +// if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { +// int64_t checkPoint = 0; +// // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { +// // taosArrayPush(result, &checkPoint); +// // } +// } else { +// break; +// } +// rocksdb_iter_next(pIter); +// } +// rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); +// rocksdb_readoptions_destroy(readopts); +// rocksdb_iter_destroy(pIter); +// return code; +// } int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { - int code = 0; - char* err = NULL; + // int code = 0; + // char* err = NULL; - rocksdb_snapshot_t* snapshot = NULL; - rocksdb_readoptions_t* readopts = NULL; - rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); - if (pIter == NULL) { - return -1; - } + // rocksdb_snapshot_t* snapshot = NULL; + // rocksdb_readoptions_t* readopts = NULL; + // rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); + // if (pIter == NULL) { + // return -1; + // } - rocksdb_iter_seek(pIter, start, strlen(start)); - while (rocksdb_iter_valid(pIter)) { - const char* key = rocksdb_iter_key(pIter, NULL); - if (end != NULL && strcmp(key, end) > 0) { - break; - } - if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { - int64_t checkPoint = 0; - if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - taosArrayPush(result, &checkPoint); - } - } else { - break; - } - rocksdb_iter_next(pIter); - } - rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); - rocksdb_readoptions_destroy(readopts); - rocksdb_iter_destroy(pIter); - return code; + // rocksdb_iter_seek(pIter, start, strlen(start)); + // while (rocksdb_iter_valid(pIter)) { + // const char* key = rocksdb_iter_key(pIter, NULL); + // if (end != NULL && strcmp(key, end) > 0) { + // break; + // } + // if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { + // int64_t checkPoint = 0; + // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { + // taosArrayPush(result, &checkPoint); + // } + // } else { + // break; + // } + // rocksdb_iter_next(pIter); + // } + // rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + // rocksdb_readoptions_destroy(readopts); + // rocksdb_iter_destroy(pIter); + // return code; } int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { From c0ebdb92d32562c08d1fe9f94608bae42b286170 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Apr 2023 02:10:33 +0000 Subject: [PATCH 2/4] startGroupTableMe --- source/libs/stream/inc/streamBackendRocksdb.h | 8 ++--- source/libs/stream/src/streamStateRocksdb.c | 35 +++++++++++++++---- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 01934ecf77..6e5fc53f3e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -92,10 +92,10 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); void* streamDefaultIterCreate_rocksdb(SStreamState* pState); int32_t streamDefaultIterValid_rocksdb(void* iter); -void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); -int32_t streamDefaultIter_rocksdb(void* iter); -char** streamDefaultIterKey_rocksdb(void* iter); -char* streamDefaultIterVal_rocksdb(void* iter); +void streamDefaultIterSeek_rocksdb(void* iter, const char* key); +void streamDefaultIterNext_rocksdb(void* iter); +char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); +char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 0fa7b78ce2..79776a0424 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -646,12 +646,35 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { return code; } -void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {} -int32_t streamDefaultIterValid_rocksdb(void* iter); -void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); -int32_t streamDefaultIter_rocksdb(void* iter); -char** streamDefaultIterKey_rocksdb(void* iter); -char* streamDefaultIterVal_rocksdb(void* iter); +void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); + return pCur; +} +int32_t streamDefaultIterValid_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + bool val = rocksdb_iter_valid(pCur->iter); + + return val ? 0 : -1; +} +void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { + SStreamStateCur* pCur = iter; + rocksdb_iter_seek(pCur->iter, key, strlen(key)); +} +void streamDefaultIterNext_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + rocksdb_iter_next(pCur->iter); +} +char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len); +} +char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len); +} // typedef struct { // char* start; // char* end; From 4c70cfdfad057f772a0eeeaa1a49c0ccea8cad41 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Apr 2023 09:36:08 +0000 Subject: [PATCH 3/4] startGroupTableMe --- source/libs/stream/inc/streamBackendRocksdb.h | 8 +- source/libs/stream/src/streamStateRocksdb.c | 92 +++++++++++++------ source/libs/stream/src/tstreamFileState.c | 23 +++-- 3 files changed, 82 insertions(+), 41 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 6e5fc53f3e..51ef3704cb 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -57,9 +57,10 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); -int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); -int32_t streamStateSessionClear_rocksdb(SStreamState* pState); -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); +int32_t streamStateSessionClear_rocksdb(SStreamState* pState); +int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); @@ -90,6 +91,7 @@ int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pV int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); +int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); void* streamDefaultIterCreate_rocksdb(SStreamState* pState); int32_t streamDefaultIterValid_rocksdb(void* iter); void streamDefaultIterSeek_rocksdb(void* iter, const char* key); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 79776a0424..e0d0ae6362 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -718,37 +718,37 @@ char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { // rocksdb_iter_destroy(pIter); // return code; // } -int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { - // int code = 0; - // char* err = NULL; +int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { + int code = 0; + char* err = NULL; - // rocksdb_snapshot_t* snapshot = NULL; - // rocksdb_readoptions_t* readopts = NULL; - // rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); - // if (pIter == NULL) { - // return -1; - // } + rocksdb_snapshot_t* snapshot = NULL; + rocksdb_readoptions_t* readopts = NULL; + rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); + if (pIter == NULL) { + return -1; + } - // rocksdb_iter_seek(pIter, start, strlen(start)); - // while (rocksdb_iter_valid(pIter)) { - // const char* key = rocksdb_iter_key(pIter, NULL); - // if (end != NULL && strcmp(key, end) > 0) { - // break; - // } - // if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { - // int64_t checkPoint = 0; - // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - // taosArrayPush(result, &checkPoint); - // } - // } else { - // break; - // } - // rocksdb_iter_next(pIter); - // } - // rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); - // rocksdb_readoptions_destroy(readopts); - // rocksdb_iter_destroy(pIter); - // return code; + rocksdb_iter_seek(pIter, start, strlen(start)); + while (rocksdb_iter_valid(pIter)) { + const char* key = rocksdb_iter_key(pIter, NULL); + if (end != NULL && strcmp(key, end) > 0) { + break; + } + if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { + int64_t checkPoint = 0; + if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { + taosArrayPush(result, &checkPoint); + } + } else { + break; + } + rocksdb_iter_next(pIter); + } + rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + rocksdb_readoptions_destroy(readopts); + rocksdb_iter_destroy(pIter); + return code; } int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { @@ -968,6 +968,40 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke memset(*pVal, 0, size); return 0; } +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateGetCur_rocksdb"); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + + if (pCur == NULL) return NULL; + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + + SStateKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + int len = stateKeyEncode((void*)&sKey, buf); + rocksdb_iter_seek(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + rocksdb_iter_seek_to_last(pCur->iter); + } else { + rocksdb_iter_seek_to_last(pCur->iter); + } + return pCur; + + // rocksdb_iter_seek(pCur->iter, buf, len); + // if (rocksdb_iter_valid(pCur->iter)) { + // SStateKey curKey; + // size_t kLen = 0; + // char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + // stateKeyDecode((void*)&curKey, keyStr); + + // if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { + // pCur->number = pState->number; + // return pCur; + // } + // } + // streamStateFreeCur(pCur); + return pCur; +} SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 88fdfb6b3c..4a50326260 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -382,7 +382,7 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { const char* taskKey = "streamFileState"; - return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list); + return streamDefaultIterGet_rocksdb(pFileState->pFileStore, taskKey, NULL, list); } int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { @@ -428,15 +428,20 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { void* pStVal = NULL; int32_t len = 0; - SWinKey key = {.groupId = 0, .ts = 0}; - SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); - if (!pCur) { - return TSDB_CODE_FAILED; - } - code = streamStateSeekLast(pFileState->pFileStore, pCur); - if (code != TSDB_CODE_SUCCESS) { - return code; + SWinKey key = {.groupId = 0, .ts = 0}; + // SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); + // if (!pCur) { + // return TSDB_CODE_FAILED; + // } + // code = streamStateSeekLast(pFileState->pFileStore, pCur); + // if (code != TSDB_CODE_SUCCESS) { + // return code; + // } + SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pState, &key); + if (pCur == NULL) { + return -1; } + while (code == TSDB_CODE_SUCCESS) { if (pFileState->curRowCount == pFileState->maxRowCount) { break; From 80f348de1db301f7977c77be20c2c12603b17141 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Apr 2023 14:05:59 +0000 Subject: [PATCH 4/4] fix recover --- source/libs/stream/src/streamStateRocksdb.c | 43 +++++++-------------- source/libs/stream/src/tstreamFileState.c | 15 ++----- 2 files changed, 19 insertions(+), 39 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index e0d0ae6362..4f51a5909d 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -489,7 +489,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ - return -1; \ + code = -1; \ + break; \ } \ char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ @@ -515,7 +516,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ - return -1; \ + code = -1; \ + break; \ } \ char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ @@ -550,7 +552,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ - return -1; \ + code = -1; \ + break; \ } \ char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ @@ -970,36 +973,20 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke } SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + int32_t code = 0; + const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; + STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); + char buf[128] = {0}; + int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); + rocksdb_iter_prev(pCur->iter); - SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; - int len = stateKeyEncode((void*)&sKey, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_to_last(pCur->iter); - } else { - rocksdb_iter_seek_to_last(pCur->iter); - } - return pCur; - - // rocksdb_iter_seek(pCur->iter, buf, len); - // if (rocksdb_iter_valid(pCur->iter)) { - // SStateKey curKey; - // size_t kLen = 0; - // char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - // stateKeyDecode((void*)&curKey, keyStr); - - // if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { - // pCur->number = pState->number; - // return pCur; - // } - // } - // streamStateFreeCur(pCur); + STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 4a50326260..731bdc4458 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -327,7 +327,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, SListIter iter = {0}; tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD); - const int32_t BATCH_LIMIT = 128; + const int32_t BATCH_LIMIT = 256; SListNode* pNode = NULL; void* batch = streamStateCreateBatch(); @@ -414,6 +414,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { sscanf(val, "%" PRId64 "", &ts); taosMemoryFree(val); if (ts < mark) { + // statekey winkey.ts < mark forceRemoveCheckpoint(pFileState, i); break; } else { @@ -428,16 +429,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { void* pStVal = NULL; int32_t len = 0; - SWinKey key = {.groupId = 0, .ts = 0}; - // SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); - // if (!pCur) { - // return TSDB_CODE_FAILED; - // } - // code = streamStateSeekLast(pFileState->pFileStore, pCur); - // if (code != TSDB_CODE_SUCCESS) { - // return code; - // } - SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pState, &key); + SWinKey key = {.groupId = 0, .ts = 0}; + SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key); if (pCur == NULL) { return -1; }