Merge pull request #5627 from taosdata/hotfix/test
[TD-3604]<fix>subscribe hanlder overwrite
This commit is contained in:
commit
4acff5cea1
|
@ -5868,6 +5868,7 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF
|
|||
static void *subSubscribeProcess(void *sarg) {
|
||||
threadInfo *winfo = (threadInfo *)sarg;
|
||||
char subSqlstr[1024];
|
||||
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
|
||||
|
||||
if (winfo->taos == NULL) {
|
||||
TAOS * taos = NULL;
|
||||
|
@ -5913,9 +5914,8 @@ static void *subSubscribeProcess(void *sarg) {
|
|||
sprintf(tmpFile, "%s-%d",
|
||||
g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
||||
}
|
||||
g_queryInfo.superQueryInfo.tsub[i] = subscribeImpl(
|
||||
winfo->taos, subSqlstr, topic, tmpFile);
|
||||
if (NULL == g_queryInfo.superQueryInfo.tsub[i]) {
|
||||
tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile);
|
||||
if (NULL == tsub[i]) {
|
||||
taos_close(winfo->taos);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -5932,7 +5932,7 @@ static void *subSubscribeProcess(void *sarg) {
|
|||
continue;
|
||||
}
|
||||
|
||||
res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
|
||||
res = taos_consume(tsub[i]);
|
||||
if (res) {
|
||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
||||
|
@ -5947,8 +5947,7 @@ static void *subSubscribeProcess(void *sarg) {
|
|||
taos_free_result(res);
|
||||
|
||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i],
|
||||
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||
taos_unsubscribe(tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||
}
|
||||
|
||||
taos_close(winfo->taos);
|
||||
|
@ -5957,6 +5956,7 @@ static void *subSubscribeProcess(void *sarg) {
|
|||
|
||||
static void *superSubscribeProcess(void *sarg) {
|
||||
threadInfo *winfo = (threadInfo *)sarg;
|
||||
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
|
||||
|
||||
if (winfo->taos == NULL) {
|
||||
TAOS * taos = NULL;
|
||||
|
@ -5999,10 +5999,7 @@ static void *superSubscribeProcess(void *sarg) {
|
|||
sprintf(tmpFile, "%s-%d",
|
||||
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
|
||||
}
|
||||
g_queryInfo.specifiedQueryInfo.tsub[i] =
|
||||
subscribeImpl(winfo->taos,
|
||||
g_queryInfo.specifiedQueryInfo.sql[i],
|
||||
topic, tmpFile);
|
||||
tsub[i] = subscribeImpl(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
|
||||
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
|
||||
taos_close(winfo->taos);
|
||||
return NULL;
|
||||
|
@ -6020,7 +6017,7 @@ static void *superSubscribeProcess(void *sarg) {
|
|||
continue;
|
||||
}
|
||||
|
||||
res = taos_consume(g_queryInfo.specifiedQueryInfo.tsub[i]);
|
||||
res = taos_consume(tsub[i]);
|
||||
if (res) {
|
||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
|
||||
|
@ -6034,8 +6031,7 @@ static void *superSubscribeProcess(void *sarg) {
|
|||
taos_free_result(res);
|
||||
|
||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
||||
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[i],
|
||||
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
taos_unsubscribe(tsub[i], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
}
|
||||
|
||||
taos_close(winfo->taos);
|
||||
|
|
Loading…
Reference in New Issue