commit
b1f869c192
|
@ -318,6 +318,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC
|
|||
|
||||
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
|
||||
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
|
||||
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries);
|
||||
|
||||
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
|
||||
|
||||
|
|
|
@ -2459,11 +2459,48 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
|
|||
tfree(p);
|
||||
}
|
||||
|
||||
static void doConcurrentlySendSubQueries(SSqlObj* pSql) {
|
||||
SSubqueryState *pState = &pSql->subState;
|
||||
|
||||
// concurrently sent the query requests.
|
||||
const int32_t MAX_REQUEST_PER_TASK = 8;
|
||||
|
||||
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
|
||||
assert(numOfTasks >= 1);
|
||||
|
||||
int32_t num;
|
||||
if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) {
|
||||
num = MAX_REQUEST_PER_TASK;
|
||||
} else {
|
||||
num = pState->numOfSub / numOfTasks + 1;
|
||||
}
|
||||
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
|
||||
|
||||
for(int32_t j = 0; j < numOfTasks; ++j) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = doSendQueryReqs;
|
||||
schedMsg.ahandle = (void*)pSql;
|
||||
|
||||
schedMsg.thandle = NULL;
|
||||
SPair* p = calloc(1, sizeof(SPair));
|
||||
p->first = j * num;
|
||||
|
||||
if (j == numOfTasks - 1) {
|
||||
p->second = pState->numOfSub;
|
||||
} else {
|
||||
p->second = (j + 1) * num;
|
||||
}
|
||||
|
||||
schedMsg.msg = p;
|
||||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
// pRes->code check only serves in launching metric sub-queries
|
||||
// pRes->code check only serves in launching super table sub-queries
|
||||
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
|
||||
return pRes->code;
|
||||
|
@ -2474,22 +2511,23 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
|
||||
pRes->qId = 0x1; // hack the qhandle check
|
||||
|
||||
const uint32_t nBufferSize = (1u << 18u); // 256KB
|
||||
const uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
SSubqueryState *pState = &pSql->subState;
|
||||
|
||||
pState->numOfSub = 0;
|
||||
if (pTableMetaInfo->pVgroupTables == NULL) {
|
||||
pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
|
||||
} else {
|
||||
pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
|
||||
: (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
|
||||
int32_t ret = doInitSubState(pSql, numOfSub);
|
||||
if (ret != 0) {
|
||||
tscAsyncResultOnError(pSql);
|
||||
return ret;
|
||||
}
|
||||
|
||||
assert(pState->numOfSub > 0);
|
||||
|
||||
int32_t ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
|
||||
ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
|
||||
if (ret != 0) {
|
||||
pRes->code = ret;
|
||||
tscAsyncResultOnError(pSql);
|
||||
|
@ -2499,32 +2537,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
|
||||
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
|
||||
if (pSql->pSubs == NULL) {
|
||||
tfree(pSql->pSubs);
|
||||
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub);
|
||||
|
||||
tscAsyncResultOnError(pSql);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (pState->states == NULL) {
|
||||
pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
|
||||
if (pState->states == NULL) {
|
||||
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub);
|
||||
|
||||
tscAsyncResultOnError(pSql);
|
||||
return ret;
|
||||
}
|
||||
|
||||
pthread_mutex_init(&pState->mutex, NULL);
|
||||
}
|
||||
|
||||
memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
|
||||
tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
|
||||
|
||||
pRes->code = TSDB_CODE_SUCCESS;
|
||||
|
||||
int32_t i = 0;
|
||||
|
@ -2545,8 +2557,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
break;
|
||||
}
|
||||
|
||||
trs->subqueryIndex = i;
|
||||
trs->pParentSql = pSql;
|
||||
trs->subqueryIndex = i;
|
||||
trs->pParentSql = pSql;
|
||||
|
||||
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
|
||||
if (pNew == NULL) {
|
||||
|
@ -2582,39 +2594,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
return pRes->code;
|
||||
}
|
||||
|
||||
// concurrently sent the query requests.
|
||||
const int32_t MAX_REQUEST_PER_TASK = 8;
|
||||
|
||||
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
|
||||
assert(numOfTasks >= 1);
|
||||
|
||||
int32_t num;
|
||||
if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) {
|
||||
num = MAX_REQUEST_PER_TASK;
|
||||
} else {
|
||||
num = pState->numOfSub / numOfTasks + 1;
|
||||
}
|
||||
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
|
||||
|
||||
for(int32_t j = 0; j < numOfTasks; ++j) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = doSendQueryReqs;
|
||||
schedMsg.ahandle = (void*)pSql;
|
||||
|
||||
schedMsg.thandle = NULL;
|
||||
SPair* p = calloc(1, sizeof(SPair));
|
||||
p->first = j * num;
|
||||
|
||||
if (j == numOfTasks - 1) {
|
||||
p->second = pState->numOfSub;
|
||||
} else {
|
||||
p->second = (j + 1) * num;
|
||||
}
|
||||
|
||||
schedMsg.msg = p;
|
||||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
}
|
||||
|
||||
doConcurrentlySendSubQueries(pSql);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -3944,6 +3944,21 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
|
|||
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
|
||||
}
|
||||
|
||||
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
|
||||
assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL);
|
||||
pSql->subState.numOfSub = numOfSubqueries;
|
||||
|
||||
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
|
||||
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
|
||||
|
||||
int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL);
|
||||
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != 0) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// do execute the query according to the query execution plan
|
||||
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -3959,16 +3974,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
|
||||
assert(pSql->subState.numOfSub == 0);
|
||||
pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream);
|
||||
assert(pSql->pSubs == NULL);
|
||||
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
|
||||
assert(pSql->subState.states == NULL);
|
||||
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
|
||||
code = pthread_mutex_init(&pSql->subState.mutex, NULL);
|
||||
|
||||
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
code = doInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -4315,7 +4322,9 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
}
|
||||
|
||||
tfree(pSql->pSubs);
|
||||
tfree(pSql->subState.states);
|
||||
pSql->subState.numOfSub = 0;
|
||||
pthread_mutex_destroy(&pSql->subState.mutex);
|
||||
|
||||
pSql->fp = fp;
|
||||
|
||||
|
|
|
@ -112,14 +112,15 @@ void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfEle
|
|||
i += 1;
|
||||
}
|
||||
|
||||
assert(i == pData[numOfElems - 1] + 1);
|
||||
assert(i == pData[numOfElems - 1] + 1 && i <= size);
|
||||
|
||||
int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1;
|
||||
int32_t srcIndex = pData[numOfElems - 1] + 1;
|
||||
|
||||
char* dst = TARRAY_GET_ELEM(pArray, dstIndex);
|
||||
char* src = TARRAY_GET_ELEM(pArray, srcIndex);
|
||||
memmove(dst, src, pArray->elemSize * (pArray->size - numOfElems));
|
||||
int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1;
|
||||
if (pArray->size - srcIndex > 0) {
|
||||
char* dst = TARRAY_GET_ELEM(pArray, dstIndex);
|
||||
char* src = TARRAY_GET_ELEM(pArray, srcIndex);
|
||||
memmove(dst, src, pArray->elemSize * (pArray->size - srcIndex));
|
||||
}
|
||||
|
||||
pArray->size -= numOfElems;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue