From 964811745432ca6f40033ef8c23e146d98f6a405 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 10 Jul 2023 11:28:30 +0800 Subject: [PATCH] fix:can not do assignment if in tsdb mode --- include/util/taoserror.h | 1 + source/client/src/clientTmq.c | 11 ++++++++++- source/util/src/terror.c | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 772a668f0f..ffeabc5684 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -771,6 +771,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) #define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004) #define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005) +#define TSDB_CODE_TMQ_SNAPSHOT_ERROR TAOS_DEF_ERROR_CODE(0, 0x4006) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f514bb616c..c6a5f4c7a1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2576,6 +2576,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a // in case of snapshot is opened, no valid offset will return *numOfAssignment = taosArrayGetSize(pTopic->vgs); + for (int32_t j = 0; j < (*numOfAssignment); ++j) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); + if ((pClientVg->offsetInfo.currentOffset.type < TMQ_OFFSET__LOG && tmq->useSnapshot) || + pClientVg->offsetInfo.currentOffset.type > TMQ_OFFSET__LOG) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, pClientVg->offsetInfo.currentOffset.type); + code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; + goto end; + } + } *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment)); if (*assignment == NULL) { @@ -2783,7 +2792,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d2b9edf753..f0c7b22bb1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -628,6 +628,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval //tmq TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")