diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 66a77839d4..4136585241 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1056,6 +1056,43 @@ typedef struct STaskDropRsp { int32_t code; } STaskDropRsp; +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + int8_t igExists; + int32_t execLen; + void* executor; + int32_t sqlLen; + char* sql; +} SCreateTopicMsg; + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + int8_t igNotExists; +} SDropTopicMsg; + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + int8_t alterType; + SSchema schema; +} SAlterTopicMsg; + +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + uint64_t tuid; + int32_t sverson; + int32_t execLen; + char* executor; + int32_t sqlLen; + char* sql; +} SCreateTopicInternalMsg; + +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + uint64_t tuid; +} SDropTopicInternalMsg; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tname.h b/include/common/tname.h index de9e309b55..3ac7f8b27b 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -16,18 +16,22 @@ #ifndef TDENGINE_TNAME_H #define TDENGINE_TNAME_H +#include "tdef.h" + #define TSDB_DB_NAME_T 1 #define TSDB_TABLE_NAME_T 2 #define T_NAME_ACCT 0x1u #define T_NAME_DB 0x2u #define T_NAME_TABLE 0x4u +#define T_NAME_TOPIC 0x8u typedef struct SName { uint8_t type; //db_name_t, table_name_t int32_t acctId; char dbname[TSDB_DB_NAME_LEN]; char tname[TSDB_TABLE_NAME_LEN]; + char topicName[TSDB_TOPIC_NAME_LEN]; } SName; int32_t tNameExtractFullName(const SName* name, char* dst); diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 51d7ca7202..0f648b5150 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -16,6 +16,8 @@ #ifndef _TD_SDB_H_ #define _TD_SDB_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif @@ -159,11 +161,12 @@ typedef enum { SDB_USER = 5, SDB_AUTH = 6, SDB_ACCT = 7, - SDB_VGROUP = 8, - SDB_STB = 9, - SDB_DB = 10, - SDB_FUNC = 11, - SDB_MAX = 12 + SDB_TOPIC = 8, + SDB_VGROUP = 9, + SDB_STB = 10, + SDB_DB = 11, + SDB_FUNC = 12, + SDB_MAX = 13 } ESdbType; typedef struct SSdb SSdb; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4362950844..b6e8964b98 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -220,6 +220,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA) #define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB) #define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC) +#define TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AD) // mnode-func #define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C0) @@ -234,6 +235,15 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) +// mnode-topic +#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) +#define TSDB_CODE_MND_TOPIC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E1) +#define TSDB_CODE_MND_TOO_MANY_TOPICS TAOS_DEF_ERROR_CODE(0, 0x03E2) +#define TSDB_CODE_MND_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E3) +#define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x03E4) +#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5) +#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6) + // dnode #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401) diff --git a/include/util/tdef.h b/include/util/tdef.h index f3f3643268..64a169b4f1 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -163,6 +163,7 @@ do { \ #define TSDB_NODE_NAME_LEN 64 #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string +#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_DB_NAME_LEN 65 #define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN) @@ -175,6 +176,7 @@ do { \ #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) +#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 05fb28e256..351fc20784 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -829,6 +829,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); if (pRsp != NULL) { + pRsp->ahandle = pMsg->ahandle; rpcSendResponse(pRsp); free(pRsp); } else { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fe01daa1a7..927b1e2e16 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -285,6 +285,21 @@ typedef struct { char payload[]; } SShowObj; +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; + int64_t createTime; + int64_t updateTime; + uint64_t uid; + uint64_t dbUid; + int32_t version; + SRWLatch lock; + int32_t execLen; + void* executor; + int32_t sqlLen; + char* sql; +} STopicObj; + typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; char db[TSDB_FULL_DB_NAME_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index 58cae73c7f..48847dc6a3 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -25,6 +25,9 @@ extern "C" { int32_t mndInitStb(SMnode *pMnode); void mndCleanupStb(SMnode *pMnode); +SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); +void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h new file mode 100644 index 0000000000..62d213b8a2 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_TOPIC_H_ +#define _TD_MND_TOPIC_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitTopic(SMnode *pMnode); +void mndCleanupTopic(SMnode *pMnode); + +STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName); +void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_TOPIC_H_*/ diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 306ebce599..1ed11cf834 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -541,6 +541,15 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) { } static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pOldDb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1; + + return 0; +} + +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; @@ -549,24 +558,57 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO return 0; } -static int32_t mndSetUpdateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { - SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; +static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + STransAction action = {0}; + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SAlterVnodeMsg *pMsg = (SAlterVnodeMsg *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) return -1; + + action.pCont = pMsg; + action.contLen = sizeof(SAlterVnodeMsg); + action.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + } return 0; } -static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } +static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; -static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; -static int32_t mndSetUpdateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + if (pVgroup->dbUid == pNewDb->uid) { + if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNewDb, pVgroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + } + + sdbRelease(pSdb, pVgroup); + } + + return 0; +} static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); return terrno; @@ -579,11 +621,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO goto UPDATE_DB_OVER; } - if (mndSetUpdateDbUndoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto UPDATE_DB_OVER; - } - if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto UPDATE_DB_OVER; @@ -594,11 +631,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO goto UPDATE_DB_OVER; } - if (mndSetUpdateDbUndoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto UPDATE_DB_OVER; - } - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); goto UPDATE_DB_OVER; @@ -660,31 +692,87 @@ static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) return 0; } -static int32_t mndSetDropDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { - SSdbRaw *pUndoRaw = mndDbActionEncode(pDb); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; - - return 0; -} - static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdbRaw *pCommitRaw = mndDbActionEncode(pDb); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); + if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED); + } + + sdbRelease(pSdb, pVgroup); + } + return 0; } -static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } +static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + STransAction action = {0}; + SVnodeGid * pVgid = pVgroup->vnodeGid + vn; -static int32_t mndSetDropDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) return -1; + + action.pCont = pMsg; + action.contLen = sizeof(SCreateVnodeMsg); + action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + } + + return 0; +} + +static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + if (mndBuildDropVgroupAction(pMnode, pTrans, pDb, pVgroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + } + + sdbRelease(pSdb, pVgroup); + } + + return 0; +} static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to drop since %s", pDb->name, terrstr()); return -1; @@ -697,11 +785,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { goto DROP_DB_OVER; } - if (mndSetDropDbUndoLogs(pMnode, pTrans, pDb) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } - if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto DROP_DB_OVER; @@ -712,11 +795,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { goto DROP_DB_OVER; } - if (mndSetDropDbUndoActions(pMnode, pTrans, pDb) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); goto DROP_DB_OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 5aec40766b..8b62c2f02d 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -484,6 +484,15 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { } } + //topic should have different name with stb + SStbObj *pTopic = mndAcquireStb(pMnode, pCreate->name); + if (pTopic != NULL) { + sdbRelease(pMnode->pSdb, pTopic); + terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC; + mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + SDbObj *pDb = mndAcquireDbByStb(pMnode, pCreate->name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -891,4 +900,4 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c new file mode 100644 index 0000000000..653c3c65fb --- /dev/null +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -0,0 +1,857 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndStb.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tname.h" + +#define TSDB_TOPIC_VER_NUMBER 1 +#define TSDB_TOPIC_RESERVE_SIZE 64 + +static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic); +static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); +static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic); +static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic); +static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pTopic, STopicObj *pNewTopic); +static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); +static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); +static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); +static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); + +int32_t mndInitTopic(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_TOPIC, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndTopicActionEncode, + .decodeFp = (SdbDecodeFp)mndTopicActionDecode, + .insertFp = (SdbInsertFp)mndTopicActionInsert, + .updateFp = (SdbUpdateFp)mndTopicActionUpdate, + .deleteFp = (SdbDeleteFp)mndTopicActionDelete}; + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_TOPIC, mndProcessCreateTopicMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_TOPIC, mndProcessAlterTopicMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_TOPIC, mndProcessDropTopicMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_TOPIC_IN_RSP, mndProcessCreateTopicInRsp); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_TOPIC_IN_RSP, mndProcessAlterTopicInRsp); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_TOPIC_IN_RSP, mndProcessDropTopicInRsp); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_TABLE_META, mndProcessTopicMetaMsg); + + /*mndAddShowMetaHandle(pMnode, TSDB_MGMT_TOPIC, mndGetTopicMeta);*/ + /*mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TOPIC, mndRetrieveTopic);*/ + /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TOPIC, mndCancelGetNextTopic);*/ + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_TOPIC, mndProcessCreateTopicMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_TOPIC_IN_RSP, mndProcessCreateTopicInRsp); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupTopic(SMnode *pMnode) {} + +static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { + int32_t size = sizeof(STopicObj) + TSDB_TOPIC_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, TSDB_TOPIC_VER_NUMBER, size); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN); + SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_FULL_DB_NAME_LEN); + SDB_SET_INT64(pRaw, dataPos, pTopic->createTime); + SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime); + SDB_SET_INT64(pRaw, dataPos, pTopic->uid); + SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid); + SDB_SET_INT32(pRaw, dataPos, pTopic->version); + SDB_SET_INT32(pRaw, dataPos, pTopic->execLen); + SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen); + SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen); + SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen); + + SDB_SET_RESERVE(pRaw, dataPos, TSDB_TOPIC_RESERVE_SIZE); + SDB_SET_DATALEN(pRaw, dataPos); + + return pRaw; +} + +static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != TSDB_TOPIC_VER_NUMBER) { + mError("failed to decode stable since %s", terrstr()); + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + return NULL; + } + + int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); + SSdbRow *pRow = sdbAllocRow(size); + STopicObj *pTopic = sdbGetRowObj(pRow); + if (pTopic == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN); + SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_FULL_DB_NAME_LEN); + SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->createTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->updateTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid); + SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->dbUid); + SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->version); + SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->execLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->executor, pTopic->execLen); + SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->sqlLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->sql, pTopic->sqlLen); + + SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TOPIC_RESERVE_SIZE); + + return pRow; +} + +static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic) { + mTrace("topic:%s, perform insert action", pTopic->name); + return 0; +} + +static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic) { + mTrace("topic:%s, perform delete action", pTopic->name); + return 0; +} + +static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj *pNewTopic) { + mTrace("topic:%s, perform update action", pOldTopic->name); + atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime); + atomic_exchange_32(&pOldTopic->version, pNewTopic->version); + + taosWLockLatch(&pOldTopic->lock); +#if 0 + + pOldTopic->numOfColumns = pNewTopic->numOfColumns; + pOldTopic->numOfTags = pNewTopic->numOfTags; + int32_t totalCols = pNewTopic->numOfTags + pNewTopic->numOfColumns; + int32_t totalSize = totalCols * sizeof(SSchema); + + if (pOldTopic->numOfTags + pOldTopic->numOfColumns < totalCols) { + void *pSchema = malloc(totalSize); + if (pSchema != NULL) { + free(pOldTopic->pSchema); + pOldTopic->pSchema = pSchema; + } + } + + memcpy(pOldTopic->pSchema, pNewTopic->pSchema, totalSize); + +#endif + taosWUnLockLatch(&pOldTopic->lock); + return 0; +} + +STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { + SSdb *pSdb = pMnode->pSdb; + STopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); + if (pTopic == NULL) { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + } + return pTopic; +} + +void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pTopic); +} + +static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { + SName name = {0}; + tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TOPIC); + + char db[TSDB_TABLE_FNAME_LEN] = {0}; + tNameGetFullDbName(&name, db); + + return mndAcquireDb(pMnode, db); +} + +static SCreateTopicInternalMsg *mndBuildCreateTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { + int32_t totalCols = 0; + int32_t contLen = sizeof(SCreateTopicInternalMsg) + pTopic->execLen + pTopic->sqlLen; + + SCreateTopicInternalMsg *pCreate = calloc(1, contLen); + if (pCreate == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCreate->head.contLen = htonl(contLen); + pCreate->head.vgId = htonl(pVgroup->vgId); + memcpy(pCreate->name, pTopic->name, TSDB_TABLE_FNAME_LEN); + pCreate->tuid = htobe64(pTopic->uid); + pCreate->sverson = htonl(pTopic->version); + + pCreate->sql = malloc(pTopic->sqlLen); + if(pCreate->sql == NULL) { + free(pCreate); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + memcpy(pCreate->sql, pTopic->sql, pTopic->sqlLen); + + pCreate->executor = malloc(pTopic->execLen); + if(pCreate->executor == NULL) { + free(pCreate); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + memcpy(pCreate->executor, pTopic->executor, pTopic->execLen); + + return pCreate; +} + +static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { + int32_t contLen = sizeof(SDropTopicInternalMsg); + + SDropTopicInternalMsg *pDrop = calloc(1, contLen); + if (pDrop == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pDrop->head.contLen = htonl(contLen); + pDrop->head.vgId = htonl(pVgroup->vgId); + memcpy(pDrop->name, pTopic->name, TSDB_TABLE_FNAME_LEN); + pDrop->tuid = htobe64(pTopic->uid); + + return pDrop; +} + +static int32_t mndCheckCreateTopicMsg(SCreateTopicMsg *pCreate) { + //deserialize and other stuff + return 0; +} + +static int32_t mndSetCreateTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { + SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateTopicUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { + SSdbRaw *pUndoRaw = mndTopicActionEncode(pTopic); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { + SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateTopicRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) continue; + + SCreateTopicInternalMsg *pMsg = mndBuildCreateTopicMsg(pMnode, pVgroup, pTopic); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pMsg; + action.contLen = htonl(pMsg->head.contLen); + action.msgType = TSDB_MSG_TYPE_CREATE_TOPIC_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndSetCreateTopicUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) continue; + + SDropTopicInternalMsg *pMsg = mndBuildDropTopicMsg(pMnode, pVgroup, pTopic); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pMsg; + action.contLen = sizeof(SDropTopicInternalMsg); + action.msgType = TSDB_MSG_TYPE_DROP_TOPIC_IN; + if (mndTransAppendUndoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg *pCreate, SDbObj *pDb) { + STopicObj topicObj = {0}; + tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); + tstrncpy(topicObj.db, pDb->name, TSDB_FULL_DB_NAME_LEN); + topicObj.createTime = taosGetTimestampMs(); + topicObj.updateTime = topicObj.createTime; + topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + topicObj.dbUid = pDb->uid; + topicObj.version = 1; + +#if 0 + int32_t totalCols = topicObj.numOfColumns + topicObj.numOfTags; + int32_t totalSize = totalCols * sizeof(SSchema); + topicObj.sql = malloc(totalSize); + if (topicObj.sql == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(topicObj.sql, pCreate->sql, totalSize); +#endif + + int32_t code = 0; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + if (pTrans == NULL) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name); + + if (mndSetCreateTopicRedoLogs(pMnode, pTrans, pDb, &topicObj) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto CREATE_TOPIC_OVER; + } + + if (mndSetCreateTopicUndoLogs(pMnode, pTrans, pDb, &topicObj) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto CREATE_TOPIC_OVER; + } + + if (mndSetCreateTopicCommitLogs(pMnode, pTrans, pDb, &topicObj) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto CREATE_TOPIC_OVER; + } + + if (mndSetCreateTopicRedoActions(pMnode, pTrans, pDb, &topicObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto CREATE_TOPIC_OVER; + } + + if (mndSetCreateTopicUndoActions(pMnode, pTrans, pDb, &topicObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto CREATE_TOPIC_OVER; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + code = 0; + +CREATE_TOPIC_OVER: + mndTransDrop(pTrans); + return code; +} + +static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SCreateTopicMsg *pCreate = pMsg->rpcMsg.pCont; + + mDebug("topic:%s, start to create", pCreate->name); + + if (mndCheckCreateTopicMsg(pCreate) != 0) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + + STopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); + if (pTopic != NULL) { + sdbRelease(pMnode->pSdb, pTopic); + if (pCreate->igExists) { + mDebug("topic:%s, already exist, ignore exist is set", pCreate->name); + return 0; + } else { + terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; + mError("db:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + } + + //topic should have different name with stb + SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); + if (pStb != NULL) { + sdbRelease(pMnode->pSdb, pStb); + terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_STB; + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + + SDbObj *pDb = mndAcquireDbByTopic(pMnode, pCreate->name); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + + int32_t code = mndCreateTopic(pMnode, pMsg, pCreate, pDb); + mndReleaseDb(pMnode, pDb); + + if (code != 0) { + terrno = code; + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndCheckAlterTopicMsg(SAlterTopicMsg *pAlter) { + SSchema *pSchema = &pAlter->schema; + pSchema->colId = htonl(pSchema->colId); + pSchema->bytes = htonl(pSchema->bytes); + + if (pSchema->type <= 0) { + terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; + return -1; + } + if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) { + terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; + return -1; + } + if (pSchema->bytes <= 0) { + terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; + return -1; + } + if (pSchema->name[0] == 0) { + terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; + return -1; + } + + return 0; +} + +static int32_t mndUpdateTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pOldTopic, STopicObj *pNewTopic) { return 0; } + +static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SAlterTopicMsg *pAlter = pMsg->rpcMsg.pCont; + + mDebug("topic:%s, start to alter", pAlter->name); + + if (mndCheckAlterTopicMsg(pAlter) != 0) { + mError("topic:%s, failed to alter since %s", pAlter->name, terrstr()); + return -1; + } + + STopicObj *pTopic = mndAcquireTopic(pMnode, pAlter->name); + if (pTopic == NULL) { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + mError("topic:%s, failed to alter since %s", pAlter->name, terrstr()); + return -1; + } + + STopicObj topicObj = {0}; + memcpy(&topicObj, pTopic, sizeof(STopicObj)); + + int32_t code = mndUpdateTopic(pMnode, pMsg, pTopic, &topicObj); + mndReleaseTopic(pMnode, pTopic); + + if (code != 0) { + mError("topic:%s, failed to alter since %s", pAlter->name, tstrerror(code)); + return code; + } + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndSetDropTopicRedoLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { + SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropTopicUndoLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { + SSdbRaw *pUndoRaw = mndTopicActionEncode(pTopic); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { + SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropTopicRedoActions(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { return 0; } + +static int32_t mndSetDropTopicUndoActions(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { return 0; } + +static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + if (pTrans == NULL) { + mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); + return -1; + } + mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); + + if (mndSetDropTopicRedoLogs(pMnode, pTrans, pTopic) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto DROP_TOPIC_OVER; + } + + if (mndSetDropTopicUndoLogs(pMnode, pTrans, pTopic) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto DROP_TOPIC_OVER; + } + + if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto DROP_TOPIC_OVER; + } + + if (mndSetDropTopicRedoActions(pMnode, pTrans, pTopic) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_TOPIC_OVER; + } + + if (mndSetDropTopicUndoActions(pMnode, pTrans, pTopic) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_TOPIC_OVER; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + goto DROP_TOPIC_OVER; + } + + code = 0; + +DROP_TOPIC_OVER: + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SDropTopicMsg *pDrop = pMsg->rpcMsg.pCont; + + mDebug("topic:%s, start to drop", pDrop->name); + + STopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); + if (pTopic == NULL) { + if (pDrop->igNotExists) { + mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name); + return 0; + } else { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); + return -1; + } + } + + int32_t code = mndDropTopic(pMnode, pMsg, pTopic); + mndReleaseTopic(pMnode, pTopic); + + if (code != 0) { + terrno = code; + mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); + return -1; + } + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; + + mDebug("topic:%s, start to retrieve meta", pInfo->tableFname); + +#if 0 + SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname); + if (pTopic == NULL) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_MND_INVALID_TOPIC; + mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + taosRLockLatch(&pTopic->lock); + int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags; + int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema); + + STableMetaMsg *pMeta = rpcMallocCont(contLen); + if (pMeta == NULL) { + taosRUnLockLatch(&pTopic->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN); + pMeta->numOfTags = htonl(pTopic->numOfTags); + pMeta->numOfColumns = htonl(pTopic->numOfColumns); + pMeta->precision = pDb->cfg.precision; + pMeta->tableType = TSDB_SUPER_TABLE; + pMeta->update = pDb->cfg.update; + pMeta->sversion = htonl(pTopic->version); + pMeta->tuid = htonl(pTopic->uid); + + for (int32_t i = 0; i < totalCols; ++i) { + SSchema *pSchema = &pMeta->pSchema[i]; + SSchema *pSrcSchema = &pTopic->pSchema[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + taosRUnLockLatch(&pTopic->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseTopic(pMnode, pTopic); + + pMsg->pCont = pMeta; + pMsg->contLen = contLen; + + mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags); +#endif + return 0; +} + +static int32_t mndProcessCreateTopicInRsp(SMnodeMsg* pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { + SSdb *pSdb = pMnode->pSdb; + + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfTopics = 0; + void *pIter = NULL; + while (1) { + STopicObj *pTopic = NULL; + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (strcmp(pTopic->db, dbName) == 0) { + numOfTopics++; + } + + sdbRelease(pSdb, pTopic); + } + + *pNumOfTopics = numOfTopics; + return 0; +} + +static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) { + return -1; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchema; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tags"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htonl(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbFname, mndShowStr(pShow->type)); + + return 0; +} + +static void mndExtractTableName(char *tableId, char *name) { + int pos = -1; + int num = 0; + for (pos = 0; tableId[pos] != 0; ++pos) { + if (tableId[pos] == '.') num++; + if (num == 2) break; + } + + if (num == 2) { + strcpy(name, tableId + pos + 1); + } +} + +static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + STopicObj *pTopic = NULL; + int32_t cols = 0; + char *pWrite; + char prefix[64] = {0}; + + tstrncpy(prefix, pShow->db, 64); + strcat(prefix, TS_PATH_DELIMITER); + int32_t prefixLen = (int32_t)strlen(prefix); + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); + if (pShow->pIter == NULL) break; + + if (strncmp(pTopic->name, prefix, prefixLen) != 0) { + sdbRelease(pSdb, pTopic); + continue; + } + + cols = 0; + + char topicName[TSDB_TABLE_NAME_LEN] = {0}; + tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TABLE_NAME_LEN); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, topicName); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pTopic->createTime; + cols++; + + /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ + /**(int32_t *)pWrite = pTopic->numOfColumns;*/ + /*cols++;*/ + + /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ + /**(int32_t *)pWrite = pTopic->numOfTags;*/ + /*cols++;*/ + + numOfRows++; + sdbRelease(pSdb, pTopic); + } + + pShow->numOfReads += numOfRows; + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + return numOfRows; +} + +static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index b6781c5b21..1c0a9ad071 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -696,7 +696,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgReceived && pAction->errCode == 0) continue; + if (pAction->msgSent && !pAction->msgReceived) continue; + if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; int64_t signature = pTrans->id; signature = (signature << 32); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 463bb0ddb6..8ff2139314 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -337,8 +337,16 @@ static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 91572399ad..0199b0c583 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -27,6 +27,7 @@ #include "mndStb.h" #include "mndSync.h" #include "mndTelem.h" +#include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -132,6 +133,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (pMnode->clusterId <= 0) { if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1; @@ -227,7 +229,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { } return 0; -} +} SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { mDebug("start to open mnode in %s", path); @@ -442,4 +444,4 @@ uint64_t mndGenerateUid(char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); uint64_t x = (us & 0x000000FFFFFFFFFF) << 24; return x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 1c8b82fd92..0c311b86b6 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -199,6 +199,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { SSdbRow *pRow = *ppRow; switch (pRow->status) { case SDB_STATUS_READY: + case SDB_STATUS_UPDATING: atomic_add_fetch_32(&pRow->refCount, 1); pRet = pRow->pObj; break; diff --git a/tests/script/general/db/basic1.sim b/tests/script/general/db/basic1.sim index dc62c33842..666a9dfd9e 100644 --- a/tests/script/general/db/basic1.sim +++ b/tests/script/general/db/basic1.sim @@ -1,7 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start -sleep 2000 sql connect print =============== create database diff --git a/tests/script/general/table/basic1.sim b/tests/script/general/table/basic1.sim index c26c3c33e4..ded8d79a3f 100644 --- a/tests/script/general/table/basic1.sim +++ b/tests/script/general/table/basic1.sim @@ -1,7 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start -sleep 2000 sql connect print =============== create database @@ -13,29 +12,24 @@ endi print $data00 $data01 $data02 -print =============== create normal table -sql create table d1.n1 (ts timestamp, i int) -sql show d1.tables -if $rows != 1 then - return -1 -endi - -print $data00 $data01 $data02 +sql use d1 print =============== create super table -sql create table d1.st (ts timestamp, i int) tags (j int) -sql show d1.stables +sql create table st (ts timestamp, i int) tags (j int) +sql show stables if $rows != 1 then return -1 endi print $data00 $data01 $data02 +return + print =============== create child table -sql create table d1.c1 using d1.st tags(1) -sql create table d1.c2 using d1.st tags(2) -sql show d1.tables -if $rows != 3 then +sql create table c1 using st tags(1) +sql create table c2 using st tags(2) +sql show tables +if $rows != 2 then return -1 endi @@ -44,12 +38,12 @@ print $data10 $data11 $data22 print $data20 $data11 $data22 print =============== insert data -sql insert into d1.n1 values(now+1s, 1) -sql insert into d1.n1 values(now+2s, 2) -sql insert into d1.n1 values(now+3s, 3) +sql insert into c1 values(now+1s, 1) +sql insert into c1 values(now+2s, 2) +sql insert into c1 values(now+3s, 3) print =============== query data -sql select * from d1.n1 +sql select * from c1 if $rows != 3 then return -1 endi