Merge pull request #25379 from taosdata/feat/addCompress
Feat/add compress
This commit is contained in:
commit
614f1f5a9d
|
@ -83,7 +83,8 @@ int32_t streamStateClearBuff(SStreamState* pState, void* pVal);
|
||||||
void streamStateFreeVal(void* val);
|
void streamStateFreeVal(void* val);
|
||||||
|
|
||||||
// count window
|
// count window
|
||||||
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen);
|
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal,
|
||||||
|
int32_t* pVLen);
|
||||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
|
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
|
||||||
|
@ -128,6 +129,8 @@ typedef struct SStateSessionKey {
|
||||||
typedef struct SStreamValue {
|
typedef struct SStreamValue {
|
||||||
int64_t unixTimestamp;
|
int64_t unixTimestamp;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
int32_t rawLen;
|
||||||
|
int8_t compress;
|
||||||
char* data;
|
char* data;
|
||||||
} SStreamValue;
|
} SStreamValue;
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
|
#include "lz4.h"
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
@ -1550,48 +1551,107 @@ void destroyCompare(void* arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
||||||
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
|
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
char* dst = NULL;
|
||||||
|
if (vlen > 512) {
|
||||||
|
dst = taosMemoryCalloc(1, vlen + 128);
|
||||||
|
int32_t dstCap = vlen + 128;
|
||||||
|
int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
|
||||||
|
if (compressedSize < vlen) {
|
||||||
|
key.compress = 1;
|
||||||
|
key.len = compressedSize;
|
||||||
|
value = dst;
|
||||||
|
}
|
||||||
|
stDebug("vlen: raw size: %d, compressed size: %d", vlen, compressedSize);
|
||||||
|
}
|
||||||
|
|
||||||
if (*dest == NULL) {
|
if (*dest == NULL) {
|
||||||
char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len);
|
char* p = taosMemoryCalloc(
|
||||||
|
1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len);
|
||||||
char* buf = p;
|
char* buf = p;
|
||||||
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.len);
|
len += taosEncodeFixedI32((void**)&buf, key.len);
|
||||||
len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
|
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
||||||
|
len += taosEncodeFixedI8((void**)&buf, key.compress);
|
||||||
|
len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
|
||||||
*dest = p;
|
*dest = p;
|
||||||
} else {
|
} else {
|
||||||
char* buf = *dest;
|
char* buf = *dest;
|
||||||
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.len);
|
len += taosEncodeFixedI32((void**)&buf, key.len);
|
||||||
len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
|
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
||||||
|
len += taosEncodeFixedI8((void**)&buf, key.compress);
|
||||||
|
len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(dst);
|
||||||
|
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ret >= 0 : found valid value
|
* ret >= 0 : found valid value
|
||||||
* ret < 0 : error or timeout
|
* ret < 0 : error or timeout
|
||||||
*/
|
*/
|
||||||
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
||||||
|
int32_t code = -1;
|
||||||
SStreamValue key = {0};
|
SStreamValue key = {0};
|
||||||
char* p = value;
|
char* p = value;
|
||||||
|
|
||||||
|
char* pCompressData = NULL;
|
||||||
|
char* pOutput = NULL;
|
||||||
if (streamStateValueIsStale(p)) {
|
if (streamStateValueIsStale(p)) {
|
||||||
goto _EXCEPT;
|
goto _EXCEPT;
|
||||||
}
|
}
|
||||||
|
|
||||||
p = taosDecodeFixedI64(p, &key.unixTimestamp);
|
p = taosDecodeFixedI64(p, &key.unixTimestamp);
|
||||||
p = taosDecodeFixedI32(p, &key.len);
|
p = taosDecodeFixedI32(p, &key.len);
|
||||||
if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) {
|
if (key.len == 0) {
|
||||||
stError("vlen: %d, read len: %d", vlen, key.len);
|
code = 0;
|
||||||
goto _EXCEPT;
|
goto _EXCEPT;
|
||||||
}
|
}
|
||||||
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
|
if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
|
||||||
|
// compatiable with previous data
|
||||||
|
p = taosDecodeBinary(p, (void**)&pOutput, key.len);
|
||||||
|
} else {
|
||||||
|
p = taosDecodeFixedI32(p, &key.rawLen);
|
||||||
|
p = taosDecodeFixedI8(p, &key.compress);
|
||||||
|
if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
|
||||||
|
stError("vlen: %d, read len: %d", vlen, key.len);
|
||||||
|
goto _EXCEPT;
|
||||||
|
}
|
||||||
|
if (key.compress == 1) {
|
||||||
|
p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
|
||||||
|
pOutput = taosMemoryCalloc(1, key.rawLen);
|
||||||
|
int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
|
||||||
|
if (rawLen != key.rawLen) {
|
||||||
|
stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
|
||||||
|
goto _EXCEPT;
|
||||||
|
}
|
||||||
|
key.len = rawLen;
|
||||||
|
} else {
|
||||||
|
p = taosDecodeBinary(p, (void**)&pOutput, key.len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
|
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
if (dest) {
|
||||||
|
*dest = pOutput;
|
||||||
|
pOutput = NULL;
|
||||||
|
}
|
||||||
|
taosMemoryFree(pCompressData);
|
||||||
|
taosMemoryFree(pOutput);
|
||||||
return key.len;
|
return key.len;
|
||||||
|
|
||||||
_EXCEPT:
|
_EXCEPT:
|
||||||
if (dest != NULL) *dest = NULL;
|
if (dest != NULL) *dest = NULL;
|
||||||
if (ttl != NULL) *ttl = 0;
|
if (ttl != NULL) *ttl = 0;
|
||||||
return -1;
|
|
||||||
|
taosMemoryFree(pOutput);
|
||||||
|
taosMemoryFree(pCompressData);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* compareDefaultName(void* arg) {
|
const char* compareDefaultName(void* arg) {
|
||||||
|
|
|
@ -28,8 +28,8 @@
|
||||||
#define MIN_NUM_OF_ROW_BUFF 10240
|
#define MIN_NUM_OF_ROW_BUFF 10240
|
||||||
#define MIN_NUM_OF_RECOVER_ROW_BUFF 128
|
#define MIN_NUM_OF_RECOVER_ROW_BUFF 128
|
||||||
|
|
||||||
#define TASK_KEY "streamFileState"
|
#define TASK_KEY "streamFileState"
|
||||||
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
|
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
|
||||||
|
|
||||||
struct SStreamFileState {
|
struct SStreamFileState {
|
||||||
SList* usedBuffs;
|
SList* usedBuffs;
|
||||||
|
@ -560,7 +560,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
|
|
||||||
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
|
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
|
||||||
|
|
||||||
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1;
|
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64;
|
||||||
char* buf = taosMemoryCalloc(1, len);
|
char* buf = taosMemoryCalloc(1, len);
|
||||||
|
|
||||||
void* batch = streamStateCreateBatch();
|
void* batch = streamStateCreateBatch();
|
||||||
|
@ -612,7 +612,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
|
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
|
||||||
char keyBuf[128] = {0};
|
char keyBuf[128] = {0};
|
||||||
sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId);
|
sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId);
|
||||||
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
|
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
|
||||||
}
|
}
|
||||||
|
@ -622,8 +622,8 @@ int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
|
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int64_t maxCheckPointId = 0;
|
int64_t maxCheckPointId = 0;
|
||||||
{
|
{
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
void* val = NULL;
|
void* val = NULL;
|
||||||
|
|
Loading…
Reference in New Issue