Merge pull request #15581 from taosdata/feature/3_liaohj
fix(query): fix memory leak.
This commit is contained in:
commit
4b4cc03fc6
|
@ -155,7 +155,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
|
||||||
|
|
||||||
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
||||||
|
|
||||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes);
|
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/);
|
||||||
|
|
||||||
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
|
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
|
||||||
|
|
||||||
|
|
|
@ -16,65 +16,9 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
|
|
||||||
pEp->port = 0;
|
|
||||||
strcpy(pEp->fqdn, ep);
|
|
||||||
|
|
||||||
char* temp = strchr(pEp->fqdn, ':');
|
|
||||||
if (temp) {
|
|
||||||
*temp = 0;
|
|
||||||
pEp->port = atoi(temp + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEp->port == 0) {
|
|
||||||
pEp->port = tsServerPort;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port) {
|
|
||||||
if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t index = pEpSet->numOfEps;
|
|
||||||
tstrncpy(pEpSet->eps[index].fqdn, fqdn, tListLen(pEpSet->eps[index].fqdn));
|
|
||||||
pEpSet->eps[index].port = port;
|
|
||||||
pEpSet->numOfEps += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
|
|
||||||
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < s1->numOfEps; i++) {
|
|
||||||
if (s1->eps[i].port != s2->eps[i].port || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
|
|
||||||
taosCorBeginWrite(&pEpSet->version);
|
|
||||||
pEpSet->epSet = *pNewEpSet;
|
|
||||||
taosCorEndWrite(&pEpSet->version);
|
|
||||||
}
|
|
||||||
|
|
||||||
SEpSet getEpSet_s(SCorEpSet* pEpSet) {
|
|
||||||
SEpSet ep = {0};
|
|
||||||
taosCorBeginRead(&pEpSet->version);
|
|
||||||
ep = pEpSet->epSet;
|
|
||||||
taosCorEndRead(&pEpSet->version);
|
|
||||||
|
|
||||||
return ep;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||||
ASSERT(pColumnInfoData != NULL);
|
ASSERT(pColumnInfoData != NULL);
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tname.h"
|
||||||
|
|
||||||
|
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
|
||||||
|
pEp->port = 0;
|
||||||
|
strcpy(pEp->fqdn, ep);
|
||||||
|
|
||||||
|
char* temp = strchr(pEp->fqdn, ':');
|
||||||
|
if (temp) {
|
||||||
|
*temp = 0;
|
||||||
|
pEp->port = atoi(temp + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEp->port == 0) {
|
||||||
|
pEp->port = tsServerPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port) {
|
||||||
|
if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t index = pEpSet->numOfEps;
|
||||||
|
tstrncpy(pEpSet->eps[index].fqdn, fqdn, tListLen(pEpSet->eps[index].fqdn));
|
||||||
|
pEpSet->eps[index].port = port;
|
||||||
|
pEpSet->numOfEps += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
|
||||||
|
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < s1->numOfEps; i++) {
|
||||||
|
if (s1->eps[i].port != s2->eps[i].port || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
|
||||||
|
taosCorBeginWrite(&pEpSet->version);
|
||||||
|
pEpSet->epSet = *pNewEpSet;
|
||||||
|
taosCorEndWrite(&pEpSet->version);
|
||||||
|
}
|
||||||
|
|
||||||
|
SEpSet getEpSet_s(SCorEpSet* pEpSet) {
|
||||||
|
SEpSet ep = {0};
|
||||||
|
taosCorBeginRead(&pEpSet->version);
|
||||||
|
ep = pEpSet->epSet;
|
||||||
|
taosCorEndRead(&pEpSet->version);
|
||||||
|
|
||||||
|
return ep;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,34 +20,6 @@
|
||||||
|
|
||||||
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
|
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
|
||||||
|
|
||||||
bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; }
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
// TODO refactor
|
|
||||||
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) {
|
|
||||||
if (numOfFilters == 0 || src == NULL) {
|
|
||||||
assert(src == NULL);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnFilterInfo* pFilter = taosMemoryCalloc(1, numOfFilters * sizeof(SColumnFilterInfo));
|
|
||||||
|
|
||||||
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
|
|
||||||
for (int32_t j = 0; j < numOfFilters; ++j) {
|
|
||||||
if (pFilter[j].filterstr) {
|
|
||||||
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
|
|
||||||
pFilter[j].pz = (int64_t) taosMemoryCalloc(1, len);
|
|
||||||
|
|
||||||
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t) pFilter[j].len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(src->filterstr == 0 || src->filterstr == 1);
|
|
||||||
assert(!(src->lowerRelOptr == 0 && src->upperRelOptr == 0));
|
|
||||||
|
|
||||||
return pFilter;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
#if 0
|
#if 0
|
||||||
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
|
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
|
||||||
if (slidingTime == 0) {
|
if (slidingTime == 0) {
|
||||||
|
|
|
@ -855,7 +855,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin
|
||||||
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList);
|
SArray* pColList);
|
||||||
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
|
|
||||||
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
|
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
|
||||||
|
|
||||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||||
|
@ -986,9 +985,8 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
|
||||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
const char* sql, EOPTR_EXEC_MODEL model);
|
const char* sql, EOPTR_EXEC_MODEL model);
|
||||||
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
||||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
|
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
||||||
int32_t* resNum);
|
|
||||||
|
|
||||||
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
|
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
|
||||||
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
|
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
|
||||||
|
|
|
@ -496,11 +496,9 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||||
doDestroyTask(pTaskInfo);
|
doDestroyTask(pTaskInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
|
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
int32_t capacity = 0;
|
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
|
||||||
|
|
||||||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
||||||
|
|
|
@ -4613,42 +4613,29 @@ void releaseQueryBuf(size_t numOfTables) {
|
||||||
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
|
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
|
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
|
||||||
int32_t* resNum) {
|
SExplainExecInfo execInfo = {0};
|
||||||
if (*resNum >= *capacity) {
|
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
|
||||||
*capacity += 10;
|
|
||||||
|
|
||||||
*pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
|
pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
|
||||||
if (NULL == *pRes) {
|
pExplainInfo->startupCost = operatorInfo->cost.openCost;
|
||||||
qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
|
pExplainInfo->totalCost = operatorInfo->cost.totalCost;
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
pExplainInfo->verboseLen = 0;
|
||||||
}
|
pExplainInfo->verboseInfo = NULL;
|
||||||
}
|
|
||||||
|
|
||||||
SExplainExecInfo* pInfo = &(*pRes)[*resNum];
|
|
||||||
|
|
||||||
pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
|
|
||||||
pInfo->startupCost = operatorInfo->cost.openCost;
|
|
||||||
pInfo->totalCost = operatorInfo->cost.totalCost;
|
|
||||||
|
|
||||||
if (operatorInfo->fpSet.getExplainFn) {
|
if (operatorInfo->fpSet.getExplainFn) {
|
||||||
int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
|
int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
|
||||||
if (code) {
|
if (code) {
|
||||||
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
|
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
pInfo->verboseLen = 0;
|
|
||||||
pInfo->verboseInfo = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
++(*resNum);
|
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
|
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
|
||||||
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
|
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
|
||||||
if (code) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFreeClear(*pRes);
|
// taosMemoryFreeClear(*pRes);
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,21 @@ static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity
|
||||||
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
|
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
|
||||||
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
|
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
|
||||||
|
|
||||||
|
static void freeGroupKey(void* param) {
|
||||||
|
SGroupKeys* pKey = (SGroupKeys*) param;
|
||||||
|
taosMemoryFree(pKey->pData);
|
||||||
|
}
|
||||||
|
|
||||||
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
|
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
taosMemoryFreeClear(pInfo->keyBuf);
|
taosMemoryFreeClear(pInfo->keyBuf);
|
||||||
taosArrayDestroy(pInfo->pGroupCols);
|
taosArrayDestroy(pInfo->pGroupCols);
|
||||||
taosArrayDestroy(pInfo->pGroupColVals);
|
taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
|
||||||
cleanupExprSupp(&pInfo->scalarSup);
|
cleanupExprSupp(&pInfo->scalarSup);
|
||||||
|
|
||||||
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,8 +421,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
// pOperator->operatorType = OP_Groupby;
|
// pOperator->operatorType = OP_Groupby;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
|
||||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
||||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
|
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
|
||||||
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
|
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList);
|
||||||
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
|
||||||
void qwFreeFetchRsp(void *msg);
|
void qwFreeFetchRsp(void *msg);
|
||||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||||
|
|
|
@ -82,8 +82,9 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
|
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList) {
|
||||||
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
|
SExplainExecInfo* pInfo = taosArrayGet(pExecList, 0);
|
||||||
|
SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
|
||||||
|
|
||||||
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
|
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
|
||||||
void * pRsp = rpcMallocCont(contLen);
|
void * pRsp = rpcMallocCont(contLen);
|
||||||
|
@ -96,10 +97,9 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
|
||||||
.code = 0,
|
.code = 0,
|
||||||
.info = *pConn,
|
.info = *pConn,
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcRsp.info.ahandle = NULL;
|
rpcRsp.info.ahandle = NULL;
|
||||||
|
|
||||||
tmsgSendRsp(&rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,18 +44,24 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
|
||||||
QW_RET(TSDB_CODE_SUCCESS);
|
QW_RET(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void freeItem(void* param) {
|
||||||
|
SExplainExecInfo* pInfo = param;
|
||||||
|
taosMemoryFree(pInfo->verboseInfo);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
qTaskInfo_t taskHandle = ctx->taskHandle;
|
qTaskInfo_t taskHandle = ctx->taskHandle;
|
||||||
|
|
||||||
if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
|
if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
|
||||||
if (ctx->explain) {
|
if (ctx->explain) {
|
||||||
SExplainExecInfo *execInfo = NULL;
|
SArray* execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
|
||||||
int32_t resNum = 0;
|
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
|
||||||
QW_ERR_RET(qGetExplainExecInfo(taskHandle, &resNum, &execInfo));
|
|
||||||
|
|
||||||
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
|
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
|
||||||
connInfo.ahandle = NULL;
|
connInfo.ahandle = NULL;
|
||||||
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
|
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
|
||||||
|
taosArrayDestroyEx(execInfoList, freeItem);
|
||||||
|
QW_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ctx->needFetch) {
|
if (!ctx->needFetch) {
|
||||||
|
|
Loading…
Reference in New Issue