From 38f507d941cd89423bcc229c28830988f7aa855e Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 4 May 2023 05:49:36 +0000 Subject: [PATCH 01/11] transaction multithread --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndTrans.c | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f547ce025d..fac574f9e9 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -177,6 +177,7 @@ typedef struct { SArray* pRpcArray; SRWLatch lockRpcArray; int64_t mTraceId; + TdThreadMutex mutex; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 106eea0313..53dc0476cd 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -546,6 +546,7 @@ static void mndTransDropData(STrans *pTrans) { pTrans->param = NULL; pTrans->paramLen = 0; } + (void)taosThreadMutexDestroy(&pTrans->mutex); } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { @@ -651,6 +652,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0; taosInitRWLatch(&pTrans->lockRpcArray); + taosThreadMutexInit(&pTrans->mutex, NULL); if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL || pTrans->pRpcArray == NULL) { @@ -1307,7 +1309,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) int32_t code = 0; int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); if (numOfActions == 0) return code; - if (pTrans->redoActionPos >= numOfActions) return code; + + taosThreadMutexLock(&pTrans->mutex); + + if (pTrans->redoActionPos >= numOfActions) { + taosThreadMutexUnlock(&pTrans->mutex); + return code; + } mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); @@ -1377,6 +1385,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } } + taosThreadMutexUnlock(&pTrans->mutex); + return code; } From 9f3a0d9a5e463681295598821a7be439cc7de86b Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 5 May 2023 02:17:06 +0000 Subject: [PATCH 02/11] balance leader skew --- source/dnode/mnode/impl/src/mndVgroup.c | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index add2b1568c..7aa3afa627 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1917,7 +1917,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra int32_t vgid = pVgroup->vgId; int8_t replica = pVgroup->replica; - if(pVgroup->replica <= 1) { + if(pVgroup->replica <= 1) { mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica); return -1; } @@ -1951,6 +1951,19 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra return -1; } + SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); + if (pDb == NULL) { + mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId); + return -1; + } + + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) { + mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId); + return -1; + } + + mndReleaseDb(pMnode, pDb); + SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup); if (pRaw == NULL) { mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId); @@ -1965,7 +1978,8 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra } else { - mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist, online); + mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", + pTrans->id, vgid, dnodeId, exist, online); } return 0; From 997be0a3edef6ac78923eed97ab2607658f1c034 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 5 May 2023 17:04:36 +0800 Subject: [PATCH 03/11] fix: having clause issues --- source/libs/parser/src/parTranslater.c | 29 ++++++++++++++++++-------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 25e92a55ec..a2748c7e6d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2289,6 +2289,23 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) return TSDB_CODE_SUCCESS; } +static int32_t checkPartitionGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) { + int32_t code = TSDB_CODE_SUCCESS; + if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) { + return code; + } + if (NULL != pSelect->pHaving) { + code = checkExprForGroupBy(pCxt, &pSelect->pHaving); + } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pProjectionList) { + code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); + } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pOrderByList) { + code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList); + } + return code; +} + static int32_t checkWindowFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { if (NULL == pSelect->pWindow) { return TSDB_CODE_SUCCESS; @@ -2901,9 +2918,6 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->currClause = SQL_CLAUSE_ORDER_BY; code = translateExprList(pCxt, pSelect->pOrderByList); } - if (TSDB_CODE_SUCCESS == code) { - code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList); - } return code; } @@ -3022,9 +3036,6 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = translateProjectionList(pCxt, pSelect); } - if (TSDB_CODE_SUCCESS == code) { - code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); - } if (TSDB_CODE_SUCCESS == code) { code = translateFillValues(pCxt, pSelect); } @@ -3041,9 +3052,6 @@ static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) { } pCxt->currClause = SQL_CLAUSE_HAVING; int32_t code = translateExpr(pCxt, &pSelect->pHaving); - if (TSDB_CODE_SUCCESS == code && (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow)) { - code = checkExprForGroupBy(pCxt, &pSelect->pHaving); - } return code; } @@ -3629,6 +3637,9 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = translateOrderBy(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = checkPartitionGroupBy(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = checkAggColCoexist(pCxt, pSelect); } From 908fd4ff97bd2c1b00b85ef5b84fca1f264d4f4b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 5 May 2023 17:48:21 +0800 Subject: [PATCH 04/11] feat: process split vgroup msg imp --- source/dnode/mnode/impl/src/mndVgroup.c | 41 ++++--------------------- 1 file changed, 6 insertions(+), 35 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index d4be6bed73..e580c78383 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2103,7 +2103,7 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, return 0; } -static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { +int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t code = -1; STrans *pTrans = NULL; SSdbRaw *pRaw = NULL; @@ -2229,42 +2229,13 @@ _OVER: return code; } -static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SVgObj *pVgroup = NULL; - SDbObj *pDb = NULL; +extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq); - SSplitVgroupReq req = {0}; - if (tDeserializeSSplitVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } +static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); } - mInfo("vgId:%d, start to split", req.vgId); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) { - goto _OVER; - } - - pVgroup = mndAcquireVgroup(pMnode, req.vgId); - if (pVgroup == NULL) goto _OVER; - - pDb = mndAcquireDb(pMnode, pVgroup->dbName); - if (pDb == NULL) goto _OVER; - - code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup); - if (code != 0) { - mError("vgId:%d, failed to start to split vgroup since %s, db:%s", pVgroup->vgId, terrstr(), pDb->name); - goto _OVER; - } - - mInfo("vgId:%d, split vgroup started successfully. db:%s", pVgroup->vgId, pDb->name); - -_OVER: - mndReleaseVgroup(pMnode, pVgroup); - mndReleaseDb(pMnode, pDb); - return code; -} +#ifndef TD_ENTERPRISE +int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; } +#endif static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pSrc, SDnodeObj *pDst) { From 76a20b2221b22e246ac4528e8e5f305f38e31cba Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 5 May 2023 18:57:15 +0800 Subject: [PATCH 05/11] enh: adjust enum ETrnExec --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/inc/mndTrans.h | 1 + source/dnode/mnode/impl/src/mndTrans.c | 4 +++- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index a7c4b2e70e..ef4a2b52bb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -60,7 +60,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (IsReq(pMsg)) { if (code != 0) { if (terrno != 0) code = terrno; - dGError("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(code), TMSG_INFO(pMsg->msgType)); + dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType)); } vmSendRsp(pMsg, code); } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f547ce025d..e227848e9f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -118,7 +118,7 @@ typedef enum { } ETrnPolicy; typedef enum { - TRN_EXEC_PRARLLEL = 0, + TRN_EXEC_PARALLEL = 0, TRN_EXEC_SERIAL = 1, } ETrnExec; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 057e3efbbc..03434573c4 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -76,6 +76,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); void mndTransSetSerial(STrans *pTrans); +void mndTransSetParallel(STrans *pTrans); void mndTransSetOper(STrans *pTrans, EOperType oper); int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 106eea0313..ccb0882bb0 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -643,7 +643,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; pTrans->conflict = conflict; - pTrans->exec = TRN_EXEC_PRARLLEL; + pTrans->exec = TRN_EXEC_PARALLEL; pTrans->createdTime = taosGetTimestampMs(); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); @@ -793,6 +793,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } +void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; } + void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; } static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { From 47791dfe2fd0cfbea9b93f5fc075e6c0dc1af4e3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 6 May 2023 11:02:20 +0800 Subject: [PATCH 06/11] fix: group by validation issue --- source/libs/parser/src/parTranslater.c | 16 ++++++++++++---- source/libs/parser/test/parSelectTest.cpp | 13 +++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index a2748c7e6d..1dcc18e5fb 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2289,20 +2289,22 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) return TSDB_CODE_SUCCESS; } -static int32_t checkPartitionGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) { +static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) { int32_t code = TSDB_CODE_SUCCESS; - if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) { + if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) { return code; } if (NULL != pSelect->pHaving) { code = checkExprForGroupBy(pCxt, &pSelect->pHaving); } +/* if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pProjectionList) { code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pOrderByList) { code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList); } +*/ return code; } @@ -2918,6 +2920,9 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->currClause = SQL_CLAUSE_ORDER_BY; code = translateExprList(pCxt, pSelect->pOrderByList); } + if (TSDB_CODE_SUCCESS == code) { + code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList); + } return code; } @@ -3036,6 +3041,9 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = translateProjectionList(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); + } if (TSDB_CODE_SUCCESS == code) { code = translateFillValues(pCxt, pSelect); } @@ -3635,10 +3643,10 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect code = translateSelectList(pCxt, pSelect); } if (TSDB_CODE_SUCCESS == code) { - code = translateOrderBy(pCxt, pSelect); + code = checkHavingGroupBy(pCxt, pSelect); } if (TSDB_CODE_SUCCESS == code) { - code = checkPartitionGroupBy(pCxt, pSelect); + code = translateOrderBy(pCxt, pSelect); } if (TSDB_CODE_SUCCESS == code) { code = checkAggColCoexist(pCxt, pSelect); diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index ec6c69ea8d..2d8ce55b72 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -239,6 +239,19 @@ TEST_F(ParserSelectTest, groupBySemanticCheck) { run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 GROUP BY c1", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); } +TEST_F(ParserSelectTest, havingCheck) { + useDb("root", "test"); + + run("select tbname,count(*) from st1 partition by tbname having c1>0", TSDB_CODE_PAR_INVALID_OPTR_USAGE); + + run("select tbname,count(*) from st1 group by tbname having c1>0", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); + + run("select max(c1) from st1 group by tbname having c1>0"); + + run("select max(c1) from st1 partition by tbname having c1>0"); +} + + TEST_F(ParserSelectTest, orderBy) { useDb("root", "test"); From fced6270215aa2ba29a936ef1faaed722e25ab5d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 5 May 2023 19:14:34 +0800 Subject: [PATCH 07/11] enh: add mndTransCommitVgStatus --- source/dnode/mnode/impl/src/mndVgroup.c | 30 ++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e580c78383..83989a22bb 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2103,6 +2103,18 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, return 0; } +static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) { + SSdbRaw *pRaw = mndVgroupActionEncode(pVg); + if (pRaw == NULL) goto _err; + if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err; + (void)sdbSetRawStatus(pRaw, vgStatus); + pRaw = NULL; + return 0; +_err: + sdbFreeRaw(pRaw); + return -1; +} + int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t code = -1; STrans *pTrans = NULL; @@ -2181,28 +2193,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro if (pDb->cfg.replications != newVg1.replica) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER; } else { - pRaw = mndVgroupActionEncode(&newVg1); - if (pRaw == NULL) goto _OVER; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - pRaw = NULL; + if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER; } if (pDb->cfg.replications != newVg2.replica) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER; } else { - pRaw = mndVgroupActionEncode(&newVg2); - if (pRaw == NULL) goto _OVER; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - pRaw = NULL; + if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER; } - pRaw = mndVgroupActionEncode(pVgroup); - if (pRaw == NULL) goto _OVER; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; - (void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - pRaw = NULL; + if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER; memcpy(&dbObj, pDb, sizeof(SDbObj)); if (dbObj.cfg.pRetensions != NULL) { From 028bfd34bf4abd69885af45224c55e839ace47dd Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Sat, 6 May 2023 14:59:27 +0800 Subject: [PATCH 08/11] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index d90db3cab0..634f50356d 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -227,7 +227,7 @@ T = 最新事件时间 - DELETE_MARK ## 流式计算支持的函数 1. 所有的 [单行函数](../function/#单行函数) 均可用于流计算。 -2. 以下 19 个聚合/选择函数 不能 应用在创建流计算的 SQL 语句,[系统信息函数](../function/#系统信息函数) 也不能用于流计算中。此外的其他类型的函数均可用于流计算。 +2. 以下 19 个聚合/选择函数 不能 应用在创建流计算的 SQL 语句。此外的其他类型的函数均可用于流计算。 - [leastsquares](../function/#leastsquares) - [percentile](../function/#percentile) From e48fdceb7c1b64e6f6e0976a6168041409ae88a3 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Sat, 6 May 2023 16:03:12 +0800 Subject: [PATCH 09/11] Update 14-stream.md --- docs/en/12-taos-sql/14-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index d99c5bdae4..43c49c03cd 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -147,7 +147,7 @@ In both of these methods, configuring the watermark is essential for obtaining a ## Supported functions -All [scalar functions](../function/#scalar-functions) are available in stream processing. All [System information functions](../function/#system-information-functions) are not allowed in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings: +All [scalar functions](../function/#scalar-functions) are available in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings: - [leastsquares](../function/#leastsquares) - [percentile](../function/#percentile) - [top](../function/#top) From 1f25cc57cdbfebbc755adb35ed4c6d41ffb5af9c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 6 May 2023 16:39:40 +0800 Subject: [PATCH 10/11] fix: projection merge issue --- source/libs/planner/src/planLogicCreater.c | 2 +- source/libs/planner/src/planOptimizer.c | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 6544898be9..d1011cbf3a 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1070,7 +1070,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); - pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList); + pProject->ignoreGroupId = (pSelect->isSubquery && NULL == pProject->node.pLimit && NULL == pProject->node.pSlimit) ? true : (NULL == pSelect->pPartitionByList); pProject->node.groupAction = (!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 4f8b57de5f..5cb3591984 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2351,6 +2351,17 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } +static int32_t mergeProjectionsLogicNode(SLogicNode* pDstNode, SLogicNode* pSrcNode) { + SProjectLogicNode *pDstPro = (SProjectLogicNode*)pDstNode; + SProjectLogicNode *pSrcPro = (SProjectLogicNode*)pSrcNode; + + if (!pSrcPro->ignoreGroupId) { + pDstPro->ignoreGroupId = pSrcPro->ignoreGroupId; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0); @@ -2360,8 +2371,11 @@ static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* if (TSDB_CODE_SUCCESS == code) { if (1 == LIST_LENGTH(pChild->pChildren)) { - SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0); - code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild); + code = mergeProjectionsLogicNode(pSelfNode, pChild); + if (TSDB_CODE_SUCCESS == code) { + SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0); + code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild); + } } else { // no grand child NODES_CLEAR_LIST(pSelfNode->pChildren); } From 758bcb901a1209f1638c641b451f88e19c61df63 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Sat, 6 May 2023 22:55:44 +0800 Subject: [PATCH 11/11] docs: fix example tmp (#21197) --- docs/examples/rust/nativeexample/examples/subscribe_demo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs index 7551ad46b1..d54bb60e93 100644 --- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs +++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> { taos.exec_many([ format!("DROP TOPIC IF EXISTS tmq_meters"), format!("DROP DATABASE IF EXISTS `{db}`"), - format!("CREATE DATABASE `{db}`"), + format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"), format!("USE `{db}`"), // create super table format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),