From 5956d8f4fd992a9e010a196c4326c23ad4d2ebe2 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 24 Nov 2022 15:38:48 +0800 Subject: [PATCH] enh: group by tbname optimize --- source/libs/executor/src/exchangeoperator.c | 54 +++++++++++---------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 64c2b8c27f..b01a4b7871 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -51,9 +51,9 @@ typedef struct SSourceDataInfo { const char* taskId; } SSourceDataInfo; -static void destroyExchangeOperatorInfo(void* param); -static void freeBlock(void* pParam); -static void freeSourceDataInfo(void* param); +static void destroyExchangeOperatorInfo(void* param); +static void freeBlock(void* pParam); +static void freeSourceDataInfo(void* param); static void* setAllSourcesCompleted(SOperatorInfo* pOperator); static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code); @@ -62,7 +62,8 @@ static int32_t getCompletedSources(const SArray* pArray); static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator); static int32_t seqLoadRemoteData(SOperatorInfo* pOperator); static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator); -static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); +static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, + bool holdDataInBuf); static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, @@ -106,7 +107,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->numOfRows == 0) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); @@ -125,14 +126,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", total:%.2f Kb, try next %d/%" PRIzu, + " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", total:%.2f Kb, try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, - pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, - i + 1, totalSources); + pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, + totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } @@ -157,7 +158,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } } - _error: +_error: pTaskInfo->code = code; } @@ -298,17 +299,19 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; qAppendTaskStopInfo(pTaskInfo, &stopInfo); - - pInfo->seqLoadData = true; + + pInfo->seqLoadData = pExNode->seqRecvData; pInfo->pTransporter = pTransporter; - setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, + pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); - pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); return pOperator; - _error: +_error: if (pInfo != NULL) { doDestroyExchangeOperatorInfo(pInfo); } @@ -379,7 +382,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), + pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; @@ -427,14 +431,14 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas taosMemoryFree(pWrapper); return pTaskInfo->code; } - + void* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; taosMemoryFree(pWrapper); return pTaskInfo->code; } - + if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; taosMemoryFree(pWrapper); @@ -524,7 +528,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pLoadInfo->totalElapsed / 1000.0); @@ -574,7 +578,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - + tsem_post(&pExchangeInfo->ready); return TSDB_CODE_SUCCESS; } @@ -636,7 +640,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 " try next", + ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); @@ -654,7 +658,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); @@ -663,7 +667,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, + ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); } @@ -675,7 +679,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } - _error: +_error: pTaskInfo->code = code; return code; }