From 5367f25b4d53caa9e88dba8eea1dce8cd15fe73a Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 19 Apr 2023 16:55:08 +0800 Subject: [PATCH 1/5] feat: the having clause can be used after the partition by clause or window clause --- source/libs/parser/src/parTranslater.c | 5 +++-- source/libs/planner/src/planLogicCreater.c | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c6e70798a7..25e92a55ec 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3035,12 +3035,13 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect } static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (NULL == pSelect->pGroupByList && NULL != pSelect->pHaving) { + if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && + NULL != pSelect->pHaving) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); } pCxt->currClause = SQL_CLAUSE_HAVING; int32_t code = translateExpr(pCxt, &pSelect->pHaving); - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow)) { code = checkExprForGroupBy(pCxt, &pSelect->pHaving); } return code; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index c9ee83a647..6544898be9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -740,6 +740,13 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets); } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { + pWindow->node.pConditions = nodesCloneNode(pSelect->pHaving); + if (NULL == pWindow->node.pConditions) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + pSelect->hasAggFuncs = false; if (TSDB_CODE_SUCCESS == code) { @@ -1132,6 +1139,14 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS } } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving && !pSelect->hasAggFuncs && NULL == pSelect->pGroupByList && + NULL == pSelect->pWindow) { + pPartition->node.pConditions = nodesCloneNode(pSelect->pHaving); + if (NULL == pPartition->node.pConditions) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { *pLogicNode = (SLogicNode*)pPartition; } else { From 55c6f115b05105d7c3f455c9902fc233d9a6d6de Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 18 Apr 2023 10:58:19 +0800 Subject: [PATCH 2/5] enh: add applied-index in sync node logging msg --- source/libs/sync/src/syncUtil.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 056a597777..cf796c3862 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -202,21 +202,22 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo // restore error code terrno = errCode; - + SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); + if (pNode != NULL) { taosPrintLog(flags, level, dflag, - "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 - ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 + "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", applied-index:%" PRId64 + ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s", - pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex, - logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, - pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, - pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, - pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, - bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr); + pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, appliedIndex, + logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + pNode->electNum, pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, + aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, + pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, + pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr); } } From a21e0fe75e9a15e7423e2e51e137123b61455bc6 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 20 Apr 2023 10:21:50 +0800 Subject: [PATCH 3/5] enh: propose vnode commit synchronously --- include/common/tmsg.h | 2 +- source/dnode/vnode/src/inc/vnd.h | 1 - source/dnode/vnode/src/inc/vnodeInt.h | 4 ---- source/dnode/vnode/src/vnd/vnodeCommit.c | 14 +------------- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 -- source/dnode/vnode/src/vnd/vnodeSync.c | 5 ----- 6 files changed, 2 insertions(+), 26 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb2450e8f7..6d1d3ebce6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) { } static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || - (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); + (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT); } static inline bool syncUtilUserCommit(tmsg_t msgType) { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ae65e2ba3f..a67f246e73 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit); -void vnodeUpdCommitSched(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81f7c3d52a..2576fce998 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -378,7 +378,6 @@ struct SVnode { STQ* pTq; SSink* pSink; tsem_t canCommit; - SVCommitSched commitSched; int64_t sync; TdThreadMutex lock; bool blocked; @@ -387,9 +386,6 @@ struct SVnode { int32_t blockSec; int64_t blockSeq; SQHandle* pQuery; -#if 0 - SRpcHandleInfo blockInfo; -#endif }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 3eb813f394..847125018c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -143,23 +143,13 @@ _exit: return code; } -void vnodeUpdCommitSched(SVnode *pVnode) { - int64_t randNum = taosRand(); - pVnode->commitSched.commitMs = taosGetMonoTimestampMs(); - pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs); -} - int vnodeShouldCommit(SVnode *pVnode, bool atExit) { - SVCommitSched *pSched = &pVnode->commitSched; - int64_t nowMs = taosGetMonoTimestampMs(); bool diskAvail = osDataSpaceAvailable(); bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { - needCommit = - ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || - ((pVnode->inUse->size > 0) && atExit); + needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit); } taosThreadMutexUnlock(&pVnode->mutex); return needCommit; @@ -431,8 +421,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm); - vnodeUpdCommitSched(pVnode); - // persist wal before starting if (walPersist(pVnode->pWal) < 0) { vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c7d155be0d..e8f34d27ac 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -286,8 +286,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); - vnodeUpdCommitSched(pVnode); - int8_t rollback = vnodeShouldRollback(pVnode); // open buffer pool diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d4a394b584..fb7f2901a1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak pVnode->blocked = true; pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSeq = seq; -#if 0 - pVnode->blockInfo = pMsg->info; -#endif } taosThreadMutexUnlock(&pVnode->lock); @@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) { } else { tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg); } - - vnodeUpdCommitSched(pVnode); } #if BATCH_ENABLE From a1ca9466cdba97d6d04a994e5bd548e34e7df30e Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 20 Apr 2023 10:23:51 +0800 Subject: [PATCH 4/5] enh: adjust size limit of appyQ and negotiationWin --- include/util/taoserror.h | 1 + include/util/tdef.h | 5 ++++- source/libs/sync/src/syncPipeline.c | 10 +++++++++- source/util/src/terror.c | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ab89466a19..0d9292cc6b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -547,6 +547,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) #define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917) +#define TSDB_CODE_SYN_NEGO_WIN_EXCEEDED TAOS_DEF_ERROR_CODE(0, 0X0918) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/include/util/tdef.h b/include/util/tdef.h index 2f86395dad..9dd454bb68 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -285,8 +285,11 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_MAX_REPLICA 5 + #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 -#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4) +#define TSDB_SYNC_LOG_BUFFER_RETENTION 256 +#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 +#define TSDB_SYNC_NEGOTIATION_WIN 512 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 6bebef77dc..5e6058ef03 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -53,8 +53,15 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _err; } + if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) { + terrno = TSDB_CODE_SYN_NEGO_WIN_EXCEEDED; + sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(), + index, pBuf->commitIndex); + goto _err; + } + SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); - if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) { + if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) { terrno = TSDB_CODE_SYN_WRITE_STALL; sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64, pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex); @@ -83,6 +90,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt _err: syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); + taosMsleep(1); return -1; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 002d605793..34b09761c8 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -426,6 +426,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restor TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGO_WIN_EXCEEDED, "Sync negotiation win exceeded") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") //tq From 1af9b52d9b3250fd9841b17f66c8395f76db4533 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Thu, 20 Apr 2023 16:40:10 +0800 Subject: [PATCH 5/5] Update 02-database.md --- docs/zh/12-taos-sql/02-database.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/zh/12-taos-sql/02-database.md b/docs/zh/12-taos-sql/02-database.md index b54998e08d..dd30635660 100644 --- a/docs/zh/12-taos-sql/02-database.md +++ b/docs/zh/12-taos-sql/02-database.md @@ -36,7 +36,6 @@ database_option: { | TSDB_PAGESIZE value | WAL_RETENTION_PERIOD value | WAL_RETENTION_SIZE value - | WAL_ROLL_PERIOD value | WAL_SEGMENT_SIZE value } ```