From 335de5359412f87517ac64fb0a9a6ddc33b502f4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 29 Dec 2021 17:53:43 +0800 Subject: [PATCH] add mndconsumer --- include/common/tmsg.h | 90 ++++- include/common/tmsgdef.h | 6 +- include/dnode/mnode/sdb/sdb.h | 14 +- source/client/src/clientImpl.c | 20 +- source/client/test/clientTests.cpp | 2 + source/dnode/mnode/impl/inc/mndConsumer.h | 38 +++ source/dnode/mnode/impl/inc/mndDef.h | 26 ++ source/dnode/mnode/impl/src/mndConsumer.c | 373 +++++++++++++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 385 +--------------------- 9 files changed, 563 insertions(+), 391 deletions(-) create mode 100644 source/dnode/mnode/impl/inc/mndConsumer.h create mode 100644 source/dnode/mnode/impl/src/mndConsumer.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c644bfda9e..60444cac45 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -318,6 +318,18 @@ typedef struct SEpSet { char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SEpSet; +static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { + if(buf == NULL) return sizeof(SEpSet); + memcpy(buf, pEp, sizeof(SEpSet)); + //TODO: endian conversion + return sizeof(SEpSet); +} + +static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEpSet) { + memcpy(pEpSet, buf, sizeof(SEpSet)); + return buf; +} + typedef struct { int32_t acctId; int64_t clusterId; @@ -1079,24 +1091,92 @@ typedef struct STaskDropRsp { typedef struct { int8_t igExists; char* name; - char* phyPlan; + char* physicalPlan; + char* logicalPlan; } SCMCreateTopicReq; static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { int tlen = 0; tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedI8(buf, pReq->igExists); - tlen += taosEncodeString(buf, pReq->phyPlan); + tlen += taosEncodeString(buf, pReq->physicalPlan); + tlen += taosEncodeString(buf, pReq->logicalPlan); return tlen; } static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { - buf = taosDecodeFixedI8(buf, &(pReq->igExists)); - buf = taosDecodeString(buf, &(pReq->name)); - buf = taosDecodeString(buf, &(pReq->phyPlan)); + buf = taosDecodeFixedI8(buf, &pReq->igExists); + buf = taosDecodeString(buf, &pReq->name); + buf = taosDecodeString(buf, &pReq->physicalPlan); + buf = taosDecodeString(buf, &pReq->logicalPlan); return buf; } +typedef struct { + int64_t topicId; +} SCMCreateTopicRsp; + +static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) { + int tlen = 0; + tlen += taosEncodeFixedI64(buf, pRsp->topicId); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) { + buf = taosDecodeFixedI64(buf, &pRsp->topicId); + return buf; +} + +typedef struct { + char* topicName; + char* consumerGroup; + int64_t consumerId; +} SCMSubscribeReq; + +static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { + int tlen = 0; + tlen += taosEncodeString(buf, pReq->topicName); + tlen += taosEncodeString(buf, pReq->consumerGroup); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { + buf = taosDecodeString(buf, &pReq->topicName); + buf = taosDecodeString(buf, &pReq->consumerGroup); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); + return buf; +} + +typedef struct { + int32_t vgId; + SEpSet pEpSet; +} SCMSubscribeRsp; + +static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pRsp->vgId); + tlen += taosEncodeSEpSet(buf, &pRsp->pEpSet); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { + buf = taosDecodeFixedI32(buf, &pRsp->vgId); + buf = taosDecodeSEpSet(buf, &pRsp->pEpSet); + return buf; +} + +typedef struct { + int64_t topicId; + int64_t consumerId; + int64_t consumerGroupId; + int64_t offset; +} SMVSubscribeReq; + +typedef struct { + int64_t newOffset; +} SMVSubscribeRsp; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9aa4325d58..659be7b96b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -116,9 +116,10 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SCMCreateTopicReq, SCMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) @@ -146,6 +147,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) @@ -156,4 +158,4 @@ enum { #if defined(TD_MSG_NUMBER_) TDMT_MAX #endif -}; \ No newline at end of file +}; diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 0f648b5150..037b75c0ba 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -161,12 +161,14 @@ typedef enum { SDB_USER = 5, SDB_AUTH = 6, SDB_ACCT = 7, - SDB_TOPIC = 8, - SDB_VGROUP = 9, - SDB_STB = 10, - SDB_DB = 11, - SDB_FUNC = 12, - SDB_MAX = 13 + SDB_CONSUMER = 8, + SDB_CGROUP = 9, + SDB_TOPIC = 10, + SDB_VGROUP = 11, + SDB_STB = 12, + SDB_DB = 13, + SDB_FUNC = 14, + SDB_MAX = 15 } ESdbType; typedef struct SSdb SSdb; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b40c718d51..c051249d38 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -240,6 +240,9 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + +//temporary disabled until planner ready +#if 0 CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); //TODO: check sql valid @@ -249,15 +252,24 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq if(dagStr == NULL) { //TODO } +#endif SCMCreateTopicReq req = { .name = (char*)name, .igExists = 0, - .phyPlan = dagStr, + /*.physicalPlan = dagStr,*/ + .physicalPlan = (char*)sql, + .logicalPlan = "", }; - void* buf = NULL; - int tlen = tSerializeSCMCreateTopicReq(&buf, &req); + int tlen = tSerializeSCMCreateTopicReq(NULL, &req); + void* buf = malloc(tlen); + if(buf == NULL) { + goto _return; + } + void* abuf = buf; + tSerializeSCMCreateTopicReq(&abuf, &req); + /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; @@ -269,8 +281,6 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq tsem_wait(&pRequest->body.rspSem); - destroySendMsgInfo(body); - _return: qDestroyQuery(pQuery); qDestroyQueryDag(pDag); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index de494cb031..a1307e0314 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -49,6 +49,7 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } +#if 0 TEST(testCase, connect_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -398,6 +399,7 @@ TEST(testCase, drop_stable_Test) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, create_topic_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h new file mode 100644 index 0000000000..60f186d7d2 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_CONSUMER_H_ +#define _TD_MND_CONSUMER_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitConsumer(SMnode *pMnode); +void mndCleanupConsumer(SMnode *pMnode); + +SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); +void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer); + +SCGroupObj *mndAcquireCGroup(SMnode *pMnode, char *consumerGroup); +void mndReleaseCGroup(SMnode *pMnode, SCGroupObj *pCGroup); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_CONSUMER_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ea0fe46302..cb2fe5cc83 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -305,8 +305,34 @@ typedef struct { void* executor; int32_t sqlLen; char* sql; + char* logicalPlan; + char* physicalPlan; } STopicObj; +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + uint64_t uid; + //uint64_t dbUid; + int32_t version; + SRWLatch lock; + +} SConsumerObj; + +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + uint64_t uid; + //uint64_t dbUid; + int32_t version; + SRWLatch lock; + +} SCGroupObj; + typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c new file mode 100644 index 0000000000..9e7cdbf09e --- /dev/null +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -0,0 +1,373 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndConsumer.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndTopic.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tname.h" + +#define MND_CONSUMER_VER_NUMBER 1 +#define MND_CONSUMER_RESERVE_SIZE 64 + +static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer); +static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); +static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer); +static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer); +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pConsumer, SConsumerObj *pNewConsumer); +static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); + +int32_t mndInitConsumer(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_CONSUMER, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndConsumerActionEncode, + .decodeFp = (SdbDecodeFp)mndConsumerActionDecode, + .insertFp = (SdbInsertFp)mndConsumerActionInsert, + .updateFp = (SdbUpdateFp)mndConsumerActionUpdate, + .deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); + mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp); + mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq); + mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupConsumer(SMnode *pMnode) {} + +static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) { + int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); + SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); + SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime); + SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime); + SDB_SET_INT64(pRaw, dataPos, pConsumer->uid); + /*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ + SDB_SET_INT32(pRaw, dataPos, pConsumer->version); + + SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE); + SDB_SET_DATALEN(pRaw, dataPos); + + return pRaw; +} + +static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != MND_CONSUMER_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode consumer since %s", terrstr()); + return NULL; + } + + int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); + SSdbRow *pRow = sdbAllocRow(size); + SConsumerObj *pConsumer = sdbGetRowObj(pRow); + if (pConsumer == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); + SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid); + /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ + SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version); + + SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE); + + return pRow; +} + +static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) { + mTrace("consumer:%s, perform insert action", pConsumer->name); + return 0; +} + +static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) { + mTrace("consumer:%s, perform delete action", pConsumer->name); + return 0; +} + +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) { + mTrace("consumer:%s, perform update action", pOldConsumer->name); + atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime); + atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version); + + taosWLockLatch(&pOldConsumer->lock); + + // TODO handle update + + taosWUnLockLatch(&pOldConsumer->lock); + return 0; +} + +SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { + SSdb *pSdb = pMnode->pSdb; + SConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); + if (pConsumer == NULL) { + /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + } + return pConsumer; +} + +void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pConsumer); +} + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMSubscribeReq *pSubscribe; + tDeserializeSCMSubscribeReq(msgStr, pSubscribe); + // add consumerGroupId -> list to sdb + // add consumerId -> list to sdb + // add consumer -> list to sdb + return 0; +} + +static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg) { + mndTransProcessRsp(pMsg); + return 0; +} + +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; + + mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname); + +#if 0 + SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname); + if (pConsumer == NULL) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_MND_INVALID_CONSUMER; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + taosRLockLatch(&pConsumer->lock); + int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags; + int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema); + + STableMetaMsg *pMeta = rpcMallocCont(contLen); + if (pMeta == NULL) { + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN); + pMeta->numOfTags = htonl(pConsumer->numOfTags); + pMeta->numOfColumns = htonl(pConsumer->numOfColumns); + pMeta->precision = pDb->cfg.precision; + pMeta->tableType = TSDB_SUPER_TABLE; + pMeta->update = pDb->cfg.update; + pMeta->sversion = htonl(pConsumer->version); + pMeta->tuid = htonl(pConsumer->uid); + + for (int32_t i = 0; i < totalCols; ++i) { + SSchema *pSchema = &pMeta->pSchema[i]; + SSchema *pSrcSchema = &pConsumer->pSchema[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + + pMsg->pCont = pMeta; + pMsg->contLen = contLen; + + mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); +#endif + return 0; +} + +static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) { + SSdb *pSdb = pMnode->pSdb; + + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfConsumers = 0; + void *pIter = NULL; + while (1) { + SConsumerObj *pConsumer = NULL; + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) break; + + if (strcmp(pConsumer->db, dbName) == 0) { + numOfConsumers++; + } + + sdbRelease(pSdb, pConsumer); + } + + *pNumOfConsumers = numOfConsumers; + return 0; +} + +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) { + return -1; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchema; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tags"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htonl(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbFname, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SConsumerObj *pConsumer = NULL; + int32_t cols = 0; + char *pWrite; + char prefix[64] = {0}; + + tstrncpy(prefix, pShow->db, 64); + strcat(prefix, TS_PATH_DELIMITER); + int32_t prefixLen = (int32_t)strlen(prefix); + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); + if (pShow->pIter == NULL) break; + + if (strncmp(pConsumer->name, prefix, prefixLen) != 0) { + sdbRelease(pSdb, pConsumer); + continue; + } + + cols = 0; + + char consumerName[TSDB_TABLE_NAME_LEN] = {0}; + tstrncpy(consumerName, pConsumer->name + prefixLen, TSDB_TABLE_NAME_LEN); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, consumerName); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pConsumer->createTime; + cols++; + + /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ + /**(int32_t *)pWrite = pConsumer->numOfColumns;*/ + /*cols++;*/ + + /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ + /**(int32_t *)pWrite = pConsumer->numOfTags;*/ + /*cols++;*/ + + numOfRows++; + sdbRelease(pSdb, pConsumer); + } + + pShow->numOfReads += numOfRows; + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + return numOfRows; +} + +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 597142d326..49c96967e6 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -33,10 +33,7 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pTopic, STopicObj *pNewTopic); static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg); -static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); @@ -53,19 +50,8 @@ int32_t mndInitTopic(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTopicActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicMsg); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_TOPIC, mndProcessAlterTopicMsg); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicMsg); - mndSetMsgHandle(pMnode, TDMT_VND_CREATE_TOPIC_RSP, mndProcessCreateTopicInRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ALTER_TOPIC_RSP, mndProcessAlterTopicInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); - mndSetMsgHandle(pMnode, TDMT_VND_TABLE_META, mndProcessTopicMetaMsg); - - /*mndAddShowMetaHandle(pMnode, TSDB_MGMT_TOPIC, mndGetTopicMeta);*/ - /*mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TOPIC, mndRetrieveTopic);*/ - /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TOPIC, mndCancelGetNextTopic);*/ - - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicMsg); - mndSetMsgHandle(pMnode, TDMT_VND_CREATE_TOPIC_RSP, mndProcessCreateTopicInRsp); return sdbSetTable(pMnode->pSdb, table); } @@ -145,24 +131,9 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj atomic_exchange_32(&pOldTopic->version, pNewTopic->version); taosWLockLatch(&pOldTopic->lock); -#if 0 + + //TODO handle update - pOldTopic->numOfColumns = pNewTopic->numOfColumns; - pOldTopic->numOfTags = pNewTopic->numOfTags; - int32_t totalCols = pNewTopic->numOfTags + pNewTopic->numOfColumns; - int32_t totalSize = totalCols * sizeof(SSchema); - - if (pOldTopic->numOfTags + pOldTopic->numOfColumns < totalCols) { - void *pSchema = malloc(totalSize); - if (pSchema != NULL) { - free(pOldTopic->pSchema); - pOldTopic->pSchema = pSchema; - } - } - - memcpy(pOldTopic->pSchema, pNewTopic->pSchema, totalSize); - -#endif taosWUnLockLatch(&pOldTopic->lock); return 0; } @@ -191,41 +162,6 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { return mndAcquireDb(pMnode, db); } -static SCreateTopicInternalMsg *mndBuildCreateTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { - int32_t totalCols = 0; - int32_t contLen = sizeof(SCreateTopicInternalMsg) + pTopic->execLen + pTopic->sqlLen; - - SCreateTopicInternalMsg *pCreate = calloc(1, contLen); - if (pCreate == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pCreate->head.contLen = htonl(contLen); - pCreate->head.vgId = htonl(pVgroup->vgId); - memcpy(pCreate->name, pTopic->name, TSDB_TABLE_FNAME_LEN); - pCreate->tuid = htobe64(pTopic->uid); - pCreate->sverson = htonl(pTopic->version); - - pCreate->sql = malloc(pTopic->sqlLen); - if (pCreate->sql == NULL) { - free(pCreate); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - memcpy(pCreate->sql, pTopic->sql, pTopic->sqlLen); - - pCreate->executor = malloc(pTopic->execLen); - if (pCreate->executor == NULL) { - free(pCreate); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - memcpy(pCreate->executor, pTopic->executor, pTopic->execLen); - - return pCreate; -} - static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { int32_t contLen = sizeof(SDropTopicInternalMsg); @@ -243,109 +179,12 @@ static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgro return pDrop; } -static int32_t mndCheckCreateTopicMsg(SCreateTopicMsg *pCreate) { +static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) { // deserialize and other stuff return 0; } -static int32_t mndSetCreateTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; - - return 0; -} - -static int32_t mndSetCreateTopicUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdbRaw *pUndoRaw = mndTopicActionEncode(pTopic); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; - - return 0; -} - -static int32_t mndSetCreateTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; - - return 0; -} - -static int32_t mndSetCreateTopicRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; - - SCreateTopicInternalMsg *pMsg = mndBuildCreateTopicMsg(pMnode, pVgroup, pTopic); - if (pMsg == NULL) { - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = pMsg; - action.contLen = htonl(pMsg->head.contLen); - action.msgType = TDMT_VND_CREATE_TOPIC; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - return -1; - } - sdbRelease(pSdb, pVgroup); - } - - return 0; -} - -static int32_t mndSetCreateTopicUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; - - SDropTopicInternalMsg *pMsg = mndBuildDropTopicMsg(pMnode, pVgroup, pTopic); - if (pMsg == NULL) { - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = pMsg; - action.contLen = sizeof(SDropTopicInternalMsg); - action.msgType = TDMT_VND_DROP_TOPIC; - if (mndTransAppendUndoAction(pTrans, &action) != 0) { - free(pMsg); - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - return -1; - } - sdbRelease(pSdb, pVgroup); - } - - return 0; -} - -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { STopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -355,66 +194,17 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg * topicObj.dbUid = pDb->uid; topicObj.version = 1; -#if 0 - int32_t totalCols = topicObj.numOfColumns + topicObj.numOfTags; - int32_t totalSize = totalCols * sizeof(SSchema); - topicObj.sql = malloc(totalSize); - if (topicObj.sql == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - memcpy(topicObj.sql, pCreate->sql, totalSize); -#endif - - int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } - mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name); - - if (mndSetCreateTopicRedoLogs(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicUndoLogs(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicCommitLogs(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicRedoActions(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicUndoActions(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - code = 0; - -CREATE_TOPIC_OVER: - mndTransDrop(pTrans); - return code; + SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); + if (pTopicRaw == NULL) return -1; + if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; + return sdbWrite(pMnode->pSdb, pTopicRaw); } static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SCreateTopicMsg *pCreate = pMsg->rpcMsg.pCont; + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMCreateTopicReq* pCreate; + tDeserializeSCMCreateTopicReq(msgStr, pCreate); mDebug("topic:%s, start to create", pCreate->name); @@ -436,15 +226,6 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { } } - // topic should have different name with stb - SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); - if (pStb != NULL) { - sdbRelease(pMnode->pSdb, pStb); - terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_STB; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } - SDbObj *pDb = mndAcquireDbByTopic(pMnode, pCreate->name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -464,144 +245,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndCheckAlterTopicMsg(SAlterTopicMsg *pAlter) { - SSchema *pSchema = &pAlter->schema; - pSchema->colId = htonl(pSchema->colId); - pSchema->bytes = htonl(pSchema->bytes); - - if (pSchema->type <= 0) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - if (pSchema->bytes <= 0) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - if (pSchema->name[0] == 0) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - - return 0; -} - -static int32_t mndUpdateTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pOldTopic, STopicObj *pNewTopic) { return 0; } - -static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SAlterTopicMsg *pAlter = pMsg->rpcMsg.pCont; - - mDebug("topic:%s, start to alter", pAlter->name); - - if (mndCheckAlterTopicMsg(pAlter) != 0) { - mError("topic:%s, failed to alter since %s", pAlter->name, terrstr()); - return -1; - } - - STopicObj *pTopic = mndAcquireTopic(pMnode, pAlter->name); - if (pTopic == NULL) { - terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("topic:%s, failed to alter since %s", pAlter->name, terrstr()); - return -1; - } - - STopicObj topicObj = {0}; - memcpy(&topicObj, pTopic, sizeof(STopicObj)); - - int32_t code = mndUpdateTopic(pMnode, pMsg, pTopic, &topicObj); - mndReleaseTopic(pMnode, pTopic); - - if (code != 0) { - mError("topic:%s, failed to alter since %s", pAlter->name, tstrerror(code)); - return code; - } - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -} - -static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); - return 0; -} - -static int32_t mndSetDropTopicRedoLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { - SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; - - return 0; -} - -static int32_t mndSetDropTopicUndoLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { - SSdbRaw *pUndoRaw = mndTopicActionEncode(pTopic); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; - - return 0; -} - -static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { - SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; - - return 0; -} - -static int32_t mndSetDropTopicRedoActions(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { return 0; } - -static int32_t mndSetDropTopicUndoActions(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { return 0; } - static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - return -1; - } - mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); - - if (mndSetDropTopicRedoLogs(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicUndoLogs(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicRedoActions(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicUndoActions(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - code = 0; - -DROP_TOPIC_OVER: - mndTransDrop(pTrans); return 0; } @@ -705,11 +349,6 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); - return 0; -} - static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { SSdb *pSdb = pMnode->pSdb;