[TD-1866]<fix> Do not release the result pointer until it is no longer consumed
This commit is contained in:
parent
7f5216b317
commit
e0caf7780b
|
@ -5,13 +5,13 @@
|
||||||
"port": 6030,
|
"port": 6030,
|
||||||
"user": "root",
|
"user": "root",
|
||||||
"password": "taosdata",
|
"password": "taosdata",
|
||||||
"databases": "db01",
|
"databases": "db",
|
||||||
"super_table_query":
|
"specified_table_query":
|
||||||
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes",
|
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes",
|
||||||
"sqls": [{"sql": "select avg(c1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}]
|
"sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}]
|
||||||
},
|
},
|
||||||
"sub_table_query":
|
"super_table_query":
|
||||||
{"stblname": "stb01", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes",
|
{"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes",
|
||||||
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}]
|
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4269,23 +4269,24 @@ void *subSubscribeProcess(void *sarg) {
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// start loop to consume result
|
// start loop to consume result
|
||||||
|
TAOS_RES* res = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
||||||
if (1 == g_queryInfo.subQueryInfo.subscribeMode) {
|
if (1 == g_queryInfo.subQueryInfo.subscribeMode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]);
|
res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]);
|
||||||
if (res) {
|
if (res) {
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
||||||
sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID);
|
sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID);
|
||||||
}
|
}
|
||||||
getResult(res, tmpFile);
|
getResult(res, tmpFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
||||||
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], g_queryInfo.subQueryInfo.subscribeKeepProgress);
|
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], g_queryInfo.subQueryInfo.subscribeKeepProgress);
|
||||||
|
@ -4328,23 +4329,24 @@ void *superSubscribeProcess(void *sarg) {
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// start loop to consume result
|
// start loop to consume result
|
||||||
|
TAOS_RES* res = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||||
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
|
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
|
res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
|
||||||
if (res) {
|
if (res) {
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
||||||
sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
||||||
}
|
}
|
||||||
getResult(res, tmpFile);
|
getResult(res, tmpFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||||
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||||
|
|
Loading…
Reference in New Issue