add compress

This commit is contained in:
Yihao Deng 2024-04-17 07:02:55 +00:00
parent 40390ad918
commit 1d9b65226e
1 changed files with 6 additions and 6 deletions

View File

@ -28,8 +28,8 @@
#define MIN_NUM_OF_ROW_BUFF 10240 #define MIN_NUM_OF_ROW_BUFF 10240
#define MIN_NUM_OF_RECOVER_ROW_BUFF 128 #define MIN_NUM_OF_RECOVER_ROW_BUFF 128
#define TASK_KEY "streamFileState" #define TASK_KEY "streamFileState"
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" #define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
struct SStreamFileState { struct SStreamFileState {
SList* usedBuffs; SList* usedBuffs;
@ -560,7 +560,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1; int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64;
char* buf = taosMemoryCalloc(1, len); char* buf = taosMemoryCalloc(1, len);
void* batch = streamStateCreateBatch(); void* batch = streamStateCreateBatch();
@ -612,7 +612,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
char keyBuf[128] = {0}; char keyBuf[128] = {0};
sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId); sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId);
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
} }
@ -622,8 +622,8 @@ int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
} }
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int64_t maxCheckPointId = 0; int64_t maxCheckPointId = 0;
{ {
char buf[128] = {0}; char buf[128] = {0};
void* val = NULL; void* val = NULL;