From 2533b5c5c2cbeabbb45fd716b8545e15458d1f0d Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 26 Apr 2023 17:00:33 +0800 Subject: [PATCH] op stream --- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamStateRocksdb.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a3cdeb170a..e738b19d05 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -308,7 +308,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - if (dataVer > pTask->chkInfo.version) { // save it since the checkpoint is updated + if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 9c0f81894d..940f194300 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -433,6 +433,7 @@ int streamInitBackend(SStreamState* pState, char* path) { pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); // rocksdb_writeoptions_ // rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->writeOpts, 1); + rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); pState->pTdbState->readOpts = rocksdb_readoptions_create(); pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; pState->pTdbState->pCompare = pCompare; @@ -1543,4 +1544,4 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi void streamStateDestroy_rocksdb(SStreamState* pState) { // only close db streamCleanBackend(pState); -} \ No newline at end of file +}