add backend
This commit is contained in:
parent
0d11430be1
commit
0e4445b3b5
|
@ -296,12 +296,12 @@ void destroyFunc(void* stata) { return; }
|
||||||
|
|
||||||
int streamInitBackend(SStreamState* pState, char* path) {
|
int streamInitBackend(SStreamState* pState, char* path) {
|
||||||
rocksdb_options_t* opts = rocksdb_options_create();
|
rocksdb_options_t* opts = rocksdb_options_create();
|
||||||
rocksdb_options_increase_parallelism(opts, 4);
|
rocksdb_options_increase_parallelism(opts, 8);
|
||||||
// rocksdb_options_optimize_level_style_compaction(opts, 0);
|
// rocksdb_options_optimize_level_style_compaction(opts, 0);
|
||||||
// create the DB if it's not already present
|
// create the DB if it's not already present
|
||||||
rocksdb_options_set_create_if_missing(opts, 1);
|
rocksdb_options_set_create_if_missing(opts, 1);
|
||||||
rocksdb_options_set_create_missing_column_families(opts, 1);
|
rocksdb_options_set_create_missing_column_families(opts, 1);
|
||||||
rocksdb_options_set_write_buffer_size(opts, 64 << 20);
|
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
|
||||||
|
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
int cfLen = sizeof(cfName) / sizeof(cfName[0]);
|
int cfLen = sizeof(cfName) / sizeof(cfName[0]);
|
||||||
|
@ -397,6 +397,7 @@ int streamGetInit(const char* funcName) {
|
||||||
}
|
}
|
||||||
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
|
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
|
||||||
bool valid = false;
|
bool valid = false;
|
||||||
|
// opt later
|
||||||
rocksdb_iter_seek(iter, buf, len);
|
rocksdb_iter_seek(iter, buf, len);
|
||||||
if (!rocksdb_iter_valid(iter)) {
|
if (!rocksdb_iter_valid(iter)) {
|
||||||
rocksdb_iter_seek_for_prev(iter, buf, len);
|
rocksdb_iter_seek_for_prev(iter, buf, len);
|
||||||
|
@ -591,20 +592,6 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
|
||||||
qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd);
|
qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
// batch clear later
|
|
||||||
// streamStatePut_rocksdb(pState, &key, NULL, 0);
|
|
||||||
// while (1) {
|
|
||||||
// SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &key);
|
|
||||||
// SWinKey delKey = {0};
|
|
||||||
// int32_t code = streamStateGetKVByCur_rocksdb(pCur, &delKey, NULL, 0);
|
|
||||||
// streamStateFreeCur(pCur);
|
|
||||||
// if (code == 0) {
|
|
||||||
// streamStateDel_rocksdb(pState, &delKey);
|
|
||||||
// } else {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue