diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e352cfb569..52221bdd44 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -342,6 +342,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) #define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3) #define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4) +#define TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB TAOS_DEF_ERROR_CODE(0, 0x03F5) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 939acd2b6e..350ecdd373 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1035,7 +1035,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = -1; - tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); + tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); @@ -1043,7 +1043,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) goto FAIL; - tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); + tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); @@ -1570,7 +1570,6 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { } int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { - /*tscDebug("call poll");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -1794,7 +1793,6 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { - /*tscDebug("call poll1");*/ void* rspObj; int64_t startTime = taosGetTimestampMs(); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7ee688d220..05c0594339 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -164,7 +164,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { STREAM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, terrstr()); + mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, + terrstr()); taosMemoryFreeClear(pRow); return NULL; } @@ -624,6 +625,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + pDb = mndAcquireDb(pMnode, streamObj.sourceDb); + if (pDb->cfg.replications != 1) { + mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); + terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; + mndReleaseDb(pMnode, pDb); + pDb = NULL; + goto _OVER; + } + mndReleaseDb(pMnode, pDb); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); @@ -680,7 +691,6 @@ _OVER: } mndReleaseStream(pMnode, pStream); - mndReleaseDb(pMnode, pDb); tFreeSCMCreateStreamReq(&createStreamReq); tFreeStreamObj(&streamObj); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 09717c823e..4a941b3c20 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -738,7 +738,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); if (code != 0) { - tqError("cannot process tq delete req %s, since no such offset", pReq->subKey); + tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey); } if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 656d775ea4..4b9dde5059 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -281,6 +281,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily does not support source db having replica > 1") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") diff --git a/tests/system-test/1-insert/drop.py b/tests/system-test/1-insert/drop.py index f8796bcf6a..c15a9bbc35 100644 --- a/tests/system-test/1-insert/drop.py +++ b/tests/system-test/1-insert/drop.py @@ -132,7 +132,7 @@ class TDTestCase: tdSql.execute(f'drop database {self.dbname}') def drop_stream_check(self): - tdSql.execute(f'create database {self.dbname} replica {self.replicaVar}') + tdSql.execute(f'create database {self.dbname} replica 1') tdSql.execute(f'use {self.dbname}') stbname = tdCom.getLongName(5,"letters") stream_name = tdCom.getLongName(5,"letters") @@ -158,4 +158,4 @@ class TDTestCase: tdLog.success("%s successfully executed" % __file__) tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file +tdCases.addLinux(__file__, TDTestCase())