From 1d9b65226e22af9a8686fddf0714dc5600135fb5 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 07:02:55 +0000 Subject: [PATCH] add compress --- source/libs/stream/src/tstreamFileState.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 6014350f58..976006dbbd 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -28,8 +28,8 @@ #define MIN_NUM_OF_ROW_BUFF 10240 #define MIN_NUM_OF_RECOVER_ROW_BUFF 128 -#define TASK_KEY "streamFileState" -#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" +#define TASK_KEY "streamFileState" +#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" struct SStreamFileState { SList* usedBuffs; @@ -560,7 +560,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); - int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1; + int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64; char* buf = taosMemoryCalloc(1, len); void* batch = streamStateCreateBatch(); @@ -612,7 +612,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { - char keyBuf[128] = {0}; + char keyBuf[128] = {0}; sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } @@ -622,8 +622,8 @@ int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { } int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t maxCheckPointId = 0; + int32_t code = TSDB_CODE_SUCCESS; + int64_t maxCheckPointId = 0; { char buf[128] = {0}; void* val = NULL;