Merge pull request #20820 from taosdata/fix/TS-3081
fix:add tools to checkout tmq offset & add error log for tmq
This commit is contained in:
commit
0dd1721e57
|
@ -1151,6 +1151,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
};
|
};
|
||||||
|
|
||||||
if (tsem_init(¶m.rspSem, 0, 0) != 0) {
|
if (tsem_init(¶m.rspSem, 0, 0) != 0) {
|
||||||
|
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1186,6 +1187,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
||||||
if (retryCnt++ > MAX_RETRY_COUNT) {
|
if (retryCnt++ > MAX_RETRY_COUNT) {
|
||||||
|
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
|
||||||
|
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,14 @@ add_executable(tmq_taosx_ci tmq_taosx_ci.c)
|
||||||
add_executable(write_raw_block_test write_raw_block_test.c)
|
add_executable(write_raw_block_test write_raw_block_test.c)
|
||||||
add_executable(sml_test sml_test.c)
|
add_executable(sml_test sml_test.c)
|
||||||
add_executable(get_db_name_test get_db_name_test.c)
|
add_executable(get_db_name_test get_db_name_test.c)
|
||||||
|
add_executable(tmq_offset tmqOffset.c)
|
||||||
|
target_link_libraries(
|
||||||
|
tmq_offset
|
||||||
|
PUBLIC taos_static
|
||||||
|
PUBLIC util
|
||||||
|
PUBLIC common
|
||||||
|
PUBLIC os
|
||||||
|
)
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
create_table
|
create_table
|
||||||
PUBLIC taos_static
|
PUBLIC taos_static
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
//
|
||||||
|
// Created by mingming wanng on 2023/4/7.
|
||||||
|
//
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t size;
|
||||||
|
} STqOffsetHead;
|
||||||
|
|
||||||
|
int32_t tqOffsetRestoreFromFile(const char* fname) {
|
||||||
|
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
|
||||||
|
if (pFile != NULL) {
|
||||||
|
STqOffsetHead head = {0};
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
|
||||||
|
if (code == 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
printf("code:%d != 0\n", code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int32_t size = htonl(head.size);
|
||||||
|
void* memBuf = taosMemoryCalloc(1, size);
|
||||||
|
if (memBuf == NULL) {
|
||||||
|
printf("memBuf == NULL\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
|
||||||
|
taosMemoryFree(memBuf);
|
||||||
|
printf("code:%d != size:%d\n", code, size);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
STqOffset offset;
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, memBuf, size);
|
||||||
|
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
||||||
|
taosMemoryFree(memBuf);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
printf("tDecodeSTqOffset error\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
printf("subkey:%s, type:%d, uid/version:%"PRId64", ts:%"PRId64"\n",
|
||||||
|
offset.subKey, offset.val.type, offset.val.uid, offset.val.ts);
|
||||||
|
taosMemoryFree(memBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosCloseFile(&pFile);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
tqOffsetRestoreFromFile("offset-ver0");
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue