diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f60a823b21..7df467680f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -200,7 +200,7 @@ int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 120; +int32_t tsStreamCheckpointTickInterval = 2; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 3600; int32_t tsGrantHBInterval = 60; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 13e5933215..75dedd49fe 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -945,19 +945,40 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { // 2. reset tick atomic_store_64(&pStream->currentTick, 0); // 3. commit log: stream checkpoint info + pStream->checkpointFreq = taosGetTimestampMs(); + taosRUnLockLatch(&pStream->lock); + // code condtion + + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to prepare trans rebalance since %s", terrstr()); + goto _ERR; + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + goto _ERR; + } + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + goto _ERR; + } + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("failed to prepare trans rebalance since %s", terrstr()); - mndTransDrop(pTrans); - mndReleaseStream(pMnode, pStream); - return -1; + goto _ERR; } mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); - return 0; +_ERR: + mndReleaseStream(pMnode, pStream); + mndTransDrop(pTrans); + return -1; } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index bcbdd8fc21..bafc8057f1 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -738,6 +738,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + rocksdb_block_based_options_set_partition_filters(tableOpt, 1); rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); rocksdb_block_based_options_set_filter_policy(tableOpt, filter); @@ -812,6 +813,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt); rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + rocksdb_block_based_options_set_partition_filters(tableOpt, 1); rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); rocksdb_block_based_options_set_filter_policy(tableOpt, filter); @@ -878,6 +880,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { // refactor later rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + rocksdb_block_based_options_set_partition_filters(tableOpt, 1); rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); rocksdb_block_based_options_set_filter_policy(tableOpt, filter);