fix(query): double free
This commit is contained in:
parent
fcc5527b88
commit
946acdcd9c
|
@ -13,7 +13,7 @@ IF (TD_LINUX)
|
||||||
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
|
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
|
||||||
|
|
||||||
add_executable(tmq "")
|
add_executable(tmq "")
|
||||||
add_executable(tstream "")
|
add_executable(stream_demo "")
|
||||||
add_executable(demoapi "")
|
add_executable(demoapi "")
|
||||||
|
|
||||||
target_sources(tmq
|
target_sources(tmq
|
||||||
|
@ -21,9 +21,9 @@ IF (TD_LINUX)
|
||||||
"tmq.c"
|
"tmq.c"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_sources(tstream
|
target_sources(stream_demo
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"tstream.c"
|
"stream_demo.c"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_sources(demoapi
|
target_sources(demoapi
|
||||||
|
@ -35,7 +35,7 @@ IF (TD_LINUX)
|
||||||
taos_static
|
taos_static
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(tstream
|
target_link_libraries(stream_demo
|
||||||
taos_static
|
taos_static
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ IF (TD_LINUX)
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(tstream
|
target_include_directories(stream_demo
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ IF (TD_LINUX)
|
||||||
)
|
)
|
||||||
|
|
||||||
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
|
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
|
||||||
SET_TARGET_PROPERTIES(tstream PROPERTIES OUTPUT_NAME tstream)
|
SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo)
|
||||||
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
|
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
IF (TD_DARWIN)
|
IF (TD_DARWIN)
|
||||||
|
|
|
@ -1,263 +0,0 @@
|
||||||
// sample code for TDengine subscribe/consume API
|
|
||||||
// to compile: gcc -o subscribe subscribe.c -ltaos
|
|
||||||
|
|
||||||
#include <stdio.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include "../../../include/client/taos.h" // include TDengine header file
|
|
||||||
|
|
||||||
int nTotalRows;
|
|
||||||
|
|
||||||
void print_result(TAOS_RES* res, int blockFetch) {
|
|
||||||
TAOS_ROW row = NULL;
|
|
||||||
int num_fields = taos_num_fields(res);
|
|
||||||
TAOS_FIELD* fields = taos_fetch_fields(res);
|
|
||||||
int nRows = 0;
|
|
||||||
|
|
||||||
if (blockFetch) {
|
|
||||||
nRows = taos_fetch_block(res, &row);
|
|
||||||
//for (int i = 0; i < nRows; i++) {
|
|
||||||
// taos_print_row(buf, row + i, fields, num_fields);
|
|
||||||
// puts(buf);
|
|
||||||
//}
|
|
||||||
} else {
|
|
||||||
while ((row = taos_fetch_row(res))) {
|
|
||||||
char buf[4096] = {0};
|
|
||||||
taos_print_row(buf, row, fields, num_fields);
|
|
||||||
puts(buf);
|
|
||||||
nRows++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nTotalRows += nRows;
|
|
||||||
printf("%d rows consumed.\n", nRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
|
|
||||||
print_result(res, *(int*)param);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void check_row_count(int line, TAOS_RES* res, int expected) {
|
|
||||||
int actual = 0;
|
|
||||||
TAOS_ROW row;
|
|
||||||
while ((row = taos_fetch_row(res))) {
|
|
||||||
actual++;
|
|
||||||
}
|
|
||||||
if (actual != expected) {
|
|
||||||
printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
|
|
||||||
} else {
|
|
||||||
printf("line %d: %d rows consumed as expected\n", line, actual);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void do_query(TAOS* taos, const char* sql) {
|
|
||||||
TAOS_RES* res = taos_query(taos, sql);
|
|
||||||
taos_free_result(res);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void run_test(TAOS* taos) {
|
|
||||||
do_query(taos, "drop database if exists test;");
|
|
||||||
|
|
||||||
usleep(100000);
|
|
||||||
do_query(taos, "create database test;");
|
|
||||||
usleep(100000);
|
|
||||||
do_query(taos, "use test;");
|
|
||||||
|
|
||||||
usleep(100000);
|
|
||||||
do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
|
|
||||||
|
|
||||||
do_query(taos, "create table t0 using meters tags(0);");
|
|
||||||
do_query(taos, "create table t1 using meters tags(1);");
|
|
||||||
do_query(taos, "create table t2 using meters tags(2);");
|
|
||||||
do_query(taos, "create table t3 using meters tags(3);");
|
|
||||||
do_query(taos, "create table t4 using meters tags(4);");
|
|
||||||
do_query(taos, "create table t5 using meters tags(5);");
|
|
||||||
do_query(taos, "create table t6 using meters tags(6);");
|
|
||||||
do_query(taos, "create table t7 using meters tags(7);");
|
|
||||||
do_query(taos, "create table t8 using meters tags(8);");
|
|
||||||
do_query(taos, "create table t9 using meters tags(9);");
|
|
||||||
|
|
||||||
do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
|
|
||||||
do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
|
|
||||||
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
|
|
||||||
|
|
||||||
// super tables subscription
|
|
||||||
usleep(1000000);
|
|
||||||
|
|
||||||
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
|
|
||||||
TAOS_RES* res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 18);
|
|
||||||
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 0);
|
|
||||||
|
|
||||||
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
|
|
||||||
do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 2);
|
|
||||||
|
|
||||||
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
|
|
||||||
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 2);
|
|
||||||
|
|
||||||
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 1);
|
|
||||||
|
|
||||||
// keep progress information and restart subscription
|
|
||||||
taos_unsubscribe(tsub, 1);
|
|
||||||
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
|
|
||||||
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 24);
|
|
||||||
|
|
||||||
// keep progress information and continue previous subscription
|
|
||||||
taos_unsubscribe(tsub, 1);
|
|
||||||
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 0);
|
|
||||||
|
|
||||||
// don't keep progress information and continue previous subscription
|
|
||||||
taos_unsubscribe(tsub, 0);
|
|
||||||
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 24);
|
|
||||||
|
|
||||||
// single meter subscription
|
|
||||||
|
|
||||||
taos_unsubscribe(tsub, 0);
|
|
||||||
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 5);
|
|
||||||
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 0);
|
|
||||||
|
|
||||||
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
|
|
||||||
res = taos_consume(tsub);
|
|
||||||
check_row_count(__LINE__, res, 1);
|
|
||||||
|
|
||||||
taos_unsubscribe(tsub, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
|
||||||
const char* host = "127.0.0.1";
|
|
||||||
const char* user = "root";
|
|
||||||
const char* passwd = "taosdata";
|
|
||||||
const char* sql = "select * from meters;";
|
|
||||||
const char* topic = "test-multiple";
|
|
||||||
int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
|
|
||||||
|
|
||||||
for (int i = 1; i < argc; i++) {
|
|
||||||
if (strncmp(argv[i], "-h=", 3) == 0) {
|
|
||||||
host = argv[i] + 3;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strncmp(argv[i], "-u=", 3) == 0) {
|
|
||||||
user = argv[i] + 3;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strncmp(argv[i], "-p=", 3) == 0) {
|
|
||||||
passwd = argv[i] + 3;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strcmp(argv[i], "-sync") == 0) {
|
|
||||||
async = 0;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strcmp(argv[i], "-restart") == 0) {
|
|
||||||
restart = 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strcmp(argv[i], "-single") == 0) {
|
|
||||||
sql = "select * from t0;";
|
|
||||||
topic = "test-single";
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strcmp(argv[i], "-nokeep") == 0) {
|
|
||||||
keep = 0;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strncmp(argv[i], "-sql=", 5) == 0) {
|
|
||||||
sql = argv[i] + 5;
|
|
||||||
topic = "test-custom";
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strcmp(argv[i], "-test") == 0) {
|
|
||||||
test = 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (strcmp(argv[i], "-block-fetch") == 0) {
|
|
||||||
blockFetch = 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS* taos = taos_connect(host, user, passwd, "", 0);
|
|
||||||
if (taos == NULL) {
|
|
||||||
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (test) {
|
|
||||||
run_test(taos);
|
|
||||||
taos_close(taos);
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
taos_select_db(taos, "test");
|
|
||||||
TAOS_SUB* tsub = NULL;
|
|
||||||
if (async) {
|
|
||||||
// create an asynchronized subscription, the callback function will be called every 1s
|
|
||||||
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
|
|
||||||
} else {
|
|
||||||
// create an synchronized subscription, need to call 'taos_consume' manually
|
|
||||||
tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsub == NULL) {
|
|
||||||
printf("failed to create subscription.\n");
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (async) {
|
|
||||||
getchar();
|
|
||||||
} else while(1) {
|
|
||||||
TAOS_RES* res = taos_consume(tsub);
|
|
||||||
if (res == NULL) {
|
|
||||||
printf("failed to consume data.");
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
print_result(res, blockFetch);
|
|
||||||
getchar();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("total rows consumed: %d\n", nTotalRows);
|
|
||||||
taos_unsubscribe(tsub, keep);
|
|
||||||
taos_close(taos);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
|
@ -176,8 +176,8 @@ tmq_t* build_consumer() {
|
||||||
|
|
||||||
tmq_list_t* build_topic_list() {
|
tmq_list_t* build_topic_list() {
|
||||||
tmq_list_t* topic_list = tmq_list_new();
|
tmq_list_t* topic_list = tmq_list_new();
|
||||||
tmq_list_append(topic_list, "topic_ctb_column");
|
/*tmq_list_append(topic_list, "topic_ctb_column");*/
|
||||||
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
|
tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");
|
||||||
return topic_list;
|
return topic_list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +195,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
if (tmqmessage) {
|
if (tmqmessage) {
|
||||||
cnt++;
|
cnt++;
|
||||||
msg_process(tmqmessage);
|
msg_process(tmqmessage);
|
||||||
if (cnt >= 2) break;
|
/*if (cnt >= 2) break;*/
|
||||||
/*printf("get data\n");*/
|
/*printf("get data\n");*/
|
||||||
taos_free_result(tmqmessage);
|
taos_free_result(tmqmessage);
|
||||||
/*} else {*/
|
/*} else {*/
|
||||||
|
|
|
@ -219,7 +219,8 @@ typedef struct SRequestObj {
|
||||||
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
||||||
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
||||||
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
|
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
|
||||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
|
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
|
||||||
|
bool freeAfterUse);
|
||||||
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
||||||
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
||||||
|
|
||||||
|
@ -241,7 +242,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
|
||||||
taosMemoryFreeClear(msg->resInfo.length);
|
taosMemoryFreeClear(msg->resInfo.length);
|
||||||
taosMemoryFreeClear(msg->resInfo.convertBuf);
|
taosMemoryFreeClear(msg->resInfo.convertBuf);
|
||||||
}
|
}
|
||||||
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
|
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4, false);
|
||||||
return &msg->resInfo;
|
return &msg->resInfo;
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -319,7 +320,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
|
||||||
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
|
||||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
||||||
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList);
|
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,7 +203,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
SRetrieveTableRsp* pRsp = NULL;
|
SRetrieveTableRsp* pRsp = NULL;
|
||||||
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
|
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
||||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
|
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -230,8 +230,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
|
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
|
||||||
SQueryNodeLoad *node1 = (SQueryNodeLoad *)elem1;
|
SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
|
||||||
SQueryNodeLoad *node2 = (SQueryNodeLoad *)elem2;
|
SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
|
||||||
|
|
||||||
if (node1->load < node2->load) {
|
if (node1->load < node2->load) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -240,7 +240,7 @@ int compareQueryNodeLoad(const void* elem1, const void* elem2) {
|
||||||
return node1->load > node2->load;
|
return node1->load > node2->load;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) {
|
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
|
||||||
taosThreadMutexLock(&pInfo->qnodeMutex);
|
taosThreadMutexLock(&pInfo->qnodeMutex);
|
||||||
if (pInfo->pQnodeList) {
|
if (pInfo->pQnodeList) {
|
||||||
taosArrayDestroy(pInfo->pQnodeList);
|
taosArrayDestroy(pInfo->pQnodeList);
|
||||||
|
@ -257,7 +257,7 @@ int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
||||||
SAppInstInfo*pInfo = pRequest->pTscObj->pAppInfo;
|
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->qnodeMutex);
|
taosThreadMutexLock(&pInfo->qnodeMutex);
|
||||||
|
@ -385,7 +385,6 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
|
||||||
return pRequest->code;
|
return pRequest->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
|
||||||
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||||
|
|
||||||
|
@ -802,7 +801,8 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
|
||||||
SCatalog* pCatalog = NULL;
|
SCatalog* pCatalog = NULL;
|
||||||
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId, tstrerror(code));
|
tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
|
||||||
|
tstrerror(code));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -815,7 +815,6 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
|
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
|
||||||
assert(pMsg->info.ahandle != NULL);
|
assert(pMsg->info.ahandle != NULL);
|
||||||
|
@ -947,7 +946,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
|
pRequest->code =
|
||||||
|
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -969,9 +969,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
|
||||||
return pResultInfo->row;
|
return pResultInfo->row;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||||
//return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
|
// return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
@ -989,7 +988,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
|
pRequest->code =
|
||||||
|
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1059,7 +1059,8 @@ static char* parseTagDatatoJson(void* p) {
|
||||||
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
|
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
|
||||||
int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
|
int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
|
||||||
if (length < 0) {
|
if (length < 0) {
|
||||||
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, pTagVal->pData);
|
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
|
||||||
|
pTagVal->pData);
|
||||||
taosMemoryFree(tagJsonValue);
|
taosMemoryFree(tagJsonValue);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -1277,10 +1278,11 @@ void resetConnectDB(STscObj* pTscObj) {
|
||||||
taosThreadMutexUnlock(&pTscObj->mutex);
|
taosThreadMutexUnlock(&pTscObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
|
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
|
||||||
|
bool freeAfterUse) {
|
||||||
assert(pResultInfo != NULL && pRsp != NULL);
|
assert(pResultInfo != NULL && pRsp != NULL);
|
||||||
|
|
||||||
taosMemoryFreeClear(pResultInfo->pRspMsg);
|
if (freeAfterUse) taosMemoryFreeClear(pResultInfo->pRspMsg);
|
||||||
|
|
||||||
pResultInfo->pRspMsg = (const char*)pRsp;
|
pResultInfo->pRspMsg = (const char*)pRsp;
|
||||||
pResultInfo->pData = (void*)pRsp->data;
|
pResultInfo->pData = (void*)pRsp->data;
|
||||||
|
|
Loading…
Reference in New Issue