diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6863e3c336..f2ba16ff47 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -97,7 +97,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_MNODE, "drop-mnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_SET_STANDBY, "set-mnode-standby", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL) @@ -239,6 +238,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 99b704826d..21d8405f58 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -59,7 +59,13 @@ void mndClose(SMnode *pMnode); * @param pMnode The mnode object. */ int32_t mndStart(SMnode *pMnode); -void mndStop(SMnode *pMnode); + +/** + * @brief Stop mnode + * + * @param pMnode The mnode object. + */ +void mndStop(SMnode *pMnode); /** * @brief Get mnode monitor info. @@ -71,17 +77,25 @@ void mndStop(SMnode *pMnode); * @return int32_t 0 for success, -1 for failure. */ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pCluster, SMonVgroupInfo *pVgroup, SMonGrantInfo *pGrant); + +/** + * @brief Get mnode loads for status msg. + * + * @param pMnode The mnode object. + * @param pLoad + * @return int32_t 0 for success, -1 for failure. + */ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); /** - * @brief Process the read, write, sync request. + * @brief Process the rpc, sync request. * * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); -int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); +int32_t mndPreProcessMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index f324d8fed1..a73b9b47f4 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -144,6 +144,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_LAST_MERGE, FUNCTION_TYPE_AVG_PARTIAL, FUNCTION_TYPE_AVG_MERGE, + FUNCTION_TYPE_STDDEV_PARTIAL, + FUNCTION_TYPE_STDDEV_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 76a290364e..e45e5bd160 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -65,6 +65,7 @@ void qDestroyQuery(SQuery* pQueryNode); int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid); +void qCleanupKeywordsTable(); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qResetStmtDataBlock(void* block, bool keepBuf); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 1231fed867..43c88371f9 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -653,6 +653,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_DELETE_WHERE TAOS_DEF_ERROR_CODE(0, 0x2655) #define TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG TAOS_DEF_ERROR_CODE(0, 0x2656) #define TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2657) +#define TSDB_CODE_PAR_INVALID_WINDOW_PC TAOS_DEF_ERROR_CODE(0, 0x2658) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 83b481658e..40a58b4dc5 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -25,6 +25,7 @@ #include "tref.h" #include "trpc.h" #include "version.h" +#include "functionMgt.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -61,6 +62,9 @@ void taos_cleanup(void) { cleanupTaskQueue(); + fmFuncMgtDestroy(); + qCleanupKeywordsTable(); + id = clientConnRefPool; clientConnRefPool = -1; taosCloseRef(id); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index e91f4b8cf4..8cda8fcec3 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -155,7 +155,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -236,7 +235,10 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index ee2df5f089..7cd7da1aa9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -114,7 +114,8 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - if (mndPreprocessQueryMsg(pMgmt->pMnode, pMsg) != 0) { + pMsg->info.node = pMgmt->pMnode; + if (mndPreProcessMsg(pMsg) != 0) { dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); return -1; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 750006d05f..8d3f3e9284 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -374,6 +374,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 16220f101a..cc9bc5b634 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -124,8 +124,6 @@ void mndReleaseRpcRef(SMnode *pMnode); void mndSetRestore(SMnode *pMnode, bool restored); void mndSetStop(SMnode *pMnode); bool mndGetStop(SMnode *pMnode); -int32_t mndAcquireSyncRef(SMnode *pMnode); -void mndReleaseSyncRef(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndSma.h b/source/dnode/mnode/impl/inc/mndSma.h index 63530b403d..48e93c43fb 100644 --- a/source/dnode/mnode/impl/inc/mndSma.h +++ b/source/dnode/mnode/impl/inc/mndSma.h @@ -26,6 +26,8 @@ int32_t mndInitSma(SMnode *pMnode); void mndCleanupSma(SMnode *pMnode); SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); +int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); +int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index cfe363fdf0..43263af573 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -935,6 +935,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER; + if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3b2eafced8..59599ee134 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -393,11 +393,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { return TAOS_SYNC_OTHER_ERROR; } - if (mndAcquireSyncRef(pMnode) != 0) { - mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); - return TAOS_SYNC_OTHER_ERROR; - } - char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); @@ -450,7 +445,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_MND_SET_STANDBY) { + } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { code = syncSetStandby(pMgmt->sync); SRpcMsg rsp = {.code = code, .info = pMsg->info}; tmsgSendRsp(&rsp); @@ -491,7 +486,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_MND_SET_STANDBY) { + } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { code = syncSetStandby(pMgmt->sync); SRpcMsg rsp = {.code = code, .info = pMsg->info}; tmsgSendRsp(&rsp); @@ -501,7 +496,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } } - mndReleaseSyncRef(pMnode); return code; } @@ -754,24 +748,3 @@ void mndSetStop(SMnode *pMnode) { } bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; } - -int32_t mndAcquireSyncRef(SMnode *pMnode) { - int32_t code = 0; - taosThreadRwlockRdlock(&pMnode->lock); - if (pMnode->stopped) { - terrno = TSDB_CODE_APP_NOT_READY; - code = -1; - } else { - int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1); - // mTrace("mnode sync is acquired, ref:%d", ref); - } - taosThreadRwlockUnlock(&pMnode->lock); - return code; -} - -void mndReleaseSyncRef(SMnode *pMnode) { - taosThreadRwlockRdlock(&pMnode->lock); - int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); - // mTrace("mnode sync is released, ref:%d", ref); - taosThreadRwlockUnlock(&pMnode->lock); -} diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index f9749a969d..f6cef945e2 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -55,7 +55,8 @@ int32_t mndInitMnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_MND_SET_STANDBY_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode); @@ -511,7 +512,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode .epSet = dropEpSet, .pCont = pReq, .contLen = contLen, - .msgType = TDMT_MND_SET_STANDBY, + .msgType = TDMT_SYNC_SET_MNODE_STANDBY, .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED, }; diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 12b39e5b78..5374f48e47 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,11 +18,10 @@ #include "mndMnode.h" #include "qworker.h" -int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg) { - if (TDMT_VND_QUERY != pMsg->msgType) { - return 0; - } +int32_t mndPreProcessMsg(SRpcMsg *pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) return 0; + SMnode *pMnode = pMsg->info.node; return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 3f0d20348b..a14eb78ffe 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -38,10 +38,10 @@ static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma); static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups); -static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); -static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); +static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq); +static int32_t mndProcessDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); -static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); +static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); @@ -56,8 +56,8 @@ int32_t mndInitSma(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSmaActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropSmaReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); @@ -79,7 +79,6 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER) @@ -100,6 +99,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER) + if (pSma->exprLen > 0) { SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) } @@ -115,6 +115,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) + terrno = 0; _OVER: @@ -193,6 +194,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { } SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) + terrno = 0; _OVER: @@ -383,6 +385,25 @@ static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, S return 0; } +static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { + SStbObj stbObj = {0}; + taosRLockLatch(&pStb->lock); + memcpy(&stbObj, pStb, sizeof(SStbObj)); + taosRUnLockLatch(&pStb->lock); + stbObj.pColumns = NULL; + stbObj.pTags = NULL; + stbObj.updateTime = taosGetTimestampMs(); + stbObj.lock = 0; + stbObj.smaVer++; + + SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; @@ -457,7 +478,6 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, pSma->schemaTag.pSchema[0].flags = 0; snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId"); - int32_t smaContLen = 0; void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen); if (pSmaReq == NULL) return -1; @@ -560,6 +580,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; + if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; @@ -600,7 +621,7 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { return 0; } -static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq) { +static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SStbObj *pStb = NULL; @@ -782,13 +803,17 @@ static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SD } static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) { - int32_t code = -1; - SVgObj *pVgroup = NULL; - STrans *pTrans = NULL; + int32_t code = -1; + SVgObj *pVgroup = NULL; + SStbObj *pStb = NULL; + STrans *pTrans = NULL; pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVgroup == NULL) goto _OVER; + pStb = mndAcquireStb(pMnode, pSma->stb); + if (pStb == NULL) goto _OVER; + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq); if (pTrans == NULL) goto _OVER; @@ -799,6 +824,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -808,10 +834,78 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p _OVER: mndTransDrop(pTrans); mndReleaseVgroup(pMnode, pVgroup); + mndReleaseStb(pMnode, pStb); return code; } -static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq) { +int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; + + while (1) { + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + if (pSma->stbUid == pStb->uid) { + pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVgroup == NULL) goto _OVER; + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; + mndReleaseVgroup(pMnode, pVgroup); + pVgroup = NULL; + } + + sdbRelease(pSdb, pSma); + } + + code = 0; + +_OVER: + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pSma); + mndReleaseVgroup(pMnode, pVgroup); + return code; +} + +int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; + + while (1) { + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + if (pSma->dbUid == pDb->uid) { + pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVgroup == NULL) goto _OVER; + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + mndReleaseVgroup(pMnode, pVgroup); + pVgroup = NULL; + } + + sdbRelease(pSdb, pSma); + } + + code = 0; + +_OVER: + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pSma); + mndReleaseVgroup(pMnode, pVgroup); + return code; +} + +static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SUserObj *pUser = NULL; @@ -901,13 +995,13 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp } int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) { - int32_t code = 0; - SSmaObj *pSma = NULL; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + int32_t code = 0; + SSmaObj *pSma = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; STableIndexInfo info; - SStbObj* pStb = mndAcquireStb(pMnode, tbFName); + SStbObj *pStb = mndAcquireStb(pMnode, tbFName); if (NULL == pStb) { *exist = false; return TSDB_CODE_SUCCESS; @@ -936,14 +1030,14 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool info.dstTbUid = pSma->dstTbUid; info.dstVgId = pSma->dstVgId; - SVgObj* pVg = mndAcquireVgroup(pMnode, pSma->dstVgId); + SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVg == NULL) { code = -1; sdbRelease(pSdb, pSma); return code; } info.epSet = mndGetVgroupEpset(pMnode, pVg); - + info.expr = taosMemoryMalloc(pSma->exprLen + 1); if (info.expr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -967,7 +1061,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool sdbRelease(pSdb, pSma); } - + return code; } @@ -1019,10 +1113,10 @@ _OVER: static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) { STableIndexReq indexReq = {0}; - SMnode *pMnode = pReq->info.node; - int32_t code = -1; + SMnode *pMnode = pReq->info.node; + int32_t code = -1; STableIndexRsp rsp = {0}; - bool exist = false; + bool exist = false; if (tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -1069,7 +1163,6 @@ _OVER: return code; } - static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6f89c97f83..3e91bfa926 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -23,6 +23,7 @@ #include "mndPerfSchema.h" #include "mndScheduler.h" #include "mndShow.h" +#include "mndSma.h" #include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" @@ -37,9 +38,9 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); -static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq); -static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq); -static int32_t mndProcessMDropStbReq(SRpcMsg *pReq); +static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); +static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); +static int32_t mndProcessDropStbReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); @@ -55,9 +56,9 @@ int32_t mndInitStb(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndStbActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); @@ -319,6 +320,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->tagVer = pNew->tagVer; pOld->colVer = pNew->colVer; + pOld->smaVer = pNew->smaVer; pOld->nextColId = pNew->nextColId; pOld->ttl = pNew->ttl; pOld->numOfColumns = pNew->numOfColumns; @@ -362,7 +364,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { return mndAcquireDb(pMnode, db); } -static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) { +static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *pSchema) { if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) { return -1; } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) { @@ -396,14 +398,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.delay = pStb->delay; if (pStb->ast1Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, - req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, + STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } if (pStb->ast2Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, - req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, + STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } @@ -762,7 +764,7 @@ int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p return 0; } -static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq) { +static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SStbObj *pStb = NULL; @@ -1301,7 +1303,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) { - int ret; + int32_t ret; SEncoder ec = {0}; uint32_t contLen = 0; SMAlterStbRsp alterRsp = {0}; @@ -1420,7 +1422,7 @@ _OVER: return code; } -static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) { +static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SDbObj *pDb = NULL; @@ -1550,6 +1552,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; + if (mndDropSmasByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -1559,7 +1562,7 @@ _OVER: return code; } -static int32_t mndProcessMDropStbReq(SRpcMsg *pReq) { +static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SUserObj *pUser = NULL; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 6b2ee03524..804b4ef5d1 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -322,6 +322,33 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_ return pReq; } +void *mndBuildSetVnodeStandbyReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { + SSetStandbyReq standbyReq = {0}; + standbyReq.dnodeId = pDnode->id; + standbyReq.standby = 1; + + int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + contLen += sizeof(SMsgHead); + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSSetStandbyReq((char *)pReq + sizeof(SMsgHead), contLen, &standbyReq); + SMsgHead *pHead = pReq; + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + *pContLen = contLen; + return pReq; +} + void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { SDropVnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id; @@ -898,6 +925,39 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO return 0; } +static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, + SVnodeGid *pVgid, bool isRedo) { + STransAction action = {0}; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildSetVnodeStandbyReq(pMnode, pDnode, pDb, pVgroup, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_DROP_VNODE; + action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; + + if (isRedo) { + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } else { + if (mndTransAppendUndoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } + + return 0; +} + int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo) { STransAction action = {0}; @@ -952,6 +1012,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVnodeGid del = newVg.vnodeGid[vnIndex]; newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica]; memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid)); + if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del, true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; @@ -1031,6 +1092,7 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid)); memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); + if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1; @@ -1341,12 +1403,14 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, S SVnodeGid del1 = {0}; if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1; + if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; SVnodeGid del2 = {0}; if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1; + if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del2, true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; @@ -1396,6 +1460,7 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj } else if (newVg1.replica == 3) { SVnodeGid del1 = {0}; if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER; + if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 56e6e15c25..cd64bc8a9c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -375,6 +375,10 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { + ret = syncSetStandby(pVnode->sync); + SRpcMsg rsp = {.code = ret, .info = pMsg->info}; + tmsgSendRsp(&rsp); } else { vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); ret = TAOS_SYNC_OTHER_ERROR; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 1bb30934c6..f3060243ed 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -65,9 +65,12 @@ int32_t getAvgInfoSize(); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); +int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stddevInvertFunction(SqlFunctionCtx* pCtx); int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getStddevInfoSize(); bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4dae296f05..833f6db0c3 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -184,6 +184,35 @@ static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t le return TSDB_CODE_SUCCESS; } +static int32_t translateStddevPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getStddevInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateStddevMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + + return TSDB_CODE_SUCCESS; +} + static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; @@ -422,7 +451,8 @@ static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t pValue->notReserved = true; // set result type - pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + pFunc->node.resType = + (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; } else { if (1 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -985,11 +1015,11 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { // first(col_list) will be rewritten as first(col) - if (2 != LIST_LENGTH(pFunc->pParameterList)) { //input has two params c0,ts, is this a bug? + if (2 != LIST_LENGTH(pFunc->pParameterList)) { // input has two params c0,ts, is this a bug? return TSDB_CODE_SUCCESS; } - SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); uint8_t paraType = ((SExprNode*)pPara)->resType.type; int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes; if (isPartial) { @@ -998,8 +1028,8 @@ static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32 "The parameters of first/last can only be columns"); } - pFunc->node.resType = (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, - .type = TSDB_DATA_TYPE_BINARY}; + pFunc->node.resType = + (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; } else { if (TSDB_DATA_TYPE_BINARY != paraType) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -1522,6 +1552,32 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = stddevFinalize, .invertFunc = stddevInvertFunction, .combineFunc = stddevCombine, + .pPartialFunc = "_stddev_partial", + .pMergeFunc = "_stddev_merge" + }, + { + .name = "_stddev_partial", + .type = FUNCTION_TYPE_STDDEV_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateStddevPartial, + .getEnvFunc = getStddevFuncEnv, + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunction, + .finalizeFunc = stddevPartialFinalize, + .invertFunc = stddevInvertFunction, + .combineFunc = stddevCombine, + }, + { + .name = "_stddev_merge", + .type = FUNCTION_TYPE_STDDEV_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateStddevMerge, + .getEnvFunc = getStddevFuncEnv, + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunctionMerge, + .finalizeFunc = stddevFinalize, + .invertFunc = stddevInvertFunction, + .combineFunc = stddevCombine, }, { .name = "leastsquares", @@ -1576,7 +1632,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "percentile", .type = FUNCTION_TYPE_PERCENTILE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_REPEAT_SCAN_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_REPEAT_SCAN_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translatePercentile, .getEnvFunc = getPercentileFuncEnv, .initFunc = percentileFunctionSetup, @@ -1737,7 +1793,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "elapsed", .type = FUNCTION_TYPE_ELAPSED, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .dataRequiredFunc = statisDataRequired, .translateFunc = translateElapsed, .getEnvFunc = getElapsedFuncEnv, @@ -1858,7 +1914,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "twa", .type = FUNCTION_TYPE_TWA, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateInNumOutDou, .getEnvFunc = getTwaFuncEnv, .initFunc = twaFunctionSetup, @@ -1944,7 +2000,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "diff", .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, @@ -1954,7 +2010,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "statecount", .type = FUNCTION_TYPE_STATE_COUNT, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateStateCount, .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, @@ -1964,7 +2020,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "stateduration", .type = FUNCTION_TYPE_STATE_DURATION, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateStateDuration, .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, @@ -1974,7 +2030,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "csum", .type = FUNCTION_TYPE_CSUM, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateCsum, .getEnvFunc = getCsumFuncEnv, .initFunc = functionSetup, @@ -1984,7 +2040,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "mavg", .type = FUNCTION_TYPE_MAVG, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateMavg, .getEnvFunc = getMavgFuncEnv, .initFunc = mavgFunctionSetup, @@ -1994,7 +2050,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "sample", .type = FUNCTION_TYPE_SAMPLE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateSample, .getEnvFunc = getSampleFuncEnv, .initFunc = sampleFunctionSetup, @@ -2004,7 +2060,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "tail", .type = FUNCTION_TYPE_TAIL, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateTail, .getEnvFunc = getTailFuncEnv, .initFunc = tailFunctionSetup, @@ -2014,7 +2070,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "unique", .type = FUNCTION_TYPE_UNIQUE, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateUnique, .getEnvFunc = getUniqueFuncEnv, .initFunc = uniqueFunctionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5ee9dbdee6..c6ab9f859d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -89,6 +89,7 @@ typedef struct SStddevRes { double dsum; int64_t isum; }; + int16_t type; } SStddevRes; typedef struct SLeastSQRInfo { @@ -1488,6 +1489,8 @@ int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return minMaxCombine(pDestCtx, pSourceCtx, 0); } +int32_t getStddevInfoSize() { return (int32_t)sizeof(SStddevRes); } + bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStddevRes); return true; @@ -1511,6 +1514,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pStddevRes->type = type; // computing based on the true data block SColumnInfoData* pCol = pInput->pData[0]; @@ -1627,6 +1631,39 @@ _stddev_over: return TSDB_CODE_SUCCESS; } +static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) { + pOutput->type = pInput->type; + if (IS_INTEGER_TYPE(pOutput->type)) { + pOutput->quadraticISum += pInput->quadraticISum; + pOutput->isum += pInput->isum; + } else { + pOutput->quadraticDSum += pInput->quadraticDSum; + pOutput->dsum += pInput->dsum; + } + + pOutput->count += pInput->count; + + return; +} + +int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data); + + stddevTransferInfo(pInputInfo, pInfo); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + #define LIST_STDDEV_SUB_N(sumT, T) \ do { \ T* plist = (T*)pCol->pData; \ @@ -1692,9 +1729,10 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) { int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; - int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t type = pStddevRes->type; double avg; + if (IS_INTEGER_TYPE(type)) { avg = pStddevRes->isum / ((double)pStddevRes->count); pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg); @@ -1706,6 +1744,24 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getStddevInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index ca066c7056..daeea1ed68 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -230,9 +230,13 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) { SRawExprNode* pRawExpr = (SRawExprNode*)pNode; SNode* pExpr = pRawExpr->pNode; if (nodesIsExprNode(pExpr)) { - int32_t len = TMIN(sizeof(((SExprNode*)pExpr)->aliasName) - 1, pRawExpr->n); - strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, len); - ((SExprNode*)pExpr)->aliasName[len] = '\0'; + if (QUERY_NODE_COLUMN == nodeType(pExpr)) { + strcpy(((SExprNode*)pExpr)->aliasName, ((SColumnNode*)pExpr)->colName); + } else { + int32_t len = TMIN(sizeof(((SExprNode*)pExpr)->aliasName) - 1, pRawExpr->n); + strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, len); + ((SExprNode*)pExpr)->aliasName[len] = '\0'; + } } taosMemoryFreeClear(pNode); return pExpr; @@ -801,7 +805,7 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti ((SDatabaseOptions*)pOptions)->pRetentions = pVal; break; case DB_OPTION_SCHEMALESS: -// ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); + // ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); ((SDatabaseOptions*)pOptions)->schemaless = 1; break; default: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index fcf2f57dec..52abeadac9 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -537,13 +537,15 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, SNode* pNode; FOREACH(pNode, pProjectList) { SExprNode* pExpr = (SExprNode*)pNode; - if (0 == strcmp(pCol->colName, pExpr->aliasName) || - (isPrimaryKey((STempTableNode*)pTable, pNode) && isInternalPrimaryKey(pCol))) { + if (0 == strcmp(pCol->colName, pExpr->aliasName)) { if (*pFound) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AMBIGUOUS_COLUMN, pCol->colName); } setColumnInfoByExpr(pTable, pExpr, pColRef); *pFound = true; + } else if (isPrimaryKey((STempTableNode*)pTable, pNode) && isInternalPrimaryKey(pCol)) { + setColumnInfoByExpr(pTable, pExpr, pColRef); + *pFound = true; } } } @@ -662,9 +664,24 @@ static int32_t parseTimeFromValueNode(STranslateContext* pCxt, SValueNode* pVal) } char* pEnd = NULL; pVal->datum.i = taosStr2Int64(pVal->literal, &pEnd, 10); - return (NULL != pEnd && '\0' == *pEnd) ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; + return (NULL != pEnd && '\0' == *pEnd) ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_WRONG_VALUE_TYPE; } else { - return TSDB_CODE_FAILED; + return TSDB_CODE_PAR_WRONG_VALUE_TYPE; + } +} + +static int32_t parseBoolFromValueNode(STranslateContext* pCxt, SValueNode* pVal) { + if (IS_VAR_DATA_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) { + pVal->datum.b = (0 == strcasecmp(pVal->literal, "true")); + return TSDB_CODE_SUCCESS; + } else if (IS_INTEGER_TYPE(pVal->node.resType.type)) { + pVal->datum.b = (0 != taosStr2Int64(pVal->literal, NULL, 10)); + return TSDB_CODE_SUCCESS; + } else if (IS_FLOAT_TYPE(pVal->node.resType.type)) { + pVal->datum.b = (0 != taosStr2Double(pVal->literal, NULL)); + return TSDB_CODE_SUCCESS; + } else { + return TSDB_CODE_PAR_WRONG_VALUE_TYPE; } } @@ -685,7 +702,9 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD case TSDB_DATA_TYPE_NULL: break; case TSDB_DATA_TYPE_BOOL: - pVal->datum.b = (0 == strcasecmp(pVal->literal, "true")); + if (TSDB_CODE_SUCCESS != parseBoolFromValueNode(pCxt, pVal)) { + return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); + } *(bool*)&pVal->typeData = pVal->datum.b; break; case TSDB_DATA_TYPE_TINYINT: { @@ -1068,6 +1087,16 @@ static int32_t translateForbidFillFunc(STranslateContext* pCxt, SFunctionNode* p return TSDB_CODE_SUCCESS; } +static int32_t translateWindowPseudoColumnFunc(STranslateContext* pCxt, SFunctionNode* pFunc) { + if (!fmIsWindowPseudoColumnFunc(pFunc->funcId)) { + return TSDB_CODE_SUCCESS; + } + if (NULL == pCxt->pCurrSelectStmt->pWindow) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WINDOW_PC); + } + return TSDB_CODE_SUCCESS; +} + static void setFuncClassification(SSelectStmt* pSelect, SFunctionNode* pFunc) { if (NULL != pSelect) { pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId); @@ -1097,6 +1126,9 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) if (TSDB_CODE_SUCCESS == pCxt->errCode) { pCxt->errCode = translateForbidFillFunc(pCxt, pFunc); } + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pCxt->errCode = translateWindowPseudoColumnFunc(pCxt, pFunc); + } if (TSDB_CODE_SUCCESS == pCxt->errCode) { setFuncClassification(pCxt->pCurrSelectStmt, pFunc); } @@ -3106,7 +3138,11 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS pStmt->ignoreNotExists); } -static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterReq) { +static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, SMAlterStbReq* pAlterReq) { + SName tableName; + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pAlterReq->name); + pAlterReq->alterType = pStmt->alterType; + if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { pAlterReq->ttl = pStmt->pOptions->ttl; if ('\0' != pStmt->pOptions->comment[0]) { @@ -3154,15 +3190,45 @@ static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterR return TSDB_CODE_SUCCESS; } -static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) { - SMAlterStbReq alterReq = {0}; - SName tableName; - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), alterReq.name); - alterReq.alterType = pStmt->alterType; +static SSchema* getColSchema(STableMeta* pTableMeta, const char* pTagName) { + int32_t numOfFields = getNumOfTags(pTableMeta) + getNumOfColumns(pTableMeta); + for (int32_t i = 0; i < numOfFields; ++i) { + SSchema* pTagSchema = pTableMeta->schema + i; + if (0 == strcmp(pTagName, pTagSchema->name)) { + return pTagSchema; + } + } + return NULL; +} + +static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) { if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE); } - int32_t code = setAlterTableField(pStmt, &alterReq); + if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType || + TSDB_ALTER_TABLE_UPDATE_TAG_BYTES == pStmt->alterType) { + STableMeta* pTableMeta = NULL; + int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); + if (TSDB_CODE_SUCCESS == code) { + SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName); + if (NULL == pSchema) { + code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName); + } else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type || + pSchema->bytes >= calcTypeBytes(pStmt->dataType)) { + code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL); + } + } + return code; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) { + SMAlterStbReq alterReq = {0}; + int32_t code = checkAlterSuperTable(pCxt, pStmt); + if (TSDB_CODE_SUCCESS == code) { + code = buildAlterSuperTableReq(pCxt, pStmt, &alterReq); + } if (TSDB_CODE_SUCCESS == code) { code = buildCmdMsg(pCxt, TDMT_MND_ALTER_STB, (FSerializeFunc)tSerializeSMAlterStbReq, &alterReq); } @@ -3835,7 +3901,7 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { code = translateDropSuperTable(pCxt, (SDropSuperTableStmt*)pNode); break; case QUERY_NODE_ALTER_TABLE_STMT: - code = translateAlterTable(pCxt, (SAlterTableStmt*)pNode); + code = translateAlterSuperTable(pCxt, (SAlterTableStmt*)pNode); break; case QUERY_NODE_CREATE_USER_STMT: code = translateCreateUser(pCxt, (SCreateUserStmt*)pNode); @@ -4445,10 +4511,35 @@ static int32_t translateTagVal(STranslateContext* pCxt, uint8_t precision, SSche ? pCxt->errCode : TSDB_CODE_SUCCESS); } else { + // return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)pNode)->aliasName); return TSDB_CODE_FAILED; } } +static int32_t buildJsonTagVal(STranslateContext* pCxt, SSchema* pTagSchema, SValueNode* pVal, SArray* pTagArray, + STag** ppTag) { + if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { + return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal); + } + + return parseJsontoTagData(pVal->literal, pTagArray, ppTag, &pCxt->msgBuf); +} + +static int32_t buildNormalTagVal(STranslateContext* pCxt, SSchema* pTagSchema, SValueNode* pVal, SArray* pTagArray) { + if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) { + void* nodeVal = nodesGetValueFromNode(pVal); + STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; + if (IS_VAR_DATA_TYPE(pTagSchema->type)) { + val.pData = varDataVal(nodeVal); + val.nData = varDataLen(nodeVal); + } else { + memcpy(&val.i64, nodeVal, pTagSchema->bytes); + } + taosArrayPush(pTagArray, &val); + } + return TSDB_CODE_SUCCESS; +} + static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta, STag** ppTag) { int32_t numOfTags = getNumOfTags(pSuperTableMeta); @@ -4458,11 +4549,10 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla } SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal)); - if (!pTagArray) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY); + if (NULL == pTagArray) { + return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = TSDB_CODE_SUCCESS; - int16_t nTags = 0, nBufPos = 0; SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta); SNode * pTag = NULL, *pNode = NULL; bool isJson = false; @@ -4490,27 +4580,12 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla } else { REPLACE_LIST2_NODE(pVal); } - if (pTagSchema->type == TSDB_DATA_TYPE_JSON) { - if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { - code = buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal); - goto end; - } + if (pSchema->type == TSDB_DATA_TYPE_JSON) { isJson = true; - code = parseJsontoTagData(pVal->literal, pTagArray, ppTag, &pCxt->msgBuf); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + code = buildJsonTagVal(pCxt, pSchema, pVal, pTagArray, ppTag); } else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) { - void* nodeVal = nodesGetValueFromNode(pVal); - STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; - if (IS_VAR_DATA_TYPE(pTagSchema->type)) { - val.pData = varDataVal(nodeVal); - val.nData = varDataLen(nodeVal); - } else { - memcpy(&val.i64, nodeVal, pTagSchema->bytes); - } - taosArrayPush(pTagArray, &val); + code = buildNormalTagVal(pCxt, pSchema, pVal, pTagArray); } } @@ -4825,17 +4900,6 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { return rewriteToVnodeModifyOpStmt(pQuery, pBufArray); } -static SSchema* getColSchema(STableMeta* pTableMeta, const char* pTagName) { - int32_t numOfFields = getNumOfTags(pTableMeta) + getNumOfColumns(pTableMeta); - for (int32_t i = 0; i < numOfFields; ++i) { - SSchema* pTagSchema = pTableMeta->schema + i; - if (0 == strcmp(pTagName, pTagSchema->name)) { - return pTagSchema; - } - } - return NULL; -} - static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, SVAlterTbReq* pReq) { SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName); @@ -4853,6 +4917,10 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS return pCxt->errCode; } + if (IS_VAR_DATA_TYPE(pSchema->type) && strlen(pStmt->pVal->literal) > pSchema->bytes) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pStmt->pVal->literal); + } + pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) { if (pStmt->pVal->literal && diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index c3ff4c63a4..42e887bb9d 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -186,6 +186,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes"; case TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC: return "%s function not allowed in fill query"; + case TSDB_CODE_PAR_INVALID_WINDOW_PC: + return "_WSTARTTS, _WENDTS and _WDURATION can only be used in window queries"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 7c330158fa..538404798d 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -207,6 +207,8 @@ int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid) { return TSDB_CODE_FAILED; } +void qCleanupKeywordsTable() { taosCleanupKeywordsTable(); } + int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index 3729a2e1dc..b0c6181264 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -141,16 +141,18 @@ void generateTestT1(MockCatalogService* mcs) { * c2 | column | VARCHAR | 20 | * tag1 | tag | INT | 4 | * tag2 | tag | VARCHAR | 20 | + * tag3 | tag | TIMESTAMP | 8 | * Child Table: st1s1, st1s2 */ void generateTestST1(MockCatalogService* mcs) { - ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 2) + ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 3) .setPrecision(TSDB_TIME_PRECISION_MILLI) .addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP) .addColumn("c1", TSDB_DATA_TYPE_INT) .addColumn("c2", TSDB_DATA_TYPE_BINARY, 20) .addTag("tag1", TSDB_DATA_TYPE_INT) - .addTag("tag2", TSDB_DATA_TYPE_BINARY, 20); + .addTag("tag2", TSDB_DATA_TYPE_BINARY, 20) + .addTag("tag3", TSDB_DATA_TYPE_TIMESTAMP); builder.done(); mcs->createSubTable("test", "st1", "st1s1", 1); mcs->createSubTable("test", "st1", "st1s2", 2); @@ -189,17 +191,17 @@ void generateFunctions(MockCatalogService* mcs) { int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; } -int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, SRequestConnInfo *pConn, const SName* pTableName, +int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta) { return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta); } -int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, SRequestConnInfo *pConn, - const SName* pTableName, SVgroupInfo* vgInfo) { +int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, + SVgroupInfo* vgInfo) { return g_mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo); } -int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, +int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pVgList) { return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList); } @@ -209,28 +211,26 @@ int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* ve return 0; } -int32_t __catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* dbFName, - SArray** pVgList) { +int32_t __catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SArray** pVgList) { return g_mockCatalogService->catalogGetDBVgInfo(dbFName, pVgList); } -int32_t __catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo *pConn, const char* dbFName, SDbCfgInfo* pDbCfg) { +int32_t __catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* pDbCfg) { return 0; } -int32_t __catalogChkAuth(SCatalog* pCtg, SRequestConnInfo *pConn, const char* user, const char* dbFName, - AUTH_TYPE type, bool* pass) { +int32_t __catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, + bool* pass) { *pass = true; return 0; } -int32_t __catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName, - SFuncInfo* pInfo) { +int32_t __catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo) { return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo); } -int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, SRequestConnInfo *pConn, - const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { +int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, + STableMeta** pTableMeta, int32_t isSTable) { return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta); } diff --git a/source/libs/parser/test/parInitialATest.cpp b/source/libs/parser/test/parInitialATest.cpp index 3c1d931f37..cc23fbbc60 100644 --- a/source/libs/parser/test/parInitialATest.cpp +++ b/source/libs/parser/test/parInitialATest.cpp @@ -163,9 +163,9 @@ TEST_F(ParserInitialATest, alterSTable) { run("ALTER TABLE st1 DROP COLUMN c1"); clearAlterStbReq(); - setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c1", TSDB_DATA_TYPE_VARCHAR, - 20 + VARSTR_HEADER_SIZE); - run("ALTER TABLE st1 MODIFY COLUMN c1 VARCHAR(20)"); + setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c2", TSDB_DATA_TYPE_VARCHAR, + 30 + VARSTR_HEADER_SIZE); + run("ALTER TABLE st1 MODIFY COLUMN c2 VARCHAR(30)"); clearAlterStbReq(); // setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1"); @@ -179,9 +179,9 @@ TEST_F(ParserInitialATest, alterSTable) { run("ALTER TABLE st1 DROP TAG tag1"); clearAlterStbReq(); - setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, 1, "tag1", TSDB_DATA_TYPE_VARCHAR, - 20 + VARSTR_HEADER_SIZE); - run("ALTER TABLE st1 MODIFY TAG tag1 VARCHAR(20)"); + setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, 1, "tag2", TSDB_DATA_TYPE_VARCHAR, + 30 + VARSTR_HEADER_SIZE); + run("ALTER TABLE st1 MODIFY TAG tag2 VARCHAR(30)"); clearAlterStbReq(); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_NAME, 2, "tag1", 0, 0, "tag11"); @@ -196,6 +196,10 @@ TEST_F(ParserInitialATest, alterSTableSemanticCheck) { useDb("root", "test"); run("ALTER TABLE st1 RENAME COLUMN c1 cc1", TSDB_CODE_PAR_INVALID_ALTER_TABLE); + + run("ALTER TABLE st1 MODIFY COLUMN c2 NCHAR(10)", TSDB_CODE_PAR_INVALID_MODIFY_COL); + + run("ALTER TABLE st1 MODIFY TAG tag2 NCHAR(10)", TSDB_CODE_PAR_INVALID_MODIFY_COL); } TEST_F(ParserInitialATest, alterTable) { @@ -336,6 +340,8 @@ TEST_F(ParserInitialATest, alterTableSemanticCheck) { useDb("root", "test"); run("ALTER TABLE st1s1 RENAME COLUMN c1 cc1", TSDB_CODE_PAR_INVALID_ALTER_TABLE); + + run("ALTER TABLE st1s1 SET TAG tag2 = '123456789012345678901'", TSDB_CODE_PAR_WRONG_VALUE_TYPE); } TEST_F(ParserInitialATest, alterUser) { diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 919d4c041d..6cdc1e115e 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -548,12 +548,14 @@ TEST_F(ParserInitialCTest, createTable) { "a14 NCHAR(30), a15 VARCHAR(50)) " "TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) FILE_FACTOR 0.1"); - run("CREATE TABLE IF NOT EXISTS t1 USING st1 TAGS(1, 'wxy')"); + run("CREATE TABLE IF NOT EXISTS t1 USING st1 TAGS(1, 'wxy', NOW)"); run("CREATE TABLE " "IF NOT EXISTS test.t1 USING test.st1 (tag1, tag2) TAGS(1, 'abc') " "IF NOT EXISTS test.t2 USING test.st1 (tag1, tag2) TAGS(2, 'abc') " "IF NOT EXISTS test.t3 USING test.st1 (tag1, tag2) TAGS(3, 'abc') "); + + // run("CREATE TABLE IF NOT EXISTS t1 USING st1 TAGS(1, 'wxy', NOW + 1S)"); } TEST_F(ParserInitialCTest, createTopic) { diff --git a/source/libs/parser/test/parInsertTest.cpp b/source/libs/parser/test/parInsertTest.cpp index 29edca0d40..3ebf9a417b 100644 --- a/source/libs/parser/test/parInsertTest.cpp +++ b/source/libs/parser/test/parInsertTest.cpp @@ -245,8 +245,8 @@ TEST_F(InsertTest, autoCreateTableTest) { setDatabase("root", "test"); bind( - "insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, " - "\"guangzhou\")"); + "insert into st1s1 using st1 tags(1, 'wxy', now) " + "values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); ASSERT_EQ(run(), TSDB_CODE_SUCCESS); dumpReslut(); checkReslut(1, 3); @@ -257,8 +257,8 @@ TEST_F(InsertTest, autoCreateTableTest) { ASSERT_EQ(run(), TSDB_CODE_SUCCESS); bind( - "insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, " - "\"guangzhou\")"); + "insert into st1s1 using st1 tags(1, 'wxy', now) " + "values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); bind( diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 51d302fe12..c87520e262 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -219,6 +219,7 @@ TEST_F(ParserSelectTest, intervalSemanticCheck) { run("SELECT HISTOGRAM(c1, 'log_bin', '{\"start\": -33,\"factor\": 55,\"count\": 5,\"infinity\": false}', 1) FROM t1 " "WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' INTERVAL(10s) FILL(NULL)", TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC); + run("SELECT _WSTARTTS, _WENDTS, _WDURATION, sum(c1) FROM t1", TSDB_CODE_PAR_INVALID_WINDOW_PC); } TEST_F(ParserSelectTest, subquery) { @@ -231,13 +232,22 @@ TEST_F(ParserSelectTest, subquery) { run("SELECT SUM(a) FROM (SELECT MAX(c1) a, ts FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)"); run("SELECT SUM(a) FROM (SELECT MAX(c1) a, _wstartts FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)"); + + run("SELECT _C0 FROM (SELECT _ROWTS, ts FROM st1s1)"); + + run("SELECT ts FROM (SELECT t1.ts FROM st1s1 t1)"); } TEST_F(ParserSelectTest, subquerySemanticCheck) { useDb("root", "test"); - run("SELECT SUM(a) FROM (SELECT MAX(c1) a FROM st1s1 INTERVAL(1m)) INTERVAL(1n)", TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY, - PARSER_STAGE_TRANSLATE); + run("SELECT SUM(a) FROM (SELECT MAX(c1) a FROM st1s1 INTERVAL(1m)) INTERVAL(1n)", + TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY); + + run("SELECT ts FROM (SELECT t1.ts AS ts, t2.ts FROM st1s1 t1, st1s2 t2 WHERE t1.ts = t2.ts)", + TSDB_CODE_PAR_AMBIGUOUS_COLUMN); + + run("SELECT ts FROM (SELECT ts AS c1 FROM st1s1 t1)", TSDB_CODE_PAR_INVALID_COLUMN); } TEST_F(ParserSelectTest, semanticCheck) { diff --git a/source/libs/parser/test/parTestUtil.cpp b/source/libs/parser/test/parTestUtil.cpp index 13ab116552..663f456cb8 100644 --- a/source/libs/parser/test/parTestUtil.cpp +++ b/source/libs/parser/test/parTestUtil.cpp @@ -62,7 +62,7 @@ int32_t getLogLevel() { return g_logLevel; } class ParserTestBaseImpl { public: - ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase) {} + ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase), sqlNo_(0) {} void login(const std::string& user) { caseEnv_.user_ = user; } @@ -73,6 +73,7 @@ class ParserTestBaseImpl { } void run(const string& sql, int32_t expect, ParserStage checkStage) { + ++sqlNo_; if (caseEnv_.nsql_ > 0) { --(caseEnv_.nsql_); return; @@ -174,7 +175,7 @@ class ParserTestBaseImpl { } void dump() { - cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl; + cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl; if (!res_.parsedAst_.empty()) { cout << "raw syntax tree : " << endl; cout << res_.parsedAst_ << endl; @@ -425,6 +426,7 @@ class ParserTestBaseImpl { stmtEnv stmtEnv_; stmtRes res_; ParserTestBase* pBase_; + int32_t sqlNo_; }; ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl(this)) {} diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 2fc79255af..9e4967dd25 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -416,6 +416,8 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO pMergeAgg->node.pTargets = NULL; SNodeList* pChildren = pMergeAgg->node.pChildren; pMergeAgg->node.pChildren = NULL; + SNode* pConditions = pMergeAgg->node.pConditions; + pMergeAgg->node.pConditions = NULL; int32_t code = TSDB_CODE_SUCCESS; SAggLogicNode* pPartAgg = (SAggLogicNode*)nodesCloneNode((SNode*)pMergeAgg); @@ -434,6 +436,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO } } if (TSDB_CODE_SUCCESS == code) { + pMergeAgg->node.pConditions = pConditions; pMergeAgg->node.pTargets = pTargets; pPartAgg->node.pChildren = pChildren; diff --git a/source/libs/planner/test/planGroupByTest.cpp b/source/libs/planner/test/planGroupByTest.cpp index 78d0f7b21f..9d937d7c91 100644 --- a/source/libs/planner/test/planGroupByTest.cpp +++ b/source/libs/planner/test/planGroupByTest.cpp @@ -82,4 +82,6 @@ TEST_F(PlanGroupByTest, stable) { run("SELECT COUNT(*) FROM st1"); run("SELECT COUNT(*) FROM st1 GROUP BY c1"); + + run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL"); } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index db8a055362..206b0656f9 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -2,20 +2,20 @@ #======================b1-start=============== # ---- user -#./test.sh -f tsim/user/basic1.sim -#./test.sh -f tsim/user/pass_alter.sim -#./test.sh -f tsim/user/pass_len.sim -#./test.sh -f tsim/user/user_len.sim -#./test.sh -f tsim/user/privilege1.sim -#./test.sh -f tsim/user/privilege2.sim# +./test.sh -f tsim/user/basic1.sim +./test.sh -f tsim/user/pass_alter.sim +./test.sh -f tsim/user/pass_len.sim +./test.sh -f tsim/user/user_len.sim +./test.sh -f tsim/user/privilege1.sim +./test.sh -f tsim/user/privilege2.sim ## ---- db -#./test.sh -f tsim/db/create_all_options.sim -#./test.sh -f tsim/db/alter_option.sim -#./test.sh -f tsim/db/basic1.sim -#./test.sh -f tsim/db/basic2.sim -#./test.sh -f tsim/db/basic3.sim -#./test.sh -f tsim/db/basic6.sim +./test.sh -f tsim/db/create_all_options.sim +./test.sh -f tsim/db/alter_option.sim +./test.sh -f tsim/db/basic1.sim +./test.sh -f tsim/db/basic2.sim +./test.sh -f tsim/db/basic3.sim +./test.sh -f tsim/db/basic6.sim ./test.sh -f tsim/db/basic7.sim ./test.sh -f tsim/db/error1.sim ./test.sh -f tsim/db/taosdlog.sim @@ -78,6 +78,7 @@ # ./test.sh -f tsim/stream/state0.sim # ./test.sh -f tsim/stream/triggerInterval0.sim # ./test.sh -f tsim/stream/triggerSession0.sim +# ./test.sh -f tsim/stream/partitionby.sim # ---- transaction diff --git a/tests/script/tsim/stream/partitionby.sim b/tests/script/tsim/stream/partitionby.sim new file mode 100644 index 0000000000..df1e096551 --- /dev/null +++ b/tests/script/tsim/stream/partitionby.sim @@ -0,0 +1,68 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream stream_t1 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st partition by ta,tb,tc interval(10s); + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); +$loop_count = 0 + +loop0: +sleep 300 +sql select * from streamtST1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 4 then +print =====rows=$rows +goto loop0 +endi + + +sql create database test1 vgroups 1; +sql use test1; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,2,3); +sql create table ts2 using st tags(1,3,4); +sql create table ts3 using st tags(1,4,5); + +sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, count(a) c2 from st partition by ta,tb,tc interval(10s); + + +sql insert into ts1 values(1648791211000,1,2,3); + +sql insert into ts2 values(1648791211000,1,2,3); + +$loop_count = 0 + +loop0: +sleep 300 +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then +print =====rows=$rows +goto loop0 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/system-test/fulltest.bat b/tests/system-test/fulltest.bat index db8c8299eb..59ddd3fb7d 100644 --- a/tests/system-test/fulltest.bat +++ b/tests/system-test/fulltest.bat @@ -6,7 +6,7 @@ python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udf_create.py -@REM python3 .\test.py -f 0-others\udf_restart_taosd.py +python3 .\test.py -f 0-others\udf_restart_taosd.py python3 .\test.py -f 0-others\cachelast.py python3 .\test.py -f 0-others\user_control.py @@ -38,7 +38,7 @@ python3 .\test.py -f 2-query\concat_ws.py python3 .\test.py -f 2-query\concat_ws2.py python3 .\test.py -f 2-query\check_tsdb.py python3 .\test.py -f 2-query\spread.py -@REM python3 .\test.py -f 2-query\hyperloglog.py +python3 .\test.py -f 2-query\hyperloglog.py python3 .\test.py -f 2-query\timezone.py python3 .\test.py -f 2-query\Now.py @@ -82,7 +82,7 @@ python3 .\test.py -f 2-query\elapsed.py python3 .\test.py -f 2-query\csum.py python3 .\test.py -f 2-query\mavg.py python3 .\test.py -f 2-query\diff.py -@REM python3 .\test.py -f 2-query\sample.py +python3 .\test.py -f 2-query\sample.py python3 .\test.py -f 2-query\function_diff.py python3 .\test.py -f 2-query\unique.py python3 .\test.py -f 2-query\stateduration.py @@ -91,7 +91,7 @@ python3 .\test.py -f 2-query\statecount.py python3 .\test.py -f 7-tmq\basic5.py python3 .\test.py -f 7-tmq\subscribeDb.py -@REM python3 .\test.py -f 7-tmq\subscribeDb0.py +python3 .\test.py -f 7-tmq\subscribeDb0.py python3 .\test.py -f 7-tmq\subscribeDb1.py python3 .\test.py -f 7-tmq\subscribeStb.py python3 .\test.py -f 7-tmq\subscribeStb0.py