From a9ba061cd890f49592b6f4a849c21e3f991bfc8a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Aug 2021 11:00:04 +0800 Subject: [PATCH 1/3] [td-6216] Send the client query requests in parallel. --- src/client/src/tscSubquery.c | 54 +++++++++++++++++++++++++++++++----- src/query/src/qExecutor.c | 4 +++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index f3af685ceb..45bcf4095a 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2433,6 +2433,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { return terrno; } +typedef struct SPair { + int32_t first; + int32_t second; +} SPair; + +static void doSendQueryReqs(SSchedMsg* pSchedMsg) { + SSqlObj* pSql = pSchedMsg->ahandle; + SPair* p = pSchedMsg->msg; + + for(int32_t i = p->first; i < p->second; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + SRetrieveSupport* pSupport = pSub->param; + + tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); + tscBuildAndSendRequest(pSub, NULL); + } + + tfree(p); +} + int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2555,13 +2575,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { doCleanupSubqueries(pSql, i); return pRes->code; } - - for(int32_t j = 0; j < pState->numOfSub; ++j) { - SSqlObj* pSub = pSql->pSubs[j]; - SRetrieveSupport* pSupport = pSub->param; - - tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); - tscBuildAndSendRequest(pSub, NULL); + + // concurrently sent the query requests. + const int32_t MAX_REQUEST_PER_TASK = 8; + + int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; + assert(numOfTasks >= 1); + + int32_t num = (pState->numOfSub/numOfTasks) + 1; + tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks); + + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doSendQueryReqs; + schedMsg.ahandle = (void*)pSql; + + schedMsg.thandle = NULL; + SPair* p = calloc(1, sizeof(SPair)); + p->first = j * num; + + if (j == numOfTasks - 1) { + p->second = pState->numOfSub; + } else { + p->second = (j + 1) * num; + } + + schedMsg.msg = p; + taosScheduleTask(tscQhandle, &schedMsg); } return TSDB_CODE_SUCCESS; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 303612fc8e..fdb7ae374b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7079,6 +7079,10 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_QID(pRuntimeEnv), count); } + if (pOperator->status == OP_EXEC_DONE) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + } + pRes->info.rows = count; return (pRes->info.rows == 0)? NULL:pInfo->pRes; } From 5585b4be875ceef9e0b44ecafcd1ef003768d6a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Aug 2021 11:13:29 +0800 Subject: [PATCH 2/3] [td-6216] remove the unused code segment. --- src/query/src/qExecutor.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index fdb7ae374b..3af41395f3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4265,6 +4265,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } qDebug("QInfo:0x%"PRIx64" set %d subscribe info", pQInfo->qId, total); + // Check if query is completed or not for stable query or normal table query respectively. if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) { setQueryStatus(pRuntimeEnv, QUERY_OVER); @@ -7079,10 +7080,6 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_QID(pRuntimeEnv), count); } - if (pOperator->status == OP_EXEC_DONE) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - } - pRes->info.rows = count; return (pRes->info.rows == 0)? NULL:pInfo->pRes; } From ac3208736beabfdade859b8df64a5d70703ec4fa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Aug 2021 11:25:33 +0800 Subject: [PATCH 3/3] [td-6216] add the removed code segment back. --- src/query/src/qExecutor.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3af41395f3..0e2aba1d24 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7080,6 +7080,10 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_QID(pRuntimeEnv), count); } + if (pOperator->status == OP_EXEC_DONE) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + } + pRes->info.rows = count; return (pRes->info.rows == 0)? NULL:pInfo->pRes; }