From eee4c0853d6df2bff2a997305f709a104eafdca8 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 17 Aug 2022 10:34:53 +0800 Subject: [PATCH 1/7] refactor(sync): add syncNodeAppendEntriesOnePeer --- include/libs/sync/sync.h | 11 +-- source/libs/sync/inc/syncReplication.h | 2 + source/libs/sync/src/syncReplication.c | 115 +++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 5 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 6d8895eb96..7cd2ebdede 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,11 +26,12 @@ extern "C" { extern bool gRaftDetailLog; -#define SYNC_RESP_TTL_MS 10000000 -#define SYNC_SPEED_UP_HB_TIMER 400 -#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) -#define SYNC_SLOW_DOWN_RANGE 100 -#define SYNC_MAX_READ_RANGE 10 +#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_SPEED_UP_HB_TIMER 400 +#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) +#define SYNC_SLOW_DOWN_RANGE 100 +#define SYNC_MAX_READ_RANGE 10 +#define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 21821be6c7..edce124ee5 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -55,6 +55,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode); +int32_t syncNodeAppendEntriesOnePeer(SSyncNode* pSyncNode, SRaftId* pDestId, SyncIndex nextIndex); + int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 24f75de5d3..886f7ad199 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -116,6 +116,120 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { return ret; } +int32_t syncNodeAppendEntriesOnePeer(SSyncNode* pSyncNode, SRaftId* pDestId, SyncIndex nextIndex) { + int32_t ret = 0; + + // pre index, pre term + SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); + if (preLogTerm == SYNC_TERM_INVALID) { + SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; + // SyncIndex newNextIndex = nextIndex + 1; + + syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); + syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); + sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 + ", match-index:%d, raftid:%" PRId64, + pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); + return -1; + } + + // entry pointer array + SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE]; + memset(entryPArr, 0, sizeof(entryPArr)); + + // get entry batch + int32_t getCount = 0; + SyncIndex getEntryIndex = nextIndex; + for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { + SSyncRaftEntry* pEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry); + if (code == 0) { + ASSERT(pEntry != NULL); + entryPArr[i] = pEntry; + getCount++; + getEntryIndex++; + + } else { + break; + } + } + + // event log + do { + char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + snprintf(logBuf, sizeof(logBuf), "build batch:%d for %s:%d", getCount, host, port); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + + // build msg + SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId); + ASSERT(pMsg != NULL); + + // free entries + for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { + SSyncRaftEntry* pEntry = entryPArr[i]; + if (pEntry != NULL) { + syncEntryDestory(pEntry); + entryPArr[i] = NULL; + } + } + + // prepare msg + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + pMsg->privateTerm = 0; + pMsg->dataCount = getCount; + + // send msg + syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg); + + // speed up + if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) { + ret = 1; + +#if 0 + do { + char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); +#endif + } + + syncAppendEntriesBatchDestroy(pMsg); + + return ret; +} + +int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + return -1; + } + + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId* pDestId = &(pSyncNode->peersId[i]); + + // next index + SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + ret = syncNodeAppendEntriesOnePeer(pSyncNode, pDestId, nextIndex); + } + + return ret; +} + +#if 0 int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { return -1; @@ -221,6 +335,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { return ret; } +#endif int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); From 96ab366beca54c8873cd08ad4a7898333d4613e2 Mon Sep 17 00:00:00 2001 From: Huo Linhe Date: Wed, 17 Aug 2022 13:55:04 +0800 Subject: [PATCH 2/7] fix: remove rust-bindings, use official connector instead Closes [TD-18455](https://jira.taosdata.com:18080/browse/TD-18455) --- cmake/rust-bindings_CMakeLists.txt.in | 12 --- contrib/CMakeLists.txt | 5 -- examples/rust/.gitignore | 2 + examples/rust/Cargo.toml | 18 +++++ examples/rust/examples/bind-tags.rs | 80 +++++++++++++++++++ examples/rust/examples/bind.rs | 74 ++++++++++++++++++ examples/rust/examples/query.rs | 106 ++++++++++++++++++++++++++ examples/rust/examples/subscribe.rs | 103 +++++++++++++++++++++++++ examples/rust/src/main.rs | 3 + 9 files changed, 386 insertions(+), 17 deletions(-) delete mode 100644 cmake/rust-bindings_CMakeLists.txt.in create mode 100644 examples/rust/.gitignore create mode 100644 examples/rust/Cargo.toml create mode 100644 examples/rust/examples/bind-tags.rs create mode 100644 examples/rust/examples/bind.rs create mode 100644 examples/rust/examples/query.rs create mode 100644 examples/rust/examples/subscribe.rs create mode 100644 examples/rust/src/main.rs diff --git a/cmake/rust-bindings_CMakeLists.txt.in b/cmake/rust-bindings_CMakeLists.txt.in deleted file mode 100644 index d16e86139b..0000000000 --- a/cmake/rust-bindings_CMakeLists.txt.in +++ /dev/null @@ -1,12 +0,0 @@ - -# rust-bindings -ExternalProject_Add(rust-bindings - GIT_REPOSITORY https://github.com/songtianyi/tdengine-rust-bindings.git - GIT_TAG 7ed7a97 - SOURCE_DIR "${TD_SOURCE_DIR}/examples/rust" - BINARY_DIR "${TD_SOURCE_DIR}/examples/rust" - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" - ) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 294b59fe95..a1eec81ee0 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -105,11 +105,6 @@ if(${BUILD_WITH_SQLITE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_SQLITE}) -# rust-bindings -if(${RUST_BINDINGS}) - cat("${TD_SUPPORT_DIR}/rust-bindings_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${RUST_BINDINGS}) - # lucene if(${BUILD_WITH_LUCENE}) cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) diff --git a/examples/rust/.gitignore b/examples/rust/.gitignore new file mode 100644 index 0000000000..96ef6c0b94 --- /dev/null +++ b/examples/rust/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml new file mode 100644 index 0000000000..1ed73e2fde --- /dev/null +++ b/examples/rust/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +taos = "*" + +[dev-dependencies] +chrono = "0.4" +itertools = "0.10.3" +pretty_env_logger = "0.4.0" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +anyhow = "1" diff --git a/examples/rust/examples/bind-tags.rs b/examples/rust/examples/bind-tags.rs new file mode 100644 index 0000000000..a1f7286625 --- /dev/null +++ b/examples/rust/examples/bind-tags.rs @@ -0,0 +1,80 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test", + "create database test keep 36500", + "use test", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t1 varchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare( + "insert into ? using tb1 tags(?) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + )?; + stmt.set_tbname("d0")?; + stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; + + let params = vec![ + ColumnView::from_millis_timestamp(vec![164000000000]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + t1: serde_json::Value, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/bind.rs b/examples/rust/examples/bind.rs new file mode 100644 index 0000000000..194938a319 --- /dev/null +++ b/examples/rust/examples/bind.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test_bindable", + "create database test_bindable keep 36500", + "use test_bindable", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?; + let params = vec![ + ColumnView::from_millis_timestamp(vec![0]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/query.rs b/examples/rust/examples/query.rs new file mode 100644 index 0000000000..016b291abc --- /dev/null +++ b/examples/rust/examples/query.rs @@ -0,0 +1,106 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://"; + + let opts = PoolBuilder::new() + .max_size(5000) // max connections + .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection + .min_idle(Some(1000)) // minimal idle connections + .connection_timeout(Duration::from_secs(2)); + + let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; + + let taos = pool.get()?; + + let db = "query"; + + // prepare database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + let inserted = taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + + assert_eq!(inserted, 6); + loop { + let count: usize = taos + .query_one("select count(*) from `meters`") + .await? + .unwrap_or_default(); + + if count >= 6 { + break; + } else { + println!("waiting for data"); + } + } + + let mut result = taos.query("select tbname, * from `meters`").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); + } + + // Query option 1, use rows stream. + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + + // Query options 2, use deserialization with serde. + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + tbname: String, + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, + } + + let records: Vec = taos + .query("select tbname, * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; + + dbg!(result.summary()); + assert_eq!(records.len(), 6); + dbg!(records); + Ok(()) +} diff --git a/examples/rust/examples/subscribe.rs b/examples/rust/examples/subscribe.rs new file mode 100644 index 0000000000..9e2e890405 --- /dev/null +++ b/examples/rust/examples/subscribe.rs @@ -0,0 +1,103 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // std::env::set_var("RUST_LOG", "debug"); + pretty_env_logger::init(); + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + "DROP TOPIC IF EXISTS tmq_meters".to_string(), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))".to_string(), + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build()?; + consumer.subscribe(["tmq_meters"]).await?; + + { + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } + } + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} diff --git a/examples/rust/src/main.rs b/examples/rust/src/main.rs new file mode 100644 index 0000000000..e7a11a969c --- /dev/null +++ b/examples/rust/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} From e7377e410caa4a55a37ccc7efd985cfc43c8be16 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 17 Aug 2022 15:10:15 +0800 Subject: [PATCH 3/7] refactor(mnode): check drop and alter stb for stream and topic --- source/dnode/mnode/impl/src/mndStb.c | 167 +++++++++++++++++++++++++-- 1 file changed, 160 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 3e747b66c8..dd2b595c29 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1145,7 +1145,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p return 0; } -int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) { +static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; while (1) { @@ -1154,7 +1154,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t if (pIter == NULL) break; mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", - pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql); + pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql); if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { sdbRelease(pSdb, pTopic); continue; @@ -1192,20 +1192,66 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t sdbRelease(pSdb, pTopic); nodesDestroyNode(pAst); } + return 0; +} +static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SStreamObj *pStream = NULL; + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + SNode *pAst = NULL; + if (nodesStringToNode(pStream->ast, &pAst) != 0) { + ASSERT(0); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + + if (pCol->tableId != suid) { + mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + goto NEXT; + } + if (pCol->colId > 0 && pCol->colId == colId) { + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; + mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId); + return -1; + } + mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + } + + NEXT: + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + } + return 0; +} + +static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SSmaObj *pSma = NULL; pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); if (pIter == NULL) break; - mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname, - suid, colId, pSma->sql); + mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, + stbFullName, suid, colId, pSma->sql); SNode *pAst = NULL; if (nodesStringToNode(pSma->ast, &pAst) != 0) { terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err", - pSma->name, stbname, suid, colId); + pSma->name, stbFullName, suid, colId); return -1; } @@ -1218,7 +1264,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t if ((pCol->tableId != suid) && (pSma->stbUid != suid)) { mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); - goto NEXT2; + goto NEXT; } if ((pCol->colId) > 0 && (pCol->colId == colId)) { sdbRelease(pSdb, pSma); @@ -1230,11 +1276,24 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); } - NEXT2: + NEXT: sdbRelease(pSdb, pSma); nodesDestroyNode(pAst); } + return 0; +} +int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { + if (mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId) < 0) { + return -1; + } + if (mndCheckAlterColForStream(pMnode, stbFullName, suid, colId) < 0) { + return -1; + } + + if (mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId) < 0) { + return -1; + } return 0; } @@ -1930,6 +1989,90 @@ _OVER: return code; } +static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SMqTopicObj *pTopic = NULL; + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) { + if (pTopic->stbUid == suid) { + sdbRelease(pSdb, pTopic); + return -1; + } + } + + if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { + sdbRelease(pSdb, pTopic); + continue; + } + + SNode *pAst = NULL; + if (nodesStringToNode(pTopic->ast, &pAst) != 0) { + ASSERT(0); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + + if (pCol->tableId != suid) { + mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + sdbRelease(pSdb, pTopic); + nodesDestroyNode(pAst); + return -1; + } else { + goto NEXT; + } + } + NEXT: + sdbRelease(pSdb, pTopic); + nodesDestroyNode(pAst); + } + return 0; +} + +static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SStreamObj *pStream = NULL; + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + SNode *pAst = NULL; + if (nodesStringToNode(pStream->ast, &pAst) != 0) { + ASSERT(0); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + + if (pCol->tableId != suid) { + mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + return -1; + } else { + goto NEXT; + } + } + NEXT: + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + } + return 0; +} + static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; @@ -1971,6 +2114,16 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { goto _OVER; } + if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) { + terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED; + goto _OVER; + } + + if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) { + terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; + goto _OVER; + } + code = mndDropStb(pMnode, pReq, pDb, pStb); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; From 2ca5bdc708ccbeb497912b01dd4b05b09f69d4dc Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 17 Aug 2022 15:39:38 +0800 Subject: [PATCH 4/7] refactor(sync): add syncNodeDynamicQuorum --- include/libs/sync/sync.h | 14 ++-- include/libs/sync/syncTools.h | 1 + source/libs/sync/inc/syncIndexMgr.h | 17 +++-- source/libs/sync/inc/syncInt.h | 2 + source/libs/sync/src/syncAppendEntries.c | 9 +++ source/libs/sync/src/syncAppendEntriesReply.c | 12 ++++ source/libs/sync/src/syncCommit.c | 58 ++++++++++++++++ source/libs/sync/src/syncIndexMgr.c | 66 ++++++++++++++++++- source/libs/sync/src/syncMain.c | 11 ++-- source/libs/sync/src/syncMessage.c | 2 + .../sync/test/syncAppendEntriesReplyTest.cpp | 3 + 11 files changed, 178 insertions(+), 17 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7cd2ebdede..952066df46 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,12 +26,14 @@ extern "C" { extern bool gRaftDetailLog; -#define SYNC_RESP_TTL_MS 10000000 -#define SYNC_SPEED_UP_HB_TIMER 400 -#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) -#define SYNC_SLOW_DOWN_RANGE 100 -#define SYNC_MAX_READ_RANGE 10 -#define SYNC_MAX_PROGRESS_WAIT_MS 4000 +#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_SPEED_UP_HB_TIMER 400 +#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) +#define SYNC_SLOW_DOWN_RANGE 100 +#define SYNC_MAX_READ_RANGE 2 +#define SYNC_MAX_PROGRESS_WAIT_MS 4000 +#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) +#define SYNC_MAX_RECV_TIME_RANGE_MS 1000 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index cd2c2d4a4f..6c95c3c6d7 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply { SyncTerm privateTerm; bool success; SyncIndex matchIndex; + int64_t startTime; } SyncAppendEntriesReply; SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 1f60a9d57e..fb85b89419 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -29,8 +29,12 @@ extern "C" { // SIndexMgr ----------------------------- typedef struct SSyncIndexMgr { SRaftId (*replicas)[TSDB_MAX_REPLICA]; - SyncIndex index[TSDB_MAX_REPLICA]; - SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function + SyncIndex index[TSDB_MAX_REPLICA]; + SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function + + int64_t startTimeArr[TSDB_MAX_REPLICA]; + int64_t recvTimeArr[TSDB_MAX_REPLICA]; + int32_t replicaNum; SSyncNode *pSyncNode; } SSyncIndexMgr; @@ -41,8 +45,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); +cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); +char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); + +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); // void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); // SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3e247e5d79..de43c81654 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -269,6 +269,8 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); +int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); + // trace log void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4f93d8197d..e000ba8bf8 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -148,6 +148,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->term = ths->pRaftStore->currentTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -290,6 +291,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->matchIndex = pMsg->prevLogIndex; } + pReply->startTime = ths->startTime; + // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -603,6 +606,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = matchIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -651,6 +655,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = false; pReply->matchIndex = ths->commitIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -729,6 +734,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -874,6 +880,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = matchIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -919,6 +926,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -984,6 +992,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 4928c54bd7..9253ed0129 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -64,6 +64,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); @@ -170,6 +174,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); @@ -330,6 +338,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3a94ed9713..3829ea0730 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -133,6 +133,63 @@ bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { return false; } +static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { + ASSERT(a >= 0); + ASSERT(b >= 0); + + int64_t c = a > b ? a - b : b - a; + return c; +} + +int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { + int32_t quorum = 1; // self + + int64_t timeNow = taosGetTimestampMs(); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + + int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); + int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); + + int32_t addQuorum = 0; + + if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { + addQuorum = 1; + } else { + addQuorum = 0; + } + + if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { + addQuorum = 0; + } + + quorum += addQuorum; + } + + ASSERT(quorum <= pSyncNode->replicaNum); + + if (quorum < pSyncNode->quorum) { + quorum = pSyncNode->quorum; + } + + return quorum; +} + +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { + int agreeCount = 0; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) { + ++agreeCount; + } + if (agreeCount >= syncNodeDynamicQuorum(pSyncNode)) { + return true; + } + } + return false; +} + +/* bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { int agreeCount = 0; for (int i = 0; i < pSyncNode->replicaNum; ++i) { @@ -145,3 +202,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } return false; } +*/ diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 8c820fcd9c..07c4fa8429 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -47,6 +47,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) { memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index)); memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm)); + + // int64_t timeNow = taosGetMonotonicMs(); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + pSyncIndexMgr->startTimeArr[i] = 0; + pSyncIndexMgr->recvTimeArr[i] = 0; + } + /* for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { pSyncIndexMgr->index[i] = 0; @@ -68,7 +75,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, char host[128]; uint16_t port; syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); - sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, index); + sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + index); } SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { @@ -125,11 +133,65 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + (pSyncIndexMgr->startTimeArr)[i] = startTime; + return; + } + } + + // maybe config change + // ASSERT(0); + char host[128]; + uint16_t port; + syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); + sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + startTime); +} + +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + int64_t startTime = (pSyncIndexMgr->startTimeArr)[i]; + return startTime; + } + } + ASSERT(0); +} + +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + (pSyncIndexMgr->recvTimeArr)[i] = recvTime; + return; + } + } + + // maybe config change + // ASSERT(0); + char host[128]; + uint16_t port; + syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); + sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + recvTime); +} + +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i]; + return recvTime; + } + } + ASSERT(0); +} + // for debug ------------------- void syncIndexMgrPrint(SSyncIndexMgr *pObj) { char *serialized = syncIndexMgr2Str(pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1991560d42..a00b59d292 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1682,13 +1682,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, - printStr); + pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, + pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1706,12 +1706,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, printStr); + pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, + pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(s, len, "%s", str); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 13adaf055c..b42aba560f 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1947,6 +1947,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pRoot, "success", pMsg->success); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp index d41e99a3cd..72d3fd5ef3 100644 --- a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp @@ -24,6 +24,7 @@ SyncAppendEntriesReply *createMsg() { pMsg->matchIndex = 77; pMsg->term = 33; pMsg->privateTerm = 44; + pMsg->startTime = taosGetTimestampMs(); return pMsg; } @@ -89,6 +90,8 @@ void test5() { } int main() { + gRaftDetailLog = true; + tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; logTest(); From 6453e24c846dd5da55660fb54b19e6455d22a48c Mon Sep 17 00:00:00 2001 From: Huo Linhe Date: Wed, 17 Aug 2022 17:20:54 +0800 Subject: [PATCH 5/7] chore: add lost source code for tmq example of C --- docs/examples/c/tmq_example.c | 275 ++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 docs/examples/c/tmq_example.c diff --git a/docs/examples/c/tmq_example.c b/docs/examples/c/tmq_example.c new file mode 100644 index 0000000000..19adaad116 --- /dev/null +++ b/docs/examples/c/tmq_example.c @@ -0,0 +1,275 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" + +static int running = 1; +static char dbName[64] = "tmqdb"; +static char stbName[64] = "stb"; +static char topicName[64] = "topicname"; + +static int32_t msg_process(TAOS_RES* msg) { + char buf[1024]; + int32_t rows = 0; + + const char* topicName = tmq_get_topic_name(msg); + const char* dbName = tmq_get_db_name(msg); + int32_t vgroupId = tmq_get_vgroup_id(msg); + + printf("topic: %s\n", topicName); + printf("db: %s\n", dbName); + printf("vgroup id: %d\n", vgroupId); + + while (1) { + TAOS_ROW row = taos_fetch_row(msg); + if (row == NULL) break; + + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + int32_t* length = taos_fetch_lengths(msg); + int32_t precision = taos_result_precision(msg); + rows++; + taos_print_row(buf, row, fields, numOfFields); + printf("row content: %s\n", buf); + } + + return rows; +} + +static int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes; + // drop database if exists + printf("create database\n"); + pRes = taos_query(pConn, "drop database if exists tmqdb"); + if (taos_errno(pRes) != 0) { + printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + // create database + pRes = taos_query(pConn, "create database tmqdb"); + if (taos_errno(pRes) != 0) { + printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + // create super table + printf("create super table\n"); + pRes = taos_query( + pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + // create sub tables + printf("create sub tables\n"); + pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + // insert data + printf("insert data into sub tables\n"); + pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +int32_t create_topic() { + printf("create topic\n"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "use tmqdb"); + if (taos_errno(pRes) != 0) { + printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_res_t code; + tmq_conf_t* conf = tmq_conf_new(); + code = tmq_conf_set(conf, "enable.auto.commit", "true"); + if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "group.id", "cgrpName"); + if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "client.id", "user defined name"); + if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "td.connect.user", "root"); + if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); + if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); + if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "experimental.snapshot.enable", "false"); + if (TMQ_CONF_OK != code) return NULL; + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topicList = tmq_list_new(); + int32_t code = tmq_list_append(topicList, "topicname"); + if (code) { + return NULL; + } + return topicList; +} + +void basic_consume_loop(tmq_t* tmq) { + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 5000; + while (running) { + TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); + if (tmqmsg) { + msgCnt++; + totalRows += msg_process(tmqmsg); + taos_free_result(tmqmsg); + } else { + break; + } + } + + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} + +int main(int argc, char* argv[]) { + int32_t code; + + if (init_env() < 0) { + return -1; + } + + if (create_topic() < 0) { + return -1; + } + + tmq_t* tmq = build_consumer(); + if (NULL == tmq) { + fprintf(stderr, "%% build_consumer() fail!\n"); + return -1; + } + + tmq_list_t* topic_list = build_topic_list(); + if (NULL == topic_list) { + return -1; + } + + if ((code = tmq_subscribe(tmq, topic_list))) { + fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); + } + tmq_list_destroy(topic_list); + + basic_consume_loop(tmq); + + code = tmq_consumer_close(tmq); + if (code) { + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + } else { + fprintf(stderr, "%% Consumer closed\n"); + } + + return 0; +} From 0bc12a8320157fe71872b61e468b31719d9a4f50 Mon Sep 17 00:00:00 2001 From: Huo Linhe Date: Wed, 17 Aug 2022 17:27:45 +0800 Subject: [PATCH 6/7] chore: fix source code file name typo --- docs/zh/07-develop/_sub_c.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/07-develop/_sub_c.mdx b/docs/zh/07-develop/_sub_c.mdx index b8f73a8ff1..b0667268e9 100644 --- a/docs/zh/07-develop/_sub_c.mdx +++ b/docs/zh/07-develop/_sub_c.mdx @@ -1,3 +1,3 @@ ```c -{{#include docs/examples/c/tmq-example.c}} +{{#include docs/examples/c/tmq_example.c}} ``` From ebc79f906e182dd3a4b798dbc48153d648017c8b Mon Sep 17 00:00:00 2001 From: ShuaiQChang Date: Wed, 17 Aug 2022 17:39:40 +0800 Subject: [PATCH 7/7] fix: docs error --- docs/zh/07-develop/07-tmq.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 25f37121e8..da8bf5e20e 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -724,7 +724,6 @@ consumer.close(); - ```go @@ -769,6 +768,7 @@ consumer.Unsubscribe(); // 关闭消费 consumer.Close(); ``` + @@ -809,7 +809,7 @@ SHOW SUBSCRIPTIONS; - +