add trigger checkpoint

This commit is contained in:
yihaoDeng 2023-06-09 10:46:03 +00:00
parent 9828ca9d7f
commit a0c2897124
3 changed files with 29 additions and 5 deletions

View File

@ -200,7 +200,7 @@ int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 120;
int32_t tsStreamCheckpointTickInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 3600;
int32_t tsGrantHBInterval = 60;

View File

@ -945,19 +945,40 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
// 2. reset tick
atomic_store_64(&pStream->currentTick, 0);
// 3. commit log: stream checkpoint info
pStream->checkpointFreq = taosGetTimestampMs();
taosRUnLockLatch(&pStream->lock);
// code condtion
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepare trans rebalance since %s", terrstr());
mndTransDrop(pTrans);
mndReleaseStream(pMnode, pStream);
return -1;
goto _ERR;
}
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return 0;
_ERR:
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {

View File

@ -738,6 +738,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
@ -812,6 +813,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt);
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
@ -878,6 +880,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
// refactor later
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);