From a64f13794129b85f39eb69ccf4bb122c985daae1 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 14 Jan 2021 09:16:56 +0800 Subject: [PATCH 01/11] add subquery states to trace subquery --- src/client/inc/tscSubquery.h | 4 + src/client/inc/tsclient.h | 3 +- src/client/src/tscSubquery.c | 216 +++++++++++++++------ src/client/src/tscUtil.c | 2 + tests/script/general/http/restful_full.sim | 1 + tests/script/general/parser/join.sim | 3 +- 6 files changed, 167 insertions(+), 62 deletions(-) diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index d3996ccf7f..f45dd85817 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -43,6 +43,10 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql); char *getArithmeticInputSrc(void *param, const char *name, int32_t colId); +void tscSpinLock(int32_t *lock); + +void tscSpinUnlock(int32_t *lock); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index a3a086ce77..27e9b2ced0 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -317,7 +317,8 @@ typedef struct STscObj { } STscObj; typedef struct SSubqueryState { - int32_t numOfRemain; // the number of remain unfinished subquery + int32_t subLock; + int8_t *states; int32_t numOfSub; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query } SSubqueryState; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d4f9620630..e39bd05c56 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -55,6 +55,72 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } } + +void tscSpinLock(int32_t *lock) { + int i = 0; + while (atomic_val_compare_exchange_32(lock, 0, 1) != 0) { + if (++i % 100 == 0) { + sched_yield(); + } + } +} + +void tscSpinUnlock(int32_t *lock) { + if (atomic_val_compare_exchange_32(lock, 1, 0) != 1) { + assert(false); + } +} + + +static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { + assert(idx < subState->numOfSub); + assert(subState->states); + + tscSpinLock(&subState->subLock); + + tscDebug("subquery:%p,%d state set to %d", pSql, idx, state); + + subState->states[idx] = state; + + tscSpinUnlock(&subState->subLock); +} + +static bool allSubqueryDone(SSubqueryState *subState) { + bool done = true; + + //lock in caller + + for (int i = 0; i < subState->numOfSub; i++) { + if (0 == subState->states[i]) { + tscDebug("subquery:%d is NOT finished, total:%d", i, subState->numOfSub); + done = false; + break; + } else { + tscDebug("subquery:%d is finished, total:%d", i, subState->numOfSub); + } + } + + return done; +} + +static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { + assert(idx < subState->numOfSub); + + tscSpinLock(&subState->subLock); + + tscDebug("subquery:%p,%d state set to 1", pSql, idx); + + subState->states[idx] = 1; + + bool done = allSubqueryDone(subState); + + tscSpinUnlock(&subState->subLock); + + return done; +} + + + static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -367,10 +433,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); - //the subqueries that do not actually launch the secondary query to virtual node is set as completed. - SSubqueryState* pState = &pSql->subState; - pState->numOfRemain = numOfSub; - bool success = true; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { @@ -403,6 +465,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { success = false; break; } + + subquerySetState(pNew, &pSql->subState, i, 0); tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; @@ -517,23 +581,23 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { SJoinSupporter* p = pSub->param; tscDestroyJoinSupporter(p); - if (pSub->res.code == TSDB_CODE_SUCCESS) { - taos_free_result(pSub); - } + taos_free_result(pSub); + pSql->pSubs[i] = NULL; } + tfree(pSql->subState.states); + pSql->subState.numOfSub = 0; } -static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - assert(pSqlObj->subState.numOfRemain > 0); - - if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) { - tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); +static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { + if (subAndCheckDone(pSqlSub, &pSqlObj->subState, pSupporter->subqueryIndex)) { + tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); freeJoinSubqueryObj(pSqlObj); + return; } - tscDestroyJoinSupporter(pSupporter); + //tscDestroyJoinSupporter(pSupporter); } // update the query time range according to the join results on timestamp @@ -785,7 +849,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); pParentSql->res.code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -802,7 +866,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p failed to malloc memory", pSql); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -844,9 +908,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); return; - } + } SArray *s1 = NULL, *s2 = NULL; int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); @@ -891,7 +956,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); pParentSql->subState.numOfSub = 2; - pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; + memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; @@ -922,7 +987,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); pParentSql->res.code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -937,7 +1002,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); @@ -955,7 +1020,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); @@ -1009,9 +1074,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { return; - } + } tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql); @@ -1088,9 +1153,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - assert(pState->numOfRemain > 0); - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { - tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub); + if (!subAndCheckDone(pSql, pState, pSupporter->subqueryIndex)) { + tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub); return; } @@ -1205,16 +1269,6 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } - // get the number of subquery that need to retrieve the next vnode. - if (orderedPrjQuery) { - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; - if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { - pSql->subState.numOfRemain++; - } - } - } - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -1242,9 +1296,13 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSub->cmd.command = TSDB_SQL_SELECT; pSub->fp = tscJoinQueryCallback; + subquerySetState(pSub, &pSql->subState, i, 0); + tscProcessSql(pSub); tryNextVnode = true; } else { + subquerySetState(pSub, &pSql->subState, i, 1); + tscDebug("%p no result in current subquery anymore", pSub); } } @@ -1270,7 +1328,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { // retrieve data from current vnode. tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); SJoinSupporter* pSupporter = NULL; - pSql->subState.numOfRemain = numOfFetch; + + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSql1 = pSql->pSubs[i]; + if (pSql1 == NULL) { + continue; + } + + SSqlRes* pRes1 = &pSql1->res; + + if (pRes1->row >= pRes1->numOfRows) { + subquerySetState(pSql1, &pSql->subState, i, 0); + } + } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; @@ -1370,7 +1440,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // retrieve actual query results from vnode during the second stage join subquery if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -1383,7 +1453,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code)); pParentSql->res.code = code; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -1410,9 +1480,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // In case of consequence query from other vnode, do not wait for other query response here. if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { return; - } + } } tscSetupOutputColumnIndex(pParentSql); @@ -1424,6 +1494,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; + + subquerySetState(pSql, &pParentSql->subState, pSupporter->subqueryIndex, 0); + tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query // set the command flag must be after the semaphore been correctly set. @@ -1459,8 +1532,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pSql->pSubs[pSql->subState.numOfRemain++] = pNew; - assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub); + pSql->pSubs[tableIndex] = pNew; if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -1592,6 +1664,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { int32_t code = TSDB_CODE_SUCCESS; pSql->subState.numOfSub = pQueryInfo->numOfTables; + if (pSql->subState.states == NULL) { + pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); + if (pSql->subState.states == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } + } + bool hasEmptySub = false; tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); @@ -1627,7 +1707,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine + memset(pSql->subState.states + i, 1, sizeof(*pSql->subState.states) * (pSql->subState.numOfSub - i)); // the already sent request will continue and do not go to the error process routine break; } } @@ -1711,7 +1791,18 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - pState->numOfRemain = pState->numOfSub; + if (pState->states == NULL) { + pState->states = calloc(pState->numOfSub, sizeof(*pState->states)); + if (pState->states == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscAsyncResultOnError(pSql); + tfree(pMemoryBuf); + return ret; + } + } + + memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); + pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; @@ -1860,7 +1951,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO assert(pSql != NULL); SSubqueryState* pState = &pParentSql->subState; - assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1891,14 +1981,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { - tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfSub - remain); + if (!subAndCheckDone(pSql, pState, subqueryIndex)) { + tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub); tscFreeRetrieveSup(pSql); return; - } + } // all subqueries are failed tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub, @@ -1963,14 +2051,12 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p return; } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { - tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfSub - remain); + if (!subAndCheckDone(pSql, &pParentSql->subState, idx)) { + tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex); tscFreeRetrieveSup(pSql); return; - } + } // all sub-queries are returned, start to local merge process pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; @@ -2016,7 +2102,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSqlObj * pParentSql = trsupport->pParentSql; SSubqueryState* pState = &pParentSql->subState; - assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; @@ -2237,7 +2322,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } } - if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(tres, &pParentObj->subState, pSupporter->index)) { + tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub); return; } @@ -2271,6 +2357,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); + subquerySetState(pSql, &pParentObj->subState, i, 0); + tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql); } } @@ -2285,7 +2373,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } pParentObj->cmd.parseFinished = false; - pParentObj->subState.numOfRemain = numOfFailed; tscResetSqlCmdObj(&pParentObj->cmd); @@ -2361,7 +2448,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { // the number of already initialized subqueries int32_t numOfSub = 0; - pSql->subState.numOfRemain = pSql->subState.numOfSub; + if (pSql->subState.states == NULL) { + pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); + if (pSql->subState.states == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } + } + + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { goto _error; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b44ebb3c98..e733012981 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -441,6 +441,8 @@ static void tscFreeSubobj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } + tfree(pSql->subState.states); + pSql->subState.numOfSub = 0; } diff --git a/tests/script/general/http/restful_full.sim b/tests/script/general/http/restful_full.sim index 7e12d30ac9..645ebd2788 100644 --- a/tests/script/general/http/restful_full.sim +++ b/tests/script/general/http/restful_full.sim @@ -15,6 +15,7 @@ print =============== step1 - login system_content curl 127.0.0.1:7111/rest/ print 1-> $system_content if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then + print $system_content return -1 endi diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index d18a3d7676..56f115051c 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -415,6 +415,7 @@ sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_m $val = 100 if $rows != $val then + print $rows return -1 endi @@ -514,4 +515,4 @@ sql drop table tm2; sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a; sql drop database ux1; -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT From c04a4f171266a3211adac7237086449e7dcc34f4 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 14 Jan 2021 14:33:01 +0800 Subject: [PATCH 02/11] fix bug --- src/client/inc/tsclient.h | 2 +- src/client/src/tscSubquery.c | 64 +++++++++++++++++++----------------- src/client/src/tscUtil.c | 6 +++- 3 files changed, 39 insertions(+), 33 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 27e9b2ced0..30d128f006 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -317,7 +317,7 @@ typedef struct STscObj { } STscObj; typedef struct SSubqueryState { - int32_t subLock; + pthread_mutex_t mutex; int8_t *states; int32_t numOfSub; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index e39bd05c56..51855f1575 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -55,34 +55,17 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } } - -void tscSpinLock(int32_t *lock) { - int i = 0; - while (atomic_val_compare_exchange_32(lock, 0, 1) != 0) { - if (++i % 100 == 0) { - sched_yield(); - } - } -} - -void tscSpinUnlock(int32_t *lock) { - if (atomic_val_compare_exchange_32(lock, 1, 0) != 1) { - assert(false); - } -} - - static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { assert(idx < subState->numOfSub); assert(subState->states); - tscSpinLock(&subState->subLock); - + pthread_mutex_lock(&subState->mutex); + tscDebug("subquery:%p,%d state set to %d", pSql, idx, state); subState->states[idx] = state; - tscSpinUnlock(&subState->subLock); + pthread_mutex_unlock(&subState->mutex); } static bool allSubqueryDone(SSubqueryState *subState) { @@ -106,7 +89,7 @@ static bool allSubqueryDone(SSubqueryState *subState) { static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { assert(idx < subState->numOfSub); - tscSpinLock(&subState->subLock); + pthread_mutex_lock(&subState->mutex); tscDebug("subquery:%p,%d state set to 1", pSql, idx); @@ -114,7 +97,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { bool done = allSubqueryDone(subState); - tscSpinUnlock(&subState->subLock); + pthread_mutex_unlock(&subState->mutex); return done; } @@ -432,6 +415,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); + memset(&pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); bool success = true; @@ -466,7 +450,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { break; } - subquerySetState(pNew, &pSql->subState, i, 0); tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; @@ -585,8 +568,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } + if (pSql->subState.states) { + pthread_mutex_destroy(&pSql->subState.mutex); + } + tfree(pSql->subState.states); + pSql->subState.numOfSub = 0; } @@ -1269,6 +1257,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } + + if (orderedPrjQuery) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { + pSql->subState.states[i] = 0; + } else { + pSql->subState.states[i] = 1; + } + } + } + + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -1296,13 +1297,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSub->cmd.command = TSDB_SQL_SELECT; pSub->fp = tscJoinQueryCallback; - subquerySetState(pSub, &pSql->subState, i, 0); - tscProcessSql(pSub); tryNextVnode = true; } else { - subquerySetState(pSub, &pSql->subState, i, 1); - tscDebug("%p no result in current subquery anymore", pSub); } } @@ -1494,8 +1491,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; - - subquerySetState(pSql, &pParentSql->subState, pSupporter->subqueryIndex, 0); tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query @@ -1670,6 +1665,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + + pthread_mutex_init(&pSql->subState.mutex, NULL); } bool hasEmptySub = false; @@ -1707,8 +1704,9 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - memset(pSql->subState.states + i, 1, sizeof(*pSql->subState.states) * (pSql->subState.numOfSub - i)); // the already sent request will continue and do not go to the error process routine - break; + pRes->code = code; + (*pSub->fp)(pSub->param, pSub, 0); + return; } } @@ -1799,6 +1797,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tfree(pMemoryBuf); return ret; } + + pthread_mutex_init(&pState->mutex, NULL); } memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); @@ -2454,6 +2454,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + + pthread_mutex_init(&pSql->subState.mutex, NULL); } memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e733012981..7b6c91d265 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -441,8 +441,12 @@ static void tscFreeSubobj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } + if (pSql->subState.states) { + pthread_mutex_destroy(&pSql->subState.mutex); + } + tfree(pSql->subState.states); - + pSql->subState.numOfSub = 0; } From 31ff3c0e3345e4d2922853bf474371731b2b3063 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 14 Jan 2021 15:21:08 +0800 Subject: [PATCH 03/11] fix bug --- src/client/src/tscSubquery.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 6450c4e272..305b0306bc 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2791,4 +2791,4 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { } return hasData; -} +} \ No newline at end of file From ef5e38e02524ee0ed510bb3dd6f0f795f94fa6ca Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 14 Jan 2021 15:33:35 +0800 Subject: [PATCH 04/11] fix bug --- src/client/src/tscSubquery.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 305b0306bc..379d2a9f77 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -415,7 +415,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); - memset(&pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); bool success = true; @@ -2791,4 +2791,4 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { } return hasData; -} \ No newline at end of file +} From d733e257c4bd01f7c58d9ceb6de9181885b21d03 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 14 Jan 2021 16:04:03 +0800 Subject: [PATCH 05/11] fix bug --- src/client/src/tscSubquery.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 379d2a9f77..1c81dfa3b2 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -415,7 +415,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); - memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * numOfSub); bool success = true; From c61cebe96227761ba2497524809eef2dff334198 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 14 Jan 2021 16:23:40 +0800 Subject: [PATCH 06/11] fix bug --- src/client/src/tscSubquery.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1c81dfa3b2..dccb836619 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -415,7 +415,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); - memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * numOfSub); bool success = true; @@ -527,6 +526,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { } } + subquerySetState(pPrevSub, &pSql->subState, i, 0); + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList), From e29a53ef42555f55f4d2d228a101dcb159c807e7 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 14 Jan 2021 19:21:19 +0800 Subject: [PATCH 07/11] fix bug --- src/client/src/tscSubquery.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index dccb836619..5316e64035 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1264,8 +1264,6 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { pSql->subState.states[i] = 0; - } else { - pSql->subState.states[i] = 1; } } } @@ -1702,15 +1700,25 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pSql->fp)(pSql->param, pSql, 0); } else { + int fail = 0; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; + if (fail) { + (*pSub->fp)(pSub->param, pSub, 0); + continue; + } + if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { pRes->code = code; (*pSub->fp)(pSub->param, pSub, 0); - return; + fail = 1; } } + if(fail) { + return; + } + pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE; } From 695572ccb224d16a21d8fd6389a037394313ca74 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Fri, 15 Jan 2021 09:32:50 +0800 Subject: [PATCH 08/11] fix bug --- src/client/src/tscSubquery.c | 37 +++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5316e64035..7aa3913817 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -68,25 +68,28 @@ static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, i pthread_mutex_unlock(&subState->mutex); } -static bool allSubqueryDone(SSubqueryState *subState) { +static bool allSubqueryDone(SSqlObj *pParentSql) { bool done = true; + SSubqueryState *subState = &pParentSql->subState; //lock in caller for (int i = 0; i < subState->numOfSub; i++) { if (0 == subState->states[i]) { - tscDebug("subquery:%d is NOT finished, total:%d", i, subState->numOfSub); + tscDebug("subquery:%p,%d is NOT finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub); done = false; break; } else { - tscDebug("subquery:%d is finished, total:%d", i, subState->numOfSub); + tscDebug("subquery:%p,%d is finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub); } } return done; } -static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { +static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { + SSubqueryState *subState = &pParentSql->subState; + assert(idx < subState->numOfSub); pthread_mutex_lock(&subState->mutex); @@ -95,7 +98,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { subState->states[idx] = 1; - bool done = allSubqueryDone(subState); + bool done = allSubqueryDone(pParentSql); pthread_mutex_unlock(&subState->mutex); @@ -580,7 +583,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { } static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - if (subAndCheckDone(pSqlSub, &pSqlObj->subState, pSupporter->subqueryIndex)) { + if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) { tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); freeJoinSubqueryObj(pSqlObj); return; @@ -897,7 +900,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); return; } @@ -945,8 +948,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); pParentSql->subState.numOfSub = 2; + memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); - + tscDebug("%p reset all sub states to 0", pParentSql); + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; issueTSCompQuery(sub, sub->param, pParentSql); @@ -1063,7 +1068,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { return; } @@ -1142,7 +1147,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - if (!subAndCheckDone(pSql, pState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub); return; } @@ -1263,7 +1268,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { - pSql->subState.states[i] = 0; + subquerySetState(pSub, &pSql->subState, i, 0); } } } @@ -1476,7 +1481,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // In case of consequence query from other vnode, do not wait for other query response here. if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { - if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { return; } } @@ -1830,6 +1835,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { } memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); + tscDebug("%p reset all sub states to 0", pSql); pRes->code = TSDB_CODE_SUCCESS; @@ -2009,7 +2015,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - if (!subAndCheckDone(pSql, pState, subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) { tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub); tscFreeRetrieveSup(pSql); @@ -2079,7 +2085,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p return; } - if (!subAndCheckDone(pSql, &pParentSql->subState, idx)) { + if (!subAndCheckDone(pSql, pParentSql, idx)) { tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex); tscFreeRetrieveSup(pSql); @@ -2350,7 +2356,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } } - if (!subAndCheckDone(tres, &pParentObj->subState, pSupporter->index)) { + if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) { tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub); return; } @@ -2487,6 +2493,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { } memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + tscDebug("%p reset all sub states to 0", pSql); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { From 87a6dd443acbfdadcc1c926a37feec674575ec43 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Fri, 15 Jan 2021 10:40:03 +0800 Subject: [PATCH 09/11] add debug log --- src/client/src/tscSubquery.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 7aa3913817..2ec03e47b3 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -76,11 +76,11 @@ static bool allSubqueryDone(SSqlObj *pParentSql) { for (int i = 0; i < subState->numOfSub; i++) { if (0 == subState->states[i]) { - tscDebug("subquery:%p,%d is NOT finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub); + tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); done = false; break; } else { - tscDebug("subquery:%p,%d is finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub); + tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); } } @@ -94,7 +94,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { pthread_mutex_lock(&subState->mutex); - tscDebug("subquery:%p,%d state set to 1", pSql, idx); + tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx); subState->states[idx] = 1; From 68ef3a245f28e827244f5563374af167edba85a5 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Fri, 15 Jan 2021 11:28:45 +0800 Subject: [PATCH 10/11] add parent code check for callbacks --- src/client/src/tscSubquery.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2ec03e47b3..ee9526e297 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -833,6 +833,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code); + quitAllSubquery(pSql, pParentSql, pSupporter); + + tscAsyncResultOnError(pParentSql); + + return; + } + // check for the error code firstly if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // todo retry if other subqueries are not failed @@ -974,6 +983,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)); + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code); + quitAllSubquery(pSql, pParentSql, pSupporter); + + tscAsyncResultOnError(pParentSql); + + return; + } + // check for the error code firstly if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // todo retry if other subqueries are not failed yet @@ -1108,6 +1126,17 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR SSqlRes* pRes = &pSql->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code); + quitAllSubquery(pSql, pParentSql, pSupporter); + + tscAsyncResultOnError(pParentSql); + + return; + } + + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(numOfRows == taos_errno(pSql)); @@ -1672,6 +1701,9 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pthread_mutex_init(&pSql->subState.mutex, NULL); } + + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + tscDebug("%p reset all sub states to 0", pSql); bool hasEmptySub = false; From 4527694b441fbc06464e5f00484e9fa0f99762db Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Fri, 15 Jan 2021 18:11:43 +0800 Subject: [PATCH 11/11] remove error msg --- src/client/src/tscLocalMerge.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 4aa751574c..5c12acda2f 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1428,6 +1428,10 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { tscResetForNextRetrieve(pRes); if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed + if (pRes->code == TSDB_CODE_SUCCESS) { + return pRes->code; + } + tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code)); return pRes->code; }