commit
4c2e4f2930
|
@ -2433,6 +2433,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
typedef struct SPair {
|
||||
int32_t first;
|
||||
int32_t second;
|
||||
} SPair;
|
||||
|
||||
static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
|
||||
SSqlObj* pSql = pSchedMsg->ahandle;
|
||||
SPair* p = pSchedMsg->msg;
|
||||
|
||||
for(int32_t i = p->first; i < p->second; ++i) {
|
||||
SSqlObj* pSub = pSql->pSubs[i];
|
||||
SRetrieveSupport* pSupport = pSub->param;
|
||||
|
||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
|
||||
tscBuildAndSendRequest(pSub, NULL);
|
||||
}
|
||||
|
||||
tfree(p);
|
||||
}
|
||||
|
||||
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
@ -2556,12 +2576,32 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
return pRes->code;
|
||||
}
|
||||
|
||||
for(int32_t j = 0; j < pState->numOfSub; ++j) {
|
||||
SSqlObj* pSub = pSql->pSubs[j];
|
||||
SRetrieveSupport* pSupport = pSub->param;
|
||||
// concurrently sent the query requests.
|
||||
const int32_t MAX_REQUEST_PER_TASK = 8;
|
||||
|
||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
|
||||
tscBuildAndSendRequest(pSub, NULL);
|
||||
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
|
||||
assert(numOfTasks >= 1);
|
||||
|
||||
int32_t num = (pState->numOfSub/numOfTasks) + 1;
|
||||
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
|
||||
|
||||
for(int32_t j = 0; j < numOfTasks; ++j) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = doSendQueryReqs;
|
||||
schedMsg.ahandle = (void*)pSql;
|
||||
|
||||
schedMsg.thandle = NULL;
|
||||
SPair* p = calloc(1, sizeof(SPair));
|
||||
p->first = j * num;
|
||||
|
||||
if (j == numOfTasks - 1) {
|
||||
p->second = pState->numOfSub;
|
||||
} else {
|
||||
p->second = (j + 1) * num;
|
||||
}
|
||||
|
||||
schedMsg.msg = p;
|
||||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -4265,6 +4265,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
|||
}
|
||||
|
||||
qDebug("QInfo:0x%"PRIx64" set %d subscribe info", pQInfo->qId, total);
|
||||
|
||||
// Check if query is completed or not for stable query or normal table query respectively.
|
||||
if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) {
|
||||
setQueryStatus(pRuntimeEnv, QUERY_OVER);
|
||||
|
@ -7079,6 +7080,10 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
|||
qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_QID(pRuntimeEnv), count);
|
||||
}
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
}
|
||||
|
||||
pRes->info.rows = count;
|
||||
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue