From 603cc02671df200b81d142ecb8f6ed1dbcdb5dc4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 11 Nov 2022 14:37:31 +0800 Subject: [PATCH 1/2] refactor(tmq): add more strict drop condition check --- include/common/tmsg.h | 21 +++++++++------ source/dnode/mnode/impl/src/mndTopic.c | 37 +++++++++++++++++++------- source/dnode/vnode/src/tq/tq.c | 6 ++--- 3 files changed, 44 insertions(+), 20 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b16b5a2d4b..99c5c72e2f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->version); - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - return NULL; - } + if (pSW->nCols > 0) { + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } - for (int32_t i = 0; i < pSW->nCols; i++) { - buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + for (int32_t i = 0; i < pSW->nCols; i++) { + buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + } + } else { + pSW->pSchema = NULL; } return (void*)buf; } @@ -839,7 +843,7 @@ typedef struct { int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT - int64_t stateTs; // ms + int64_t stateTs; // ms } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE } static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + pSubTopicEp->schema.nCols = 0; taosArrayDestroy(pSubTopicEp->vgs); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e1ca1d2708..522036afa2 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -637,6 +637,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (pIter == NULL) break; if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; + int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->assignedTopics, i); @@ -649,6 +650,33 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return -1; } } + + sz = taosArrayGetSize(pConsumer->rebNewTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->rebNewTopics, i); + if (strcmp(name, pTopic->name) == 0) { + mndReleaseConsumer(pMnode, pConsumer); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + return -1; + } + } + + sz = taosArrayGetSize(pConsumer->rebRemovedTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); + if (strcmp(name, pTopic->name) == 0) { + mndReleaseConsumer(pMnode, pConsumer); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + return -1; + } + } + sdbRelease(pSdb, pConsumer); } @@ -675,15 +703,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); -#if 0 - if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { - ASSERT(0); - mndTransDrop(pTrans); - mndReleaseTopic(pMnode, pTopic); - return -1; - } -#endif - // TODO check if rebalancing if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { /*ASSERT(0);*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9c377fe7f5..f75dce8231 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -582,10 +582,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%" PRId64 - ", version:%" PRId64 "", + tqDebug("tmq poll: consumer %" PRId64 + ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, - dataRsp.rspOffset.uid, dataRsp.rspOffset.version); + dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); tDeleteSMqDataRsp(&dataRsp); return code; From 265bb81b49b7a5013faad078d5c24fb0fda3cdff Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 11 Nov 2022 15:35:17 +0800 Subject: [PATCH 2/2] fix(query): set offset for empty record --- source/libs/executor/src/scanoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 945753cc53..a3dc128ced 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1773,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); return NULL; } ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);