diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4cb233f71e..27c24669cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4558,6 +4558,7 @@ void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); +void createResultBlock(const SArray* pExprInfo, SExchangeInfo* pInfo, const SOperatorInfo* pOperator, size_t size); static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) { STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; #if 0 @@ -4930,7 +4931,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } -void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { +void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; assert(pMsg->ahandle != NULL); @@ -4958,13 +4959,14 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; *newgroup = false; - if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) { return NULL; } SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); if (NULL == pMsg) { // todo handle malloc error + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0); @@ -4983,6 +4985,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } pMsgSendInfo->param = pExchangeInfo; @@ -4993,7 +4997,6 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pExchangeInfo->ready); if (pExchangeInfo->pRsp->numOfRows == 0) { @@ -5007,7 +5010,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows); if (tmp == NULL) { - // todo + goto _error; } size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; @@ -5021,8 +5024,17 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { pRes->info.rows = pExchangeInfo->pRsp->numOfRows; return pExchangeInfo->pResult; + + _error: + tfree(pMsg); + tfree(pMsgSendInfo); + + terrno = pTaskInfo->code; + return NULL; } +static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); + SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -5038,21 +5050,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* assert(taosArrayGetSize(pInfo->pSources) > 0); size_t size = taosArrayGetSize(pExprInfo); - pInfo->pResult = calloc(1, sizeof(SSDataBlock)); - pInfo->pResult->pDataBlock = taosArrayInit(pOperator->numOfOutput, sizeof(SColumnInfoData)); - - SArray* pResult = pInfo->pResult->pDataBlock; - for(int32_t i = 0; i < size; ++i) { - SColumnInfoData colInfoData = {0}; - SExprInfo* p = taosArrayGetP(pExprInfo, i); - - SSchema* pSchema = &p->base.resSchema; - colInfoData.info.type = pSchema->type; - colInfoData.info.colId = pSchema->colId; - colInfoData.info.bytes = pSchema->bytes; - - taosArrayPush(pResult, &colInfoData); - } + pInfo->pResult = createResultDataBlock(pExprInfo); pOperator->name = "ExchangeOperator"; pOperator->operatorType = OP_Exchange; @@ -5064,13 +5062,13 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->exec = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; - { + { // todo refactor SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processMsgFromServer; + rpcInit.cfp = processRspMsg; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)"root"; @@ -5088,6 +5086,31 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* return pOperator; } +SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { + SSDataBlock* pResBlock = calloc(1, sizeof(SSDataBlock)); + if (pResBlock == NULL) { + return NULL; + } + + size_t numOfCols = taosArrayGetSize(pExprInfo); + pResBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + + SArray* pResult = pResBlock->pDataBlock; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfoData = {0}; + SExprInfo* p = taosArrayGetP(pExprInfo, i); + + SSchema* pSchema = &p->base.resSchema; + colInfoData.info.type = pSchema->type; + colInfoData.info.colId = pSchema->colId; + colInfoData.info.bytes = pSchema->bytes; + + taosArrayPush(pResult, &colInfoData); + } + + return pResBlock; +} + SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0 && numOfOutput > 0);