From 38f507d941cd89423bcc229c28830988f7aa855e Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 4 May 2023 05:49:36 +0000 Subject: [PATCH 1/8] transaction multithread --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndTrans.c | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f547ce025d..fac574f9e9 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -177,6 +177,7 @@ typedef struct { SArray* pRpcArray; SRWLatch lockRpcArray; int64_t mTraceId; + TdThreadMutex mutex; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 106eea0313..53dc0476cd 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -546,6 +546,7 @@ static void mndTransDropData(STrans *pTrans) { pTrans->param = NULL; pTrans->paramLen = 0; } + (void)taosThreadMutexDestroy(&pTrans->mutex); } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { @@ -651,6 +652,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0; taosInitRWLatch(&pTrans->lockRpcArray); + taosThreadMutexInit(&pTrans->mutex, NULL); if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL || pTrans->pRpcArray == NULL) { @@ -1307,7 +1309,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) int32_t code = 0; int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); if (numOfActions == 0) return code; - if (pTrans->redoActionPos >= numOfActions) return code; + + taosThreadMutexLock(&pTrans->mutex); + + if (pTrans->redoActionPos >= numOfActions) { + taosThreadMutexUnlock(&pTrans->mutex); + return code; + } mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); @@ -1377,6 +1385,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } } + taosThreadMutexUnlock(&pTrans->mutex); + return code; } From 908fd4ff97bd2c1b00b85ef5b84fca1f264d4f4b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 5 May 2023 17:48:21 +0800 Subject: [PATCH 2/8] feat: process split vgroup msg imp --- source/dnode/mnode/impl/src/mndVgroup.c | 41 ++++--------------------- 1 file changed, 6 insertions(+), 35 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index d4be6bed73..e580c78383 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2103,7 +2103,7 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, return 0; } -static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { +int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t code = -1; STrans *pTrans = NULL; SSdbRaw *pRaw = NULL; @@ -2229,42 +2229,13 @@ _OVER: return code; } -static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SVgObj *pVgroup = NULL; - SDbObj *pDb = NULL; +extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq); - SSplitVgroupReq req = {0}; - if (tDeserializeSSplitVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } +static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); } - mInfo("vgId:%d, start to split", req.vgId); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) { - goto _OVER; - } - - pVgroup = mndAcquireVgroup(pMnode, req.vgId); - if (pVgroup == NULL) goto _OVER; - - pDb = mndAcquireDb(pMnode, pVgroup->dbName); - if (pDb == NULL) goto _OVER; - - code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup); - if (code != 0) { - mError("vgId:%d, failed to start to split vgroup since %s, db:%s", pVgroup->vgId, terrstr(), pDb->name); - goto _OVER; - } - - mInfo("vgId:%d, split vgroup started successfully. db:%s", pVgroup->vgId, pDb->name); - -_OVER: - mndReleaseVgroup(pMnode, pVgroup); - mndReleaseDb(pMnode, pDb); - return code; -} +#ifndef TD_ENTERPRISE +int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; } +#endif static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pSrc, SDnodeObj *pDst) { From 76a20b2221b22e246ac4528e8e5f305f38e31cba Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 5 May 2023 18:57:15 +0800 Subject: [PATCH 3/8] enh: adjust enum ETrnExec --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/inc/mndTrans.h | 1 + source/dnode/mnode/impl/src/mndTrans.c | 4 +++- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index a7c4b2e70e..ef4a2b52bb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -60,7 +60,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (IsReq(pMsg)) { if (code != 0) { if (terrno != 0) code = terrno; - dGError("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(code), TMSG_INFO(pMsg->msgType)); + dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType)); } vmSendRsp(pMsg, code); } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f547ce025d..e227848e9f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -118,7 +118,7 @@ typedef enum { } ETrnPolicy; typedef enum { - TRN_EXEC_PRARLLEL = 0, + TRN_EXEC_PARALLEL = 0, TRN_EXEC_SERIAL = 1, } ETrnExec; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 057e3efbbc..03434573c4 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -76,6 +76,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); void mndTransSetSerial(STrans *pTrans); +void mndTransSetParallel(STrans *pTrans); void mndTransSetOper(STrans *pTrans, EOperType oper); int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 106eea0313..ccb0882bb0 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -643,7 +643,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; pTrans->conflict = conflict; - pTrans->exec = TRN_EXEC_PRARLLEL; + pTrans->exec = TRN_EXEC_PARALLEL; pTrans->createdTime = taosGetTimestampMs(); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); @@ -793,6 +793,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } +void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; } + void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; } static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { From fced6270215aa2ba29a936ef1faaed722e25ab5d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 5 May 2023 19:14:34 +0800 Subject: [PATCH 4/8] enh: add mndTransCommitVgStatus --- source/dnode/mnode/impl/src/mndVgroup.c | 30 ++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e580c78383..83989a22bb 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2103,6 +2103,18 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, return 0; } +static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) { + SSdbRaw *pRaw = mndVgroupActionEncode(pVg); + if (pRaw == NULL) goto _err; + if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err; + (void)sdbSetRawStatus(pRaw, vgStatus); + pRaw = NULL; + return 0; +_err: + sdbFreeRaw(pRaw); + return -1; +} + int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t code = -1; STrans *pTrans = NULL; @@ -2181,28 +2193,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro if (pDb->cfg.replications != newVg1.replica) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER; } else { - pRaw = mndVgroupActionEncode(&newVg1); - if (pRaw == NULL) goto _OVER; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - pRaw = NULL; + if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER; } if (pDb->cfg.replications != newVg2.replica) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER; } else { - pRaw = mndVgroupActionEncode(&newVg2); - if (pRaw == NULL) goto _OVER; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - pRaw = NULL; + if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER; } - pRaw = mndVgroupActionEncode(pVgroup); - if (pRaw == NULL) goto _OVER; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; - (void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - pRaw = NULL; + if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER; memcpy(&dbObj, pDb, sizeof(SDbObj)); if (dbObj.cfg.pRetensions != NULL) { From 028bfd34bf4abd69885af45224c55e839ace47dd Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Sat, 6 May 2023 14:59:27 +0800 Subject: [PATCH 5/8] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index d90db3cab0..634f50356d 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -227,7 +227,7 @@ T = 最新事件时间 - DELETE_MARK ## 流式计算支持的函数 1. 所有的 [单行函数](../function/#单行函数) 均可用于流计算。 -2. 以下 19 个聚合/选择函数 不能 应用在创建流计算的 SQL 语句,[系统信息函数](../function/#系统信息函数) 也不能用于流计算中。此外的其他类型的函数均可用于流计算。 +2. 以下 19 个聚合/选择函数 不能 应用在创建流计算的 SQL 语句。此外的其他类型的函数均可用于流计算。 - [leastsquares](../function/#leastsquares) - [percentile](../function/#percentile) From e48fdceb7c1b64e6f6e0976a6168041409ae88a3 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Sat, 6 May 2023 16:03:12 +0800 Subject: [PATCH 6/8] Update 14-stream.md --- docs/en/12-taos-sql/14-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index d99c5bdae4..43c49c03cd 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -147,7 +147,7 @@ In both of these methods, configuring the watermark is essential for obtaining a ## Supported functions -All [scalar functions](../function/#scalar-functions) are available in stream processing. All [System information functions](../function/#system-information-functions) are not allowed in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings: +All [scalar functions](../function/#scalar-functions) are available in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings: - [leastsquares](../function/#leastsquares) - [percentile](../function/#percentile) - [top](../function/#top) From 1f25cc57cdbfebbc755adb35ed4c6d41ffb5af9c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 6 May 2023 16:39:40 +0800 Subject: [PATCH 7/8] fix: projection merge issue --- source/libs/planner/src/planLogicCreater.c | 2 +- source/libs/planner/src/planOptimizer.c | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 6544898be9..d1011cbf3a 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1070,7 +1070,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); - pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList); + pProject->ignoreGroupId = (pSelect->isSubquery && NULL == pProject->node.pLimit && NULL == pProject->node.pSlimit) ? true : (NULL == pSelect->pPartitionByList); pProject->node.groupAction = (!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 4f8b57de5f..5cb3591984 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2351,6 +2351,17 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } +static int32_t mergeProjectionsLogicNode(SLogicNode* pDstNode, SLogicNode* pSrcNode) { + SProjectLogicNode *pDstPro = (SProjectLogicNode*)pDstNode; + SProjectLogicNode *pSrcPro = (SProjectLogicNode*)pSrcNode; + + if (!pSrcPro->ignoreGroupId) { + pDstPro->ignoreGroupId = pSrcPro->ignoreGroupId; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0); @@ -2360,8 +2371,11 @@ static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* if (TSDB_CODE_SUCCESS == code) { if (1 == LIST_LENGTH(pChild->pChildren)) { - SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0); - code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild); + code = mergeProjectionsLogicNode(pSelfNode, pChild); + if (TSDB_CODE_SUCCESS == code) { + SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0); + code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild); + } } else { // no grand child NODES_CLEAR_LIST(pSelfNode->pChildren); } From 758bcb901a1209f1638c641b451f88e19c61df63 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Sat, 6 May 2023 22:55:44 +0800 Subject: [PATCH 8/8] docs: fix example tmp (#21197) --- docs/examples/rust/nativeexample/examples/subscribe_demo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs index 7551ad46b1..d54bb60e93 100644 --- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs +++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> { taos.exec_many([ format!("DROP TOPIC IF EXISTS tmq_meters"), format!("DROP DATABASE IF EXISTS `{db}`"), - format!("CREATE DATABASE `{db}`"), + format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"), format!("USE `{db}`"), // create super table format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),