diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 73a30a62f5..ebf3d83a1a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -372,11 +372,14 @@ typedef struct STaskParam { typedef struct SExchangeInfo { SArray *pSources; - uint64_t bytes; // total load bytes from remote tsem_t ready; void *pTransporter; SRetrieveTableRsp *pRsp; SSDataBlock *pResult; + int32_t current; + uint64_t rowsOfCurrentSource; + uint64_t bytes; // total load bytes from remote + uint64_t totalRows; } SExchangeInfo; typedef struct STableScanInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e963c49c86..02827ec32c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5144,72 +5144,110 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; *newgroup = false; - if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) { + + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + if (pExchangeInfo->current >= totalSources) { return NULL; } - SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { // todo handle malloc error - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _error; - } + SResFetchReq* pMsg = NULL; + SMsgSendInfo* pMsgSendInfo = NULL; - SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0); - SEpSet epSet = {0}; - - epSet.numOfEps = pSource->addr.numOfEps; - epSet.port[0] = pSource->addr.epAddr[0].port; - tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); - - pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); - - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _error; - } - - pMsgSendInfo->param = pExchangeInfo; - pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); - pMsgSendInfo->msgType = TDMT_VND_FETCH; - pMsgSendInfo->fp = loadRemoteDataCallback; - - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pExchangeInfo->ready); - - if (pExchangeInfo->pRsp->numOfRows == 0) { - return NULL; - } - - SSDataBlock* pRes = pExchangeInfo->pResult; - char* pData = pExchangeInfo->pRsp->data; - - for(int32_t i = 0; i < pOperator->numOfOutput; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); - char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows); - if (tmp == NULL) { + while(1) { + pMsg = calloc(1, sizeof(SResFetchReq)); + if (NULL == pMsg) { // todo handle malloc error + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _error; } - size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; - memcpy(tmp, pData, len); + SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); + SEpSet epSet = {0}; - pColInfoData->pData = tmp; - pData += len; + epSet.numOfEps = pSource->addr.numOfEps; + epSet.port[0] = pSource->addr.epAddr[0].port; + tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); + + qDebug("QID:0x%" PRIx64 " build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources); + + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); + + // send the fetch remote task result reques + pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("QID:0x%" PRIx64 " prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; + } + + pMsgSendInfo->param = pExchangeInfo; + pMsgSendInfo->msgInfo.pData = pMsg; + pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); + pMsgSendInfo->msgType = TDMT_VND_FETCH; + pMsgSendInfo->fp = loadRemoteDataCallback; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); + tsem_wait(&pExchangeInfo->ready); + + SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp; + if (pRsp->numOfRows == 0) { + if (pExchangeInfo->current >= taosArrayGetSize(pExchangeInfo->pSources)) { + return NULL; + } + + qDebug("QID:0x%"PRIx64" vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, + pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows); + + pExchangeInfo->rowsOfCurrentSource = 0; + pExchangeInfo->current += 1; + continue; + } + + SSDataBlock* pRes = pExchangeInfo->pResult; + char* pData = pRsp->data; + + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows); + if (tmp == NULL) { + goto _error; + } + + size_t len = pRsp->numOfRows * pColInfoData->info.bytes; + memcpy(tmp, pData, len); + + pColInfoData->pData = tmp; + pData += len; + } + + pRes->info.numOfCols = pOperator->numOfOutput; + pRes->info.rows = pRsp->numOfRows; + + pExchangeInfo->totalRows += pRsp->numOfRows; + pExchangeInfo->bytes += pRsp->compLen; + pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows; + + if (pRsp->completed == 1) { + qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes, + pExchangeInfo->current + 1, totalSources); + + pExchangeInfo->rowsOfCurrentSource = 0; + pExchangeInfo->current += 1; + } else { + qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes); + } + + return pExchangeInfo->pResult; } - pRes->info.numOfCols = pOperator->numOfOutput; - pRes->info.rows = pExchangeInfo->pRsp->numOfRows; - - return pExchangeInfo->pResult; - _error: tfree(pMsg); tfree(pMsgSendInfo); @@ -7719,7 +7757,6 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); } else if (pPhyNode->info.type == OP_StreamScan) { - size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo); }