op stream
This commit is contained in:
parent
255af84139
commit
2533b5c5c2
|
@ -308,7 +308,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
int64_t ckId = 0;
|
int64_t ckId = 0;
|
||||||
int64_t dataVer = 0;
|
int64_t dataVer = 0;
|
||||||
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
|
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
|
||||||
if (dataVer > pTask->chkInfo.version) { // save it since the checkpoint is updated
|
if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
|
||||||
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
|
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
|
||||||
", checkPoint id:%" PRId64 " -> %" PRId64,
|
", checkPoint id:%" PRId64 " -> %" PRId64,
|
||||||
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
|
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
|
||||||
|
|
|
@ -433,6 +433,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
|
||||||
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
|
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
|
||||||
// rocksdb_writeoptions_
|
// rocksdb_writeoptions_
|
||||||
// rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->writeOpts, 1);
|
// rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->writeOpts, 1);
|
||||||
|
rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
|
||||||
pState->pTdbState->readOpts = rocksdb_readoptions_create();
|
pState->pTdbState->readOpts = rocksdb_readoptions_create();
|
||||||
pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt;
|
pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt;
|
||||||
pState->pTdbState->pCompare = pCompare;
|
pState->pTdbState->pCompare = pCompare;
|
||||||
|
@ -1543,4 +1544,4 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
|
||||||
void streamStateDestroy_rocksdb(SStreamState* pState) {
|
void streamStateDestroy_rocksdb(SStreamState* pState) {
|
||||||
// only close db
|
// only close db
|
||||||
streamCleanBackend(pState);
|
streamCleanBackend(pState);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue