diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9e036d7b25..d9a78029e2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -468,7 +468,7 @@ typedef struct { char* ast; char* physicalPlan; SSchemaWrapper schema; - int32_t refConsumerCnt; + // int32_t refConsumerCnt; } SMqTopicObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7cebeb35f5..c3eaeb73b2 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -414,6 +414,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto SUBSCRIBE_OVER; } +#if 0 // ref topic to prevent drop // TODO make topic complete SMqTopicObj topicObj = {0}; @@ -422,6 +423,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup, topicObj.refConsumerCnt); if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER; +#endif mndReleaseTopic(pMnode, pTopic); } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 95d3383ee1..99ac9c729c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1044,9 +1044,9 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; - /*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ - /*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ - /*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ + if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER; + if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER; + if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser); diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 01516d03f2..6cbaca3c07 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -21,6 +21,7 @@ #include "mndMnode.h" #include "mndShow.h" #include "mndStb.h" +#include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -188,7 +189,15 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { bool create = false; SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); if (pOffsetObj == NULL) { + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName); + if (pTopic == NULL) { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr()); + continue; + } pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj)); + pOffsetObj->dbUid = pTopic->dbUid; + mndReleaseTopic(pMnode, pTopic); memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN); create = true; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 58b51e4c54..d1404b96fe 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -286,7 +286,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); bool hasExtraSink = false; - if (totLevel == 2) { + if (totLevel == 2 || strcmp(pStream->sourceDb, pStream->targetDb) != 0) { SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); // add extra sink @@ -407,7 +407,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; - SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb); + SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { sdbRelease(pSdb, pDb); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cbef1facdc..13071b5c53 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -393,6 +393,15 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq streamObj.trigger = pCreate->triggerType; streamObj.waterMark = pCreate->watermark; + if (streamObj.targetSTbName[0]) { + pDb = mndAcquireDbByStb(pMnode, streamObj.targetSTbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN); + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, pReq); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3f3f4f5b5d..1a8323e1a8 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -157,6 +157,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM int32_t vgId = pRebVg->pVgEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); if (pVgObj == NULL) { + ASSERT(0); taosMemoryFree(buf); return -1; } @@ -451,6 +452,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu taosArrayPush(pConsumerNew->rebNewTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { + ASSERT(0); goto REB_FAIL; } } @@ -469,9 +471,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { + ASSERT(0); goto REB_FAIL; } } +#if 0 if (consumerNum) { char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; @@ -486,9 +490,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu pTopic->refConsumerCnt = topicObj.refConsumerCnt; mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup, topicObj.refConsumerCnt); - if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL; + if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) { + ASSERT(0); + goto REB_FAIL; + } } } +#endif // 4. TODO commit log: modification log @@ -496,7 +504,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu mndTransSetCb(pTrans, MQ_REB_TRANS_START_FUNC, MQ_REB_TRANS_STOP_FUNC, NULL, 0); // 6. execution - if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL; + if (mndTransPrepare(pMnode, pTrans) != 0) { + ASSERT(0); + goto REB_FAIL; + } mndTransDrop(pTrans); return 0; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 2048c79847..720233d625 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -15,6 +15,7 @@ #include "mndTopic.h" #include "mndAuth.h" +#include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -121,7 +122,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); } - SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER); + /*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/ SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); @@ -221,7 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { pTopic->schema.pSchema = NULL; } - SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER); + /*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/ SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); @@ -253,7 +254,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->version, pNewTopic->version); - atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt); + /*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/ /*taosWLockLatch(&pOldTopic->lock);*/ @@ -327,7 +328,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.version = 1; topicObj.sql = strdup(pCreate->sql); topicObj.sqlLen = strlen(pCreate->sql) + 1; - topicObj.refConsumerCnt = 0; + /*topicObj.refConsumerCnt = 0;*/ if (pCreate->ast && pCreate->ast[0]) { topicObj.ast = strdup(pCreate->ast); @@ -492,8 +493,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo } static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - /*SSdb *pSdb = pMnode->pSdb;*/ + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; SMDropTopicReq dropReq = {0}; if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { @@ -513,12 +514,36 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } } + void *pIter = NULL; + SMqConsumerObj *pConsumer; + while (1) { + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) break; + + if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; + int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->assignedTopics, i); + if (strcmp(name, pTopic->name) == 0) { + mndReleaseConsumer(pMnode, pConsumer); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name, + pConsumer->consumerId, pConsumer->cgroup); + return -1; + } + } + sdbRelease(pSdb, pConsumer); + } + +#if 0 if (pTopic->refConsumerCnt != 0) { mndReleaseTopic(pMnode, pTopic); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } +#endif STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq); if (pTrans == NULL) { diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index 56db157ab8..b86364b9c3 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -382,6 +382,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") + time.sleep(15) tdSql.query("drop topic %s"%topicName1) tdLog.printNoPrefix("======== test case 10 end ...... ") @@ -453,6 +454,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") + time.sleep(15) tdSql.query("drop topic %s"%topicName1) tdLog.printNoPrefix("======== test case 11 end ...... ")