diff --git a/deps/arm/dm_static/libdmodule.a b/deps/arm/dm_static/libdmodule.a index dd2afa9037..fff5943606 100644 Binary files a/deps/arm/dm_static/libdmodule.a and b/deps/arm/dm_static/libdmodule.a differ diff --git a/deps/darwin/arm/dm_static/libdmodule.a b/deps/darwin/arm/dm_static/libdmodule.a index d1f37cf4f4..09c501f2ef 100644 Binary files a/deps/darwin/arm/dm_static/libdmodule.a and b/deps/darwin/arm/dm_static/libdmodule.a differ diff --git a/deps/darwin/x64/dm_static/libdmodule.a b/deps/darwin/x64/dm_static/libdmodule.a index da082d9224..2230393565 100644 Binary files a/deps/darwin/x64/dm_static/libdmodule.a and b/deps/darwin/x64/dm_static/libdmodule.a differ diff --git a/deps/x86/dm_static/libdmodule.a b/deps/x86/dm_static/libdmodule.a index c7286f316d..995713ce7b 100644 Binary files a/deps/x86/dm_static/libdmodule.a and b/deps/x86/dm_static/libdmodule.a differ diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index 02c176ee3d..b699019b44 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -436,11 +436,22 @@ The `TaosConnection` class and the `TaosResult` class already implement all the :::note The TaosCursor class uses native connections for write and query operations. In a client-side multi-threaded scenario, this cursor instance must remain thread exclusive and cannot be shared across threads for use, otherwise, it will result in errors in the returned results. +The best practice for TaosCursor is to create a cursor at the beginning of a query and close it immediately after use. Please avoid reusing the same cursor for multiple executions. ::: +##### Use of the RestClient class + +The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result. + +```python title="Use of RestClient" +{{#include docs/examples/python/rest_client_example.py}} +``` + +For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html). + ##### Use of TaosRestCursor class The `TaosRestCursor` class is an implementation of the PEP249 Cursor interface. @@ -452,15 +463,9 @@ The `TaosRestCursor` class is an implementation of the PEP249 Cursor interface. - `cursor.rowcount` : For write operations, returns the number of successful rows written. For query operations, returns the number of rows in the result set. - `cursor.description` : Returns the description of the field. Please refer to [TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html) for the specific format of the description information. -##### Use of the RestClient class - -The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result. - -```python title="Use of RestClient" -{{#include docs/examples/python/rest_client_example.py}} -``` - -For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html). +:::note +The best practice for TaosRestCursor is to create a cursor at the beginning of a query and close it immediately after use. Please avoid reusing the same cursor for multiple executions. +::: @@ -554,6 +559,16 @@ The `TaosConnection` class and the `TaosResult` class already implement all the +##### Use of the RestClient class + +The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result. + +```python title="Use of RestClient" +{{#include docs/examples/python/rest_client_with_req_id_example.py}} +``` + +For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html). + ##### Use of TaosRestCursor class As the way to connect introduced above but add `req_id` argument. @@ -565,16 +580,6 @@ As the way to connect introduced above but add `req_id` argument. - `cursor.rowcount` : For write operations, returns the number of successful rows written. For query operations, returns the number of rows in the result set. - `cursor.description` : Returns the description of the field. Please refer to [TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html) for the specific format of the description information. -##### Use of the RestClient class - -The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result. - -```python title="Use of RestClient" -{{#include docs/examples/python/rest_client_with_req_id_example.py}} -``` - -For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html). - diff --git a/docs/examples/python/connect_rest_examples.py b/docs/examples/python/connect_rest_examples.py index 0f8625ae53..1c432dcc65 100644 --- a/docs/examples/python/connect_rest_examples.py +++ b/docs/examples/python/connect_rest_examples.py @@ -33,6 +33,8 @@ data = cursor.fetchall() print(column_names) for row in data: print(row) +# close cursor +cursor.close() # output: # inserted row count: 8 diff --git a/docs/examples/python/connect_rest_with_req_id_examples.py b/docs/examples/python/connect_rest_with_req_id_examples.py index 3feb574fa6..f1b5915ea3 100644 --- a/docs/examples/python/connect_rest_with_req_id_examples.py +++ b/docs/examples/python/connect_rest_with_req_id_examples.py @@ -33,6 +33,8 @@ data = cursor.fetchall() print(column_names) for row in data: print(row) +# close cursor +cursor.close() # output: # inserted row count: 8 diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index 1526c0da6e..e7160ab094 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -436,11 +436,23 @@ now 为系统内部函数,默认为客户端所在计算机当前时间。 now :::note TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。 + +TaosCursor 的最佳实践是,查询开始时创建 cursor,用完之后就关闭,请避免复用同一个 cursor 多次执行。 ::: +##### RestClient 类的使用 + +`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 + +```python title="RestClient 的使用" +{{#include docs/examples/python/rest_client_example.py}} +``` + +对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 + ##### TaosRestCursor 类的使用 `TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。 @@ -452,15 +464,10 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 - `cursor.rowcount`: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。 - `cursor.description` : 返回字段的描述信息。关于描述信息的具体格式请参考[TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html)。 -##### RestClient 类的使用 +:::note +TaosRestCursor 的最佳实践是,查询开始时创建 cursor,用完之后就关闭,请避免复用同一个 cursor 多次执行。 +::: -`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 - -```python title="RestClient 的使用" -{{#include docs/examples/python/rest_client_example.py}} -``` - -对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 @@ -557,6 +564,16 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 类似上文介绍的使用方法,增加 `req_id` 参数。 +##### RestClient 类的使用 + +`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 + +```python title="RestClient 的使用" +{{#include docs/examples/python/rest_client_with_req_id_example.py}} +``` + +对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 + ##### TaosRestCursor 类的使用 `TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。 @@ -568,15 +585,6 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 - `cursor.rowcount`: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。 - `cursor.description` : 返回字段的描述信息。关于描述信息的具体格式请参考[TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html)。 -##### RestClient 类的使用 - -`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 - -```python title="RestClient 的使用" -{{#include docs/examples/python/rest_client_with_req_id_example.py}} -``` - -对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 diff --git a/include/common/tgrant.h b/include/common/tgrant.h index f06fca8014..a5f3ab2e3f 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -31,6 +31,8 @@ extern "C" { #endif #define GRANT_HEART_BEAT_MIN 2 +#define GRANT_ACTIVE_CODE "activeCode" +#define GRANT_C_ACTIVE_CODE "cActiveCode" typedef enum { TSDB_GRANT_ALL, @@ -50,6 +52,11 @@ typedef enum { TSDB_GRANT_TABLE, } EGrantType; +typedef struct { + int64_t grantedTime; + int64_t connGrantedTime; +} SGrantedInfo; + int32_t grantCheck(EGrantType grant); int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type); diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index e8c64b07c4..c3dfd3a611 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -45,6 +45,7 @@ typedef struct { */ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption); +int32_t sndInit(SSnode * pSnode); /** * @brief Stop Snode in Dnode. * diff --git a/include/dnode/vnode/stream.h b/include/dnode/vnode/stream.h new file mode 100644 index 0000000000..6d86847542 --- /dev/null +++ b/include/dnode/vnode/stream.h @@ -0,0 +1,18 @@ +// +// Created by mingming wanng on 2023/11/15. +// + +#ifndef TDENGINE_STREAM_H +#define TDENGINE_STREAM_H + +#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) +#define STREAM_EXEC_START_ALL_TASKS_ID (-2) +#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) + +typedef struct STaskUpdateEntry { + int64_t streamId; + int32_t taskId; + int32_t transId; +} STaskUpdateEntry; + +#endif // TDENGINE_STREAM_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 416c0c5d18..63c9c8d98d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -606,7 +606,7 @@ typedef struct { int32_t downstreamNodeId; int32_t downstreamTaskId; int32_t childId; - int32_t oldStage; + int64_t oldStage; int8_t status; } SStreamTaskCheckRsp; @@ -671,7 +671,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct STaskStatusEntry { STaskId id; int32_t status; - int32_t stage; + int64_t stage; int32_t nodeId; int64_t verStart; // start version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task @@ -774,7 +774,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); bool streamTaskAllUpstreamClosed(SStreamTask* pTask); @@ -847,7 +847,6 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamMetaStartHb(SStreamMeta* pMeta); -void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); void streamMetaRLock(SStreamMeta* pMeta); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a19b249bc7..ece1e40585 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -45,8 +45,8 @@ extern "C" { #define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 -#define SYNC_SNAP_RESEND_MS 1000 * 60 -#define SYNC_SNAP_TIMEOUT_MS 1000 * 600 +#define SYNC_SNAP_RESEND_MS 1000 * 300 +#define SYNC_SNAP_TIMEOUT_MS 1000 * 1800 #define SYNC_VND_COMMIT_MIN_MS 3000 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a3ee294338..ce8db162b6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -557,6 +557,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_GRANT_GEN_IVLD_KEY TAOS_DEF_ERROR_CODE(0, 0x0812) #define TSDB_CODE_GRANT_GEN_APP_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0813) #define TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN TAOS_DEF_ERROR_CODE(0, 0x0814) +#define TSDB_CODE_GRANT_PAR_IVLD_DIST TAOS_DEF_ERROR_CODE(0, 0x0815) // sync // #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x diff --git a/include/util/tlog.h b/include/util/tlog.h index a6d146a79e..6d393bfefb 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -66,6 +66,7 @@ extern int32_t udfDebugFlag; extern int32_t smaDebugFlag; extern int32_t idxDebugFlag; extern int32_t tdbDebugFlag; +extern int32_t sndDebugFlag; int32_t taosInitLog(const char *logName, int32_t maxFiles); void taosCloseLog(); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index a1f8d74571..89995fc326 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -167,7 +167,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 6c5b627211..a0242bef5d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -431,6 +431,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "sndDebugFlag", sndDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; return 0; } @@ -952,6 +953,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { tdbDebugFlag = cfgGetItem(pCfg, "tdbDebugFlag")->i32; metaDebugFlag = cfgGetItem(pCfg, "metaDebugFlag")->i32; stDebugFlag = cfgGetItem(pCfg, "stDebugFlag")->i32; + sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32; } static int32_t taosSetSlowLogScope(char *pScope) { @@ -1425,7 +1427,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag}, - {"jniDebugFlag", &jniDebugFlag}, {"stDebugFlag", &stDebugFlag}, + {"jniDebugFlag", &jniDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, }; static OptionNameAndVar options[] = { @@ -1733,6 +1735,7 @@ void taosSetAllDebugFlag(int32_t flag, bool rewrite) { taosSetDebugFlag(&tdbDebugFlag, "tdbDebugFlag", flag, rewrite); taosSetDebugFlag(&metaDebugFlag, "metaDebugFlag", flag, rewrite); taosSetDebugFlag(&stDebugFlag, "stDebugFlag", flag, rewrite); + taosSetDebugFlag(&sndDebugFlag, "sndDebugFlag", flag, rewrite); uInfo("all debug flag are set to %d", flag); } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index b29c5c1eb4..cd81b9873f 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -73,6 +73,7 @@ SArray *smGetMsgHandles() { SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; @@ -87,7 +88,8 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 47c2993014..56744e4654 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -76,9 +76,14 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return 0; } +static int32_t smStartSnodes(SSnodeMgmt *pMgmt) { + return sndInit(pMgmt->pSnode); +} + SMgmtFunc smGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; mgmtFunc.openFp = smOpen; + mgmtFunc.startFp = (NodeStartFp)smStartSnodes; mgmtFunc.closeFp = (NodeCloseFp)smClose; mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq; mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq; diff --git a/source/dnode/mnode/impl/inc/mndCluster.h b/source/dnode/mnode/impl/inc/mndCluster.h index e33ffdb372..2b59d9dbf5 100644 --- a/source/dnode/mnode/impl/inc/mndCluster.h +++ b/source/dnode/mnode/impl/inc/mndCluster.h @@ -27,6 +27,8 @@ void mndCleanupCluster(SMnode *pMnode); int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len); int64_t mndGetClusterId(SMnode *pMnode); int64_t mndGetClusterCreateTime(SMnode *pMnode); +int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo); +int32_t mndSetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo); int64_t mndGetClusterUpTime(SMnode *pMnode); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 08c0aec46a..032c23bb4c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -192,6 +192,8 @@ typedef struct { int64_t createdTime; int64_t updateTime; int32_t upTime; + int64_t grantedTime; + int64_t connGrantedTime; } SClusterObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 4c799e1e1e..26c678b513 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -19,7 +19,7 @@ #include "mndTrans.h" #define CLUSTER_VER_NUMBE 1 -#define CLUSTER_RESERVE_SIZE 60 +#define CLUSTER_RESERVE_SIZE 44 int64_t tsExpireTime = 0; static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); @@ -112,6 +112,19 @@ int64_t mndGetClusterCreateTime(SMnode *pMnode) { return createTime; } +int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo) { + void *pIter = NULL; + SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); + if (pCluster != NULL) { + pInfo->grantedTime = pCluster->grantedTime; + pInfo->connGrantedTime = pCluster->connGrantedTime; + mndReleaseCluster(pMnode, pCluster, pIter); + return 0; + } + + return -1; +} + static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) { #if 0 int32_t upTime = taosGetTimestampSec() - pCluster->updateTime / 1000; @@ -146,6 +159,8 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pCluster->grantedTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pCluster->connGrantedTime, _OVER) SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) terrno = 0; @@ -186,6 +201,8 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER) SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pCluster->grantedTime, _OVER); + SDB_GET_INT64(pRaw, dataPos, &pCluster->connGrantedTime, _OVER); SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) terrno = 0; @@ -218,6 +235,8 @@ static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p, uptime from %d to %d", pOld->id, pOld, pNew, pOld->upTime, pNew->upTime); pOld->upTime = pNew->upTime; + pOld->grantedTime = pNew->grantedTime; + pOld->connGrantedTime = pNew->connGrantedTime; pOld->updateTime = taosGetTimestampMs(); return 0; } @@ -359,3 +378,44 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { mndTransDrop(pTrans); return 0; } + +int32_t mndSetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo) { + SClusterObj clusterObj = {0}; + void *pIter = NULL; + SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); + if (pCluster != NULL) { + if (pCluster->grantedTime >= pInfo->grantedTime && pCluster->connGrantedTime >= pInfo->connGrantedTime) { + mndReleaseCluster(pMnode, pCluster, pIter); + return 0; + } + memcpy(&clusterObj, pCluster, sizeof(SClusterObj)); + if (pCluster->grantedTime < pInfo->grantedTime) clusterObj.grantedTime = pInfo->grantedTime; + if (pCluster->connGrantedTime < pInfo->connGrantedTime) clusterObj.connGrantedTime = pInfo->connGrantedTime; + mndReleaseCluster(pMnode, pCluster, pIter); + } + + if (clusterObj.id <= 0) { + mError("can't get cluster info while update granted info"); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "granted-info"); + if (pTrans == NULL) return -1; + + SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 54f89f9bc7..077c0a9c2a 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -919,7 +919,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * if (pIter == NULL) break; if (mndVgroupInDb(pVgroup, pNewDb->uid)) { - if (mndBuildRaftAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) { + if (mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); taosArrayDestroy(pArray); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index e224aceec2..b0bffcc83e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -790,7 +790,9 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg if (cfgAll) { // alter all dnodes: if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t)); if (failRecord) taosArrayPush(failRecord, &pDnode->id); - if (0 == cfgAllErr) cfgAllErr = terrno; // output 1st terrno. + if (0 == cfgAllErr || cfgAllErr == TSDB_CODE_GRANT_PAR_IVLD_ACTIVE) { + cfgAllErr = terrno; // output 1st or more specific error + } } } else { terrno = 0; // no action for dup active code @@ -806,7 +808,9 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg if (cfgAll) { if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t)); if (failRecord) taosArrayPush(failRecord, &pDnode->id); - if (0 == cfgAllErr) cfgAllErr = terrno; + if (0 == cfgAllErr || cfgAllErr == TSDB_CODE_GRANT_PAR_IVLD_ACTIVE) { + cfgAllErr = terrno; // output 1st or more specific error + } } } else { terrno = 0; @@ -1283,7 +1287,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "supportvnodes"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); - } else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) { + } else if (strncasecmp(cfgReq.config, GRANT_ACTIVE_CODE, 10) == 0 || + strncasecmp(cfgReq.config, GRANT_C_ACTIVE_CODE, 11) == 0) { + if (cfgReq.dnodeId != -1) { + terrno = TSDB_CODE_INVALID_CFG; + goto _err_out; + } int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE; int8_t index = opt == DND_ACTIVE_CODE ? 10 : 11; if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) { @@ -1301,12 +1310,11 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { goto _err_out; } - strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? "activeCode" : "cActiveCode"); + strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? GRANT_ACTIVE_CODE : GRANT_C_ACTIVE_CODE); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%s", cfgReq.value); - if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) { + if ((terrno = mndConfigDnode(pMnode, pReq, &cfgReq, opt)) != 0) { mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr()); - terrno = TSDB_CODE_INVALID_CFG; goto _err_out; } tFreeSMCfgDnodeReq(&cfgReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3103047aeb..3b0559741e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1435,7 +1435,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen); + auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2204,7 +2204,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); epsetAssign(&updateInfo.newEp, &pCurrent->epset); taosArrayPush(info.pUpdateNodeList, &updateInfo); - + } + if (pCurrent->nodeId != SNODE_HANDLE) { SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); mndReleaseVgroup(pMnode, pVgroup); @@ -2260,6 +2261,24 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { sdbRelease(pSdb, pVgroup); } + SSnodeObj *pObj = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + SNodeEntry entry = {0}; + addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); + entry.nodeId = SNODE_HANDLE; + + char buf[256] = {0}; + EPSET_TO_STR(&entry.epset, buf); + mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + taosArrayPush(pVgroupListSnapshot, &entry); + sdbRelease(pSdb, pObj); + } + return pVgroupListSnapshot; } @@ -2436,6 +2455,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { STaskId * pId = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->nodeId == SNODE_HANDLE) continue; + bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { taosArrayPush(pRemovedTasks, pId); @@ -2797,13 +2818,13 @@ int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { return TSDB_CODE_SUCCESS; } -static void updateStageInfo(STaskStatusEntry *pTaskEntry, int32_t stage) { +static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j); if (pNodeEntry->nodeId == pTaskEntry->nodeId) { - mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, - pTaskEntry->stage, stage, pTaskEntry->id.taskId); + mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, + pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); pNodeEntry->stageUpdated = true; pTaskEntry->stage = stage; @@ -2847,6 +2868,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { setNodeEpsetExpiredFlag(req.pUpdateNodes); } + bool snodeChanged = false; for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); @@ -2857,6 +2879,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); + if (pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true; } else { streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { @@ -2885,7 +2908,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); taosArrayDestroy(p); - if (allReady) { + if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint); diff --git a/source/dnode/snode/CMakeLists.txt b/source/dnode/snode/CMakeLists.txt index ebfe80ecab..2da1f9adac 100644 --- a/source/dnode/snode/CMakeLists.txt +++ b/source/dnode/snode/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(snode STATIC ${SNODE_SRC}) target_include_directories( snode PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode" + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" private "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ff147fa37b..2ab49db3a3 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -13,15 +13,37 @@ * along with this program. If not, see . */ -#include "rsync.h" #include "executor.h" +#include "rsync.h" #include "sndInt.h" +#include "stream.h" #include "tstream.h" #include "tuuid.h" +#define sndError(...) \ + do { \ + if (sndDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__); \ + } \ + } while (0) + +#define sndInfo(...) \ + do { \ + if (sndDebugFlag & DEBUG_INFO) { \ + taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__); \ + } \ + } while (0) + +#define sndDebug(...) \ + do { \ + if (sndDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__); \ + } \ + } while (0) + void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { - char *msgStr = pMsg->pCont; - char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + char * msgStr = pMsg->pCont; + char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; @@ -40,10 +62,14 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp); + tDeleteStreamDispatchReq(&req); streamMetaReleaseTask(pSnode->pMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); return; + } else { + tDeleteStreamDispatchReq(&req); + return; } FAIL: @@ -64,23 +90,37 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamTaskOpenAllUpstreamInput(pTask); - pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - qError("s-task:%s failed to open state for task", pTask->id.idStr); - return -1; - } else { - qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + SStreamTask *pSateTask = pTask; + SStreamTask task = {0}; + if (pTask->info.fillHistory) { + task.id.streamId = pTask->streamTaskId.streamId; + task.id.taskId = pTask->streamTaskId.taskId; + task.pMeta = pTask->pMeta; + pSateTask = &task; } - int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList); - SReadHandle handle = {.vnode = NULL, - .numOfVgroups = numOfChildEp, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory}; + pTask->pState = streamStateOpen(pSnode->path, pSateTask, false, -1, -1); + if (pTask->pState == NULL) { + sndError("s-task:%s failed to open state for task", pTask->id.idStr); + return -1; + } else { + sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + } + + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); + SReadHandle handle = { + .checkpointId = pTask->chkInfo.checkpointId, + .vnode = NULL, + .numOfVgroups = numOfVgroups, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window, + }; initStreamStateAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId); ASSERT(pTask->exec.pExecutor); + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); @@ -89,25 +129,161 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; - qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, - pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); + sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, + pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } else { if (pTask->chkInfo.nextProcessVer == -1) { pTask->chkInfo.nextProcessVer = 0; } } - char* p = NULL; + char *p = NULL; streamTaskGetStatus(pTask, &p); - qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam); + sndInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam); return 0; } +int32_t sndStartStreamTasks(SSnode *pSnode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = SNODE_HANDLE; + SStreamMeta *pMeta = pSnode->pMeta; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + sndDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + SArray *pTaskList = NULL; + streamMetaWLock(pMeta); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.startTs = taosGetTimestampMs(); + streamMetaWUnLock(pMeta); + + // broadcast the check downstream tasks msg + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId *pTaskId = taosArrayGet(pTaskList, i); + SStreamTask * pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + // fill-history task can only be launched by related stream tasks. + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + if (pTask->status.downstreamReady == 1) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + sndDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + } + + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + return code; +} + +int32_t sndResetStreamTaskStatus(SSnode *pSnode) { + SStreamMeta *pMeta = pSnode->pMeta; + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + sndDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId *pTaskId = taosArrayGet(pMeta->pTaskList, i); + + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + SStreamTask **pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + streamTaskResetStatus(*pTask); + } + + return 0; +} + +int32_t sndRestartStreamTasks(SSnode *pSnode) { + SStreamMeta *pMeta = pSnode->pMeta; + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); + + while (1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + sndDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + terrno = 0; + sndInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, + pMeta->updateInfo.transId); + + while (streamMetaTaskInTimer(pMeta)) { + sndDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + code = streamMetaReopen(pMeta); + if (code != TSDB_CODE_SUCCESS) { + sndError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + streamMetaInitBackend(pMeta); + int64_t el = taosGetTimestampMs() - st; + + sndInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); + + code = streamMetaLoadAllTasks(pMeta); + if (code != TSDB_CODE_SUCCESS) { + sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + sndResetStreamTaskStatus(pSnode); + + streamMetaWUnLock(pMeta); + sndStartStreamTasks(pSnode); + + code = terrno; + return code; +} + SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); if (pSnode == NULL) { @@ -121,17 +297,19 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { } pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1); + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs()); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } + if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) { + goto FAIL; + } + stopRsync(); startRsync(); - // todo fix it: send msg to mnode to rollback to an existed checkpoint - streamMetaInitForSnode(pSnode->pMeta); return pSnode; FAIL: @@ -140,6 +318,12 @@ FAIL: return NULL; } +int32_t sndInit(SSnode *pSnode) { + sndResetStreamTaskStatus(pSnode); + sndStartStreamTasks(pSnode); + return 0; +} + void sndClose(SSnode *pSnode) { streamMetaNotifyClose(pSnode->pMeta); streamMetaCommit(pSnode->pMeta); @@ -150,6 +334,33 @@ void sndClose(SSnode *pSnode) { int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } +int32_t sndStartStreamTaskAsync(SSnode *pSnode, bool restart) { + SStreamMeta *pMeta = pSnode->pMeta; + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + sndDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + SStreamTaskRunReq *pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + sndError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); + return -1; + } + + sndDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = restart ? STREAM_EXEC_RESTART_ALL_TASKS_ID : STREAM_EXEC_START_ALL_TASKS_ID; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(&pSnode->msgCb, STREAM_QUEUE, &msg); + return 0; +} + int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t code; @@ -185,27 +396,26 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); streamMetaWUnLock(pSnode->pMeta); - char* p = NULL; + char *p = NULL; streamTaskGetStatus(pTask, &p); - qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, - pTask->id.idStr, p, numOfTasks); + sndDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, + pTask->id.idStr, p, numOfTasks); EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; streamTaskHandleEvent(pTask->status.pSM, event); - streamTaskCheckDownstream(pTask); return 0; } int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; - qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); + sndDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); // commit the update streamMetaWLock(pSnode->pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); - qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); + sndDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pSnode->pMeta) < 0) { // persist to disk @@ -217,6 +427,16 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + + if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + sndStartStreamTasks(pSnode); + return 0; + } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + sndRestartStreamTasks(pSnode); + return 0; + } + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask) { streamExecTask(pTask); @@ -228,28 +448,31 @@ int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { } int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { - char *msgStr = pMsg->pCont; - char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + char * msgStr = pMsg->pCont; + char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamDispatchReq req; SDecoder decoder; tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); + tDecoderClear(&decoder); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp); + tDeleteStreamDispatchReq(&req); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { + tDeleteStreamDispatchReq(&req); return -1; } } int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { - char *msgStr = pMsg->pCont; - char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + char * msgStr = pMsg->pCont; + char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamRetrieveReq req; SDecoder decoder; @@ -274,6 +497,9 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); pRsp->streamId = htobe64(pRsp->streamId); + pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); + pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); + pRsp->stage = htobe64(pRsp->stage); pRsp->msgId = htonl(pRsp->msgId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId); @@ -291,24 +517,8 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } -int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { - switch (pMsg->msgType) { - case TDMT_STREAM_TASK_DEPLOY: { - void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - return sndProcessTaskDeployReq(pSnode, pReq, len); - } - - case TDMT_STREAM_TASK_DROP: - return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen); - default: - ASSERT(0); - } - return 0; -} - -int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { - char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); +int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { + char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); // deserialize @@ -334,14 +544,77 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) return 0; } -int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) { - // +int32_t sndProcessTaskScanHistoryFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) { + char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamCompleteHistoryMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t *)msg, msgLen); + tDecodeCompleteHistoryDataMsg(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.upstreamTaskId); + if (pTask == NULL) { + sndError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", + pSnode->pMeta->vgId, req.upstreamTaskId); + return -1; + } + + int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); + if (remain > 0) { + sndDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", + pTask->id.idStr, req.downstreamId, remain); + } else { + sndDebug( + "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " + "completed msg", + pTask->id.idStr, req.downstreamId); + streamProcessScanHistoryFinishRsp(pTask); + } + + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } +// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task +int32_t sndProcessTaskCheckpointReadyMsg(SSnode *pSnode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pSnode->pMeta; + char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamCheckpointReadyMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t *)msg, len); + if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + return code; + } + tDecoderClear(&decoder); + + SStreamTask *pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); + if (pTask == NULL) { + sndError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, + req.downstreamTaskId); + return code; + } + + sndDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + + streamProcessCheckpointReadyMsg(pTask); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { - char *msgStr = pMsg->pCont; - char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + char * msgStr = pMsg->pCont; + char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamTaskCheckReq req; @@ -366,17 +639,17 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); streamMetaReleaseTask(pSnode->pMeta, pTask); - char* p = NULL; + char *p = NULL; streamTaskGetStatus(pTask, &p); - qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", - pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + sndDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; - qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp status %d", - taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + sndDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", + taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } SEncoder encoder; @@ -385,7 +658,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { - qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId); + sndError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId); return -1; } @@ -404,7 +677,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { } int32_t sndProcessStreamTaskCheckRsp(SSnode *pSnode, SRpcMsg *pMsg) { - char *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + char * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code; @@ -420,13 +693,13 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode *pSnode, SRpcMsg *pMsg) { } tDecoderClear(&decoder); - qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, - rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + sndDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, - pSnode->pMeta->vgId); + sndError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, + pSnode->pMeta->vgId); return -1; } @@ -435,6 +708,183 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode *pSnode, SRpcMsg *pMsg) { return code; } +int32_t sndProcessTaskUpdateReq(SSnode *pSnode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pSnode->pMeta; + int32_t vgId = SNODE_HANDLE; + char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + + SStreamTaskNodeUpdateMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t *)msg, len); + if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { + rsp.code = TSDB_CODE_MSG_DECODE_ERROR; + sndError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); + tDecoderClear(&decoder); + return rsp.code; + } + + tDecoderClear(&decoder); + + // update the nodeEpset when it exists + streamMetaWLock(pMeta); + + // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (ppTask == NULL || *ppTask == NULL) { + sndError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, + req.taskId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + SStreamTask *pTask = *ppTask; + + if (pMeta->updateInfo.transId != req.transId) { + pMeta->updateInfo.transId = req.transId; + sndInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + // info needs to be kept till the new trans to update the nodeEp arrived. + taosHashClear(pMeta->updateInfo.pTasks); + } else { + sndDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + } + + STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; + void * exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + if (exist != NULL) { + sndDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, + req.transId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + streamMetaWUnLock(pMeta); + + // the following two functions should not be executed within the scope of meta lock to avoid deadlock + streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamTaskResetStatus(pTask); + + // continue after lock the meta again + streamMetaWLock(pMeta); + + SStreamTask **ppHTask = NULL; + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + ppHTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + if (ppHTask == NULL || *ppHTask == NULL) { + sndError( + "vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", + pMeta->vgId, req.taskId); + CLEAR_RELATED_FILLHISTORY_TASK(pTask); + } else { + sndDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + } + } + + { + streamMetaSaveTask(pMeta, pTask); + if (ppHTask != NULL) { + streamMetaSaveTask(pMeta, *ppHTask); + } + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + } + + streamTaskStop(pTask); + + // keep the already handled info + taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); + + if (ppHTask != NULL) { + streamTaskStop(*ppHTask); + sndDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", + pTask->id.idStr); + taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); + } else { + sndDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); + } + + rsp.code = 0; + + // possibly only handle the stream task. + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); + + pMeta->startInfo.tasksWillRestart = 1; + + if (updateTasks < numOfTasks) { + sndDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + updateTasks, (numOfTasks - updateTasks)); + streamMetaWUnLock(pMeta); + } else { + sndDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); +#if 1 + sndStartStreamTaskAsync(pSnode, true); + streamMetaWUnLock(pMeta); +#else + streamMetaWUnLock(pMeta); + + // For debug purpose. + // the following procedure consume many CPU resource, result in the re-election of leader + // with high probability. So we employ it as a test case for the stream processing framework, with + // checkpoint/restart/nodeUpdate etc. + while (1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + int32_t code = streamMetaReopen(pMeta); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d start all stream tasks after all being updated", vgId); + tqResetStreamTaskStatus(pTq); + tqStartStreamTaskAsync(pTq, false); + } else { + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + streamMetaWUnLock(pMeta); +#endif + } + + taosArrayDestroy(req.pNodeList); + return rsp.code; +} + int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: @@ -448,13 +898,34 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg); + return sndProcessTaskScanHistoryFinishReq(pSnode, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); + return sndProcessTaskScanHistoryFinishRsp(pSnode, pMsg); case TDMT_VND_STREAM_TASK_CHECK: return sndProcessStreamTaskCheckReq(pSnode, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: return sndProcessStreamTaskCheckRsp(pSnode, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY: + return sndProcessTaskCheckpointReadyMsg(pSnode, pMsg); + default: + ASSERT(0); + } + return 0; +} + +int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { + switch (pMsg->msgType) { + case TDMT_STREAM_TASK_DEPLOY: { + void * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + return sndProcessTaskDeployReq(pSnode, pReq, len); + } + + case TDMT_STREAM_TASK_DROP: + return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen); + case TDMT_VND_STREAM_TASK_UPDATE: + sndProcessTaskUpdateReq(pSnode, pMsg); + break; default: ASSERT(0); } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index dc43da7fe7..635c15aa41 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -138,6 +138,11 @@ else() endif() endif() +target_include_directories( + vnode + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" +) + target_link_libraries( vnode PUBLIC os diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index fdd449bf36..b3f8317add 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -43,9 +43,6 @@ extern "C" { typedef struct STqOffsetStore STqOffsetStore; -#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) -#define STREAM_EXEC_START_ALL_TASKS_ID (-2) -#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) // tqExec diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 79a6ad0a30..7eafae46c8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,18 +14,13 @@ */ #include "tq.h" +#include "stream.h" #include "vnd.h" typedef struct { int8_t inited; } STqMgmt; -typedef struct STaskUpdateEntry { - int64_t streamId; - int32_t taskId; - int32_t transId; -} STaskUpdateEntry; - static STqMgmt tqMgmt = {0}; // 0: not init @@ -942,12 +937,12 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { } else { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); streamMetaReleaseTask(pMeta, pTask); char* p = NULL; streamTaskGetStatus(pTask, &p); - tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 + tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { @@ -1377,6 +1372,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp); + tDeleteStreamDispatchReq(&req); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { @@ -1614,6 +1610,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp); + tDeleteStreamDispatchReq(&req); streamMetaReleaseTask(pTq->pStreamMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index aa531a0ba8..56dcdb2abc 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -15,6 +15,7 @@ #include "tq.h" #include "vnd.h" +#include "stream.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 #define SCAN_WAL_IDLE_DURATION 100 diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index cf717472b1..e29583d8fc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -293,6 +293,9 @@ typedef struct STableMergeScanInfo { int32_t readIdx; SSDataBlock* pResBlock; SSampleExecInfo sample; // sample execution info + SSHashObj* mTableNumRows; // uid->num of table rows + SHashObj* mSkipTables; + int64_t mergeLimit; SSortExecInfo sortExecInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1c3db48972..28832ffec8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3200,6 +3200,27 @@ _error: return NULL; } +static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { + int64_t nRows = 0; + void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); + if (pNum == NULL) { + nRows = pBlock->info.rows; + tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); + } else { + *(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows; + } + + if (nRows >= pInfo->mergeLimit) { + if (pInfo->mSkipTables == NULL) { + pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + } + int bSkip = 1; + taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip)); + } + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -3257,6 +3278,10 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); + if (pInfo->mergeLimit != -1) { + tableMergeScanDoSkipTable(pInfo, pBlock); + } + pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -3316,22 +3341,20 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; - int64_t mergeLimit = -1; - if (hasLimit) { - mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; - } + tSimpleHashClear(pInfo->mTableNumRows); + size_t szRow = blockDataGetRowSize(pInfo->pResBlock); - if (hasLimit) { - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, - NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024); - } else { +// if (pInfo->mergeLimit != -1) { +// pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, +// NULL, pTaskInfo->id.str, pInfo->mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024); +// } else + { pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); - tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); + tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); } @@ -3343,7 +3366,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); param->pOperator = pOperator; STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); - pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, + (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = param; @@ -3385,6 +3409,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortHandle = NULL; resetLimitInfoForNextGroup(&pInfo->limitInfo); + taosHashCleanup(pInfo->mSkipTables); + pInfo->mSkipTables = NULL; return TSDB_CODE_SUCCESS; } @@ -3493,7 +3519,10 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - + tSimpleHashCleanup(pTableScanInfo->mTableNumRows); + pTableScanInfo->mTableNumRows = NULL; + taosHashCleanup(pTableScanInfo->mSkipTables); + pTableScanInfo->mSkipTables = NULL; destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); @@ -3583,7 +3612,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); - + pInfo->mTableNumRows = tSimpleHashInit(1024, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + pInfo->mergeLimit = -1; + bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; + if (hasLimit) { + pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; + pInfo->mSkipTables = NULL; + } pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); int32_t rowSize = pInfo->pResBlock->info.rowSize; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 34b4677235..ab7951bb92 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -283,7 +283,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S tmsgSendRsp(pRsp); } - tDeleteStreamDispatchReq(pReq); streamSchedExec(pTask); return 0; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b643e7186c..6ca47c8616 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -528,6 +528,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { if (err != NULL) { stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err); taosMemoryFreeClear(err); + rocksdb_list_column_families_destroy(cfs, nCf); goto _EXIT; } } else { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e6cf38422a..9078eeee2d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -371,7 +371,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.pData = pReqs; } - stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->execInfo.dispatch); + stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch, pTask->pMeta->stage); return code; } @@ -926,8 +926,8 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); info.msg.info.noResp = 1; // refactor later. - stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d", - pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index); + stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d, vgId:%d", + pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index, req.upstreamNodeId); if (pTask->pReadyMsgList == NULL) { pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8dd7e8f5d1..78b4814aa6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -338,6 +338,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; pMeta->stage = stage; + pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -950,7 +951,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; - if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; + if (tEncodeI64(pEncoder, ps->stage) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; @@ -988,7 +989,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &taskId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; @@ -1104,7 +1105,7 @@ void metaHbToMnode(void* param, void* tmrId) { SStreamHbMsg hbMsg = {0}; SEpSet epset = {0}; bool hasMnodeEpset = false; - int32_t stage = 0; + int64_t stage = 0; streamMetaRLock(pMeta); @@ -1285,11 +1286,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) { metaHbToMnode(pRid, NULL); } -void streamMetaInitForSnode(SStreamMeta* pMeta) { - pMeta->stage = 0; - pMeta->role = NODE_ROLE_LEADER; -} - void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); taosHashClear(pStartInfo->pFailedTaskSet); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 9b27281915..f042687942 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -289,10 +289,11 @@ static void recheckDownstreamTasks(void* param, void* tmrId) { stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); } -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) { +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) { SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); ASSERT(pInfo != NULL); + *oldStage = pInfo->stage; const char* id = pTask->id.idStr; if (stage == -1) { stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id, @@ -457,9 +458,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError( - "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " + "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", " "not check wait for downstream task nodeUpdate, and all tasks restart", - id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); + id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); } else { stError( "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " @@ -474,7 +475,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, + stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); } @@ -921,7 +922,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->oldStage) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -936,7 +937,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->oldStage) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; tEndDecode(pDecoder); return 0; diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index d4f228a537..911fdc52b7 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -112,7 +112,7 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) { int32_t retId = -1; int64_t avail = 0; for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) { -#if 0 // round-robin +#if 1 // round-robin int32_t diskId = (pTier->nextid + id) % pTier->ndisk; STfsDisk *pDisk = pTier->disks[diskId]; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 21022f2016..f310db53ef 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -444,6 +444,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_DEC_IVLD_KLEN, "Invalid klen to decod TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_IVLD_KEY, "Invalid key to gen active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_APP_LIMIT, "Limited app num to gen active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN, "Invalid klen to encode active code") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_DIST, "Invalid dist to parse active code") // sync TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index aa6719f604..46b25dee6a 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -110,6 +110,7 @@ int32_t metaDebugFlag = 131; int32_t udfDebugFlag = 131; int32_t smaDebugFlag = 131; int32_t idxDebugFlag = 131; +int32_t sndDebugFlag = 131; int64_t dbgEmptyW = 0; int64_t dbgWN = 0; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 92eaec52b5..ca1cc704ae 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -20,6 +20,9 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py +#,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4 +#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4 +,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 2bfe33d0af..544a966960 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -247,7 +247,10 @@ class TDTestCase: tdSql.error('alter all dnodes "activeCode" "' + self.str510 + '"') tdSql.query(f'select * from information_schema.ins_dnodes') tdSql.checkEqual(tdSql.queryResult[0][8],"") - tdSql.execute('alter dnode 1 "activeCode" ""') + tdSql.error('alter dnode 1 "activeCode" ""') + tdSql.error('alter dnode 1 "activeCode"') + tdSql.execute('alter all dnodes "activeCode" ""') + tdSql.execute('alter all dnodes "activeCode"') tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes') tdSql.checkEqual(tdSql.queryResult[0][0],"") tdSql.checkEqual(tdSql.queryResult[0][1],'') @@ -259,6 +262,10 @@ class TDTestCase: tdSql.error('alter all dnodes "cActiveCode" "' + self.str257 + '"') tdSql.error('alter all dnodes "cActiveCode" "' + self.str254 + '"') tdSql.error('alter dnode 1 "cActiveCode" "' + self.str510 + '"') + tdSql.error('alter dnode 1 "cActiveCode" ""') + tdSql.error('alter dnode 1 "cActiveCode"') + tdSql.execute('alter all dnodes "cActiveCode" ""') + tdSql.execute('alter all dnodes "cActiveCode"') tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes') tdSql.checkEqual(tdSql.queryResult[0][0],"") tdSql.checkEqual(tdSql.queryResult[0][1],"") diff --git a/tests/system-test/2-query/db.py b/tests/system-test/2-query/db.py index 6870c59a0d..0246626e40 100644 --- a/tests/system-test/2-query/db.py +++ b/tests/system-test/2-query/db.py @@ -55,7 +55,7 @@ class TDTestCase: tdSql.checkData(0, 2, 0) tdSql.query("show dnode 1 variables like '%debugFlag'") - tdSql.checkRows(22) + tdSql.checkRows(23) tdSql.query("show dnode 1 variables like '____debugFlag'") tdSql.checkRows(2) diff --git a/tests/system-test/7-tmq/tmqVnodeReplicate.py b/tests/system-test/7-tmq/tmqVnodeReplicate.py index fd8ece02e0..0ee11781ed 100644 --- a/tests/system-test/7-tmq/tmqVnodeReplicate.py +++ b/tests/system-test/7-tmq/tmqVnodeReplicate.py @@ -105,7 +105,6 @@ class TDTestCase: topicNameList = ['topic1'] # expectRowsList = [] - tmqCom.initConsumerTable("cdb", self.replicaVar) tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -133,14 +132,15 @@ class TDTestCase: tmqCom.getStartConsumeNotifyFromTmqsim() tmqCom.getStartCommitNotifyFromTmqsim() - tdSql.query("select * from information_schema.ins_vnodes") - # tdLog.debug(tdSql.queryResult) - tdDnodes = cluster.dnodes - for result in tdSql.queryResult: - if result[2] == 'dbt' and result[3] == 'leader': - tdLog.debug("leader is %d"%(result[0] - 1)) - tdDnodes[result[0] - 1].stoptaosd() - break + tdSql.query("balance vgroup leader") + # tdSql.query("select * from information_schema.ins_vnodes") + # # tdLog.debug(tdSql.queryResult) + # tdDnodes = cluster.dnodes + # for result in tdSql.queryResult: + # if result[2] == 'dbt' and result[3] == 'leader': + # tdLog.debug("leader is %d"%(result[0] - 1)) + # tdDnodes[result[0] - 1].stoptaosd() + # break pInsertThread.join() expectRows = 1 @@ -159,7 +159,6 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): - tdSql.prepare() self.prepareTestEnv() self.tmqCase1() diff --git a/tests/system-test/8-stream/snode_restart.py b/tests/system-test/8-stream/snode_restart.py new file mode 100644 index 0000000000..3657163ab0 --- /dev/null +++ b/tests/system-test/8-stream/snode_restart.py @@ -0,0 +1,78 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * + +class TDTestCase: + updatecfgDict = {'checkpointInterval': 1100} + print("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + + def case1(self): + tdLog.debug("========case1 start========") + + os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &") + time.sleep(4) + tdSql.query("use test") + tdSql.query("create snode on dnode 4") + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") + tdLog.debug("========create stream useing snode and insert data ok========") + time.sleep(4) + + tdDnodes = cluster.dnodes + tdDnodes[3].stoptaosd() + time.sleep(2) + tdDnodes[3].starttaosd() + tdLog.debug("========snode restart ok========") + + time.sleep(30) + os.system("kill -9 `pgrep taosBenchmark`") + tdLog.debug("========stop insert ok========") + time.sleep(2) + + tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") + rowCnt = tdSql.getRows() + results = [] + for i in range(rowCnt): + results.append(tdSql.getData(i,1)) + + tdSql.query("select * from st1 order by groupid,_wstart") + tdSql.checkRows(rowCnt) + for i in range(rowCnt): + data1 = tdSql.getData(i,1) + data2 = results[i] + if data1 != data2: + tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2)) + tdLog.exit("check data error!") + + # tdLog.debug("========sleep 500s========") + # time.sleep(500) + + tdLog.debug("case1 end") + + def run(self): + self.case1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/8-stream/snode_restart_with_checkpoint.py b/tests/system-test/8-stream/snode_restart_with_checkpoint.py new file mode 100644 index 0000000000..9567bbe439 --- /dev/null +++ b/tests/system-test/8-stream/snode_restart_with_checkpoint.py @@ -0,0 +1,78 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * + +class TDTestCase: + # updatecfgDict = {'checkpointInterval': 5} + # print("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + + def case1(self): + tdLog.debug("========case1 start========") + + os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &") + time.sleep(4) + tdSql.query("use test") + tdSql.query("create snode on dnode 4") + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") + tdLog.debug("========create stream useing snode and insert data ok========") + time.sleep(60) + + tdDnodes = cluster.dnodes + tdDnodes[3].stoptaosd() + time.sleep(2) + tdDnodes[3].starttaosd() + tdLog.debug("========snode restart ok========") + + time.sleep(30) + os.system("kill -9 `pgrep taosBenchmark`") + tdLog.debug("========stop insert ok========") + time.sleep(2) + + tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") + rowCnt = tdSql.getRows() + results = [] + for i in range(rowCnt): + results.append(tdSql.getData(i,1)) + + tdSql.query("select * from st1 order by groupid,_wstart") + tdSql.checkRows(rowCnt) + for i in range(rowCnt): + data1 = tdSql.getData(i,1) + data2 = results[i] + if data1 != data2: + tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2)) + tdLog.exit("check data error!") + + # tdLog.debug("========sleep 500s========") + # time.sleep(500) + + tdLog.debug("case1 end") + + def run(self): + self.case1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/8-stream/vnode_restart.py b/tests/system-test/8-stream/vnode_restart.py new file mode 100644 index 0000000000..a53432b77a --- /dev/null +++ b/tests/system-test/8-stream/vnode_restart.py @@ -0,0 +1,77 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * + +class TDTestCase: + updatecfgDict = {'checkpointInterval': 1100} + print("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + + def case1(self): + tdLog.debug("========case1 start========") + + os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &") + time.sleep(4) + tdSql.query("use test") + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") + tdLog.debug("========create stream useing snode and insert data ok========") + time.sleep(4) + + tdDnodes = cluster.dnodes + tdDnodes[2].stoptaosd() + time.sleep(2) + tdDnodes[2].starttaosd() + tdLog.debug("========vnode restart ok========") + + time.sleep(30) + os.system("kill -9 `pgrep taosBenchmark`") + tdLog.debug("========stop insert ok========") + time.sleep(2) + + tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") + rowCnt = tdSql.getRows() + results = [] + for i in range(rowCnt): + results.append(tdSql.getData(i,1)) + + tdSql.query("select * from st1 order by groupid,_wstart") + tdSql.checkRows(rowCnt) + for i in range(rowCnt): + data1 = tdSql.getData(i,1) + data2 = results[i] + if data1 != data2: + tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2)) + tdLog.exit("check data error!") + + # tdLog.debug("========sleep 500s========") + # time.sleep(500) + + tdLog.debug("case1 end") + + def run(self): + self.case1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/output.txt b/tests/system-test/output.txt deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 81f98fea22..795132b14e 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -582,7 +582,7 @@ if __name__ == "__main__": tdDnodes.setAsan(asan) tdDnodes.stopAll() for dnode in tdDnodes.dnodes: - tdDnodes.deploy(dnode.index,{}) + tdDnodes.deploy(dnode.index,updateCfgDict) for dnode in tdDnodes.dnodes: tdDnodes.starttaosd(dnode.index) tdCases.logSql(logSql)