enh: group by tbname optimize

This commit is contained in:
Xiaoyu Wang 2022-11-24 15:38:48 +08:00
parent 61a634e8c2
commit 5956d8f4fd
1 changed files with 29 additions and 25 deletions

View File

@ -51,9 +51,9 @@ typedef struct SSourceDataInfo {
const char* taskId; const char* taskId;
} SSourceDataInfo; } SSourceDataInfo;
static void destroyExchangeOperatorInfo(void* param); static void destroyExchangeOperatorInfo(void* param);
static void freeBlock(void* pParam); static void freeBlock(void* pParam);
static void freeSourceDataInfo(void* param); static void freeSourceDataInfo(void* param);
static void* setAllSourcesCompleted(SOperatorInfo* pOperator); static void* setAllSourcesCompleted(SOperatorInfo* pOperator);
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
@ -62,7 +62,8 @@ static int32_t getCompletedSources(const SArray* pArray);
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator); static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator); static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator); static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
bool holdDataInBuf);
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
@ -106,7 +107,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
@ -125,14 +126,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb, try next %d/%" PRIzu, ", total:%.2f Kb, try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
i + 1, totalSources); totalSources);
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
} }
@ -157,7 +158,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
} }
} }
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
} }
@ -298,17 +299,19 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
qAppendTaskStopInfo(pTaskInfo, &stopInfo); qAppendTaskStopInfo(pTaskInfo, &stopInfo);
pInfo->seqLoadData = true; pInfo->seqLoadData = pExNode->seqRecvData;
pInfo->pTransporter = pTransporter; pInfo->pTransporter = pTransporter;
setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
doDestroyExchangeOperatorInfo(pInfo); doDestroyExchangeOperatorInfo(pInfo);
} }
@ -379,7 +382,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
} else { } else {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
pSourceDataInfo->code = code; pSourceDataInfo->code = code;
qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo); qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
pExchangeInfo);
} }
pSourceDataInfo->status = EX_SOURCE_DATA_READY; pSourceDataInfo->status = EX_SOURCE_DATA_READY;
@ -427,14 +431,14 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
return pTaskInfo->code; return pTaskInfo->code;
} }
void* msg = taosMemoryCalloc(1, msgSize); void* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
return pTaskInfo->code; return pTaskInfo->code;
} }
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
@ -524,7 +528,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms", qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
pLoadInfo->totalElapsed / 1000.0); pLoadInfo->totalElapsed / 1000.0);
@ -574,7 +578,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
tsem_post(&pExchangeInfo->ready); tsem_post(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -636,7 +640,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 " try next", ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->totalRows, pLoadInfo->totalRows);
@ -654,7 +658,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources); totalSources);
@ -663,7 +667,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
pExchangeInfo->current += 1; pExchangeInfo->current += 1;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64, ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pLoadInfo->totalRows, pLoadInfo->totalSize); pLoadInfo->totalRows, pLoadInfo->totalSize);
} }
@ -675,7 +679,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }