From a8701889465e22d880aa41556768b90312fe8a37 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 4 Feb 2021 09:26:10 +0800 Subject: [PATCH 1/4] fix bug --- src/client/src/tscSubquery.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 96aae423d5..8da83f8e01 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -95,11 +95,21 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { pthread_mutex_lock(&subState->mutex); + bool done = allSubqueryDone(pParentSql); + + if (done) { + tscDebug("%p subquery:%p,%d all subs already done", pParentSql, pSql, idx); + + pthread_mutex_unlock(&subState->mutex); + + return false; + } + tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx); subState->states[idx] = 1; - bool done = allSubqueryDone(pParentSql); + done = allSubqueryDone(pParentSql); pthread_mutex_unlock(&subState->mutex); From cfaa9adc58f2d618e745f06f90e19fea705adc70 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 4 Feb 2021 16:13:55 +0800 Subject: [PATCH 2/4] fix bug --- src/client/src/tscServer.c | 2 -- src/client/src/tscSubquery.c | 11 ++++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b10b040c7b..b82662c05f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -450,7 +450,6 @@ int doProcessSql(SSqlObj *pSql) { } if (pRes->code != TSDB_CODE_SUCCESS) { - tscAsyncResultOnError(pSql); return pRes->code; } @@ -459,7 +458,6 @@ int doProcessSql(SSqlObj *pSql) { // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads. if (code != TSDB_CODE_SUCCESS) { pRes->code = code; - tscAsyncResultOnError(pSql); return code; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8da83f8e01..f8ccd3303b 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2287,11 +2287,12 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql); if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d", - trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex); + oriTrs->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex); pParentSql->res.code = terrno; - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + oriTrs->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + tfree(trsupport); return pParentSql->res.code; } @@ -2299,9 +2300,13 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 // if failed to process sql, let following code handle the pSql if (ret == TSDB_CODE_SUCCESS) { + tscFreeRetrieveSup(pSql); taos_free_result(pSql); return ret; - } else { + } else { + pSql->pSubs[trsupport->subqueryIndex] = pSql; + tscFreeRetrieveSup(pNew); + taos_free_result(pNew); return ret; } } From f526932ad4837526f545f46945ce6462d47ce515 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Fri, 5 Feb 2021 09:24:02 +0800 Subject: [PATCH 3/4] fix bug --- src/client/src/tscAsync.c | 13 ++++++++++++- src/client/src/tscServer.c | 3 +++ src/client/src/tscSubquery.c | 22 ++++++++++++++++++---- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 8e5f621b37..797804252d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -273,7 +273,8 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { taosScheduleTask(tscQhandle, &schedMsg); } -void tscAsyncResultOnError(SSqlObj *pSql) { +static void tscAsyncResultCallback(SSchedMsg *pMsg) { + SSqlObj* pSql = pMsg->ahandle; if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); return; @@ -291,6 +292,16 @@ void tscAsyncResultOnError(SSqlObj *pSql) { (*pSql->fp)(pSql->param, pSql, pRes->code); } +void tscAsyncResultOnError(SSqlObj* pSql) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = tscAsyncResultCallback; + schedMsg.ahandle = pSql; + schedMsg.thandle = (void *)1; + schedMsg.msg = 0; + taosScheduleTask(tscQhandle, &schedMsg); +} + + int tscSendMsgToServer(SSqlObj *pSql); void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b82662c05f..8277d28fc9 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -450,6 +450,7 @@ int doProcessSql(SSqlObj *pSql) { } if (pRes->code != TSDB_CODE_SUCCESS) { + tscAsyncResultOnError(pSql); return pRes->code; } @@ -458,6 +459,7 @@ int doProcessSql(SSqlObj *pSql) { // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads. if (code != TSDB_CODE_SUCCESS) { pRes->code = code; + tscAsyncResultOnError(pSql); return code; } @@ -686,6 +688,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { + return TSDB_CODE_TSC_INVALID_SQL; SSqlCmd *pCmd = &pSql->cmd; int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index f8ccd3303b..27bd5b0616 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2255,7 +2255,9 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES * current query failed, and the retry count is less than the available * count, retry query clear previous retrieved data, then launch a new sub query */ -static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code) { +static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code, int32_t *sent) { + *sent = 0; + SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport)); if (trsupport == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -2298,6 +2300,8 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 int32_t ret = tscProcessSql(pNew); + *sent = 1; + // if failed to process sql, let following code handle the pSql if (ret == TSDB_CODE_SUCCESS) { tscFreeRetrieveSup(pSql); @@ -2343,7 +2347,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO subqueryIndex, tstrerror(pParentSql->res.code)); } else { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) { - if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { + int32_t sent = 0; + + tscReissueSubquery(trsupport, pSql, numOfRows, &sent); + if (sent) { return; } } else { // reach the maximum retry count, abort @@ -2497,7 +2504,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); - if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { + int32_t sent = 0; + + tscReissueSubquery(trsupport, pSql, numOfRows, &sent); + if (sent) { return; } } else { @@ -2619,7 +2629,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); - if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) { + + int32_t sent = 0; + + tscReissueSubquery(trsupport, pSql, code, &sent); + if (sent) { return; } } else { From fd6f387ec8fd549d41fd8ca3ab0ed1d848c23ff3 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Fri, 5 Feb 2021 09:27:10 +0800 Subject: [PATCH 4/4] fix bug --- src/client/src/tscServer.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8277d28fc9..b10b040c7b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -688,7 +688,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - return TSDB_CODE_TSC_INVALID_SQL; SSqlCmd *pCmd = &pSql->cmd; int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex);