diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 08fef72107..64c2b8c27f 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -299,7 +299,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; qAppendTaskStopInfo(pTaskInfo, &stopInfo); - pInfo->seqLoadData = false; + pInfo->seqLoadData = true; pInfo->pTransporter = pTransporter; setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -614,13 +614,15 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } + SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; + doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); if (pDataInfo->code != TSDB_CODE_SUCCESS) {