From 6cff4dcfd83be7d9f5181be72ebfafa6fb80b48a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 15 Aug 2022 14:42:40 +0800 Subject: [PATCH 01/43] refactor(sync): add syncIsReadyForRead --- include/libs/sync/sync.h | 4 ++ source/dnode/vnode/src/inc/vnd.h | 3 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/dnode/vnode/src/vnd/vnodeSync.c | 14 +++++++ source/libs/sync/src/syncMain.c | 56 ++++++++++++++++++++++++++ 5 files changed, 78 insertions(+), 3 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index aa563343f8..6d8895eb96 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -30,6 +30,7 @@ extern bool gRaftDetailLog; #define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_AFTER_MS (1000 * 20) #define SYNC_SLOW_DOWN_RANGE 100 +#define SYNC_MAX_READ_RANGE 10 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 @@ -210,9 +211,12 @@ void syncStop(int64_t rid); int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); bool syncIsReady(int64_t rid); +bool syncIsReadyForRead(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); bool syncRestoreFinish(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); +SyncIndex syncGetLastIndex(int64_t rid); +SyncIndex syncGetCommitIndex(int64_t rid); SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index dd1facb462..5608efa7af 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); -int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); @@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); bool vnodeIsLeader(SVnode* pVnode); +bool vnodeIsReadyForRead(SVnode* pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d5c5e18668..e17d4a0b6d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -281,7 +281,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { + if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } @@ -305,7 +305,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && - !vnodeIsLeader(pVnode)) { + !vnodeIsReadyForRead(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 50d32f5f5e..2779ab724f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -771,3 +771,17 @@ bool vnodeIsLeader(SVnode *pVnode) { return true; } + +bool vnodeIsReadyForRead(SVnode *pVnode) { + if (syncIsReady(pVnode->sync)) { + return true; + } + + if (syncIsReadyForRead(pVnode->sync)) { + return true; + } + + vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, + syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync)); + return false; +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c7784cd62e..5656f19890 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) { return b; } +bool syncIsReadyForRead(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return false; + } + ASSERT(rid == pSyncNode->rid); + + // TODO: last not noop? + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE); + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + + // if false, set error code + if (false == b) { + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + terrno = TSDB_CODE_SYN_NOT_LEADER; + } else { + terrno = TSDB_CODE_APP_NOT_READY; + } + } + return b; +} + bool syncIsRestoreFinish(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) { return term; } +SyncIndex syncGetLastIndex(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return SYNC_INDEX_INVALID; + } + ASSERT(rid == pSyncNode->rid); + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return lastIndex; +} + +SyncIndex syncGetCommitIndex(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return SYNC_INDEX_INVALID; + } + ASSERT(rid == pSyncNode->rid); + SyncIndex cmtIndex = pSyncNode->commitIndex; + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return cmtIndex; +} + SyncGroupId syncGetVgId(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { pSyncNode->changing = true; } + // not restored + if (!pSyncNode->restoreFinish) { + ret = -1; + terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId, + TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); + goto _END; + } + SRespStub stub; stub.createTime = taosGetTimestampMs(); stub.rpcMsg = *pMsg; From 947d90e196a69b176814c0110ee31052f0b30072 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 15 Aug 2022 17:36:46 +0800 Subject: [PATCH 02/43] refactor(sync): add syncIsReadyForRead --- source/dnode/vnode/src/inc/vnd.h | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 5608efa7af..5164e22474 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -99,6 +99,7 @@ void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsReadyForRead(SVnode* pVnode); +bool vnodeIsRoleLeader(SVnode* pVnode); #ifdef __cplusplus } From 4164d1463754e4f086f1695f4794299ad551eec1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 15 Aug 2022 18:50:14 +0800 Subject: [PATCH 03/43] refactor(sync): add syncIsReadyForRead --- source/libs/sync/src/syncMain.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5656f19890..1991560d42 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -875,8 +875,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { pSyncNode->changing = true; } - // not restored - if (!pSyncNode->restoreFinish) { + // not restored, vnode enable + if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) { ret = -1; terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId, From 39c3cde285b69b220d1a90047d743455e9bcbe33 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 16 Aug 2022 10:37:17 +0800 Subject: [PATCH 04/43] fix: select constant error --- source/libs/parser/src/parTranslater.c | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ef985a3894..1c7446ad6f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1399,7 +1399,7 @@ static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFu "%s function must be used in select statements", pFunc->functionName); } SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; - if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && + if (NULL != pSelect->pFromTable && QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && !isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, "%s function requires valid time series input", pFunc->functionName); @@ -2037,16 +2037,13 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, code = getDBVgInfoImpl(pCxt, pName, &vgroupList); } - if (TSDB_CODE_SUCCESS == code && - 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && - 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) && - isSelectStmt(pCxt->pCurrStmt) && + if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && + 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) && isSelectStmt(pCxt->pCurrStmt) && 0 == taosArrayGetSize(vgroupList)) { ((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true; } - if (TSDB_CODE_SUCCESS == code && - 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && + if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) { code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList); } From 9a9afc0666548585123a7ed73058e0ff8eaad59c Mon Sep 17 00:00:00 2001 From: zhaoyanggh Date: Tue, 16 Aug 2022 12:06:35 +0800 Subject: [PATCH 05/43] docs: refine python tmq doc --- docs/examples/python/tmq_example.py | 63 +---------- docs/zh/07-develop/07-tmq.mdx | 157 ++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 58 deletions(-) diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py index 1f6da3d1b6..cee036454e 100644 --- a/docs/examples/python/tmq_example.py +++ b/docs/examples/python/tmq_example.py @@ -1,59 +1,6 @@ import taos -from taos.tmq import * - -conn = taos.connect() - -# create database -conn.execute("drop database if exists py_tmq") -conn.execute("create database if not exists py_tmq vgroups 2") - -# create table and stables -conn.select_db("py_tmq") -conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") -conn.execute("create table if not exists tb1 using stb1 tags(1)") -conn.execute("create table if not exists tb2 using stb1 tags(2)") -conn.execute("create table if not exists tb3 using stb1 tags(3)") - -# create topic -conn.execute("drop topic if exists topic_ctb_column") -conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1") - -# set consumer configure options -conf = TaosTmqConf() -conf.set("group.id", "tg2") -conf.set("td.connect.user", "root") -conf.set("td.connect.pass", "taosdata") -conf.set("enable.auto.commit", "true") -conf.set("msg.with.table.name", "true") - -def tmq_commit_cb_print(tmq, resp, offset, param=None): - print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") - -conf.set_auto_commit_cb(tmq_commit_cb_print, None) - -# build consumer -tmq = conf.new_consumer() - -# build topic list -topic_list = TaosTmqList() -topic_list.append("topic_ctb_column") - -# subscribe consumer -tmq.subscribe(topic_list) - -# check subscriptions -sub_list = tmq.subscription() -print("subscribed topics: ",sub_list) - -# start subscribe -while 1: - res = tmq.poll(1000) - if res: - topic = res.get_topic_name() - vg = res.get_vgroup_id() - db = res.get_db_name() - print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") - for row in res: - print(row) - tb = res.get_table_name() - print(f"from table: {tb}") +from taos.tmq import TaosConsumer +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +for msg in consumer: + for row in msg: + print(row) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..23574e7478 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -88,6 +88,110 @@ void close() throws SQLException; ``` + + + +```python +class TaosConsumer(): + DEFAULT_CONFIG = { + 'group.id', + 'client.id', + 'enable.auto.commit', + 'auto.commit.interval.ms', + 'auto.offset.reset', + 'msg.with.table.name', + 'experimental.snapshot.enable', + 'enable.heartbeat.background', + 'experimental.snapshot.batch.size', + 'td.connect.ip', + 'td.connect.user', + 'td.connect.pass', + 'td.connect.port', + 'td.connect.db', + 'timeout' + } + + def __init__(self, *topics, **configs): + self._closed = True + self._conf = None + self._list = None + self._tmq = None + + keys = list(configs.keys()) + for k in keys: + configs.update({k.replace('_','.'): configs.pop(k)}) + + extra_configs = set(configs).difference(self.DEFAULT_CONFIG) + if extra_configs: + raise TmqError("Unrecognized configs: %s" % (extra_configs,)) + + self._conf = tmq_conf_new() + self._list = tmq_list_new() + + # set poll timeout + if 'timeout' in configs: + self._timeout = configs['timeout'] + del configs['timeout'] + else: + self._timeout = 0 + + # check if group id is set + + if 'group.id' not in configs: + raise TmqError("missing group.id in consumer config setting") + + for key, value in configs.items(): + tmq_conf_set(self._conf, key, value) + + self._tmq = tmq_consumer_new(self._conf) + + if not topics: + raise TmqError("Unset topic for Consumer") + + for topic in topics: + tmq_list_append(self._list, topic) + + tmq_subscribe(self._tmq, self._list) + + + def __iter__(self): + return self + + def __next__(self): + if not self._tmq: + raise StopIteration('TaosConsumer closed') + return next(self.sync_next()) + + def sync_next(self): + while 1: + res = tmq_consumer_poll(self._tmq, self._timeout) + if res: + break + yield TaosResult(res) + + def subscription(self): + if self._tmq is None: + return None + return tmq_subscription(self._tmq) + + def unsubscribe(self): + tmq_unsubscribe(self._tmq) + + def close(self): + if self._tmq: + tmq_consumer_close(self._tmq) + self._tmq = None + + def __del__(self): + if self._conf: + tmq_conf_destroy(self._conf) + if self._list: + tmq_list_destroy(self._list) + if self._tmq: + tmq_consumer_close(self._tmq) +``` + + ## 写入数据 @@ -230,6 +334,27 @@ public class MetersDeserializer extends ReferenceDeserializer { ``` + + + + +| 参数名称 | 类型 | 参数说明 | 备注 | +| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | +| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | +| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | +| `client_id` | string | 客户端 ID | 最大长度:192。 | +| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | +| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | +| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | +| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | +| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | +| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | + + + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -262,6 +387,14 @@ consumer.subscribe(topics); + + +```python +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +``` + + + ## 消费 @@ -294,6 +427,17 @@ while(running){ ``` + + + +```python +for msg in consumer: + for row in msg: + print(row) +``` + + + ## 结束消费 @@ -322,6 +466,19 @@ consumer.unsubscribe(); consumer.close(); ``` + + + + + +```python +/* 取消订阅 */ +consumer.unsubscribe(); + +/* 关闭消费 */ +consumer.close(); +``` + From 2ced6c280cae7faca7aa0ee2bb2efbf890c73577 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 16 Aug 2022 12:11:56 +0800 Subject: [PATCH 06/43] enh(stream): reduce table scan --- include/libs/stream/tstreamUpdate.h | 45 +++++------ source/client/src/tmq.c | 20 ++--- source/libs/executor/src/scanoperator.c | 47 ++++++------ source/libs/stream/src/streamUpdate.c | 99 ++++++++++++++----------- 4 files changed, 113 insertions(+), 98 deletions(-) diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 78543118da..c186430f3f 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -25,33 +25,34 @@ extern "C" { #endif typedef struct SUpdateInfo { - SArray *pTsBuckets; - uint64_t numBuckets; - SArray *pTsSBFs; - uint64_t numSBFs; - int64_t interval; - int64_t watermark; - TSKEY minTS; - SScalableBf* pCloseWinSBF; - SHashObj* pMap; - STimeWindow scanWindow; - uint64_t scanGroupId; - uint64_t maxVersion; + SArray *pTsBuckets; + uint64_t numBuckets; + SArray *pTsSBFs; + uint64_t numSBFs; + int64_t interval; + int64_t watermark; + TSKEY minTS; + SScalableBf *pCloseWinSBF; + SHashObj *pMap; + STimeWindow scanWindow; + uint64_t scanGroupId; + uint64_t maxVersion; } SUpdateInfo; -SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); +SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); -bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); -void updateInfoDestroy(SUpdateInfo *pInfo); -void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); -void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); -int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); -int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); +bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); +bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); +void updateInfoDestroy(SUpdateInfo *pInfo); +void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); +void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); +int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); +int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); #ifdef __cplusplus } #endif -#endif /* ifndef _TSTREAMUPDATE_H_ */ \ No newline at end of file +#endif /* ifndef _TSTREAMUPDATE_H_ */ diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 086cf653e9..7637ffbc80 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -56,8 +56,8 @@ struct tmq_conf_t { int8_t autoCommit; int8_t resetOffset; int8_t withTbName; - int8_t ssEnable; - int32_t ssBatchSize; + int8_t snapEnable; + int32_t snapBatchSize; bool hbBgEnable; @@ -287,16 +287,21 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value if (strcmp(key, "experimental.snapshot.enable") == 0) { if (strcmp(value, "true") == 0) { - conf->ssEnable = true; + conf->snapEnable = true; return TMQ_CONF_OK; } else if (strcmp(value, "false") == 0) { - conf->ssEnable = false; + conf->snapEnable = false; return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } } + if (strcmp(key, "experimental.snapshot.batch.size") == 0) { + conf->snapBatchSize = atoi(value); + return TMQ_CONF_OK; + } + if (strcmp(key, "enable.heartbeat.background") == 0) { if (strcmp(value, "true") == 0) { conf->hbBgEnable = true; @@ -310,11 +315,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } - if (strcmp(key, "experimental.snapshot.batch.size") == 0) { - conf->ssBatchSize = atoi(value); - return TMQ_CONF_OK; - } - if (strcmp(key, "td.connect.ip") == 0) { conf->ip = strdup(value); return TMQ_CONF_OK; @@ -889,7 +889,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); pTmq->withTbName = conf->withTbName; - pTmq->useSnapshot = conf->ssEnable; + pTmq->useSnapshot = conf->snapEnable; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commitCb = conf->commitCb; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d8de8df163..1d3ceece3a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include "os.h" #include "executorimpl.h" #include "filter.h" #include "function.h" #include "functionMgt.h" +#include "os.h" #include "querynodes.h" #include "systable.h" #include "tname.h" @@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, + GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; @@ -238,7 +238,7 @@ static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDat // todo move to the initialization function int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); + bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); filterFreeInfo(filter); return keep; @@ -312,9 +312,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - loadSMA = true; // mark the operation of load sma; + loadSMA = true; // mark the operation of load sma; bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); - if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead + if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); return TSDB_CODE_SUCCESS; @@ -453,7 +453,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); - } else { // todo opt for json tag + } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, data, false); } @@ -570,7 +570,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug("%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks due to query func required", GET_TASKID(pTaskInfo)); + qDebug( + "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks " + "due to query func required", + GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -1174,16 +1177,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - bool isClosed = false; + bool isClosed = false; STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) { win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); isClosed = isCloseWindow(&win, &pInfo->twAggSup); } + bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - if ((update || (isSignleIntervalWindow(pInfo) && isClosed && - isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) { + bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) && + isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup); + if ((update || closedWin) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); } } @@ -1390,8 +1395,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version); + uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); + updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); return pSDB; @@ -1445,7 +1450,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { setBlockIntoRes(pInfo, &block); - if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) { + if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, + pInfo->pRes->info.version)) { printDataBlock(pInfo->pRes, "stream scan ignore"); blockDataCleanup(pInfo->pRes); continue; @@ -2248,7 +2254,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // build message and send to mnode to fetch the content of system tables. SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSysTableScanInfo* pInfo = pOperator->info; - char dbName[TSDB_DB_NAME_LEN] = {0}; + char dbName[TSDB_DB_NAME_LEN] = {0}; const char* name = tNameGetTableName(&pInfo->name); if (pInfo->showRewrite) { @@ -2260,8 +2266,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return sysTableScanUserTables(pOperator); } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { return sysTableScanUserTags(pOperator); - } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && - pInfo->showRewrite && IS_SYS_DBNAME(dbName)) { + } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite && + IS_SYS_DBNAME(dbName)) { return sysTableScanUserSTables(pOperator); } else { // load the meta from mnode of the given epset if (pOperator->status == OP_EXEC_DONE) { @@ -2541,7 +2547,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); taosArrayDestroy(pInfo->pColMatchInfo); - + taosMemoryFreeClear(param); } @@ -2597,7 +2603,6 @@ _error: int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { - int64_t st = taosGetTimestampUs(); int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); @@ -2606,7 +2611,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } int64_t st1 = taosGetTimestampUs(); - qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1-st)/1000.0, idStr); + qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { qDebug("no table qualified for query, %s" PRIx64, idStr); @@ -2620,7 +2625,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } int64_t st2 = taosGetTimestampUs(); - qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2-st1)/1000.0, idStr); + qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 0b1ce27b77..d053662bd3 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -13,33 +13,31 @@ * along with this program. If not, see . */ -#include "tstreamUpdate.h" -#include "tencode.h" -#include "ttime.h" #include "query.h" +#include "tencode.h" +#include "tstreamUpdate.h" +#include "ttime.h" -#define DEFAULT_FALSE_POSITIVE 0.01 -#define DEFAULT_BUCKET_SIZE 1310720 -#define DEFAULT_MAP_CAPACITY 1310720 -#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10) -#define ROWS_PER_MILLISECOND 1 -#define MAX_NUM_SCALABLE_BF 100000 -#define MIN_NUM_SCALABLE_BF 10 -#define DEFAULT_PREADD_BUCKET 1 -#define MAX_INTERVAL MILLISECOND_PER_MINUTE -#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) -#define DEFAULT_EXPECTED_ENTRIES 10000 +#define DEFAULT_FALSE_POSITIVE 0.01 +#define DEFAULT_BUCKET_SIZE 1310720 +#define DEFAULT_MAP_CAPACITY 1310720 +#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10) +#define ROWS_PER_MILLISECOND 1 +#define MAX_NUM_SCALABLE_BF 100000 +#define MIN_NUM_SCALABLE_BF 10 +#define DEFAULT_PREADD_BUCKET 1 +#define MAX_INTERVAL MILLISECOND_PER_MINUTE +#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) +#define DEFAULT_EXPECTED_ENTRIES 10000 -static int64_t adjustExpEntries(int64_t entries) { - return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); -} +static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { if (pInfo->numSBFs < count) { count = pInfo->numSBFs; } for (uint64_t i = 0; i < count; ++i) { - int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); + int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); taosArrayPush(pInfo->pTsSBFs, &tsSBF); } @@ -78,7 +76,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { if (watermark <= adjInterval) { - watermark = TMAX(originInt/adjInterval, 1) * adjInterval; + watermark = TMAX(originInt / adjInterval, 1) * adjInterval; } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { watermark = MAX_NUM_SCALABLE_BF * adjInterval; }/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) { @@ -158,11 +156,17 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { return res; } +bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { + void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); + if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true; + return false; +} + bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { - int32_t res = TSDB_CODE_FAILED; - TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); - uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; - TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); + int32_t res = TSDB_CODE_FAILED; + TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); + uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; + TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { @@ -178,42 +182,47 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { } int32_t size = taosHashGetSize(pInfo->pMap); - if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) { + if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) { taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY)); return false; } - if ( !pMapMaxTs && maxTs < ts ) { + if (!pMapMaxTs && maxTs < ts) { taosArraySet(pInfo->pTsBuckets, index, &ts); return false; } if (ts < pInfo->minTS) { - qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, + maxTs, *pMapMaxTs, ts); return true; } else if (res == TSDB_CODE_SUCCESS) { return false; } - qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, + maxTs, *pMapMaxTs, ts); // check from tsdb api return true; } -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { - qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { + qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, + pWin->skey, pWin->ekey, version); pInfo->scanWindow = *pWin; pInfo->scanGroupId = groupId; pInfo->maxVersion = version; } -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { if (!pInfo) { return false; } - qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); - if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && - pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) { - qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, + pWin->skey, pWin->ekey, version); + if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey && + version <= pInfo->maxVersion) { + qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, + pWin->skey, pWin->ekey, version); return true; } return false; @@ -261,7 +270,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) int32_t size = taosArrayGetSize(pInfo->pTsBuckets); if (tEncodeI32(&encoder, size) < 0) return -1; for (int32_t i = 0; i < size; i++) { - TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i); + TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i); if (tEncodeI64(&encoder, *pTs) < 0) return -1; } @@ -270,7 +279,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs); if (tEncodeI32(&encoder, sBfSize) < 0) return -1; for (int32_t i = 0; i < sBfSize; i++) { - SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i); + SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i); if (tScalableBfEncode(pSBf, &encoder) < 0) return -1; } @@ -278,17 +287,17 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1; if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1; if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1; - + if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1; int32_t mapSize = taosHashGetSize(pInfo->pMap); if (tEncodeI32(&encoder, mapSize) < 0) return -1; - void* pIte = NULL; + void *pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) { - void* key = taosHashGetKey(pIte, &keyLen); - if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1; - if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1; + void *key = taosHashGetKey(pIte, &keyLen); + if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1; + if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1; } if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; @@ -311,7 +320,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { int32_t size = 0; if (tDecodeI32(&decoder, &size) < 0) return -1; - pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); + pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); TSKEY ts = INT64_MIN; for (int32_t i = 0; i < size; i++) { if (tDecodeI64(&decoder, &ts) < 0) return -1; @@ -324,7 +333,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *)); for (int32_t i = 0; i < sBfSize; i++) { - SScalableBf* pSBf = tScalableBfDecode(&decoder); + SScalableBf *pSBf = tScalableBfDecode(&decoder); if (!pSBf) return -1; taosArrayPush(pInfo->pTsSBFs, &pSBf); } @@ -337,11 +346,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { int32_t mapSize = 0; if (tDecodeI32(&decoder, &mapSize) < 0) return -1; - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK); uint64_t uid = 0; ts = INT64_MIN; - for(int32_t i = 0; i < mapSize; i++) { + for (int32_t i = 0; i < mapSize; i++) { if (tDecodeU64(&decoder, &uid) < 0) return -1; if (tDecodeI64(&decoder, &ts) < 0) return -1; taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); From 316584a0b1f299bd4e9b828d407dc47493660dd4 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Aug 2022 12:42:28 +0800 Subject: [PATCH 07/43] Update 01-docker.md --- docs/zh/05-get-started/01-docker.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index 949d54f77b..5ede3c7d31 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -6,7 +6,7 @@ title: 通过 Docker 快速体验 TDengine 如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ::: -本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。 +本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉Docker, 请使用[安装包的方式快速体验](../../get-started/package/)。 ## 启动 TDengine From 97d4ebe85cc3332b11cefbbb51fc2c79d9f295ed Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 13:03:44 +0800 Subject: [PATCH 08/43] fix: syntax error --- docs/zh/12-taos-sql/29-changes.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md index 68f3b5cc29..f6a78d3277 100644 --- a/docs/zh/12-taos-sql/29-changes.md +++ b/docs/zh/12-taos-sql/29-changes.md @@ -8,7 +8,7 @@ description: "TDengine 3.0 版本的语法变更说明" | # | **元素** | **差异性** | **说明** | | - | :------- | :--------: | :------- | -| 1 | VARCHAR |
新增
| BINARY类型的别名。 +| 1 | VARCHAR | 新增 | BINARY类型的别名。 | 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。 | 3 | _ROWTS伪列 | 新增 | 表示时间戳主键。是_C0伪列的别名。 | 4 | INFORMATION_SCHEMA | 新增 | 包含各种SCHEMA定义的系统数据库。 @@ -24,7 +24,7 @@ description: "TDengine 3.0 版本的语法变更说明" | # | **语句** | **差异性** | **说明** | | - | :------- | :--------: | :------- | -| 1 | ALTER ACCOUNT |
废除
| 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。 +| 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。 | 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。 | 3 | ALTER DATABASE | 调整 | 废除
  • QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。
  • BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。
  • UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。
  • CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。
  • COMP:3.0版本暂不支持修改。
    新增
  • CACHEMODEL:表示是否在内存中缓存子表的最近数据。
  • CACHESIZE:表示缓存子表最近数据的内存大小。
  • WAL_FSYNC_PERIOD:代替原FSYNC参数。
  • WAL_LEVEL:代替原WAL参数。
    调整
  • REPLICA:3.0.0版本暂不支持修改。
  • KEEP:3.0版本新增支持带单位的设置方式。
| 4 | ALTER STABLE | 调整 | 废除
  • CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。
    新增
  • RENAME TAG:代替原CHANGE TAG子句。
  • COMMENT:修改超级表的注释。
@@ -82,7 +82,7 @@ description: "TDengine 3.0 版本的语法变更说明" | # | **函数** | **差异性** | **说明** | | - | :------- | :--------: | :------- | -| 1 | TWA |
增强
| 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 +| 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 3 | LEASTSQUARES | 增强 | 可以用于超级表了。 | 4 | ELAPSED | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 From 291df97c8e19eb5373975ac9bee56483d041f0d7 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Aug 2022 13:04:53 +0800 Subject: [PATCH 09/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 0c178e5962..f158cf2418 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -6,12 +6,7 @@ title: 使用安装包立即开始 import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; -:::info -如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. - -::: - -在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。 +在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/package/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ## 安装 From 552a63236e03c4d3661f81eeb510a213c1ce851f Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 13:10:57 +0800 Subject: [PATCH 10/43] style: format --- docs/zh/05-get-started/01-docker.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index 5ede3c7d31..04ecf56541 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -6,7 +6,7 @@ title: 通过 Docker 快速体验 TDengine 如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ::: -本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉Docker, 请使用[安装包的方式快速体验](../../get-started/package/)。 +本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。 ## 启动 TDengine From a3ddbd656cb9d8d6e83986a6db7151ee11a305af Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Aug 2022 13:16:35 +0800 Subject: [PATCH 11/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index f158cf2418..846cd9e9cd 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -6,7 +6,7 @@ title: 使用安装包立即开始 import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; -在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/package/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. +在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ## 安装 From 8b85af754c3d64d53701f92bba3b5169367b7f6c Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Aug 2022 13:17:37 +0800 Subject: [PATCH 12/43] Update 01-docker.md --- docs/zh/05-get-started/01-docker.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index 04ecf56541..4895653d8d 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -1,12 +1,8 @@ --- sidebar_label: Docker title: 通过 Docker 快速体验 TDengine ---- -:::info -如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. -::: -本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。 +本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ## 启动 TDengine From f1775126b4d6b2c8df7b5d444bf2056f25100835 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 13:31:47 +0800 Subject: [PATCH 13/43] Update 01-docker.md --- docs/zh/05-get-started/01-docker.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index 4895653d8d..34ddd90e2b 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -1,6 +1,7 @@ --- sidebar_label: Docker title: 通过 Docker 快速体验 TDengine +--- 本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. From 2112887fa4c499ef288d91f8e664541e2009eb38 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 13:35:03 +0800 Subject: [PATCH 14/43] Update 01-docker.md --- docs/zh/05-get-started/01-docker.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index 34ddd90e2b..f0f09d4c7e 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -3,7 +3,7 @@ sidebar_label: Docker title: 通过 Docker 快速体验 TDengine --- -本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. +本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ## 启动 TDengine From 9c7077737747f5923a89fbaba4ef9fd6a6f0890f Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 13:35:03 +0800 Subject: [PATCH 15/43] Update 01-docker.md --- docs/zh/05-get-started/01-docker.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index 34ddd90e2b..f0f09d4c7e 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -3,7 +3,7 @@ sidebar_label: Docker title: 通过 Docker 快速体验 TDengine --- -本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. +本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. ## 启动 TDengine From 6853f519580201974fdc6fed15bc678624d6f39c Mon Sep 17 00:00:00 2001 From: huolibo Date: Tue, 16 Aug 2022 13:50:39 +0800 Subject: [PATCH 16/43] docs(driver): modify 3.0 version --- docs/zh/14-reference/03-connector/java.mdx | 27 ++++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/docs/zh/14-reference/03-connector/java.mdx b/docs/zh/14-reference/03-connector/java.mdx index c0b83e7d4a..6a78902b1e 100644 --- a/docs/zh/14-reference/03-connector/java.mdx +++ b/docs/zh/14-reference/03-connector/java.mdx @@ -93,12 +93,12 @@ Maven 项目中,在 pom.xml 中添加以下依赖: 可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector ```shell -git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0 +git clone https://github.com/taosdata/taos-connector-jdbc.git cd taos-connector-jdbc mvn clean install -Dmaven.test.skip=true ``` -编译后,在 target 目录下会产生 taos-jdbcdriver-2.0.XX-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。 +编译后,在 target 目录下会产生 taos-jdbcdriver-3.0.*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。 @@ -198,7 +198,7 @@ url 中的配置参数如下: - user:登录 TDengine 用户名,默认值 'root'。 - password:用户登录密码,默认值 'taosdata'。 -- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始,JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。 +- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。 - charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。 - batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。 - httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。 @@ -216,7 +216,7 @@ url 中的配置参数如下: INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6); ``` -- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6); +- 如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6); ::: @@ -230,7 +230,7 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFra **注意**: - 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。 -- 以下示例代码基于 taos-jdbcdriver-2.0.36。 +- 以下示例代码基于 taos-jdbcdriver-3.0.0。 ```java public Connection getConn() throws Exception{ @@ -367,7 +367,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据 **注意**: - JDBC REST 连接目前不支持参数绑定 -- 以下示例代码基于 taos-jdbcdriver-2.0.36 +- 以下示例代码基于 taos-jdbcdriver-3.0.0 - binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法 - setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽 @@ -635,7 +635,7 @@ TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协 **注意**: - JDBC REST 连接目前不支持无模式写入 -- 以下示例代码基于 taos-jdbcdriver-2.0.36 +- 以下示例代码基于 taos-jdbcdriver-3.0.0 ```java public class SchemalessInsertTest { @@ -666,7 +666,7 @@ public class SchemalessInsertTest { } ``` -### 订阅 +### 数据订阅 TDengine Java 连接器支持订阅功能,应用 API 如下: @@ -717,9 +717,14 @@ while(true) { #### 关闭订阅 ```java +// 取消订阅 +consumer.unsubscribe(); +// 关闭消费 consumer.close() ``` +详情请参考:[数据订阅](../../../develop/tmq) + ### 使用示例如下: ```java @@ -734,7 +739,7 @@ public abstract class ConsumerLoop { config.setProperty("msg.with.table.name", "true"); config.setProperty("enable.auto.commit", "true"); config.setProperty("group.id", "group1"); - config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer"); + config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer"); this.consumer = new TaosConsumer<>(config); this.topics = Collections.singletonList("topic_speed"); @@ -754,8 +759,9 @@ public abstract class ConsumerLoop { process(record); } } + consumer.unsubscribe(); } finally { - consumer.close(); + consumer.close(); shutdownLatch.countDown(); } } @@ -875,6 +881,7 @@ public static void main(String[] args) throws Exception { | taos-jdbcdriver 版本 | 主要变化 | | :------------------: | :----------------------------: | +| 3.0.0 | 支持 TDengine 3.0 | | 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | | 2.0.38 | JDBC REST 连接增加批量拉取功能 | | 2.0.37 | 增加对 json tag 支持 | From 8d651d37fcca03922ba5c1375709ac3fd0a1ab1a Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 13:56:54 +0800 Subject: [PATCH 17/43] style: format docs --- docs/zh/12-taos-sql/29-changes.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md index f6a78d3277..896f1ad5de 100644 --- a/docs/zh/12-taos-sql/29-changes.md +++ b/docs/zh/12-taos-sql/29-changes.md @@ -6,8 +6,8 @@ description: "TDengine 3.0 版本的语法变更说明" ## SQL 基本元素变更 -| # | **元素** | **差异性** | **说明** | -| - | :------- | :--------: | :------- | +| # | **元素** | ****
差异性
**** | **说明** | +| - | :------- | :-------- | :------- | | 1 | VARCHAR | 新增 | BINARY类型的别名。 | 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。 | 3 | _ROWTS伪列 | 新增 | 表示时间戳主键。是_C0伪列的别名。 @@ -22,8 +22,8 @@ description: "TDengine 3.0 版本的语法变更说明" 在 TDengine 中,普通表的数据模型中可使用以下数据类型。 -| # | **语句** | **差异性** | **说明** | -| - | :------- | :--------: | :------- | +| # | **语句** | **
差异性
** | **说明** | +| - | :------- | :-------- | :------- | | 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。 | 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。 | 3 | ALTER DATABASE | 调整 | 废除
  • QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。
  • BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。
  • UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。
  • CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。
  • COMP:3.0版本暂不支持修改。
    新增
  • CACHEMODEL:表示是否在内存中缓存子表的最近数据。
  • CACHESIZE:表示缓存子表最近数据的内存大小。
  • WAL_FSYNC_PERIOD:代替原FSYNC参数。
  • WAL_LEVEL:代替原WAL参数。
    调整
  • REPLICA:3.0.0版本暂不支持修改。
  • KEEP:3.0版本新增支持带单位的设置方式。
@@ -80,8 +80,8 @@ description: "TDengine 3.0 版本的语法变更说明" ## SQL 函数变更 -| # | **函数** | **差异性** | **说明** | -| - | :------- | :--------: | :------- | +| # | **函数** | **
差异性
** | **说明** | +| - | :------- | :-------- | :------- | | 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 3 | LEASTSQUARES | 增强 | 可以用于超级表了。 From 65c08530d568d91b6a279f689bf4356003f7ccff Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 13:57:37 +0800 Subject: [PATCH 18/43] Update 29-changes.md --- docs/zh/12-taos-sql/29-changes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md index 896f1ad5de..145af5cc73 100644 --- a/docs/zh/12-taos-sql/29-changes.md +++ b/docs/zh/12-taos-sql/29-changes.md @@ -80,7 +80,7 @@ description: "TDengine 3.0 版本的语法变更说明" ## SQL 函数变更 -| # | **函数** | **
差异性
** | **说明** | +| # | **函数** | **
差异性
** | **说明** | | - | :------- | :-------- | :------- | | 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 From 2a8114c1726db1bee043b91240dc93c928e21a9e Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 14:01:50 +0800 Subject: [PATCH 19/43] Update 29-changes.md --- docs/zh/12-taos-sql/29-changes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md index 145af5cc73..63a998b4cd 100644 --- a/docs/zh/12-taos-sql/29-changes.md +++ b/docs/zh/12-taos-sql/29-changes.md @@ -6,7 +6,7 @@ description: "TDengine 3.0 版本的语法变更说明" ## SQL 基本元素变更 -| # | **元素** | ****
差异性
**** | **说明** | +| # | **元素** | **
差异性
** | **说明** | | - | :------- | :-------- | :------- | | 1 | VARCHAR | 新增 | BINARY类型的别名。 | 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。 From 393ee81e327c93edb7f831dc08357afb6e9b880e Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 14:02:45 +0800 Subject: [PATCH 20/43] Update 29-changes.md --- docs/zh/12-taos-sql/29-changes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md index 63a998b4cd..77544c214d 100644 --- a/docs/zh/12-taos-sql/29-changes.md +++ b/docs/zh/12-taos-sql/29-changes.md @@ -6,7 +6,7 @@ description: "TDengine 3.0 版本的语法变更说明" ## SQL 基本元素变更 -| # | **元素** | **
差异性
** | **说明** | +| # | **元素** |
**差异性**
| **说明** | | - | :------- | :-------- | :------- | | 1 | VARCHAR | 新增 | BINARY类型的别名。 | 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。 From 7f8b6fbdf623c6b9f639cf11e7ffdaefd6dfeffe Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 14:07:16 +0800 Subject: [PATCH 21/43] Update 29-changes.md --- docs/zh/12-taos-sql/29-changes.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md index 77544c214d..d653c59a5c 100644 --- a/docs/zh/12-taos-sql/29-changes.md +++ b/docs/zh/12-taos-sql/29-changes.md @@ -6,7 +6,7 @@ description: "TDengine 3.0 版本的语法变更说明" ## SQL 基本元素变更 -| # | **元素** |
**差异性**
| **说明** | +| # | **元素** | **
差异性
** | **说明** | | - | :------- | :-------- | :------- | | 1 | VARCHAR | 新增 | BINARY类型的别名。 | 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。 @@ -22,7 +22,7 @@ description: "TDengine 3.0 版本的语法变更说明" 在 TDengine 中,普通表的数据模型中可使用以下数据类型。 -| # | **语句** | **
差异性
** | **说明** | +| # | **语句** | **
差异性
** | **说明** | | - | :------- | :-------- | :------- | | 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。 | 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。 @@ -80,7 +80,7 @@ description: "TDengine 3.0 版本的语法变更说明" ## SQL 函数变更 -| # | **函数** | **
差异性
** | **说明** | +| # | **函数** | **
差异性
** | **说明** | | - | :------- | :-------- | :------- | | 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 From 645a2cce97e989e20e410cbb948c781e04606dcc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 16 Aug 2022 13:52:02 +0800 Subject: [PATCH 22/43] fix(stream): window delete --- source/dnode/vnode/src/tq/tqSink.c | 12 +++++----- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 6 ++--- tests/script/jenkins/basic.txt | 4 ++-- tests/script/tsim/stream/session0.sim | 26 +++++++++++----------- 4 files changed, 23 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 8a249eb105..55630511bf 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,7 +17,7 @@ #include "tmsg.h" #include "tq.h" -int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, +int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq) { ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT); int32_t totRow = pDataBlock->info.rows; @@ -68,9 +68,10 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { int32_t padding1 = 0; - void* padding2 = taosMemoryMalloc(1); + void* padding2 = NULL; taosArrayPush(schemaReqSz, &padding1); taosArrayPush(schemaReqs, &padding2); + continue; } STagVal tagVal = { @@ -138,8 +139,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem continue; } int32_t rows = pDataBlock->info.rows; - // TODO min - int32_t rowSize = pDataBlock->info.rowSize; + /*int32_t rowSize = pDataBlock->info.rowSize;*/ int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); int32_t schemaLen = 0; @@ -150,7 +150,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem } // assign data - // TODO ret = rpcMallocCont(cap); ret->header.vgId = pVnode->config.vgId; ret->length = sizeof(SSubmitReq); @@ -161,13 +160,12 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; - tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); + tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); continue; } blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->sversion = htonl(pTSchema->version); - // TODO blkHead->suid = htobe64(suid); // uid is assigned by vnode blkHead->uid = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 8ae0e824cf..6fc6636623 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -196,9 +196,9 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey); } - tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 - " since %s", - TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); + tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 + " since %s", + TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); return code; _err: diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index fda5e5cb6e..97295d75e0 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -237,8 +237,8 @@ ./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ./test.sh -f tsim/stream/distributeSession0.sim -#./test.sh -f tsim/stream/session0.sim -#./test.sh -f tsim/stream/session1.sim +./test.sh -f tsim/stream/session0.sim +./test.sh -f tsim/stream/session1.sim ./test.sh -f tsim/stream/state0.sim ./test.sh -f tsim/stream/triggerInterval0.sim ./test.sh -f tsim/stream/triggerSession0.sim diff --git a/tests/script/tsim/stream/session0.sim b/tests/script/tsim/stream/session0.sim index fee8c98cce..3e0af354d8 100644 --- a/tests/script/tsim/stream/session0.sim +++ b/tests/script/tsim/stream/session0.sim @@ -83,22 +83,22 @@ if $data11 != 3 then goto loop0 endi -if $data12 != NULL then +if $data12 != 10 then print ======data12=$data12 goto loop0 endi -if $data13 != NULL then +if $data13 != 10 then print ======data13=$data13 goto loop0 endi -if $data14 != NULL then +if $data14 != 1.100000000 then print ======data14=$data14 return -1 endi -if $data15 != NULL then +if $data15 != 0.000000000 then print ======data15=$data15 return -1 endi @@ -141,38 +141,38 @@ if $data01 != 7 then goto loop1 endi -if $data02 != NULL then +if $data02 != 18 then print =====data02=$data02 goto loop1 endi -if $data03 != NULL then +if $data03 != 4 then print =====data03=$data03 goto loop1 endi -if $data04 != NULL then - print ======$data04 +if $data04 != 1.000000000 then + print ======data04=$data04 return -1 endi -if $data05 != NULL then - print ======$data05 +if $data05 != 1.154700538 then + print ======data05=$data05 return -1 endi if $data06 != 4 then - print ======$data06 + print ======data06=$data06 return -1 endi if $data07 != 1.000000000 then - print ======$data07 + print ======data07=$data07 return -1 endi if $data08 != 13 then - print ======$data08 + print ======data08=$data08 return -1 endi From 4b85d81d373ad91d497127b65292049175d79b10 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Tue, 16 Aug 2022 14:31:42 +0800 Subject: [PATCH 23/43] docs(TMQ): improve tmq document --- docs/zh/07-develop/07-tmq.mdx | 105 ++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..aa599c2173 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -87,6 +87,25 @@ void commitSync() throws SQLException; void close() throws SQLException; ``` + + + +```go +func NewConsumer(conf *Config) (*Consumer, error) + +func (c *Consumer) Close() error + +func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error + +func (c *Consumer) FreeMessage(message unsafe.Pointer) + +func (c *Consumer) Poll(timeout time.Duration) (*Result, error) + +func (c *Consumer) Subscribe(topics []string) error + +func (c *Consumer) Unsubscribe() error +``` + @@ -229,6 +248,56 @@ public class MetersDeserializer extends ReferenceDeserializer { } ``` + + + +```go +config := tmq.NewConfig() +defer config.Destroy() +err = config.SetGroupID("test") +if err != nil { + panic(err) +} +err = config.SetAutoOffsetReset("earliest") +if err != nil { + panic(err) +} +err = config.SetConnectIP("127.0.0.1") +if err != nil { + panic(err) +} +err = config.SetConnectUser("root") +if err != nil { + panic(err) +} +err = config.SetConnectPass("taosdata") +if err != nil { + panic(err) +} +err = config.SetConnectPort("6030") +if err != nil { + panic(err) +} +err = config.SetMsgWithTableName(true) +if err != nil { + panic(err) +} +err = config.EnableHeartBeat() +if err != nil { + panic(err) +} +err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { + if result.ErrCode != 0 { + errStr := wrapper.TMQErr2Str(result.ErrCode) + err := errors.NewError(int(result.ErrCode), errStr) + panic(err) + } +}) +if err != nil { + panic(err) +} +``` + @@ -260,6 +329,20 @@ topics.add("tmq_topic"); consumer.subscribe(topics); ``` + + + +```go +consumer, err := tmq.NewConsumer(config) +if err != nil { + panic(err) +} +err = consumer.Subscribe([]string{"example_tmq_topic"}) +if err != nil { + panic(err) +} +``` + @@ -293,6 +376,21 @@ while(running){ } ``` + + + +```go +for { + result, err := consumer.Poll(time.Second) + if err != nil { + panic(err) + } + fmt.Println(result) + consumer.Commit(context.Background(), result.Message) + consumer.FreeMessage(result.Message) +} +``` + @@ -322,6 +420,13 @@ consumer.unsubscribe(); consumer.close(); ``` + + + +```go +consumer.Close() +``` + From a325c5e7f2b5cf373cc5c265e2ca3d332b4fe568 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 14:58:37 +0800 Subject: [PATCH 24/43] feat: add download --- docs/zh/05-get-started/03-package.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 846cd9e9cd..4a5329323d 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -5,6 +5,7 @@ title: 使用安装包立即开始 import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; +import PkgListV3 from "/components/PkgListV3"; 在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. @@ -64,6 +65,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm + + 1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: @@ -85,6 +88,8 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 + + 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 From e5194f5f53cc44d45ad75c07b8627a8db427a990 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:03:17 +0800 Subject: [PATCH 25/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 4a5329323d..59991cc0c5 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -89,7 +89,7 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 - + 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 From e4f9a41f4d4f0d0bd0d55d61be1bf6ed031334fa Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 16 Aug 2022 15:03:20 +0800 Subject: [PATCH 26/43] fix(query): fix sample with partition by invalid pageId cause crash issue TD-17499 --- source/libs/function/src/builtinsimpl.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index de72c32fa1..cbf81f1d0d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) { return numOfElems; } +static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + pInfo->data = (char*)pInfo + sizeof(SSampleInfo); + pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes); + + return pInfo; +} + bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); @@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da int32_t sampleFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SSampleInfo* pInfo = getSampleOutputInfo(pCtx); SInputColumnInfoData* pInput = &pCtx->input; @@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + SSampleInfo* pInfo = getSampleOutputInfo(pCtx); pEntryInfo->complete = true; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; From f7a2d8f63c234de88285a3d1fc562a11ed471e6d Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:05:16 +0800 Subject: [PATCH 27/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 59991cc0c5..08b23b5025 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -89,7 +89,7 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 - + 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 From 1b86bd111692052dec19e4c57c0af058ba4fa8d5 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:10:07 +0800 Subject: [PATCH 28/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 08b23b5025..69424a2d18 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -65,9 +65,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm +1. 从列表中下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; - -1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: ```bash @@ -81,6 +80,9 @@ sudo ./install.sh ``` :::info + +下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) + install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 ::: From 5f04c27cc2761c8510d845b668d1a9434b1a2380 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:12:10 +0800 Subject: [PATCH 29/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 69424a2d18..bce7529989 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -82,7 +82,6 @@ sudo ./install.sh :::info 下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) - install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 ::: From 400ff894174a0e9e8d8b6bc4caa79b7bea59ee46 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:15:23 +0800 Subject: [PATCH 30/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 65 ++++++++++++++-------------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index bce7529989..b35872e7ae 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -11,36 +11,11 @@ import PkgListV3 from "/components/PkgListV3"; ## 安装 +:::info +下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) +::: + - -可以使用 apt-get 工具从官方仓库安装。 - -**安装包仓库** - -```bash -wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - -echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list -``` - -如果安装 Beta 版需要安装包仓库 - -```bash -wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - -echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list -``` - -**使用 apt-get 命令安装** - -```bash -sudo apt-get update -apt-cache policy tdengine -sudo apt-get install tdengine -``` - -:::tip -apt-get 方式只适用于 Debian 或 Ubuntu 系统 -:::: - 1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; @@ -80,10 +55,7 @@ sudo ./install.sh ``` :::info - -下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 - ::: @@ -95,6 +67,35 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 + + +可以使用 apt-get 工具从官方仓库安装。 + +**安装包仓库** + +```bash +wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list +``` + +如果安装 Beta 版需要安装包仓库 + +```bash +wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list +``` + +**使用 apt-get 命令安装** + +```bash +sudo apt-get update +apt-cache policy tdengine +sudo apt-get install tdengine +``` + +:::tip +apt-get 方式只适用于 Debian 或 Ubuntu 系统 +:::: From 29f78522f8a123e40614f4ce88bbdb5f531e18ad Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:18:32 +0800 Subject: [PATCH 31/43] Update _windows_install.mdx --- docs/zh/14-reference/03-connector/_windows_install.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_windows_install.mdx b/docs/zh/14-reference/03-connector/_windows_install.mdx index 755f96b2d7..10cf37a7b4 100644 --- a/docs/zh/14-reference/03-connector/_windows_install.mdx +++ b/docs/zh/14-reference/03-connector/_windows_install.mdx @@ -1,8 +1,8 @@ -import PkgList from "/components/PkgList"; +import PkgListV3 from "/components/PkgListV3"; 1. 下载客户端安装包 - + [所有下载](https://www.taosdata.com/cn/all-downloads/) From 397349156f97bd2765a67c78f50d8978e213b579 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:19:17 +0800 Subject: [PATCH 32/43] Update _linux_install.mdx --- docs/zh/14-reference/03-connector/_linux_install.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/03-connector/_linux_install.mdx b/docs/zh/14-reference/03-connector/_linux_install.mdx index eb7f683288..a4667caec9 100644 --- a/docs/zh/14-reference/03-connector/_linux_install.mdx +++ b/docs/zh/14-reference/03-connector/_linux_install.mdx @@ -1,4 +1,4 @@ -import PkgList from "/components/PkgList"; +import PkgListV3 from "/components/PkgListV3"; 1. 下载客户端安装包 From 5b3f85ed3ba569a3cf867603d6747f991b5f5636 Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:22:09 +0800 Subject: [PATCH 33/43] docs(driver):update C# 3.0 tmq doc in develop/tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 90 +++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..957a8bbdda 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -87,6 +87,31 @@ void commitSync() throws SQLException; void close() throws SQLException; ``` + + + +```C# +ConsumerBuilder(IEnumerable> config) + +virtual IConsumer Build() + +Consumer(ConsumerBuilder builder) + +void Subscribe(IEnumerable topics) + +void Subscribe(string topic) + +ConsumeResult Consume(int millisecondsTimeout) + +List Subscription() + +void Unsubscribe() + +void Commit(ConsumeResult consumerResult) + +void Close() +``` + @@ -229,6 +254,30 @@ public class MetersDeserializer extends ReferenceDeserializer { } ``` + + + +```C# +using TDengineTMQ; + +// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 +// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 +var cfg = new ConsumerConfig + { + EnableAutoCommit = "true" + AutoCommitIntervalMs = "1000" + GourpId = "TDengine-TMQ-C#", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + AutoOffsetReset = "earliest" + MsgWithTableName = "true", + TDConnectIp = "127.0.0.1", + TDConnectPort = "6030" + }; + +var consumer = new ConsumerBuilder(cfg).Build(); +``` + @@ -262,6 +311,18 @@ consumer.subscribe(topics); + + +```C# +// 创建订阅 topics 列表 +List topics = new List(); +topics.add("tmq_topic"); +// 启动订阅 +consumer.Subscribe(topics); +``` + + + ## 消费 @@ -296,6 +357,23 @@ while(running){ + + +```C# +// 消费数据 +while (true) +{ + var consumerRes = consumer.Consume(100); + // process ConsumeResult + ProcessMsg(consumerRes); + consumer.Commit(consumerRes); +} +``` + + + + + ## 结束消费 消费结束后,应当取消订阅。 @@ -322,6 +400,18 @@ consumer.unsubscribe(); consumer.close(); ``` + + + + +```C# +// 取消订阅 +consumer.Unsubscribe(); + +// 关闭消费 +consumer.Close(); +``` + From 0db3fe1e1e617e03435238e327a31e98c803b72d Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:39:22 +0800 Subject: [PATCH 34/43] docs(driver):update markdown syntax mistake caused by merge --- docs/zh/07-develop/07-tmq.mdx | 105 ++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 985b6a6ff2..fb308c5c40 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -89,6 +89,26 @@ void close() throws SQLException; + + +```go +func NewConsumer(conf *Config) (*Consumer, error) + +func (c *Consumer) Close() error + +func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error + +func (c *Consumer) FreeMessage(message unsafe.Pointer) + +func (c *Consumer) Poll(timeout time.Duration) (*Result, error) + +func (c *Consumer) Subscribe(topics []string) error + +func (c *Consumer) Unsubscribe() error + +``` + + ```C# @@ -111,27 +131,10 @@ void Unsubscribe() void Commit(ConsumeResult consumerResult) void Close() - - - - -```go -func NewConsumer(conf *Config) (*Consumer, error) - -func (c *Consumer) Close() error - -func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error - -func (c *Consumer) FreeMessage(message unsafe.Pointer) - -func (c *Consumer) Poll(timeout time.Duration) (*Result, error) - -func (c *Consumer) Subscribe(topics []string) error - -func (c *Consumer) Unsubscribe() error ``` + ## 写入数据 @@ -275,29 +278,6 @@ public class MetersDeserializer extends ReferenceDeserializer { - - -```C# -using TDengineTMQ; - -// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 -// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 -var cfg = new ConsumerConfig - { - EnableAutoCommit = "true" - AutoCommitIntervalMs = "1000" - GourpId = "TDengine-TMQ-C#", - TDConnectUser = "root", - TDConnectPasswd = "taosdata", - AutoOffsetReset = "earliest" - MsgWithTableName = "true", - TDConnectIp = "127.0.0.1", - TDConnectPort = "6030" - }; - -var consumer = new ConsumerBuilder(cfg).Build(); - - ```go @@ -348,6 +328,32 @@ if err != nil { ``` + + + +```C# +using TDengineTMQ; + +// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 +// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 +var cfg = new ConsumerConfig + { + EnableAutoCommit = "true" + AutoCommitIntervalMs = "1000" + GourpId = "TDengine-TMQ-C#", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + AutoOffsetReset = "earliest" + MsgWithTableName = "true", + TDConnectIp = "127.0.0.1", + TDConnectPort = "6030" + }; + +var consumer = new ConsumerBuilder(cfg).Build(); + +``` + + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -500,6 +506,15 @@ consumer.close(); + + +```go +consumer.Close() + +``` + + + ```C# @@ -510,14 +525,6 @@ consumer.Unsubscribe(); consumer.Close(); - - -```go -consumer.Close() - -``` - - ## 删除 *topic* From e60359336d0d7ed32c5198dd9215e61708343145 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:42:01 +0800 Subject: [PATCH 35/43] Update _linux_install.mdx --- docs/zh/14-reference/03-connector/_linux_install.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_linux_install.mdx b/docs/zh/14-reference/03-connector/_linux_install.mdx index a4667caec9..c3ddff53cd 100644 --- a/docs/zh/14-reference/03-connector/_linux_install.mdx +++ b/docs/zh/14-reference/03-connector/_linux_install.mdx @@ -2,9 +2,9 @@ import PkgListV3 from "/components/PkgListV3"; 1. 下载客户端安装包 - + - [所有下载](https://www.taosdata.com/cn/all-downloads/) + [所有下载](../../releases) 2. 解压缩软件包 From 85c4994e7b0c6336e70065cb1f581b0a1866bc05 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:42:32 +0800 Subject: [PATCH 36/43] Update _windows_install.mdx --- docs/zh/14-reference/03-connector/_windows_install.mdx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_windows_install.mdx b/docs/zh/14-reference/03-connector/_windows_install.mdx index 10cf37a7b4..9fdefa04c0 100644 --- a/docs/zh/14-reference/03-connector/_windows_install.mdx +++ b/docs/zh/14-reference/03-connector/_windows_install.mdx @@ -4,8 +4,7 @@ import PkgListV3 from "/components/PkgListV3"; - [所有下载](https://www.taosdata.com/cn/all-downloads/) - + [所有下载](../../releases) 2. 执行安装程序,按提示选择默认值,完成安装 3. 安装路径 From a0525216c0c3a6d49696b76b7f03f36f2b0e5bf5 Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:44:51 +0800 Subject: [PATCH 37/43] docs(driver):update C# tmq document --- docs/zh/07-develop/07-tmq.mdx | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index fb308c5c40..c37a3c4737 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -88,7 +88,6 @@ void close() throws SQLException; ``` - ```go @@ -105,7 +104,6 @@ func (c *Consumer) Poll(timeout time.Duration) (*Result, error) func (c *Consumer) Subscribe(topics []string) error func (c *Consumer) Unsubscribe() error - ``` @@ -277,7 +275,6 @@ public class MetersDeserializer extends ReferenceDeserializer { ``` - ```go @@ -505,7 +502,6 @@ consumer.close(); ``` - ```go @@ -514,7 +510,6 @@ consumer.Close() ``` - ```C# From 093ff2bd54b50f41895151773872b9c1ee73069c Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:46:07 +0800 Subject: [PATCH 38/43] docs(driver):update TMQ markdown doc --- docs/zh/07-develop/07-tmq.mdx | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index c37a3c4737..0945f69fcf 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -506,7 +506,6 @@ consumer.close(); ```go consumer.Close() - ``` From dbf93120ab9a8e48dcef5286afc16fcc68c4ed4f Mon Sep 17 00:00:00 2001 From: Yang Zhao Date: Tue, 16 Aug 2022 15:46:51 +0800 Subject: [PATCH 39/43] Update 07-tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 100 ++++------------------------------ 1 file changed, 11 insertions(+), 89 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index d8966f7798..1ec1922c01 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -92,102 +92,21 @@ void close() throws SQLException; ```python class TaosConsumer(): - DEFAULT_CONFIG = { - 'group.id', - 'client.id', - 'enable.auto.commit', - 'auto.commit.interval.ms', - 'auto.offset.reset', - 'msg.with.table.name', - 'experimental.snapshot.enable', - 'enable.heartbeat.background', - 'experimental.snapshot.batch.size', - 'td.connect.ip', - 'td.connect.user', - 'td.connect.pass', - 'td.connect.port', - 'td.connect.db', - 'timeout' - } + def __init__(self, *topics, **configs) - def __init__(self, *topics, **configs): - self._closed = True - self._conf = None - self._list = None - self._tmq = None + def __iter__(self) - keys = list(configs.keys()) - for k in keys: - configs.update({k.replace('_','.'): configs.pop(k)}) + def __next__(self) - extra_configs = set(configs).difference(self.DEFAULT_CONFIG) - if extra_configs: - raise TmqError("Unrecognized configs: %s" % (extra_configs,)) - - self._conf = tmq_conf_new() - self._list = tmq_list_new() - - # set poll timeout - if 'timeout' in configs: - self._timeout = configs['timeout'] - del configs['timeout'] - else: - self._timeout = 0 - - # check if group id is set - - if 'group.id' not in configs: - raise TmqError("missing group.id in consumer config setting") - - for key, value in configs.items(): - tmq_conf_set(self._conf, key, value) - - self._tmq = tmq_consumer_new(self._conf) - - if not topics: - raise TmqError("Unset topic for Consumer") - - for topic in topics: - tmq_list_append(self._list, topic) - - tmq_subscribe(self._tmq, self._list) - - - def __iter__(self): - return self - - def __next__(self): - if not self._tmq: - raise StopIteration('TaosConsumer closed') - return next(self.sync_next()) - - def sync_next(self): - while 1: - res = tmq_consumer_poll(self._tmq, self._timeout) - if res: - break - yield TaosResult(res) + def sync_next(self) - def subscription(self): - if self._tmq is None: - return None - return tmq_subscription(self._tmq) + def subscription(self) - def unsubscribe(self): - tmq_unsubscribe(self._tmq) + def unsubscribe(self) - def close(self): - if self._tmq: - tmq_consumer_close(self._tmq) - self._tmq = None + def close(self) - def __del__(self): - if self._conf: - tmq_conf_destroy(self._conf) - if self._list: - tmq_list_destroy(self._list) - if self._tmq: - tmq_consumer_close(self._tmq) + def __del__(self) ``` @@ -354,6 +273,8 @@ public class MetersDeserializer extends ReferenceDeserializer { +Python 使用以下配置项创建一个 Consumer 实例。 + | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | | `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | @@ -368,6 +289,7 @@ public class MetersDeserializer extends ReferenceDeserializer { | `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | | `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | | `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | +| `timeout` | int | 消费者拉去的超时时间 | | From ae954d2eda5c534cc1de94474d35888f3ea472d5 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 16:11:46 +0800 Subject: [PATCH 40/43] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index b35872e7ae..7e694a704f 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -18,7 +18,8 @@ import PkgListV3 from "/components/PkgListV3"; -1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; +1. 从列表中下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; + 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令: ```bash @@ -29,7 +30,8 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb -1. 从 [发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm; +1. 从列表中下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm; + 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令: ```bash @@ -60,11 +62,10 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 - - - + -1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; +1. 从列表中下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; + 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 From bd4fdca940d1a2c2548c8be98a5df3fb1b177cc2 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 16:32:22 +0800 Subject: [PATCH 41/43] Update 07-tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 200c1258a9..a3e6f035a0 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -520,7 +520,6 @@ for { ``` - From 90347b156951d5458d4ba007698cb803092cbe44 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 16:42:01 +0800 Subject: [PATCH 42/43] Update 07-tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index a3e6f035a0..1fa6469b62 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -574,6 +574,9 @@ consumer.unsubscribe(); /* 关闭消费 */ consumer.close(); + + + ```go From debc78805c9c8dce0b09df6321972d6eece98221 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 16:44:09 +0800 Subject: [PATCH 43/43] Update 07-tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 1fa6469b62..f22c3419cc 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -966,3 +966,5 @@ for msg in consumer: + +