[TD-3604]<fix>subscribe hanlder overwrite

This commit is contained in:
Hui Li 2021-03-30 15:57:14 +08:00
parent 77e3675640
commit 9353aed068
1 changed files with 9 additions and 13 deletions

View File

@ -5868,6 +5868,7 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF
static void *subSubscribeProcess(void *sarg) { static void *subSubscribeProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
char subSqlstr[1024]; char subSqlstr[1024];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (winfo->taos == NULL) { if (winfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
@ -5913,9 +5914,8 @@ static void *subSubscribeProcess(void *sarg) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], winfo->threadID); g_queryInfo.superQueryInfo.result[i], winfo->threadID);
} }
g_queryInfo.superQueryInfo.tsub[i] = subscribeImpl( tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile);
winfo->taos, subSqlstr, topic, tmpFile); if (NULL == tsub[i]) {
if (NULL == g_queryInfo.superQueryInfo.tsub[i]) {
taos_close(winfo->taos); taos_close(winfo->taos);
return NULL; return NULL;
} }
@ -5932,7 +5932,7 @@ static void *subSubscribeProcess(void *sarg) {
continue; continue;
} }
res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]); res = taos_consume(tsub[i]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[i][0] != 0) { if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
@ -5947,8 +5947,7 @@ static void *subSubscribeProcess(void *sarg) {
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], taos_unsubscribe(tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
g_queryInfo.superQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos); taos_close(winfo->taos);
@ -5957,6 +5956,7 @@ static void *subSubscribeProcess(void *sarg) {
static void *superSubscribeProcess(void *sarg) { static void *superSubscribeProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (winfo->taos == NULL) { if (winfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
@ -5999,10 +5999,7 @@ static void *superSubscribeProcess(void *sarg) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
} }
g_queryInfo.specifiedQueryInfo.tsub[i] = tsub[i] = subscribeImpl(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
subscribeImpl(winfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i],
topic, tmpFile);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) { if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
taos_close(winfo->taos); taos_close(winfo->taos);
return NULL; return NULL;
@ -6020,7 +6017,7 @@ static void *superSubscribeProcess(void *sarg) {
continue; continue;
} }
res = taos_consume(g_queryInfo.specifiedQueryInfo.tsub[i]); res = taos_consume(tsub[i]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
@ -6034,8 +6031,7 @@ static void *superSubscribeProcess(void *sarg) {
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[i], taos_unsubscribe(tsub[i], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos); taos_close(winfo->taos);