From 43966abbc6f29f5d5fe2572c78c3cd637374a96b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 15 Feb 2023 18:39:05 +0800 Subject: [PATCH] feat:add disable stream option --- include/common/tglobal.h | 2 ++ source/common/src/tglobal.c | 6 ++++++ source/dnode/vnode/src/tq/tqPush.c | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2331f0b23c..5a0c0e0777 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -159,6 +159,8 @@ extern int32_t tsUptimeInterval; extern int32_t tsRpcRetryLimit; extern int32_t tsRpcRetryInterval; +extern bool tsDisableStream; + // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d4849650e6..46c2a32535 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -189,6 +189,7 @@ int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdLdLibPath[512] = ""; +bool tsDisableStream = true; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -469,6 +470,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; + if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; + GRANT_CFG_ADD; return 0; } @@ -770,6 +773,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } + + tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; + GRANT_CFG_GET; return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f89bc20362..b9df3e5826 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -307,7 +307,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) taosWUnLockLatch(&pTq->pushLock); } - if (vnodeIsRoleLeader(pTq->pVnode)) { + if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (msgType == TDMT_VND_SUBMIT) { void* data = taosMemoryMalloc(msgLen);