From a9ba061cd890f49592b6f4a849c21e3f991bfc8a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Aug 2021 11:00:04 +0800 Subject: [PATCH] [td-6216] Send the client query requests in parallel. --- src/client/src/tscSubquery.c | 54 +++++++++++++++++++++++++++++++----- src/query/src/qExecutor.c | 4 +++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index f3af685ceb..45bcf4095a 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2433,6 +2433,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { return terrno; } +typedef struct SPair { + int32_t first; + int32_t second; +} SPair; + +static void doSendQueryReqs(SSchedMsg* pSchedMsg) { + SSqlObj* pSql = pSchedMsg->ahandle; + SPair* p = pSchedMsg->msg; + + for(int32_t i = p->first; i < p->second; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + SRetrieveSupport* pSupport = pSub->param; + + tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); + tscBuildAndSendRequest(pSub, NULL); + } + + tfree(p); +} + int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2555,13 +2575,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { doCleanupSubqueries(pSql, i); return pRes->code; } - - for(int32_t j = 0; j < pState->numOfSub; ++j) { - SSqlObj* pSub = pSql->pSubs[j]; - SRetrieveSupport* pSupport = pSub->param; - - tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); - tscBuildAndSendRequest(pSub, NULL); + + // concurrently sent the query requests. + const int32_t MAX_REQUEST_PER_TASK = 8; + + int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; + assert(numOfTasks >= 1); + + int32_t num = (pState->numOfSub/numOfTasks) + 1; + tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks); + + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doSendQueryReqs; + schedMsg.ahandle = (void*)pSql; + + schedMsg.thandle = NULL; + SPair* p = calloc(1, sizeof(SPair)); + p->first = j * num; + + if (j == numOfTasks - 1) { + p->second = pState->numOfSub; + } else { + p->second = (j + 1) * num; + } + + schedMsg.msg = p; + taosScheduleTask(tscQhandle, &schedMsg); } return TSDB_CODE_SUCCESS; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 303612fc8e..fdb7ae374b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7079,6 +7079,10 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_QID(pRuntimeEnv), count); } + if (pOperator->status == OP_EXEC_DONE) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + } + pRes->info.rows = count; return (pRes->info.rows == 0)? NULL:pInfo->pRes; }