Merge pull request #7474 from taosdata/fix/20
[td-6229] Send the query request in parallel if the number of subquer…
This commit is contained in:
commit
2751ee2215
|
@ -15,8 +15,9 @@
|
|||
#define _GNU_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "texpr.h"
|
||||
|
||||
#include "tsched.h"
|
||||
#include "qTsbuf.h"
|
||||
#include "tcompare.h"
|
||||
#include "tscLog.h"
|
||||
|
@ -2423,6 +2424,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
typedef struct SPair {
|
||||
int32_t first;
|
||||
int32_t second;
|
||||
} SPair;
|
||||
|
||||
static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
|
||||
SSqlObj* pSql = pSchedMsg->ahandle;
|
||||
SPair* p = pSchedMsg->msg;
|
||||
|
||||
for(int32_t i = p->first; i < p->second; ++i) {
|
||||
SSqlObj* pSub = pSql->pSubs[i];
|
||||
SRetrieveSupport* pSupport = pSub->param;
|
||||
|
||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
|
||||
tscProcessSql(pSub);
|
||||
}
|
||||
|
||||
tfree(p);
|
||||
}
|
||||
|
||||
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
@ -2546,13 +2567,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
doCleanupSubqueries(pSql, i);
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
for(int32_t j = 0; j < pState->numOfSub; ++j) {
|
||||
SSqlObj* pSub = pSql->pSubs[j];
|
||||
SRetrieveSupport* pSupport = pSub->param;
|
||||
|
||||
tscDebug("0x%"PRIx64" sub:%p launch subquery, orderOfSub:%d.", pSql->self, pSub, pSupport->subqueryIndex);
|
||||
tscProcessSql(pSub);
|
||||
|
||||
// concurrently sent the query requests.
|
||||
const int32_t MAX_REQUEST_PER_TASK = 8;
|
||||
|
||||
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
|
||||
assert(numOfTasks >= 1);
|
||||
|
||||
int32_t num = (pState->numOfSub/numOfTasks) + 1;
|
||||
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
|
||||
|
||||
for(int32_t j = 0; j < numOfTasks; ++j) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = doSendQueryReqs;
|
||||
schedMsg.ahandle = (void*)pSql;
|
||||
|
||||
schedMsg.thandle = NULL;
|
||||
SPair* p = calloc(1, sizeof(SPair));
|
||||
p->first = j * num;
|
||||
|
||||
if (j == numOfTasks - 1) {
|
||||
p->second = pState->numOfSub;
|
||||
} else {
|
||||
p->second = (j + 1) * num;
|
||||
}
|
||||
|
||||
schedMsg.msg = p;
|
||||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue