add ttl to stream state key
This commit is contained in:
parent
f2c36621b5
commit
ae3730c318
|
@ -40,6 +40,7 @@ typedef struct STdbState {
|
||||||
struct SStreamTask* pOwner;
|
struct SStreamTask* pOwner;
|
||||||
void* param;
|
void* param;
|
||||||
void* env;
|
void* env;
|
||||||
|
void* compactFactory;
|
||||||
|
|
||||||
TDB* db;
|
TDB* db;
|
||||||
TTB* pStateDb;
|
TTB* pStateDb;
|
||||||
|
@ -149,11 +150,11 @@ typedef struct SStateSessionKey {
|
||||||
int64_t opNum;
|
int64_t opNum;
|
||||||
} SStateSessionKey;
|
} SStateSessionKey;
|
||||||
|
|
||||||
typedef struct streamValue {
|
typedef struct SStreamValue {
|
||||||
int64_t unixTimestamp;
|
int64_t unixTimestamp;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
char data[0];
|
char* data;
|
||||||
} streamValue;
|
} SStreamValue;
|
||||||
|
|
||||||
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
||||||
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
||||||
|
|
|
@ -289,24 +289,24 @@ int parKeyToString(void* k, char* buf) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
int stremaValueEncode(void* k, char* buf) {
|
int stremaValueEncode(void* k, char* buf) {
|
||||||
int len = 0;
|
int len = 0;
|
||||||
streamValue* key = k;
|
SStreamValue* key = k;
|
||||||
len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp);
|
len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key->len);
|
len += taosEncodeFixedI32((void**)&buf, key->len);
|
||||||
len += taosEncodeBinary((void**)&buf, key->data, key->len);
|
len += taosEncodeBinary((void**)&buf, key->data, key->len);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
int streamValueDecode(void* k, char* buf) {
|
int streamValueDecode(void* k, char* buf) {
|
||||||
streamValue* key = k;
|
SStreamValue* key = k;
|
||||||
char* p = buf;
|
char* p = buf;
|
||||||
p = taosDecodeFixedI64(p, &key->unixTimestamp);
|
p = taosDecodeFixedI64(p, &key->unixTimestamp);
|
||||||
p = taosDecodeFixedI32(p, &key->len);
|
p = taosDecodeFixedI32(p, &key->len);
|
||||||
p = taosDecodeBinary(p, (void**)&key->data, key->len);
|
p = taosDecodeBinary(p, (void**)&key->data, key->len);
|
||||||
return p - buf;
|
return p - buf;
|
||||||
}
|
}
|
||||||
int32_t streamValueToString(void* k, char* buf) {
|
int32_t streamValueToString(void* k, char* buf) {
|
||||||
streamValue* key = k;
|
SStreamValue* key = k;
|
||||||
int n = 0;
|
int n = 0;
|
||||||
n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
|
n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
|
||||||
n += sprintf(buf + n, "len:%d,", key->len);
|
n += sprintf(buf + n, "len:%d,", key->len);
|
||||||
n += sprintf(buf + n, "data:%s]", key->data);
|
n += sprintf(buf + n, "data:%s]", key->data);
|
||||||
|
@ -315,7 +315,7 @@ int32_t streamValueToString(void* k, char* buf) {
|
||||||
/*1: stale, 0: no stale*/
|
/*1: stale, 0: no stale*/
|
||||||
|
|
||||||
int32_t streaValueIsStale(void* k, int64_t ts) {
|
int32_t streaValueIsStale(void* k, int64_t ts) {
|
||||||
streamValue* key = k;
|
SStreamValue* key = k;
|
||||||
if (key->unixTimestamp < ts) {
|
if (key->unixTimestamp < ts) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -325,6 +325,8 @@ int32_t streaValueIsStale(void* k, int64_t ts) {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void* tableOpt;
|
void* tableOpt;
|
||||||
void* lru; // global or not
|
void* lru; // global or not
|
||||||
|
void* filteFactory;
|
||||||
|
|
||||||
} rocksdbCfParam;
|
} rocksdbCfParam;
|
||||||
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
||||||
|
|
||||||
|
@ -334,6 +336,8 @@ typedef int (*ToStringFunc)(void* key, char* buf);
|
||||||
typedef const char* (*CompareName)(void* statue);
|
typedef const char* (*CompareName)(void* statue);
|
||||||
typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
|
typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
|
||||||
typedef void (*DestroyFunc)(void* state);
|
typedef void (*DestroyFunc)(void* state);
|
||||||
|
typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest);
|
||||||
|
typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest);
|
||||||
|
|
||||||
const char* compareDefaultName(void* name);
|
const char* compareDefaultName(void* name);
|
||||||
const char* compareStateName(void* name);
|
const char* compareStateName(void* name);
|
||||||
|
@ -346,28 +350,57 @@ const char* comparePartagKeyName(void* name);
|
||||||
void destroyFunc(void* stata) { return; }
|
void destroyFunc(void* stata) { return; }
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
const char* key;
|
const char* key;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int idx;
|
int idx;
|
||||||
BackendCmpFunc cmpFunc;
|
BackendCmpFunc cmpFunc;
|
||||||
EncodeFunc enFunc;
|
EncodeFunc enFunc;
|
||||||
DecodeFunc deFunc;
|
DecodeFunc deFunc;
|
||||||
ToStringFunc toStrFunc;
|
ToStringFunc toStrFunc;
|
||||||
CompareName cmpName;
|
CompareName cmpName;
|
||||||
DestroyFunc detroyFunc;
|
DestroyFunc detroyFunc;
|
||||||
|
EncodeValueFunc enValueFunc;
|
||||||
|
DecodeValueFunc deValueFunc;
|
||||||
|
|
||||||
} SCfInit;
|
} SCfInit;
|
||||||
|
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
||||||
|
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
|
||||||
|
|
||||||
|
char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len);
|
||||||
|
char* buf = p;
|
||||||
|
int32_t len = 0;
|
||||||
|
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
||||||
|
len += taosEncodeFixedI32((void**)&buf, key.len);
|
||||||
|
len += taosEncodeBinary((void**)&buf, (char*)value, vlen);
|
||||||
|
*dest = p;
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
|
||||||
|
SStreamValue key = {0};
|
||||||
|
char* p = value;
|
||||||
|
p = taosDecodeFixedI64(p, &key.unixTimestamp);
|
||||||
|
p = taosDecodeFixedI32(p, &key.len);
|
||||||
|
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
|
||||||
|
|
||||||
|
*ttl = key.unixTimestamp;
|
||||||
|
*dest = key.data;
|
||||||
|
return key.len;
|
||||||
|
}
|
||||||
SCfInit ginitDict[] = {
|
SCfInit ginitDict[] = {
|
||||||
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
|
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
|
||||||
destroyFunc},
|
destroyFunc, encodeValueFunc, decodeValueFunc},
|
||||||
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc},
|
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
|
||||||
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc},
|
encodeValueFunc, decodeValueFunc},
|
||||||
|
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
|
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
|
||||||
compareSessionKeyName, destroyFunc},
|
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
|
||||||
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc},
|
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
|
||||||
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc},
|
encodeValueFunc, decodeValueFunc},
|
||||||
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc},
|
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
|
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
|
||||||
|
encodeValueFunc, decodeValueFunc},
|
||||||
};
|
};
|
||||||
|
|
||||||
const char* compareDefaultName(void* name) { return ginitDict[0].key; }
|
const char* compareDefaultName(void* name) { return ginitDict[0].key; }
|
||||||
|
@ -378,6 +411,41 @@ const char* compareFuncKeyName(void* name) { return ginitDict[4].key; }
|
||||||
const char* compareParKeyName(void* name) { return ginitDict[5].key; }
|
const char* compareParKeyName(void* name) { return ginitDict[5].key; }
|
||||||
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; }
|
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; }
|
||||||
|
|
||||||
|
typedef struct SCompactFilteFactory {
|
||||||
|
void* status;
|
||||||
|
} SCompactFilteFactory;
|
||||||
|
|
||||||
|
void destroyCompactFilteFactory(void* arg) {
|
||||||
|
if (arg == NULL) return;
|
||||||
|
}
|
||||||
|
const char* compactFilteFactoryName(void* arg) {
|
||||||
|
SCompactFilteFactory* state = arg;
|
||||||
|
return "stream_compact_filter";
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyCompactFilte(void* arg) {
|
||||||
|
if (arg == NULL) return;
|
||||||
|
}
|
||||||
|
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
||||||
|
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
||||||
|
int64_t unixTime = taosGetTimestampMs();
|
||||||
|
SStreamValue value;
|
||||||
|
memset(&value, 0, sizeof(value));
|
||||||
|
streamValueDecode(&value, (char*)val);
|
||||||
|
taosMemoryFree(value.data);
|
||||||
|
if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
const char* compactFilteName(void* arg) { return "stream_filte"; }
|
||||||
|
|
||||||
|
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
|
||||||
|
SCompactFilteFactory* state = arg;
|
||||||
|
rocksdb_compactionfilter_t* filter =
|
||||||
|
rocksdb_compactionfilter_create(NULL, destroyCompactFilte, compactFilte, compactFilteName);
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
int streamInitBackend(SStreamState* pState, char* path) {
|
int streamInitBackend(SStreamState* pState, char* path) {
|
||||||
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
|
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
|
||||||
rocksdb_env_set_low_priority_background_threads(env, 4);
|
rocksdb_env_set_low_priority_background_threads(env, 4);
|
||||||
|
@ -392,6 +460,10 @@ int streamInitBackend(SStreamState* pState, char* path) {
|
||||||
rocksdb_options_set_create_missing_column_families(opts, 1);
|
rocksdb_options_set_create_missing_column_families(opts, 1);
|
||||||
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
|
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
|
||||||
|
|
||||||
|
rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create(
|
||||||
|
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
|
||||||
|
rocksdb_options_set_compaction_filter_factory(opts, factory);
|
||||||
|
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||||
|
|
||||||
|
@ -411,8 +483,6 @@ int streamInitBackend(SStreamState* pState, char* path) {
|
||||||
|
|
||||||
param[i].tableOpt = tableOpt;
|
param[i].tableOpt = tableOpt;
|
||||||
param[i].lru = cache;
|
param[i].lru = cache;
|
||||||
// rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8);
|
|
||||||
// rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
|
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
|
||||||
|
@ -437,6 +507,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
|
||||||
pState->pTdbState->dbOpt = opts;
|
pState->pTdbState->dbOpt = opts;
|
||||||
pState->pTdbState->param = param;
|
pState->pTdbState->param = param;
|
||||||
pState->pTdbState->env = env;
|
pState->pTdbState->env = env;
|
||||||
|
pState->pTdbState->compactFactory = factory;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void streamCleanBackend(SStreamState* pState) {
|
void streamCleanBackend(SStreamState* pState) {
|
||||||
|
@ -468,11 +539,13 @@ void streamCleanBackend(SStreamState* pState) {
|
||||||
|
|
||||||
rocksdb_cache_destroy(param[i].lru);
|
rocksdb_cache_destroy(param[i].lru);
|
||||||
rocksdb_block_based_options_destroy(param[i].tableOpt);
|
rocksdb_block_based_options_destroy(param[i].tableOpt);
|
||||||
|
// rocksdb_compactionfilterfactory_destroy(param[i].filteFactory);
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pState->pTdbState->cfOpts);
|
taosMemoryFreeClear(pState->pTdbState->cfOpts);
|
||||||
taosMemoryFree(pState->pTdbState->pCompare);
|
taosMemoryFree(pState->pTdbState->pCompare);
|
||||||
taosMemoryFree(pState->pTdbState->param);
|
taosMemoryFree(pState->pTdbState->param);
|
||||||
rocksdb_env_destroy(pState->pTdbState->env);
|
rocksdb_env_destroy(pState->pTdbState->env);
|
||||||
|
rocksdb_compactionfilterfactory_destroy(pState->pTdbState->compactFactory);
|
||||||
|
|
||||||
pState->pTdbState->rocksdb = NULL;
|
pState->pTdbState->rocksdb = NULL;
|
||||||
}
|
}
|
||||||
|
@ -514,31 +587,34 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
|
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
||||||
do { \
|
do { \
|
||||||
code = 0; \
|
code = 0; \
|
||||||
char buf[128] = {0}; \
|
char buf[128] = {0}; \
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamGetInit(funcname); \
|
int i = streamGetInit(funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
qWarn("streamState failed to get cf name: %s", funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
||||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||||
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
||||||
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \
|
char* ttlV = NULL; \
|
||||||
if (err != NULL) { \
|
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
|
||||||
taosMemoryFree(err); \
|
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
|
||||||
qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
if (err != NULL) { \
|
||||||
code = -1; \
|
taosMemoryFree(err); \
|
||||||
} else { \
|
qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
||||||
qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
|
code = -1; \
|
||||||
} \
|
} else { \
|
||||||
|
qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
|
||||||
|
} \
|
||||||
|
taosMemoryFree(ttlV); \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
||||||
|
@ -565,8 +641,15 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
if (err != NULL) taosMemoryFree(err); \
|
if (err != NULL) taosMemoryFree(err); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
} else { \
|
} else { \
|
||||||
if (pVal != NULL) *pVal = val; \
|
char * p = NULL, *end = NULL; \
|
||||||
if (vLen != NULL) *vLen = len; \
|
int64_t ttl; \
|
||||||
|
int32_t vlen = ginitDict[i].deValueFunc(val, len, &ttl, &p); \
|
||||||
|
if (pVal != NULL) { \
|
||||||
|
*pVal = p; \
|
||||||
|
} else { \
|
||||||
|
taosMemoryFree(p); \
|
||||||
|
} \
|
||||||
|
if (vLen != NULL) *vLen = vlen; \
|
||||||
} \
|
} \
|
||||||
if (err != NULL) { \
|
if (err != NULL) { \
|
||||||
taosMemoryFree(err); \
|
taosMemoryFree(err); \
|
||||||
|
@ -606,7 +689,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
|
|
||||||
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
|
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, value, vLen);
|
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
|
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
|
||||||
|
@ -624,7 +707,7 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, value, vLen);
|
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
||||||
|
@ -661,8 +744,11 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
|
||||||
|
|
||||||
|
char* ttlV = NULL;
|
||||||
|
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
|
||||||
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
|
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
|
||||||
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen);
|
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
||||||
|
taosMemoryFree(ttlV);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -844,7 +930,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
|
||||||
sKeyStr, sLen, eKeyStr, eLen, &err);
|
sKeyStr, sLen, eKeyStr, eLen, &err);
|
||||||
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen);
|
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qWarn("failed to delete range cf(state) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd);
|
qWarn(
|
||||||
|
"failed to delete range cf(state) err: %s, "
|
||||||
|
"start: %s, end:%s",
|
||||||
|
err, toStringStart, toStringEnd);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -923,7 +1012,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
||||||
|
|
||||||
rocksdb_iter_prev(pCur->iter);
|
rocksdb_iter_prev(pCur->iter);
|
||||||
if (!rocksdb_iter_valid(pCur->iter)) {
|
if (!rocksdb_iter_valid(pCur->iter)) {
|
||||||
// qWarn("streamState failed to seek key prev %s", toString);
|
// qWarn("streamState failed to seek key prev
|
||||||
|
// %s", toString);
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1528,7 +1618,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void
|
||||||
|
|
||||||
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
|
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, tbname, TSDB_TABLE_NAME_LEN);
|
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
|
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
|
||||||
|
|
Loading…
Reference in New Issue