From dd22347992be00bcef2466decb07eda290e73914 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 7 Apr 2023 17:05:23 +0800 Subject: [PATCH 1/4] fix:add tools to get tmq offset --- utils/test/c/CMakeLists.txt | 8 +++++ utils/test/c/tmqOffset.c | 64 +++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 utils/test/c/tmqOffset.c diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 6ca266c555..85378221c6 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -5,6 +5,14 @@ add_executable(create_table createTable.c) add_executable(tmq_taosx_ci tmq_taosx_ci.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) +add_executable(tmq_offset tmqOffset.c) +target_link_libraries( + tmq_offset + PUBLIC taos_static + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( create_table PUBLIC taos_static diff --git a/utils/test/c/tmqOffset.c b/utils/test/c/tmqOffset.c new file mode 100644 index 0000000000..50a9991a26 --- /dev/null +++ b/utils/test/c/tmqOffset.c @@ -0,0 +1,64 @@ +// +// Created by mingming wanng on 2023/4/7. +// +#include +#include +#include "taoserror.h" +#include "tlog.h" +#include "tmsg.h" + +typedef struct { + int32_t size; +} STqOffsetHead; + +int32_t tqOffsetRestoreFromFile(const char* fname) { + TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); + if (pFile != NULL) { + STqOffsetHead head = {0}; + int32_t code; + + while (1) { + if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if (code == 0) { + break; + } else { + printf("code:%d != 0\n", code); + return -1; + } + } + int32_t size = htonl(head.size); + void* memBuf = taosMemoryCalloc(1, size); + if (memBuf == NULL) { + printf("memBuf == NULL\n"); + return -1; + } + if ((code = taosReadFile(pFile, memBuf, size)) != size) { + taosMemoryFree(memBuf); + printf("code:%d != size:%d\n", code, size); + return -1; + } + STqOffset offset; + SDecoder decoder; + tDecoderInit(&decoder, memBuf, size); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + taosMemoryFree(memBuf); + tDecoderClear(&decoder); + printf("tDecodeSTqOffset error\n"); + return -1; + } + + tDecoderClear(&decoder); + printf("subkey:%s, type:%d, uid/version:%lld, ts:%lld\n", + offset.subKey, offset.val.type, offset.val.uid, offset.val.ts); + taosMemoryFree(memBuf); + } + + taosCloseFile(&pFile); + } + return 0; +} + +int main(int argc, char *argv[]) { + tqOffsetRestoreFromFile("offset-ver0"); + return 0; +} From 5f700d59bd63ca8f1323a01a4068c09154cc486f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 7 Apr 2023 17:10:00 +0800 Subject: [PATCH 2/4] fix:add tools to get tmq offset --- utils/test/c/tmqOffset.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/tmqOffset.c b/utils/test/c/tmqOffset.c index 50a9991a26..5af56a6018 100644 --- a/utils/test/c/tmqOffset.c +++ b/utils/test/c/tmqOffset.c @@ -48,7 +48,7 @@ int32_t tqOffsetRestoreFromFile(const char* fname) { } tDecoderClear(&decoder); - printf("subkey:%s, type:%d, uid/version:%lld, ts:%lld\n", + printf("subkey:%s, type:%d, uid/version:%lld, ts:%"PRId64"\n", offset.subKey, offset.val.type, offset.val.uid, offset.val.ts); taosMemoryFree(memBuf); } From 9bb921d5a89e31ea40ec00a1e05d2b2e1cd1c6ce Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 7 Apr 2023 17:10:57 +0800 Subject: [PATCH 3/4] fix:add tools to get tmq offset --- utils/test/c/tmqOffset.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/tmqOffset.c b/utils/test/c/tmqOffset.c index 5af56a6018..7225cb87bd 100644 --- a/utils/test/c/tmqOffset.c +++ b/utils/test/c/tmqOffset.c @@ -48,7 +48,7 @@ int32_t tqOffsetRestoreFromFile(const char* fname) { } tDecoderClear(&decoder); - printf("subkey:%s, type:%d, uid/version:%lld, ts:%"PRId64"\n", + printf("subkey:%s, type:%d, uid/version:%"PRId64", ts:%"PRId64"\n", offset.subKey, offset.val.type, offset.val.uid, offset.val.ts); taosMemoryFree(memBuf); } From 4f23578a79c2f64362b4bf7508a5975aeacaf99b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 7 Apr 2023 19:53:11 +0800 Subject: [PATCH 4/4] fix:add err log --- source/client/src/clientTmq.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index befcb00ac7..0ee9a7bd7d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1151,6 +1151,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { }; if (tsem_init(¶m.rspSem, 0, 0) != 0) { + code = TSDB_CODE_TSC_INTERNAL_ERROR; goto FAIL; } @@ -1186,6 +1187,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { if (retryCnt++ > MAX_RETRY_COUNT) { + tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); + code = TSDB_CODE_TSC_INTERNAL_ERROR; goto FAIL; }