From f83ca89ea22746528b26d0caf502b6140a7ebab1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 6 Aug 2022 19:40:10 +0800 Subject: [PATCH 01/12] refactor(sync): make leader life longer --- source/common/src/tglobal.c | 3 +- source/libs/sync/inc/syncRaftLog.h | 2 + source/libs/sync/src/syncAppendEntries.c | 112 ++++++++++++++--------- source/libs/sync/src/syncMain.c | 3 + source/libs/sync/src/syncRaftLog.c | 18 +++- 5 files changed, 91 insertions(+), 47 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f836cd76ac..a0f02d96f9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -401,7 +401,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; - tsNumOfVnodeSyncThreads = tsNumOfCores; + // tsNumOfVnodeSyncThreads = tsNumOfCores; + tsNumOfVnodeSyncThreads = 32; tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 65ec77e38f..ff59189a9d 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -47,6 +47,8 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore); SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); +SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore); + // for debug void logStorePrint(SSyncLogStore* pLogStore); void logStorePrint2(char* s, SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index f31f3dd1ae..85d1fa6d31 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -357,16 +357,14 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd); - syncNodeEventLog(ths, eventLog); - logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); - return code; } +// if FromIndex > walCommitVer, return 0 +// else return num of pass entries static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { - int32_t code; + int32_t code = 0; + int32_t pass = 0; SyncIndex delBegin = FromIndex; SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore); @@ -398,16 +396,31 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { } } + // update delete begin + SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore); + + if (delBegin <= walCommitVer) { + delBegin = walCommitVer + 1; + pass = walCommitVer - delBegin + 1; + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "update delete begin to %ld", delBegin); + syncNodeEventLog(ths, logBuf); + } while (0); + } + // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd); - syncNodeEventLog(ths, eventLog); - logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "make log same from:%ld, delbegin:%ld, pass:%d", FromIndex, delBegin, pass); + syncNodeEventLog(ths, logBuf); + } while (0); - return code; + return pass; } int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) { @@ -543,31 +556,34 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { + int32_t pass = 0; + SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; + // make log same - do { - SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; - - if (hasExtraEntries) { - // make log same, rollback deleted entries - code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); - ASSERT(code == 0); - } - - } while (0); + if (hasExtraEntries) { + // make log same, rollback deleted entries + pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); + ASSERT(pass >= 0); + } // append entry batch - for (int32_t i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; + if (pass == 0) { + // assert! no batch + ASSERT(pMsg->dataCount == 1); + + for (int32_t i = 0; i < pMsg->dataCount; ++i) { + SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + return -1; + } + + code = syncNodePreCommit(ths, pAppendEntry, 0); + ASSERT(code == 0); + + // syncEntryDestory(pAppendEntry); } - - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); - - // syncEntryDestory(pAppendEntry); } // fsync once @@ -670,25 +686,33 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc syncLogRecvAppendEntriesBatch(ths, pMsg, "really match"); + int32_t pass = 0; + if (hasExtraEntries) { // make log same, rollback deleted entries - code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); - ASSERT(code == 0); + pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); + ASSERT(pass >= 0); } if (hasAppendEntries) { // append entry batch - for (int32_t i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; + if (pass == 0) { + // assert! no batch + ASSERT(pMsg->dataCount == 1); + + // append entry batch + for (int32_t i = 0; i < pMsg->dataCount; ++i) { + SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + return -1; + } + + code = syncNodePreCommit(ths, pAppendEntry, 0); + ASSERT(code == 0); + + // syncEntryDestory(pAppendEntry); } - - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); - - // syncEntryDestory(pAppendEntry); } // fsync once diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index daa65add3c..389eda0f8b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2409,6 +2409,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; + + syncNodeEventLog(pSyncNode, "eq hb timer"); + if (pSyncNode->replicaNum > 1) { if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index b575e40d86..0649e064e4 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -305,10 +305,18 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, return code; } +// truncate semantic static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - int32_t code = walRollback(pWal, fromIndex); + + // need not truncate + SyncIndex wallastVer = walGetLastVer(pWal); + if (fromIndex > wallastVer) { + return 0; + } + + int32_t code = walRollback(pWal, fromIndex); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -323,7 +331,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn // event log do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%" PRId64, fromIndex); + snprintf(logBuf, sizeof(logBuf), "log truncate, from-index:%" PRId64, fromIndex); syncNodeEventLog(pData->pSyncNode, logBuf); } while (0); @@ -637,6 +645,12 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { return walGetFirstVer(pWal); } +SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + return walGetCommittedVer(pWal); +} + // for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { char* serialized = logStore2Str(pLogStore); From f8b0c98a71eaca40d3841565d3c8554648b6f42c Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 6 Aug 2022 20:44:14 +0800 Subject: [PATCH 02/12] refactor(sync): make leader life longer --- source/libs/sync/src/syncCommit.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a603cfff27..3a94ed9713 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -92,6 +92,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } } + // advance commit index as large as possible + SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore); + if (walCommitVer > newCommitIndex) { + newCommitIndex = walCommitVer; + } + // maybe execute fsm if (newCommitIndex > pSyncNode->commitIndex) { SyncIndex beginIndex = pSyncNode->commitIndex + 1; From 2f1bf1eba911a1d79ebe174e631f9b0d69a4a585 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 6 Aug 2022 21:41:03 +0800 Subject: [PATCH 03/12] refactor(sync): make leader life longer --- source/libs/sync/src/syncAppendEntries.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 85d1fa6d31..4f93d8197d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -570,7 +570,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // append entry batch if (pass == 0) { // assert! no batch - ASSERT(pMsg->dataCount == 1); + ASSERT(pMsg->dataCount <= 1); for (int32_t i = 0; i < pMsg->dataCount; ++i) { SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); @@ -698,7 +698,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // append entry batch if (pass == 0) { // assert! no batch - ASSERT(pMsg->dataCount == 1); + ASSERT(pMsg->dataCount <= 1); // append entry batch for (int32_t i = 0; i < pMsg->dataCount; ++i) { From bfb6bce0073e2461a7be08e0b7242d069cc46872 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 7 Aug 2022 19:39:24 +0800 Subject: [PATCH 04/12] fix read error --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dc7ffa9b13..0b8829a0c0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -482,6 +482,7 @@ void cliReadTimeoutCb(uv_timer_t* handle) { // set up timeout cb SCliConn* conn = handle->data; tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + uv_read_stop(conn->stream); cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); } From 43026ac31dec1ad19083e047db29c9287aed34d3 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 8 Aug 2022 11:09:39 +0800 Subject: [PATCH 05/12] refactor(stream) --- docs/zh/07-develop/07-tmq.md | 6 +- include/common/tcommon.h | 1 + include/common/tmsg.h | 2 +- include/libs/stream/tstream.h | 29 +++-- source/client/src/tmq.c | 3 +- source/dnode/mnode/impl/inc/mndDef.h | 3 +- source/dnode/mnode/impl/src/mndDef.c | 4 +- source/dnode/mnode/impl/src/mndScheduler.c | 21 ++-- source/dnode/mnode/impl/src/mndStream.c | 8 +- source/libs/stream/CMakeLists.txt | 3 +- source/libs/stream/src/streamMeta.c | 16 +-- source/libs/stream/src/streamRecover.c | 121 +++++++++++++-------- source/libs/stream/src/streamTask.c | 4 +- 13 files changed, 123 insertions(+), 98 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index 7faccdcec1..459b88085f 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -6,11 +6,11 @@ title: 数据订阅 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 -与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量。 +与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 -消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。 +消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once 消费。 -为了实现上述功能,TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。 +为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。 diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 7d78c2dc2f..be18ef1fc0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -104,6 +104,7 @@ typedef struct SDataBlockInfo { uint32_t capacity; // TODO: optimize and remove following int64_t version; // used for stream, and need serialization + int64_t ts; // used for stream, and need serialization int32_t childId; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3c3071c8df..9eabeb4f58 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3075,7 +3075,7 @@ typedef struct { void* msg; } SBatchRsp; -static FORCE_INLINE void tFreeSBatchRsp(void *p) { +static FORCE_INLINE void tFreeSBatchRsp(void* p) { if (NULL == p) { return; } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 103ca6a4f0..239fcdad8d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -17,6 +17,7 @@ #include "os.h" #include "query.h" #include "tdatablock.h" +#include "tdbInt.h" #include "tmsg.h" #include "tmsgcb.h" #include "tqueue.h" @@ -85,6 +86,12 @@ enum { TASK_OUTPUT__FETCH, }; +enum { + STREAM_QUEUE__SUCESS = 1, + STREAM_QUEUE__FAILED, + STREAM_QUEUE__PROCESSING, +}; + typedef struct { int8_t type; } SStreamQueueItem; @@ -123,12 +130,6 @@ typedef struct { SSDataBlock* pBlock; } SStreamTrigger; -enum { - STREAM_QUEUE__SUCESS = 1, - STREAM_QUEUE__FAILED, - STREAM_QUEUE__PROCESSING, -}; - typedef struct { STaosQueue* queue; STaosQall* qall; @@ -233,6 +234,7 @@ typedef struct { typedef struct SStreamTask { int64_t streamId; int32_t taskId; + int32_t totalLevel; int8_t taskLevel; int8_t outputType; int16_t dispatchMsgType; @@ -458,9 +460,20 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); -typedef struct SStreamMeta SStreamMeta; +typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask); -SStreamMeta* streamMetaOpen(); +typedef struct SStreamMeta { + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pStateDb; + SHashObj* pTasks; + void* ahandle; + TXN txn; + FTaskExpand* expandFunc; +} SStreamMeta; + +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f7d45dc6ff..073f772ee4 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1699,7 +1699,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tmq_list_destroy(lst); - return rsp; + /*return rsp;*/ + return 0; } // TODO: free resources return 0; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 06c64dcea6..c4da9b5c3d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -604,11 +604,11 @@ typedef struct { int64_t createTime; int64_t updateTime; int32_t version; + int32_t totalLevel; int64_t smaId; // 0 for unused // info int64_t uid; int8_t status; - int8_t isDistributed; // config int8_t igExpired; int8_t trigger; @@ -647,7 +647,6 @@ typedef struct { typedef struct { int64_t uid; int64_t streamId; - int8_t isDistributed; int8_t status; int8_t stage; } SStreamRecoverObj; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index abac0573da..08ce161409 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -23,11 +23,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->isDistributed) < 0) return -1; if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; @@ -69,11 +69,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->isDistributed) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 218f82df18..a24b7ef459 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -307,10 +307,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); - ASSERT(totLevel <= 2); - pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); - pStream->isDistributed = totLevel == 2; + int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); + ASSERT(planTotLevel <= 2); + pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*)); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; @@ -320,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { bool multiTarget = pDbObj->cfg.numOfVgroups > 1; - if (totLevel == 2 || externalTargetDB || multiTarget) { + if (planTotLevel == 2 || externalTargetDB || multiTarget) { /*if (true) {*/ SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); @@ -338,8 +337,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } } } + pStream->totalLevel = planTotLevel + hasExtraSink; - if (totLevel > 1) { + if (planTotLevel > 1) { SStreamTask* pInnerTask; // inner level { @@ -371,13 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } -#if 0 - SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb); - ASSERT(pDbObj != NULL); - sdbRelease(pSdb, pSourceDb); - pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups; -#endif - if (tsSchedStreamToSnode) { SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { @@ -464,7 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } } - if (totLevel == 1) { + if (planTotLevel == 1) { SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 902879701c..ba9bb2982f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); -static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq); +/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -55,7 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); - mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq); + /*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/ mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); @@ -540,6 +540,7 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) { return 0; } +#if 0 int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { if (pStream->isDistributed) { int32_t lv = taosArrayGetSize(pStream->tasks); @@ -573,6 +574,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea } return 0; } +#endif int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t lv = taosArrayGetSize(pStream->tasks); @@ -755,6 +757,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +#if 0 static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -817,6 +820,7 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +#endif int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index 33e864158a..ceddf4f215 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -8,7 +8,8 @@ target_include_directories( target_link_libraries( stream - PRIVATE os util transport qcom executor tdb + PUBLIC tdb + PRIVATE os util transport qcom executor ) if(${BUILD_TEST}) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7dfeefb261..085a0e4ce7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -14,22 +14,8 @@ */ #include "executor.h" -#include "tdbInt.h" #include "tstream.h" -typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask); - -typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pStateDb; - SHashObj* pTasks; - void* ahandle; - TXN txn; - FTaskExpand* expandFunc; -} SStreamMeta; - SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -150,7 +136,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { return 0; } -int32_t streamRestoreTask(SStreamMeta* pMeta) { +int32_t streamLoadTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { ASSERT(0); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3530c05688..2a77ce8a91 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -87,53 +87,80 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp return 0; } -int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) { -#if 0 - if (pTask->taskStatus != TASK_STATUS__FAIL) { - return 0; - } +typedef struct { + int32_t vgId; + int32_t childId; + int64_t ver; +} SStreamVgVerCheckpoint; - if (pTask->isStreamDistributed) { - if (pTask->taskType == TASK_TYPE__SOURCE) { - pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; - } else if (pTask->taskType != TASK_TYPE__SINK) { - pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; - bool hasCheckpoint = false; - int32_t childSz = taosArrayGetSize(pTask->childEpInfo); - for (int32_t i = 0; i < childSz; i++) { - SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); - if (pEpInfo->checkpointVer == -1) { - hasCheckpoint = true; - break; - } - } - if (hasCheckpoint) { - // load from checkpoint - } else { - // recover child - } - } - } else { - if (pTask->taskType == TASK_TYPE__SOURCE) { - if (pTask->checkpointVer != -1) { - // load from checkpoint - } else { - // reset stream query task info - // TODO get snapshot ver - pTask->recoverSnapVer = -1; - qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); - pTask->taskStatus = TASK_STATUS__RECOVERING; - } - } - } - - if (pTask->taskStatus == TASK_STATUS__RECOVERING) { - if (streamPipelineExec(pTask, 100) < 0) { - // set fail - return -1; - } - } - -#endif +int32_t tEncodeSStreamVgVerCheckpoint(SEncoder* pEncoder, const SStreamVgVerCheckpoint* pCheckpoint) { + if (tEncodeI32(pEncoder, pCheckpoint->vgId) < 0) return -1; + if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1; + if (tEncodeI64(pEncoder, pCheckpoint->ver) < 0) return -1; + return 0; +} + +int32_t tDecodeSStreamVgVerCheckpoint(SDecoder* pDecoder, SStreamVgVerCheckpoint* pCheckpoint) { + if (tDecodeI32(pDecoder, &pCheckpoint->vgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1; + if (tDecodeI64(pDecoder, &pCheckpoint->ver) < 0) return -1; + return 0; +} + +typedef struct { + int64_t streamId; + int64_t checkTs; + int64_t checkpointId; + int32_t taskId; + SArray* checkpointVer; // SArray +} SStreamAggVerCheckpoint; + +int32_t tEncodeSStreamAggVerCheckpoint(SEncoder* pEncoder, const SStreamAggVerCheckpoint* pCheckpoint) { + if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1; + if (tEncodeI64(pEncoder, pCheckpoint->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1; + int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer); + if (tEncodeI32(pEncoder, sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SStreamVgVerCheckpoint* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i); + if (tEncodeSStreamVgVerCheckpoint(pEncoder, pOneVgCkpoint) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSStreamAggVerCheckpoint(SDecoder* pDecoder, SStreamAggVerCheckpoint* pCheckpoint) { + if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1; + if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1; + if (tDecodeI64(pDecoder, &pCheckpoint->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1; + int32_t sz; + if (tDecodeI32(pDecoder, &sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SStreamVgVerCheckpoint oneVgCheckpoint; + if (tDecodeSStreamVgVerCheckpoint(pDecoder, &oneVgCheckpoint) < 0) return -1; + taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint); + } + return 0; +} + +int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) { + ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); + // load status + void* pVal = NULL; + int32_t vLen = 0; + if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { + return -1; + } + SDecoder decoder; + tDecoderInit(&decoder, pVal, vLen); + SStreamAggVerCheckpoint aggCheckpoint; + tDecodeSStreamAggVerCheckpoint(&decoder, &aggCheckpoint); + /*pTask->*/ + return 0; +} + +int32_t streamRecoverTask(SStreamTask* pTask) { + // return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 3a54981989..8b5bd849f6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -52,6 +52,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; @@ -62,7 +63,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; - /*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/ int32_t epSz = taosArrayGetSize(pTask->childEpInfo); if (tEncodeI32(pEncoder, epSz) < 0) return -1; @@ -101,6 +101,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { /*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; @@ -111,7 +112,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; - /*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/ int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; From e8ead35bd81dda99df2c7163c880c9ff6dda4688 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 8 Aug 2022 11:34:29 +0800 Subject: [PATCH 06/12] fix multi process problem --- source/libs/transport/src/transCli.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0b8829a0c0..e1df181329 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -994,6 +994,8 @@ static void cliAsyncCb(uv_async_t* handle) { if (count >= 2) { tTrace("cli process batch size:%d", count); } + // if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb); + if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } static void cliPrepareCb(uv_prepare_t* handle) { @@ -1089,7 +1091,7 @@ static SCliThrd* createThrdObj() { pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); pThrd->prepare->data = pThrd; - uv_prepare_start(pThrd->prepare, cliPrepareCb); + // uv_prepare_start(pThrd->prepare, cliPrepareCb); int32_t timerSize = 512; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); @@ -1126,7 +1128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(timer); } taosArrayDestroy(pThrd->timerList); - taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); From 819a48a4646f8136065de66a9ab2ea179a818db4 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 8 Aug 2022 11:46:31 +0800 Subject: [PATCH 07/12] fix: skip 0 row blocks to dismiss crashing --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index a9f07cbf24..c40fb98d62 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -115,12 +115,19 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow); int64_t version = TSDBROW_VERSION(&row); + tsdbTrace("vgId:%d, vnode snapshot tsdb read for %s, %" PRId64 "(%" PRId64 " , %" PRId64 ")", + TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path, version, pReader->sver, pReader->ever); + if (version < pReader->sver || version > pReader->ever) continue; code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL); if (code) goto _err; } + if (pReader->nBlockData.nRow <= 0) { + continue; + } + // org data // compress data (todo) int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData); @@ -808,7 +815,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { if (code) goto _err; _exit: - tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); + tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path); return code; _err: From cdd40e2863159650e4edd77bb3dd323fb11960be Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 8 Aug 2022 12:57:09 +0800 Subject: [PATCH 08/12] fix multi process problem --- source/libs/transport/src/.transSvr.c.swo | Bin 53248 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 source/libs/transport/src/.transSvr.c.swo diff --git a/source/libs/transport/src/.transSvr.c.swo b/source/libs/transport/src/.transSvr.c.swo deleted file mode 100644 index c9486c50867994926ea395577110ee4d15de5632..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 53248 zcmeI53wT^tb?-%a7AT~=pY4fLyOJWyj|7L=0wbx#I@3q!md!Mh)P)+$?C_2p){vSqP4*V@u< zmTIj=z1iNJmtv`m@8|cE|nugzLw{-#a?)|8&Rw zrBI=m|9v~|f34&G{lfjl@b~cD{Y!lQ{^9y_!{3i|+z&td&I$LQ6c`qt50}^ftaD(U z1M3`E=fFA#);X}wfpre7b6}kV>l|3;z&Z#1OE^%P@9Wz@$vb2Zv;H6L|NnSkU*8A8 zGB_9f!vp&Iz6M?n_JLmnzX1N~XZreH0$vPW1Wtmdg9?}j=Ym1-K=1(Y9~jbK0q+5C z0j~tV2~L9Rz>~ox;9M{W9tysLasDOn2jJ!4Bv=3yFbM|10JuN+E{6W6!Kc9c!Mng~ zK?Pg_?gxI1v*1VI@4+X)AA`4mmw=nVb3qkc0nP_M2kr~Lj}zjH;5P6!@O$8O;8(zn zpazz}72qQ9i{K&P!Qek|aC{B?Irts$Yv5Nw3(SK3;6m_y93}q*z6L%IJ_k;K$AVAd zWO)O)4r~K=;A40*Xn{+?eZalJJ;0}NNPHaJ0-g%0;5=|2aBuJ#+WVD2ev(CS0gwzl z4y@tRYM0v8a&oL*ollmJ9jLdf3n%tfYu8=ctY25D&D2L4js9tZH%9BV+F(-V|AAc) zT6k)@U23+cE4BHlO3PF4Zygy-mRd(hTTD9HtTal^iW2WBwM&+DOgsbuvvbP}v+cp; z_*~bcg;K_A?Mky&TFf$Nd2t=(UDm8rM(6s;_-Ls%zgSV;{;ezjL#4&*dM6z+_6#Pa zrYh7?!XctyMi!uErqsGl1t}~>#WX9|4<*%FI}rz#TixZ6Zcn|(Q>07=w}7NwU8-z` z=dIL&pVftY zC+*b)mbcpVjO5*SGMS#PG>=u96LU|cTO@P!`XZuF)6CNo6Ujp=myMjr?n;s^Tau?P zBQkW8>zAwTWT9DK^8ZRE+OgA+34Q6j?T8v5MXjlz{IzPst{j_fAR@HVV4^RDYkHDv z`>HUiW|U29V3;}DoL6%GBE+E%8WZBDw!Ad4pp=#76e&(9)2{pebgT28dVZxjK7WI5 zP}|%{KpqSAz@No>y&=)k-#~M_%oRvz8EbWMX5I_oV%uol)z?1JsLWRu7`~dz<#w(e zk{g-1leB(%+?vJMtY4Z;RB1&4j?-&(?fT`)a>eEk<4!DZ|8l!>gJz2TZL5YSg9`g_ zUC&P2pfX9*4|3VsTR(J*_K)@BR<4Tm*6hN(H|YYRVktxOkucwY3@jRils0~4=++~V zNIkGJUt&84wlG&Rp&8l60DjEC$wchb)nW~;oava*%88XuCtY1_s7v)|6Duq$*;7%G zqot-ED_>VRad>8W&!w}IBh%A|CZ_hx?i)LBt@n;{wQ;o4)WkxnzU-k^rQEEvdp(v* zw2bU{l7k1vbD8RTGQKBSs5g6RFQ!oa+n#E(zICVeihc4N-;oWKwsO2P&5 zklHg)iq+hs z{BlW;(*vxEx~{|;Fz9@ZnX-)`GqY1;m(A`O8{IcDHP%1SJyy8Fmj7>=>(VOw}NDgh4$|`sCtdLumu`mn5L` z#_l~)E7LtSSnaRRFE3WcRqDmbEgvDt%HGVZ$iR$8pnxsF%cN9hXHR?=wJk2Fh5!^!x9NMeSqRFaF1 zw%d)JTeciOeth^yZF#ufJVFq4xl(IYT3dDxOAWlLzMQm<)|VIO(Or*KXzX&OdJG+~ z#E8{6k<=Hgt6hHJU^1c!y`EfNsa2Y##bk1Mj=Yk6o*#KByUWIB4vbAtCznl3B_qk? z$kfdE=s_AXnLIc(IWawEWsV%UDmgSVH8pZz=Biy*2b7hha;#FbvZ_ms#VQmWFEwFY z+i2K7HZ{7J^dpy!?;D@FiiT5ZGr5{4Fkzl4vRR#D+?Nh(nH{Zbd^}oeCoukab#XD7 zt0W903(Jc`$ZQYt8%8dl+_zz1h&dr?sH&}_Ls;f%_Ik~#M?w514~X7$L?cG6!SNVad?c40CzGBKUVRvJpiYvtkO zqNg5hm+F^*7WF9&m+MQrrTEMKe+72)ZtQH?|K|kb?=#r^?*K0V3!nn_fo)(bcp$g~ z`~L^v1K^$DRp6DN2JQ#Gg1!Gva0_@7cqMoW7zGak_XPh&xqk}e3wRZfFW?p6GB5^2 z*TrD{v(ABa4y@xaKp+6?{~xs^5yC6ybF^5m=+uzhD| zSyq#&%3`I|s*K&>6!*vKO;r~5A8GevJ(|bKDEP?{w{54~v4Im?t+w(woDOkDxVb{L z;LtCp^8DuAsXKHi*|fBI_w@x%Zfu!PQ>?`4(KKgx4^CpAE-ucM%3`UNvad8(S)_#M z!G%e%dh>2?87so2An)eIXA)@29yD(gWXVCpecV9SimE?XX@4%ZgCj`(w*9tnsEgRQ z`zuTJ=7~{gtC#z?`GQ1!Aujn1KnB^2OkZ_ibarfNYW9g!V>8YbCono@-TyvMrd9`| z(H>R7!}3TDn*@*`_#W}CS=FJ&sN>F{5t)`Zx_4~!Nwau6_Us$$C)>Ob(TB{DnW(+- zW%4u+Z{D4iWPY$Hkb@$tkjA;ZE9;DrZP3$Swso+SK3`_V(I)$!CGo!RX(++|kMaF~ zg?;~4@Juia{uO)vU%)58$HB|Mlfln}zr&8d9y|m*7H#7 z8@>Ua0UiY20s%XL*5c>D9juqX5d0jtl{N9pz?WG6u7RHduVC$a5c~=2+*9Co*0Y}m z9su6N`nA@v|AN5#rvTOKyQ*9IfxmoC`k$ZZ|JU@z?3XG_txCIp6Rl9&yqm6sTX-nh zn#yMWIszIWJB`0tS^a}qKia&zywDiP^+?7|+*&o9h?^u5DKW!{8ol+v!F~HqU3^wn z&))3$W<=bX>ZCQ#%8J9hve(Ge^Z|M(x@mn?Dk(J@3`a_I>WZYL_?K8#bpbzGo;h+T zYmPdDpIFw(GnnT}S;eyAL~@rJWTisF^;nj8c(zRnwjeOM+&UV?KP-PB{I(7me6w4D`7)Xm1xMt~U;-5#v4Ns9dobF?@?R!qOP|gr~ONaN*;&UvP3)63a?w zyf`DD(O&G;k7b<}Gwx*KSeA7yX~de`?Z+tk@MQ!m>}46Rzu#7{29EOm&dnzxuTa(~ zg5LCH!`P9->fvdRv%RVw%kpv87hP#qDn$&-8SZ1TEXjt+Rgfym6T?tD{rZ{c)v>Hz zbw1PAVp-iYWmxzNQzlaxX7;B+(Yk^Y)={o0}!2zYS2mQYO#Yf=nMu-H9bF|2lX zrj~VNY12TSU?-t-*`m3kWe${fvKZLf zKWHPj8Rwq-@U;WcP&E?`w@GG?QlWW-JGn8-ywy--oj-d8;cD5hFlr9Pum!O(O&X~> zMOj5^V>w;ny4^&s^q}?HJ5{JEcghE2UFDAI7?@TMMz-di&m;r>V4+!g5|lK8Idlr+ zW#=;;sR=<)&|z8AX0m0_{~k5(SBOm++(I7-Ip)ZNaDquCo-X-Ek@tD;P35DN@^#7a zqZPsrnn`I<@_j-zok!;-w4&0kBn|vAm3GY6C=C^xMvu%`7y_TwK?ptR`@A-)WB~zK3t&GvHm|CEzMB4JN=j;79lzJ_LRX zycs+Wd<_3X30w)jkN@EX;341(_#kcr9{}$MzYP>?zX{wMh~J+R&wnBxoT3_9(G6X> z?$VPRdri@ap=Es*H+9~yZ0TgFdV@lA6o1rE$Vy95RE)6Ohm*Quo?5sO6NU9Ckc44> zvFhH!Fe#hgwT%1{6sxG)r6qz_5a~q)3zE&Y;dHES5Diqe=$aU)r1CBQCVK)XE=(T& zWFWd$*9gQoxNS(DJh_T3LIRh;ME;1cp66jPOj>1LSjUEr+VQjLR%M$5@frvvro67hU!$OVy?g1i;C3mHcn zh+QL+2YS_}kko!Fq&B*>6-AwS^+*|t%&sY|cSrvps`pgpmXGvru<-z=U8zh^J<)86 zwu>5@IqWR2v=Ob#u25O1h$Y9$nwdD_S&IgOKE)9qiHKtIB$A zp{MymfX3K@T^t_LjEt#Fbs)y-qD~q-9w8(B60Pa&kaLfsa*0%cqD<3F*w92YYd)Va z%8|&j)&mTB@(e&kWD14JQqqIbHysJd`=#SGEt*6zAeWSAt6(qV#TXF=T4w_zze0j7 zTG%!__4oH@y6r&VT#SJp*t}a-#!#Y=Vkx!Kf_>Xl{FUyWGTM~AQM-cFgyPjQv#mC^ zj+B2a0pBnw^T?yjzo;7!Dtaq_>Gi^^lFB*k&aXt5av$eqVPA_Gg;XIS3Uq z6MH85=Q;iOMD=K?KCp95yS3cpku=d-<;IEF+Z8sSPG|muIxSXev7L3zi_YYihKJ&# z?Rt&1x>Ty8IHZzg?zuv$bb3skYI+*`Pk+d=zj%EBAu|EVZ;H{HmJ`sRhtSH}_u`uf zG``q}B;vT;pCdZG5786*xj>jQC zDh-o6gCLqjiT!2As_hf0k3>YIdn@Q;#zRx*T}FBrKN7N0v^@Ds=t2a2x}gwi5DPRH zbgiV-sFbS5}Kh`y#hZ+&fxYhwSaVl<40MjPx`kG@Jqi>{}cUCN9u4ZtmqR$u!c{3GJ4Aj zUW3%Q(jG8&03NB;i3Ue7H{jj!CpC-6B4_cOGziri%e-^mrOa2c5}($7}kZv3?f0sur5gZ;&33TbIMC z*_vehRE)XC*8eft`#v7*|7x&Dwf6s9a5cCJd|Bj9H6dhl#84}J;Ef^qOL@HsMj zJva=m1{=WH;CAf)Uj;7$F9er@3&A76_ptqM1HTVm1l7zY4q<41rDHQQ%?V zr@$T9{NDrL1+NCr2Q_dx=m-B3oBtN@WRQSIfCqseVC#Pud>gz9{0h+i`zL`Xg0sOR zz#Z89p9Y@W^0TicjKDZnB1N;NW z!FAwLa1Pi2ZpSb1QE)T(4e)GmHJAi@!KL6F@Ccyx`pZuLfttHL)@;_BnZ}*PLfJ0k zV6<;+g^5kt)NG~MoUP%r?RSOR6^4QZXR6gf_EhKW4^0*}(a;O~T+-s-?24|hB*_e? z;d!Mhl&1^DOxBu>>;6(9&c4{=8O+Y{4AGxVw@BS!16DCxq9$}LE^K)hYoQ54UtfF_ z-lkffR^AkcmZLxDpO)7o3T6oQ2zeC}W^7X4Q2~!r^C^Nua`Yq^IWtlMv|aYj#d;KS zbNx*Y{g^3gmXnoT^7%uAJFES^?|*dD$?v%~5u;t?q98N=w`H1qCi!~fuCiTp+g42A zoAPWEANg5GoR_Eh*s}Mf`7CNo=C79#?L{|^gTa`m*@-+I>B!2?MLUcwHQFaS1t2R2 zb8 zG;&~P${*(@Q(h}pNf~3!tw3#b)$+sDU{OVAxC-!8yLRc(R-dV`cXW#Iy?*1s zbk8*%mXNCT9V4K0X89{2+V%e;4R#Htx^F%z^JeJP8|{pOpu+b)c_EgK?!9^U9C4#+ zrF69Dibd2?*#H~&EIB5SHFW&JqubM(o7O!_!(4jT4_$yrjXXa3xSBgRYydpKPs#A!c_Qk{-!Au`Np%x-4gK`qxY z&L~4<#5RyyUtTspke+6K#gM)z`sUh)O3hlqLu>b$t?NxZvB>jkL>Fgv%UTmVqeLTsW zDV|W(;EttAy&dGKyUJ~`y7B;dSI(M=<0n&N>LQ^RHBrfem&XqrkWl+-cZX*MMou-# zL`$>RKqEDS>$a_YT!%wZT4PkKwLepBv|rUd3SKxsR`nb;48G}vqZsdr-XhH<|9~>EIrW@g%zdZ`h*%UmzO^_t_8FH=!qjF z*a=9V^e?meV?A*iZpNY!lK7QOHf76BS`xyo9R^5KYuF&)aB0!aY_y4dEe6iB+XG@H z(828(*Y;T4Q5HCzz!#hi?GIGwLbcdAgA zH1yH(2y@NarBv(xn7V!EVpD4U|JlL*{|9XTKL@t~odfuM&;+}H_W%Dow*EhZkAhpl zyMWFBXo5+w704g(O>F@;7ZodpA(&_C9Pck*Of8xq9kLvSIZ0ke##x;pw zoOr6eoTt*i>A3Gv3NGWb{nq&O?AZRvnXBw}7%`-G`+|ub6LqkLgZ#SwO&tEEy+iFA zY<%;_t@Lo`dg{fth)_&NCi{A1!FV@UYL|(UXx7V>R!fRb$qW|0sLUfRL%}e+uDW3 zB}S7DBBL{tiejTvEPDWA@%jfx-cc8R8}WDgf( zrGO|ewHm6PJqHOg0OIkOAsvRC>XTdZ45gu9Om+zxJ=$R%->gY(UxTu6>mcdG%J4-J1_bNQ+cJOTz3;0@GCN#I?3~VQ<;d0*ZPFk&dDmh# zWZ2D4KIY}TgL~=vNtlZUi!y(F&WtcJ3@g&QFz1dO2savo-o{T~!S==R=)Ot9M`Cqm zg2F_KF)L=BMKxultUc~)HJd>pTX{vftO1Qd4(ADnZ{e-9J;1BQgF&mse!2McaHLp+ zsT>)r(uDINNBh&Au1J+`s!wEAZDzmHKtU|BYt}f}&UP3lbVL~@(|sQKqQSDiVTDjL z#lsk1Jz_mw6!1MW#@c*)hq#>(3K231fr6nO()|dbTW2n4->2{dwkhnCvzl7@K;{24 zsiy%LYDg&W->QMzX}2A6%Drs5`V`Rk2YVK~o|+Lgy$VN!0SUTHSz zOUXW3YK=9fD;9EPEpH}oysA0|+Je|hJhOB;Sq{l^fw;28wc&dp_dqTl*M*7=mbqsLZ zHDTqZnSNAKsh; z{glz@a4j-rV$G*Y!xD`oKAQCE&#H!z9hc*xc4ErOE?fUkFz~ddEc-vcUr*=%eF(e{ zyaqf2=={Hnz!q>{@YCSS*!bETumCOscL#roU4JWh1JId&$HArGEO1|NFK`d=aqRoI zgO`FB8!*fFtAXAD5YPU*Ki_p`z@LCu0-aa*a&R>m1NQ`X10TcQe>YIv!0&_Sfd!y_ z1K+{cza4xHd=>mIcp11BTm^Q5v%!zB^*;vQ4Q>U$51tORpWtdBpTNVx!@%dT`TrYu zH+U9!BKSJC{D;7+!HdE3z)e6l{{Z*~cKp}DZ-cjh5ukX1Phzj%23`kV1%4Ae2@HWp zf*)b8e+>K?xEVCTG#CcjZ=iGi?*SA;@FDPe@GS65Fb7ItH&8o29GvRoBf#18DLekS zp@Uv(M`p`9Jhfk!f=fImKzr0x4Jz5C0nA&ZGiSt_fkdLvCGB=ccc#C)=4E0V+<)oC z!&|S_{OGH|{KzNr&=eQ)%ORoCRu?aFO~>W zy$Qi2ELkdXW-Wefj({D?rWj_Ijq~iXaSArOYe8NFBQd2bmeO%qU9IyHis3-AF=Swe z#JeMmv!;zuSVQfwy7-^x!%r<@#vS9{XE>X|!-(@;2zOP}XHp)89hwX?`_qG=MO5TJ zR|c{^)EyV?Z%PPjl5q}8W9%&R6?vdAVs&0<7&?;F!gJ&);gi_FR zmpbK^`r5YU7+yho9jS3;xsY22$diKPe)lD$BIk)0~3%~3HC zWL`PJ&gw{KmJXpY2{!^dN7SOuv)NX6(gDM_0C&EpB9?%jj5ylMP)?x%#(t>bN!DwO zVF?ae8k4q5^+BVP!PO4gURbV`?K~?I(O0d*ily+9BrJ<@+%iI9$?OST?ici#}dd>rRS=PVjz((*WEgRT9IIN8^i^eXqJ+dL*+^Ejy(6>6=nQnbaTs@UC3+c&C{l)uC+K#c&Qq4+ymQq@6SxX~O z{UHO?(+-po`e*o#-RfZ0-3`MtKu<1p5(yDHeB&-cvNpV-IJ2M@+7tFPIzw4Sm& zqmj&UZrkEfd2zam^qBEx(KPW+>YJG+la5H#J4Fkk6hZh)iSdiN3|q zEGUimYZ&6I+Lj%AOg9-((^;;ChOC!ukoI?CnJh_Z*ZTisFa{qYn=ohpe+AqAv*1(U zli-!$M({K+4W_^Vco6toY<|51;Qipu;1=*Ap!Wox1MUUx3BG{M|9PPO0eV;9m0%WdFY%`(A7RQ(zC+4YdCMEo}R5g0BGC{4WK+3SIP#{0Rx4~Dy{{lCI7lLPir-2>dtJwB`3-oTkYry$n1JHSdj|Jbs&Q~13d%?}% z_24<+N^lTd4$cLGK=A;I0T=>$hu@>Yqrf+@`M(T42>uk@3XX#v;1@sweja=s+h6Pa zPX*&(FX#t9#MXZUSOSN^As`>XXJ|*AHSluqQlLEq@&OzMJAvBy!C%{tnH=Hsumq^?xD4{ix#uj?kkAr&P$TwExzjokY($GA4x znRjtwZzdX@*fTaeH90yvIWaXeON79lsqq86NhPEWR&X|v^Og@nn7e0jGIbAu`e8r6 zD1ZsZlXvVA-DJ~gwBsJPZM(0gMfDYzNYvI=sYMrMbJZH5gPq#N3UoZ8GdO-gG~T?&ND)2B(aalYN^->`Mq{^b=08N%^d8|ypr)@zm01U7uS zu8~t4Qe%-jY8Hxg9m32OOYT-n*OTIFE~DVF?MkM>JYB@?3bIa%YkLFRSQ?H4=EW*T zik%iAH?Q{aMfHid?UeNv8IRZSIk<;7b~U7`70mvS)oC?rCs*<0=#TlMLaiYywu96`fi1k%X`bIajO&Jgla<~JhblVOE@mEkw#Vbc`yR&otGo}oN4U73dT*2;SAn`A}t zDyGxv!YI{fSN!DPs9x~)lx&r?tES7q8$B-vcpAGtar^Kpd zlqfePIhzPNFP=dbCYYaq%=YkU)BKrxB*<>=Xe^6}w^s%t&(;!o-l}E9RaKd{&&7Ep zb&+(+xOf@LkQJ2{S2X=qLg>OCQO0jEgH64;;kR>VFNwN`6rxY!8#M=4RRBun_w=!_N?f(+OITx#h_?A1tJ~5 z@^3u#@x}w)y??|W68Wrl)aO)HAVruz)O1D1&=J;~wLHP$NyJbyDVvFCBI805yPlK* z5^K7*?VFYk=k`n)gH0Z~4rL(Rx4cJ)^k$A%z&9Zip}!Jt?Z4@*`KWY_kIQsplXt3q=E35OTM*fThu8#=$c0hFYoehQ37f3V#ak%e zE107vn`ZQEm@@QSj~PQLT;QzMxm?0{AIi(rw61jkOOa}5DnmYNMI1S|Y!dWx;^{Et zU6=yXV(~3DhkKmZ0te-!)w@8MhYOQlaes-`Vhe$^MuvK(J*O8l#u%I|?kJKW2E+}s zHfeT8skyYM7GAnDJuGg9#Z}q<48=2}lSTzJM{!YPP?|^sxJ}@3O;*81z9icG%H-~yoc0DceK{=4AA z;LpHo!E3+~Z~$m8z!vaB?0mfw@GroxgKL1!3D^eo?tp*B&e!<>Hv*j*cmvQm0Ye}G zj{%PcKfvz49o!0@1D*)v6SxG(FYo~HkJ$RRf!_r>NAMPK99#nK3qFgT|61@oa0KWK z!GFQF{~Pev;8Q?*1LOnfT>szBwQb;EvF~pK9|Eri*MP&|=YZk>be3Q|JK!yRe-?Nq zm;-x&;sJgZoCW>{TmOCFW#A>?*T5vW5Ii2-fzAKN;00hOxDeb6d&YQuV`}9%_vaa1_fn_R#`nLre`NTL*_i}Y2-IA14nX}^l zB=cZLZEjI-74b!l!irns87}age`|Svk?Lv23)%;-FCLqk=|?}-GK#tb7lXFEq8YQP z+`eHLO%iK_axGL~wmhemBkZ2?ocG-ZRNGY^ft4MEg*mD? zLAent()O7HrgH-8B05&nhBzCpbDF9yvNrFvjN>5cZ;UQCD&VOcw7 zlNQd_p&FUjUVgW5WI~GUM z;6kixChx4^UT1$)ZMe`2$+a&x(JK1ZUHrKy_uMq6j7K_6EK6{yEbwM6?M+cFBf_z#Sc&5Z73GY*1Ck+PUugQ&XOvYF}&d zF<6|leMJpKd6KA}slB`z-1gLO38GSINV3&D^ABW-lHcptxm4^-K{`gIMCAv*jwQrk z@49Qu35&}_EJua^ajsd-BV&^+6%r7m3BiD;1c! zJBxaBn$U+vCMVyC#)eikPP!3aGm$On5ujA99V;zzD7cqOY5$2*6k1jti+Lvc38yOl znl3eQICDFAa9o^XYwCoYcv4IVY&P08UeO0l(9$Cs=v=k3thtAo)Si2 z_rzJf>FFFha${`j(4pKr8;3DX`&^+QlC7l|ufQ|gtt94oL*|b|PZhM#Rn2^}9y#OT zg$$RWD`@3e^_GJqwYaT2hYi~Et8LJxQw8Tq_|s$b>#=<)HT8Or8$E~R=sTqbejHOa zF8|jIF~LAiv<4r;3K&3AUZ*2Qe39Imh8AQZ7~5)g9F_TVhSh}OZW?8tuHVH|u0n!l z*E`S(H#CezF*Vgd7DIX#M3`z*4>~Db$Cx}gy*G8UOKx!ZgnV-fN5S1L5!DfnzR5Hl z1h4NdwL{U!<}ND%PGlmV3$p8B6dh}-&``zPWEy7&JMlmDerM(6oCt34_^Scv&}Qze z;OGsK=_+KGt5@s)nDBjH4(I|z+YpZzZbk2+yp9M9y}I&1Ka%TKx_GL1g`;?f?@DO zZ1it{_X64N$3O$<9RTNn`-Aslm%jtN6`TZha2nXHW&c|;1k&Ee+b?UPJ*X^dxEcEtG^4p3A`4Z0EfUu;L+f(X*adqmpab^q#YAJ zQ3H*GDE-b}rI~vnYWf<_M7vmu6OA5Ar%KnF`9F7rTOonZrAauvt#Hc&i93#Mvq{@J z+KT6x#d$T;B*W~Nd$!lR<)AY7w6hsM4p}{pe~WC&De9|ns;Iqps^kQbtL`>e=Xx)f z7$l5+He#d^^Ls7A3?;)_Cfnvq(%kwH3n(btnmfglj@_+k1Xz!XP;C$JH095B1@>>> zvVGu`mB>i7s-RS+>Lc2TJ85Iof2>1`RXIQGVWs!iIQ>DfDHv$p);($v<9I_T6i)XL zjTSC!hSF2kCT)_iAheQPgdjeJvAF~n#U{V!ytwGo(In&Xz*dMe{zY}F_>{=#+A$vr zc8hXIGLtUQbg0W|tyZc|&^}!4I~{2>vv6&VnW+xuF^CoR_6#jO63#y`A(p0)uFGkd zTBEx|l{|$5gCWl>Wb=2(#@F#`!5!m5s`$E7*QyUx$WJC#|C;9t;uzYx|RbOtFDrxNGf@N2SS?JgAnieC$wnw}PSSqdUL%;z-KexHvF9v47;rdZCEC-Fff1Q_+#Bu{akE zsoyatefP%S)prhFuj+y|C*!F;YC4?0)0M@AN%%rzkI#oOpjB%$k>>?lQdJHqOVTWd z5?-6Mv8CU`riTs_l0Cd}>*JriId^JYYo@Z)*i$9wU}n$kp7E*au}S2ff~QLhl__+- z{sHY8Q^=C7EvT%+lOr>GXW@Zl?Z&OVFYp|_M~P%>BkLMIPq^k9QFhHWIno+=!ipp$ z@L|-q?@anZRw*lrr0qEWTF9v`@XpHQ(D;Ep6Njdut5=h#2jv5dx)*_nMKuGTd0!pY z*X2zb^wY*RmkowfIH9pRn^YTIsW#Hzs;uOiy!4fx*`sTA$!glF*<&taUT)eEm2=t` zPA})|v0Tobi+tp7E=()dhh6rf7Ov*I7&r1iQznRQA~iC!Fgz7~#c2{zCHwyuu@$GW zCAI$l++hFzAME}A0d(e{_WbVvj|Gne4+sB>4e;yWTCfjX0Cs@C#b@w`K<5LT z0Clh(+zosme}T>lkT2m4;Ps#Zo&+|6yMzCW-{4!|GvJTFd%!EeN$_-VIncR)8^GD% zAMq#L3Z4(H1WyE)fV034Xn(!0@IBz|;3Uww0SAF(V+ySCqd7b1!hXIdgQM;IVjaSF zy2$+X!%Cvp`>2_~#W1+kT*b8WRh004m+d)i+e@&KE*NgBti_l=*9Mm}P13d;N!FcD zn<9I_qK&7StzwK)8orDplhX@5ZZtipNyeXe*2}`*<`#a=`)d`K^SWF(e97M0WS0gu zVY$g@;Z#@1(KQ>SDcBD*O0uJUbd;hmqVh+hP-gWr40UNnIlaQY17ZB{c$tm)DxFU0 zaty-r>^{~XGPSDM5BCjIaPc%*YhI&;O4<=M@ulyBMr1ZyxntjgY?a=r7~MNI`lQ*t zBM0{E8`F-qa1yMMgF>LQdJ;ZyQGhH1ud;HZ>_wk?mc|8qSD!RxYihOf6mL!>G_Vo^ z1k)pb%ykkyJ%z4x%Kn0J7@Q(u3FRlydG0z9|GC}8M80s2dfH(EuZqs2;4#nE*s)rQ z{Ro-KIz^nAp@MHsE>6G!5@TkeLbb9y{jCra_eAz^l6!O6f*<%z?{s z)KKlA*Uv%SGd9{?prv%eGAFl+_T2IT6XwRU(W4_czGr1)H@%a@jvqKv#7^bk8q}Lq zOp^U}GLJbB(S_kkpPOmL>nv)c>nnMOtxF#dq(g#rr`7c?P4q$YiVic23iRa=^T`ByWxwU94LkgU@}%~& zfiJf{!qAXOOV6d^{qp?>4|WeJ$ZSm4lcLs;=R3@a=EbiWxh!M=nH&bZMiO zbY^Pgz%=gzoSvDQxN6oXhRKPEeY3ouy6Xu)(V5C{{*g!dY3axE=@7s$`=%n;A$;L> zm-zJ>K`m|902<0qJJL!l%w#_|1Vy5wGn*!^EV1V=3!hor>$}$aK8mhbrr<#}&S~E4 X8I7VLU*SJlFp83@@A;kVNZtPjn%Mi$ From 0f63837e69b3f620196be6ebdfe583d02a37bc7e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 8 Aug 2022 13:25:02 +0800 Subject: [PATCH 09/12] refactor(sync): make leader life longer --- source/dnode/vnode/src/vnd/vnodeSync.c | 2 ++ source/libs/sync/src/syncMain.c | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a269f81ddd..50ca81e58b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -722,10 +722,12 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } bool vnodeIsLeader(SVnode *pVnode) { if (!syncIsReady(pVnode->sync)) { + vDebug("vgId:%d, vnode not ready", pVnode->config.vgId); return false; } if (!pVnode->restored) { + vDebug("vgId:%d, vnode not restored", pVnode->config.vgId); terrno = TSDB_CODE_APP_NOT_READY; return false; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 389eda0f8b..3d9b34c34c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2668,6 +2668,12 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p syncNodeEventLog(ths, "I am not follower, can not do leader transfer"); return 0; } + + if (!ths->restoreFinish) { + syncNodeEventLog(ths, "restore not finish, can not do leader transfer"); + return 0; + } + syncNodeEventLog(ths, "do leader transfer"); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); From 020a656063452290abf5caeeb0587715d418e694 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 8 Aug 2022 13:32:20 +0800 Subject: [PATCH 10/12] fix: fix memory double free issue --- source/client/src/clientHb.c | 1 + source/client/src/tmq.c | 2 +- source/libs/catalog/src/ctgRemote.c | 4 +++- source/libs/catalog/src/ctgUtil.c | 10 +++++++++- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 06bd3f3887..7031a1ebca 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -286,6 +286,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { if (pInst == NULL || NULL == *pInst) { taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); + taosMemoryFree(pMsg->pData); tFreeClientHbBatchRsp(&pRsp); return -1; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f7d45dc6ff..f7d04e67ee 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1007,7 +1007,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pParam); if (code != 0) { tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code); - if (pMsg->pData) taosMemoryFree(pMsg->pData); + if (pMsg->pData) taosMemoryFreeClear(pMsg->pData); if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index a9f2d426bc..45f97865ce 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -467,6 +467,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (NULL == taosArrayPush(newBatch.pMsgs, &req)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + msg = NULL; if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -517,6 +518,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (NULL == taosArrayPush(pBatch->pMsgs, &req)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + msg = NULL; if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -545,7 +547,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } - tNameGetFullDbName(pName, newBatch.dbFName); + tNameGetFullDbName(pName, pBatch->dbFName); } ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 8e5fb90f1a..1ca60c89cd 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -438,6 +438,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { } } +void ctgFreeTbMetasMsgCtx(SCtgMsgCtx* pCtx) { + ctgFreeMsgCtx(pCtx); + if (pCtx->lastOut) { + ctgFreeSTableMetaOutput((STableMetaOutput*)pCtx->lastOut); + pCtx->lastOut = NULL; + } +} + void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput) { if (NULL == pOutput) { return; @@ -641,7 +649,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { taosArrayDestroy(taskCtx->pFetchs); // NO NEED TO FREE pNames - taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx); + taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeTbMetasMsgCtx); if (pTask->msgCtx.lastOut) { ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut); From b23fe53327ae95879d99cc888b88abede1aa594b Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Mon, 8 Aug 2022 13:56:30 +0800 Subject: [PATCH 11/12] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 6dbf74f8bc..1cdfe20c8d 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -11,7 +11,7 @@ import TabItem from "@theme/TabItem"; ::: -TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。 +TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。也支持通过 `apt-get` 工具从线上进行安装。 ## 安装 @@ -293,4 +293,4 @@ taos> select avg(current), max(voltage), min(phase) from test.meters where group ```sql taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s); -``` \ No newline at end of file +``` From d9b8417d7b0791080096f684bfe381b9cc3c97d3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 8 Aug 2022 14:01:31 +0800 Subject: [PATCH 12/12] refactor(sync): make leader life longer --- source/libs/sync/src/syncMain.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3d9b34c34c..cdd729999f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2674,7 +2674,16 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p return 0; } - syncNodeEventLog(ths, "do leader transfer"); + if (ths->vgId > 1) { + syncNodeEventLog(ths, "I am vnode, can not do leader transfer"); + return 0; + } + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index); + syncNodeEventLog(ths, logBuf); + } while (0); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&