refactor(query): do some internal refactor.
This commit is contained in:
parent
1ee075dce0
commit
6abc13e843
|
@ -874,8 +874,6 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
|
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
pRequest->metric.resultReady = taosGetTimestampUs();
|
|
||||||
|
|
||||||
if (pResult) {
|
if (pResult) {
|
||||||
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
||||||
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
|
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
|
||||||
|
@ -1059,7 +1057,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->metric.planEnd = taosGetTimestampUs();
|
pRequest->metric.planEnd = taosGetTimestampUs();
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
||||||
SArray* pNodeList = NULL;
|
SArray* pNodeList = NULL;
|
||||||
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
||||||
|
|
|
@ -817,7 +817,6 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
pRequest->metric.syntaxEnd = taosGetTimestampUs();
|
pRequest->metric.syntaxEnd = taosGetTimestampUs();
|
||||||
|
|
||||||
if (!updateMetaForce) {
|
if (!updateMetaForce) {
|
||||||
STscObj *pTscObj = pRequest->pTscObj;
|
|
||||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
if (NULL == pRequest->pQuery->pRoot) {
|
if (NULL == pRequest->pQuery->pRoot) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||||
|
@ -864,6 +863,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)param;
|
SRequestObj *pRequest = (SRequestObj *)param;
|
||||||
|
|
||||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
|
pRequest->metric.resultReady = taosGetTimestampUs();
|
||||||
|
|
||||||
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
||||||
tstrerror(code), pRequest->requestId);
|
tstrerror(code), pRequest->requestId);
|
||||||
|
|
|
@ -487,6 +487,8 @@ int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* tas
|
||||||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
|
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
|
||||||
void* param) {
|
void* param) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
|
int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
|
||||||
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
|
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
|
||||||
int32_t tbHashNum = (int32_t)ctgGetTablesReqNum(pReq->pTableHash);
|
int32_t tbHashNum = (int32_t)ctgGetTablesReqNum(pReq->pTableHash);
|
||||||
|
@ -634,12 +636,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
|
|
||||||
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId,
|
double el = (taosGetTimestampUs() - st)/1000.0;
|
||||||
taskNum, pReq->forceUpdate);
|
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms",
|
||||||
|
pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate, el);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
ctgFreeJob(*job);
|
ctgFreeJob(*job);
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue