diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md
index d99c5bdae4..43c49c03cd 100644
--- a/docs/en/12-taos-sql/14-stream.md
+++ b/docs/en/12-taos-sql/14-stream.md
@@ -147,7 +147,7 @@ In both of these methods, configuring the watermark is essential for obtaining a
## Supported functions
-All [scalar functions](../function/#scalar-functions) are available in stream processing. All [System information functions](../function/#system-information-functions) are not allowed in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings:
+All [scalar functions](../function/#scalar-functions) are available in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings:
- [leastsquares](../function/#leastsquares)
- [percentile](../function/#percentile)
- [top](../function/#top)
diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
index 7551ad46b1..d54bb60e93 100644
--- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
+++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
- format!("CREATE DATABASE `{db}`"),
+ format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md
index d90db3cab0..634f50356d 100644
--- a/docs/zh/12-taos-sql/14-stream.md
+++ b/docs/zh/12-taos-sql/14-stream.md
@@ -227,7 +227,7 @@ T = 最新事件时间 - DELETE_MARK
## 流式计算支持的函数
1. 所有的 [单行函数](../function/#单行函数) 均可用于流计算。
-2. 以下 19 个聚合/选择函数 不能 应用在创建流计算的 SQL 语句,[系统信息函数](../function/#系统信息函数) 也不能用于流计算中。此外的其他类型的函数均可用于流计算。
+2. 以下 19 个聚合/选择函数 不能 应用在创建流计算的 SQL 语句。此外的其他类型的函数均可用于流计算。
- [leastsquares](../function/#leastsquares)
- [percentile](../function/#percentile)
diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
index a7c4b2e70e..ef4a2b52bb 100644
--- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
@@ -60,7 +60,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (IsReq(pMsg)) {
if (code != 0) {
if (terrno != 0) code = terrno;
- dGError("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(code), TMSG_INFO(pMsg->msgType));
+ dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
}
vmSendRsp(pMsg, code);
}
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index f547ce025d..fbdfc4b360 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -118,7 +118,7 @@ typedef enum {
} ETrnPolicy;
typedef enum {
- TRN_EXEC_PRARLLEL = 0,
+ TRN_EXEC_PARALLEL = 0,
TRN_EXEC_SERIAL = 1,
} ETrnExec;
@@ -177,6 +177,7 @@ typedef struct {
SArray* pRpcArray;
SRWLatch lockRpcArray;
int64_t mTraceId;
+ TdThreadMutex mutex;
} STrans;
typedef struct {
diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h
index 057e3efbbc..03434573c4 100644
--- a/source/dnode/mnode/impl/inc/mndTrans.h
+++ b/source/dnode/mnode/impl/inc/mndTrans.h
@@ -76,6 +76,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransSetSerial(STrans *pTrans);
+void mndTransSetParallel(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans);
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 106eea0313..cfb5bef9d0 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -546,6 +546,7 @@ static void mndTransDropData(STrans *pTrans) {
pTrans->param = NULL;
pTrans->paramLen = 0;
}
+ (void)taosThreadMutexDestroy(&pTrans->mutex);
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
@@ -643,7 +644,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy;
pTrans->conflict = conflict;
- pTrans->exec = TRN_EXEC_PRARLLEL;
+ pTrans->exec = TRN_EXEC_PARALLEL;
pTrans->createdTime = taosGetTimestampMs();
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
@@ -651,6 +652,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0;
taosInitRWLatch(&pTrans->lockRpcArray);
+ taosThreadMutexInit(&pTrans->mutex, NULL);
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
pTrans->pRpcArray == NULL) {
@@ -793,6 +795,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname)
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
+void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
+
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
@@ -1307,7 +1311,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code;
- if (pTrans->redoActionPos >= numOfActions) return code;
+
+ taosThreadMutexLock(&pTrans->mutex);
+
+ if (pTrans->redoActionPos >= numOfActions) {
+ taosThreadMutexUnlock(&pTrans->mutex);
+ return code;
+ }
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos);
@@ -1377,6 +1387,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
}
}
+ taosThreadMutexUnlock(&pTrans->mutex);
+
return code;
}
diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c
index 2ac1eb3114..72a7ed77a4 100644
--- a/source/dnode/mnode/impl/src/mndVgroup.c
+++ b/source/dnode/mnode/impl/src/mndVgroup.c
@@ -2117,6 +2117,18 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
return 0;
}
+static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) {
+ SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
+ if (pRaw == NULL) goto _err;
+ if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err;
+ (void)sdbSetRawStatus(pRaw, vgStatus);
+ pRaw = NULL;
+ return 0;
+_err:
+ sdbFreeRaw(pRaw);
+ return -1;
+}
+
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1;
STrans *pTrans = NULL;
@@ -2195,28 +2207,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
if (pDb->cfg.replications != newVg1.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
} else {
- pRaw = mndVgroupActionEncode(&newVg1);
- if (pRaw == NULL) goto _OVER;
- if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
- (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
- pRaw = NULL;
+ if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER;
}
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else {
- pRaw = mndVgroupActionEncode(&newVg2);
- if (pRaw == NULL) goto _OVER;
- if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
- (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
- pRaw = NULL;
+ if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER;
}
- pRaw = mndVgroupActionEncode(pVgroup);
- if (pRaw == NULL) goto _OVER;
- if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
- (void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
- pRaw = NULL;
+ if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER;
memcpy(&dbObj, pDb, sizeof(SDbObj));
if (dbObj.cfg.pRetensions != NULL) {
@@ -2243,42 +2243,13 @@ _OVER:
return code;
}
-static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
- SMnode *pMnode = pReq->info.node;
- int32_t code = -1;
- SVgObj *pVgroup = NULL;
- SDbObj *pDb = NULL;
+extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
- SSplitVgroupReq req = {0};
- if (tDeserializeSSplitVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
- terrno = TSDB_CODE_INVALID_MSG;
- goto _OVER;
- }
+static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
- mInfo("vgId:%d, start to split", req.vgId);
- if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) {
- goto _OVER;
- }
-
- pVgroup = mndAcquireVgroup(pMnode, req.vgId);
- if (pVgroup == NULL) goto _OVER;
-
- pDb = mndAcquireDb(pMnode, pVgroup->dbName);
- if (pDb == NULL) goto _OVER;
-
- code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
- if (code != 0) {
- mError("vgId:%d, failed to start to split vgroup since %s, db:%s", pVgroup->vgId, terrstr(), pDb->name);
- goto _OVER;
- }
-
- mInfo("vgId:%d, split vgroup started successfully. db:%s", pVgroup->vgId, pDb->name);
-
-_OVER:
- mndReleaseVgroup(pMnode, pVgroup);
- mndReleaseDb(pMnode, pDb);
- return code;
-}
+#ifndef TD_ENTERPRISE
+int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
+#endif
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SDnodeObj *pSrc, SDnodeObj *pDst) {
diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c
index 6544898be9..d1011cbf3a 100644
--- a/source/libs/planner/src/planLogicCreater.c
+++ b/source/libs/planner/src/planLogicCreater.c
@@ -1070,7 +1070,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
TSWAP(pProject->node.pLimit, pSelect->pLimit);
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
- pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList);
+ pProject->ignoreGroupId = (pSelect->isSubquery && NULL == pProject->node.pLimit && NULL == pProject->node.pSlimit) ? true : (NULL == pSelect->pPartitionByList);
pProject->node.groupAction =
(!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c
index 4f8b57de5f..5cb3591984 100644
--- a/source/libs/planner/src/planOptimizer.c
+++ b/source/libs/planner/src/planOptimizer.c
@@ -2351,6 +2351,17 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
+static int32_t mergeProjectionsLogicNode(SLogicNode* pDstNode, SLogicNode* pSrcNode) {
+ SProjectLogicNode *pDstPro = (SProjectLogicNode*)pDstNode;
+ SProjectLogicNode *pSrcPro = (SProjectLogicNode*)pSrcNode;
+
+ if (!pSrcPro->ignoreGroupId) {
+ pDstPro->ignoreGroupId = pSrcPro->ignoreGroupId;
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
@@ -2360,8 +2371,11 @@ static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
if (TSDB_CODE_SUCCESS == code) {
if (1 == LIST_LENGTH(pChild->pChildren)) {
- SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0);
- code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild);
+ code = mergeProjectionsLogicNode(pSelfNode, pChild);
+ if (TSDB_CODE_SUCCESS == code) {
+ SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0);
+ code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild);
+ }
} else { // no grand child
NODES_CLEAR_LIST(pSelfNode->pChildren);
}