fix:modify the interface of tmq meta
This commit is contained in:
parent
00bb6ed309
commit
ccdda33ff8
|
@ -28,8 +28,9 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
printf("db: %s\n", tmq_get_db_name(msg));
|
printf("db: %s\n", tmq_get_db_name(msg));
|
||||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||||
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
|
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
|
||||||
tmq_raw_data* raw = tmq_get_raw_meta(msg);
|
tmq_raw_data raw = {0};
|
||||||
if (raw) {
|
int32_t code = tmq_get_raw_meta(msg, &raw);
|
||||||
|
if (code == 0) {
|
||||||
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -53,7 +54,6 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
printf("write raw data: %s\n", tmq_err2str(ret));
|
printf("write raw data: %s\n", tmq_err2str(ret));
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
tmq_free_raw_meta(raw);
|
|
||||||
char* result = tmq_get_json_meta(msg);
|
char* result = tmq_get_json_meta(msg);
|
||||||
if (result) {
|
if (result) {
|
||||||
printf("meta result: %s\n", result);
|
printf("meta result: %s\n", result);
|
||||||
|
|
|
@ -45,10 +45,10 @@ typedef enum {
|
||||||
NOTIFY_CMD_ID_BUTT,
|
NOTIFY_CMD_ID_BUTT,
|
||||||
} NOTIFY_CMD_ID;
|
} NOTIFY_CMD_ID;
|
||||||
|
|
||||||
typedef enum enumQUERY_TYPE {
|
typedef enum enumQUERY_TYPE {
|
||||||
NO_INSERT_TYPE,
|
NO_INSERT_TYPE,
|
||||||
INSERT_TYPE,
|
INSERT_TYPE,
|
||||||
QUERY_TYPE_BUT
|
QUERY_TYPE_BUT
|
||||||
} QUERY_TYPE;
|
} QUERY_TYPE;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -597,9 +597,10 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
tmq_get_topic_name(msg), vgroupId);
|
tmq_get_topic_name(msg), vgroupId);
|
||||||
|
|
||||||
{
|
{
|
||||||
tmq_raw_data *raw = tmq_get_raw_meta(msg);
|
tmq_raw_data raw = {0};
|
||||||
|
int32_t code = tmq_get_raw_meta(msg, &raw);
|
||||||
|
|
||||||
if(raw){
|
if(code == TSDB_CODE_SUCCESS){
|
||||||
TAOS_RES* pRes = taos_query(pInfo->taos, "use metadb");
|
TAOS_RES* pRes = taos_query(pInfo->taos, "use metadb");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
pError("error when use metadb, reason:%s\n", taos_errstr(pRes));
|
pError("error when use metadb, reason:%s\n", taos_errstr(pRes));
|
||||||
|
@ -609,10 +610,9 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
taosFprintfFile(g_fp, "raw:%p\n", raw);
|
taosFprintfFile(g_fp, "raw:%p\n", &raw);
|
||||||
|
|
||||||
int32_t ret = taos_write_raw_meta(pInfo->taos, raw);
|
taos_write_raw_meta(pInfo->taos, raw);
|
||||||
taosMemoryFree(raw);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char* result = tmq_get_json_meta(msg);
|
char* result = tmq_get_json_meta(msg);
|
||||||
|
@ -1169,23 +1169,23 @@ void* ombConsumeThreadFunc(void* param) {
|
||||||
|
|
||||||
|
|
||||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) {
|
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) {
|
||||||
TAOS_RES *res = taos_query(taos, command);
|
TAOS_RES *res = taos_query(taos, command);
|
||||||
int32_t code = taos_errno(res);
|
int32_t code = taos_errno(res);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
|
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (INSERT_TYPE == type) {
|
if (INSERT_TYPE == type) {
|
||||||
int affectedRows = taos_affected_rows(res);
|
int affectedRows = taos_affected_rows(res);
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
return affectedRows;
|
return affectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* ombProduceThreadFunc(void* param) {
|
void* ombProduceThreadFunc(void* param) {
|
||||||
|
|
Loading…
Reference in New Issue