qnode for policy 2/3
This commit is contained in:
parent
d862f2b49c
commit
5f4614d61a
|
@ -87,7 +87,7 @@ typedef struct SMetaData {
|
|||
SArray* pUdfList; // pRes = SFuncInfo*
|
||||
SArray* pIndex; // pRes = SIndexInfo*
|
||||
SArray* pUser; // pRes = bool*
|
||||
SArray* pQnodeList; // pRes = SQueryNodeAddr*
|
||||
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
|
||||
} SMetaData;
|
||||
|
||||
typedef struct SCatalogCfg {
|
||||
|
|
|
@ -47,6 +47,10 @@ typedef enum {
|
|||
TARGET_TYPE_OTHER,
|
||||
} ETargetType;
|
||||
|
||||
#define QUERY_POLICY_VNODE 1
|
||||
#define QUERY_POLICY_HYBRID 2
|
||||
#define QUERY_POLICY_QNODE 3
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags; // the number of tags in schema
|
||||
uint8_t precision; // the number of precision
|
||||
|
|
|
@ -128,6 +128,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
|
||||
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
|
||||
#define TSDB_CODE_TSC_QUERY_KILLED TAOS_DEF_ERROR_CODE(0, 0X0228)
|
||||
#define TSDB_CODE_TSC_NO_EXEC_NODE TAOS_DEF_ERROR_CODE(0, 0X0229)
|
||||
|
||||
// mnode-common
|
||||
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
|
||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
|||
|
||||
#include "parser.h"
|
||||
#include "planner.h"
|
||||
#include "catalog.h"
|
||||
#include "query.h"
|
||||
#include "taos.h"
|
||||
#include "tcommon.h"
|
||||
|
@ -292,7 +293,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
|
|||
|
||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList);
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||
|
||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||
|
||||
|
@ -317,12 +318,13 @@ void hbMgrInitMqHbRspHandle();
|
|||
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
||||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery);
|
||||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta);
|
||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
||||
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
|
||||
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
|
||||
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function
|
||||
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx
|
||||
bool qnodeRequired(SRequestObj* pRequest);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -307,6 +307,21 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool qnodeRequired(SRequestObj* pRequest) {
|
||||
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
||||
bool required = false;
|
||||
|
||||
taosThreadMutexLock(&pInfo->qnodeMutex);
|
||||
required = (NULL == pInfo->pQnodeList);
|
||||
taosThreadMutexUnlock(&pInfo->qnodeMutex);
|
||||
|
||||
return required;
|
||||
}
|
||||
|
||||
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
||||
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
||||
int32_t code = 0;
|
||||
|
@ -337,7 +352,7 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList) {
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
||||
pRequest->type = pQuery->msgType;
|
||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||
|
||||
|
@ -349,12 +364,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
|
|||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||
|
||||
int32_t code = getQnodeList(pRequest, pNodeList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qCreateQueryPlan(&cxt, pPlan, *pNodeList);
|
||||
}
|
||||
|
||||
return code;
|
||||
return qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||
}
|
||||
|
||||
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
|
||||
|
@ -397,6 +407,195 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
|||
pResInfo->precision = precision;
|
||||
}
|
||||
|
||||
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
|
||||
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
|
||||
int32_t dbNum = taosArrayGetSize(pDbVgList);
|
||||
for (int32_t i = 0; i < dbNum; ++i) {
|
||||
SArray* pVg = taosArrayGetP(pDbVgList, i);
|
||||
int32_t vgNum = taosArrayGetSize(pVg);
|
||||
if (vgNum <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < vgNum; ++j) {
|
||||
SVgroupInfo* pInfo = taosArrayGet(pVg, j);
|
||||
SQueryNodeLoad load = {0};
|
||||
load.addr.nodeId = pInfo->vgId;
|
||||
load.addr.epSet = pInfo->epSet;
|
||||
|
||||
taosArrayPush(nodeList, &load);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vnodeNum = taosArrayGetSize(nodeList);
|
||||
if (vnodeNum > 0) {
|
||||
tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
int32_t mnodeNum = taosArrayGetSize(pMnodeList);
|
||||
if (mnodeNum <= 0) {
|
||||
tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
void* pData = taosArrayGet(pMnodeList, 0);
|
||||
taosArrayAddBatch(nodeList, pData, mnodeNum);
|
||||
|
||||
tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
|
||||
|
||||
_return:
|
||||
|
||||
*pNodeList = nodeList;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
|
||||
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
|
||||
int32_t qNodeNum = taosArrayGetSize(pQnodeList);
|
||||
if (qNodeNum > 0) {
|
||||
void* pData = taosArrayGet(pQnodeList, 0);
|
||||
taosArrayAddBatch(nodeList, pData, qNodeNum);
|
||||
tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
int32_t mnodeNum = taosArrayGetSize(pMnodeList);
|
||||
if (mnodeNum <= 0) {
|
||||
tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
void* pData = taosArrayGet(pMnodeList, 0);
|
||||
taosArrayAddBatch(nodeList, pData, mnodeNum);
|
||||
|
||||
tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
|
||||
|
||||
_return:
|
||||
|
||||
*pNodeList = nodeList;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData *pResultMeta) {
|
||||
SArray* pDbVgList = NULL;
|
||||
SArray* pQnodeList = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
switch (tsQueryPolicy) {
|
||||
case QUERY_POLICY_VNODE: {
|
||||
if (pResultMeta) {
|
||||
pDbVgList = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
|
||||
for (int32_t i = 0; i < dbNum; ++i) {
|
||||
SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
|
||||
if (pRes->code || NULL == pRes->pRes) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayPush(pDbVgList, &pRes->pRes);
|
||||
}
|
||||
}
|
||||
|
||||
code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
|
||||
break;
|
||||
}
|
||||
case QUERY_POLICY_HYBRID:
|
||||
case QUERY_POLICY_QNODE: {
|
||||
if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
|
||||
SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
|
||||
if (pRes->code) {
|
||||
pQnodeList = NULL;
|
||||
} else {
|
||||
pQnodeList = taosArrayDup((SArray*)pRes->pRes);
|
||||
}
|
||||
} else {
|
||||
SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
|
||||
taosThreadMutexLock(&pInst->qnodeMutex);
|
||||
if (pInst->pQnodeList) {
|
||||
pQnodeList = taosArrayDup(pInst->pQnodeList);
|
||||
}
|
||||
taosThreadMutexUnlock(&pInst->qnodeMutex);
|
||||
}
|
||||
|
||||
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
tscError("unknown query policy: %d", tsQueryPolicy);
|
||||
return TSDB_CODE_TSC_APP_ERROR;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pDbVgList);
|
||||
taosArrayDestroy(pQnodeList);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
|
||||
SArray* pDbVgList = NULL;
|
||||
SArray* pQnodeList = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
switch (tsQueryPolicy) {
|
||||
case QUERY_POLICY_VNODE: {
|
||||
int32_t dbNum = taosArrayGetSize(pRequest->dbList);
|
||||
if (dbNum > 0) {
|
||||
SCatalog* pCtg = NULL;
|
||||
SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
|
||||
code = catalogGetHandle(pInst->clusterId, &pCtg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
|
||||
SArray* pVgList = NULL;
|
||||
for (int32_t i = 0; i < dbNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pRequest->dbList, i);
|
||||
SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pInst->mgmtEp)};
|
||||
|
||||
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, &pVgList);
|
||||
if (code) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
taosArrayPush(pDbVgList, &pVgList);
|
||||
}
|
||||
}
|
||||
|
||||
code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
|
||||
break;
|
||||
}
|
||||
case QUERY_POLICY_HYBRID:
|
||||
case QUERY_POLICY_QNODE: {
|
||||
getQnodeList(pRequest, &pQnodeList);
|
||||
|
||||
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
tscError("unknown query policy: %d", tsQueryPolicy);
|
||||
return TSDB_CODE_TSC_APP_ERROR;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
taosArrayDestroy(pDbVgList);
|
||||
taosArrayDestroy(pQnodeList);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
||||
tsem_init(&schdRspSem, 0, 0);
|
||||
|
||||
|
@ -658,12 +857,16 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
|||
code = execDdlQuery(pRequest, pQuery);
|
||||
break;
|
||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||
SArray* pNodeList = NULL;
|
||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, &pNodeList);
|
||||
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pMnodeList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SArray* pNodeList = NULL;
|
||||
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
|
||||
|
||||
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
|
||||
taosArrayDestroy(pNodeList);
|
||||
}
|
||||
taosArrayDestroy(pNodeList);
|
||||
taosArrayDestroy(pMnodeList);
|
||||
break;
|
||||
}
|
||||
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
||||
|
@ -712,7 +915,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
|||
return launchQueryImpl(pRequest, pQuery, false, NULL);
|
||||
}
|
||||
|
||||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta) {
|
||||
int32_t code = 0;
|
||||
|
||||
switch (pQuery->execMode) {
|
||||
|
@ -723,7 +926,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
code = asyncExecDdlQuery(pRequest, pQuery);
|
||||
break;
|
||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||
SArray* pNodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
|
||||
pRequest->type = pQuery->msgType;
|
||||
|
||||
|
@ -736,13 +939,16 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||
|
||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
|
||||
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pMnodeList);
|
||||
if (code) {
|
||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SArray* pNodeList = NULL;
|
||||
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
||||
|
||||
SRequestConnInfo conn = {
|
||||
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
|
||||
SSchedulerReq req = {.pConn = &conn,
|
||||
|
@ -754,6 +960,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
.cbParam = pRequest,
|
||||
.reqKilled = &pRequest->killed};
|
||||
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
|
||||
taosArrayDestroy(pNodeList);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
|
@ -761,7 +968,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
}
|
||||
|
||||
// todo not to be released here
|
||||
taosArrayDestroy(pNodeList);
|
||||
taosArrayDestroy(pMnodeList);
|
||||
break;
|
||||
}
|
||||
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
||||
|
|
|
@ -702,7 +702,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
|||
TSWAP(pRequest->tableList, (pQuery)->pTableList);
|
||||
|
||||
destorySqlParseWrapper(pWrapper);
|
||||
launchAsyncQuery(pRequest, pQuery);
|
||||
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
||||
} else {
|
||||
destorySqlParseWrapper(pWrapper);
|
||||
tscDebug("error happens, code:%d", code);
|
||||
|
@ -808,7 +808,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
|
||||
SQuery *pQuery = NULL;
|
||||
|
||||
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce};
|
||||
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
|
||||
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
|
|
@ -2274,7 +2274,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
|||
// info->affectedRows = taos_affected_rows(info->pRequest);
|
||||
// return info->pRequest->code;
|
||||
|
||||
launchAsyncQuery(info->pRequest, info->pQuery);
|
||||
launchAsyncQuery(info->pRequest, info->pQuery, NULL);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,10 +23,6 @@ extern "C" {
|
|||
#include "planner.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
#define QUERY_POLICY_VNODE 1
|
||||
#define QUERY_POLICY_HYBRID 2
|
||||
#define QUERY_POLICY_QNODE 3
|
||||
|
||||
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
|
||||
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
|
||||
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
|
||||
|
|
|
@ -458,8 +458,6 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
|
||||
}
|
||||
|
||||
|
@ -493,10 +491,6 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
|
||||
}
|
||||
if (pCxt->pExecNodeList) {
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
}
|
||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||
pTableScan->dataRequired = pScanLogicNode->dataRequired;
|
||||
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
|
||||
|
@ -531,12 +525,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
pScan->accountId = pCxt->pPlanCxt->acctId;
|
||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
} else {
|
||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
}
|
||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||
|
||||
|
@ -1373,8 +1364,6 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
|
|||
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
||||
pSubplan->msgType = pModify->msgType;
|
||||
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
|
||||
}
|
||||
|
||||
|
|
|
@ -652,7 +652,7 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
if (addNum <= 0) {
|
||||
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -677,7 +677,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
|
||||
SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -133,6 +133,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error"
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node")
|
||||
|
||||
// mnode-common
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
|
||||
|
|
Loading…
Reference in New Issue