Merge pull request #13977 from taosdata/enh/taos.ref
feat: support all queries on Qnode
This commit is contained in:
commit
a8694bd863
|
@ -87,7 +87,7 @@ typedef struct SMetaData {
|
||||||
SArray* pUdfList; // pRes = SFuncInfo*
|
SArray* pUdfList; // pRes = SFuncInfo*
|
||||||
SArray* pIndex; // pRes = SIndexInfo*
|
SArray* pIndex; // pRes = SIndexInfo*
|
||||||
SArray* pUser; // pRes = bool*
|
SArray* pUser; // pRes = bool*
|
||||||
SArray* pQnodeList; // pRes = SQueryNodeAddr*
|
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
|
||||||
} SMetaData;
|
} SMetaData;
|
||||||
|
|
||||||
typedef struct SCatalogCfg {
|
typedef struct SCatalogCfg {
|
||||||
|
|
|
@ -47,6 +47,10 @@ typedef enum {
|
||||||
TARGET_TYPE_OTHER,
|
TARGET_TYPE_OTHER,
|
||||||
} ETargetType;
|
} ETargetType;
|
||||||
|
|
||||||
|
#define QUERY_POLICY_VNODE 1
|
||||||
|
#define QUERY_POLICY_HYBRID 2
|
||||||
|
#define QUERY_POLICY_QNODE 3
|
||||||
|
|
||||||
typedef struct STableComInfo {
|
typedef struct STableComInfo {
|
||||||
uint8_t numOfTags; // the number of tags in schema
|
uint8_t numOfTags; // the number of tags in schema
|
||||||
uint8_t precision; // the number of precision
|
uint8_t precision; // the number of precision
|
||||||
|
|
|
@ -128,6 +128,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
|
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
|
||||||
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
|
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
|
||||||
#define TSDB_CODE_TSC_QUERY_KILLED TAOS_DEF_ERROR_CODE(0, 0X0228)
|
#define TSDB_CODE_TSC_QUERY_KILLED TAOS_DEF_ERROR_CODE(0, 0X0228)
|
||||||
|
#define TSDB_CODE_TSC_NO_EXEC_NODE TAOS_DEF_ERROR_CODE(0, 0X0229)
|
||||||
|
|
||||||
// mnode-common
|
// mnode-common
|
||||||
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
|
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
|
||||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
||||||
|
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
|
#include "catalog.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
@ -294,7 +295,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
|
||||||
|
|
||||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||||
|
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList);
|
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||||
|
|
||||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||||
|
|
||||||
|
@ -319,12 +320,13 @@ void hbMgrInitMqHbRspHandle();
|
||||||
|
|
||||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
|
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
||||||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery);
|
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta);
|
||||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
||||||
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
|
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
|
||||||
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
|
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
|
||||||
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function
|
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function
|
||||||
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx
|
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx
|
||||||
|
bool qnodeRequired(SRequestObj* pRequest);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -307,6 +307,21 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool qnodeRequired(SRequestObj* pRequest) {
|
||||||
|
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
||||||
|
bool required = false;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->qnodeMutex);
|
||||||
|
required = (NULL == pInfo->pQnodeList);
|
||||||
|
taosThreadMutexUnlock(&pInfo->qnodeMutex);
|
||||||
|
|
||||||
|
return required;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
||||||
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -337,7 +352,7 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList) {
|
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
||||||
pRequest->type = pQuery->msgType;
|
pRequest->type = pQuery->msgType;
|
||||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||||
|
|
||||||
|
@ -349,12 +364,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
|
||||||
.pMsg = pRequest->msgBuf,
|
.pMsg = pRequest->msgBuf,
|
||||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||||
|
|
||||||
int32_t code = getQnodeList(pRequest, pNodeList);
|
return qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = qCreateQueryPlan(&cxt, pPlan, *pNodeList);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
|
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
|
||||||
|
@ -397,6 +407,195 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
||||||
pResInfo->precision = precision;
|
pResInfo->precision = precision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
|
||||||
|
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
|
|
||||||
|
int32_t dbNum = taosArrayGetSize(pDbVgList);
|
||||||
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
|
SArray* pVg = taosArrayGetP(pDbVgList, i);
|
||||||
|
int32_t vgNum = taosArrayGetSize(pVg);
|
||||||
|
if (vgNum <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < vgNum; ++j) {
|
||||||
|
SVgroupInfo* pInfo = taosArrayGet(pVg, j);
|
||||||
|
SQueryNodeLoad load = {0};
|
||||||
|
load.addr.nodeId = pInfo->vgId;
|
||||||
|
load.addr.epSet = pInfo->epSet;
|
||||||
|
|
||||||
|
taosArrayPush(nodeList, &load);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeNum = taosArrayGetSize(nodeList);
|
||||||
|
if (vnodeNum > 0) {
|
||||||
|
tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mnodeNum = taosArrayGetSize(pMnodeList);
|
||||||
|
if (mnodeNum <= 0) {
|
||||||
|
tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pData = taosArrayGet(pMnodeList, 0);
|
||||||
|
taosArrayAddBatch(nodeList, pData, mnodeNum);
|
||||||
|
|
||||||
|
tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
*pNodeList = nodeList;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
|
||||||
|
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
|
|
||||||
|
int32_t qNodeNum = taosArrayGetSize(pQnodeList);
|
||||||
|
if (qNodeNum > 0) {
|
||||||
|
void* pData = taosArrayGet(pQnodeList, 0);
|
||||||
|
taosArrayAddBatch(nodeList, pData, qNodeNum);
|
||||||
|
tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mnodeNum = taosArrayGetSize(pMnodeList);
|
||||||
|
if (mnodeNum <= 0) {
|
||||||
|
tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pData = taosArrayGet(pMnodeList, 0);
|
||||||
|
taosArrayAddBatch(nodeList, pData, mnodeNum);
|
||||||
|
|
||||||
|
tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
*pNodeList = nodeList;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData *pResultMeta) {
|
||||||
|
SArray* pDbVgList = NULL;
|
||||||
|
SArray* pQnodeList = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
switch (tsQueryPolicy) {
|
||||||
|
case QUERY_POLICY_VNODE: {
|
||||||
|
if (pResultMeta) {
|
||||||
|
pDbVgList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
|
int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
|
||||||
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
|
SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
|
||||||
|
if (pRes->code || NULL == pRes->pRes) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pDbVgList, &pRes->pRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case QUERY_POLICY_HYBRID:
|
||||||
|
case QUERY_POLICY_QNODE: {
|
||||||
|
if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
|
||||||
|
SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
|
||||||
|
if (pRes->code) {
|
||||||
|
pQnodeList = NULL;
|
||||||
|
} else {
|
||||||
|
pQnodeList = taosArrayDup((SArray*)pRes->pRes);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
|
||||||
|
taosThreadMutexLock(&pInst->qnodeMutex);
|
||||||
|
if (pInst->pQnodeList) {
|
||||||
|
pQnodeList = taosArrayDup(pInst->pQnodeList);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pInst->qnodeMutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
tscError("unknown query policy: %d", tsQueryPolicy);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pDbVgList);
|
||||||
|
taosArrayDestroy(pQnodeList);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
|
||||||
|
SArray* pDbVgList = NULL;
|
||||||
|
SArray* pQnodeList = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
switch (tsQueryPolicy) {
|
||||||
|
case QUERY_POLICY_VNODE: {
|
||||||
|
int32_t dbNum = taosArrayGetSize(pRequest->dbList);
|
||||||
|
if (dbNum > 0) {
|
||||||
|
SCatalog* pCtg = NULL;
|
||||||
|
SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
|
||||||
|
code = catalogGetHandle(pInst->clusterId, &pCtg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
|
||||||
|
SArray* pVgList = NULL;
|
||||||
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
|
char* dbFName = taosArrayGet(pRequest->dbList, i);
|
||||||
|
SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
|
||||||
|
.requestId = pRequest->requestId,
|
||||||
|
.requestObjRefId = pRequest->self,
|
||||||
|
.mgmtEps = getEpSet_s(&pInst->mgmtEp)};
|
||||||
|
|
||||||
|
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, &pVgList);
|
||||||
|
if (code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pDbVgList, &pVgList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case QUERY_POLICY_HYBRID:
|
||||||
|
case QUERY_POLICY_QNODE: {
|
||||||
|
getQnodeList(pRequest, &pQnodeList);
|
||||||
|
|
||||||
|
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
tscError("unknown query policy: %d", tsQueryPolicy);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
taosArrayDestroy(pDbVgList);
|
||||||
|
taosArrayDestroy(pQnodeList);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
||||||
tsem_init(&schdRspSem, 0, 0);
|
tsem_init(&schdRspSem, 0, 0);
|
||||||
|
|
||||||
|
@ -658,12 +857,16 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
code = execDdlQuery(pRequest, pQuery);
|
code = execDdlQuery(pRequest, pQuery);
|
||||||
break;
|
break;
|
||||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||||
SArray* pNodeList = NULL;
|
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, &pNodeList);
|
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pMnodeList);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
SArray* pNodeList = NULL;
|
||||||
|
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
|
||||||
|
|
||||||
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
|
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
|
||||||
|
taosArrayDestroy(pNodeList);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pMnodeList);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
||||||
|
@ -712,7 +915,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
||||||
return launchQueryImpl(pRequest, pQuery, false, NULL);
|
return launchQueryImpl(pRequest, pQuery, false, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
switch (pQuery->execMode) {
|
switch (pQuery->execMode) {
|
||||||
|
@ -723,7 +926,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
code = asyncExecDdlQuery(pRequest, pQuery);
|
code = asyncExecDdlQuery(pRequest, pQuery);
|
||||||
break;
|
break;
|
||||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||||
SArray* pNodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
|
|
||||||
pRequest->type = pQuery->msgType;
|
pRequest->type = pQuery->msgType;
|
||||||
|
|
||||||
|
@ -736,13 +939,16 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||||
|
|
||||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||||
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
|
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pMnodeList);
|
||||||
if (code) {
|
if (code) {
|
||||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
SArray* pNodeList = NULL;
|
||||||
|
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
||||||
|
|
||||||
SRequestConnInfo conn = {
|
SRequestConnInfo conn = {
|
||||||
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
|
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
|
||||||
SSchedulerReq req = {.pConn = &conn,
|
SSchedulerReq req = {.pConn = &conn,
|
||||||
|
@ -754,6 +960,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
.cbParam = pRequest,
|
.cbParam = pRequest,
|
||||||
.reqKilled = &pRequest->killed};
|
.reqKilled = &pRequest->killed};
|
||||||
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
|
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
|
||||||
|
taosArrayDestroy(pNodeList);
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
|
@ -761,7 +968,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo not to be released here
|
// todo not to be released here
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pMnodeList);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
||||||
|
|
|
@ -702,7 +702,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
TSWAP(pRequest->tableList, (pQuery)->pTableList);
|
TSWAP(pRequest->tableList, (pQuery)->pTableList);
|
||||||
|
|
||||||
destorySqlParseWrapper(pWrapper);
|
destorySqlParseWrapper(pWrapper);
|
||||||
launchAsyncQuery(pRequest, pQuery);
|
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
||||||
} else {
|
} else {
|
||||||
destorySqlParseWrapper(pWrapper);
|
destorySqlParseWrapper(pWrapper);
|
||||||
tscDebug("error happens, code:%d", code);
|
tscDebug("error happens, code:%d", code);
|
||||||
|
@ -808,7 +808,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
|
|
||||||
SQuery *pQuery = NULL;
|
SQuery *pQuery = NULL;
|
||||||
|
|
||||||
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce};
|
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
|
||||||
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
|
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -2274,7 +2274,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
// info->affectedRows = taos_affected_rows(info->pRequest);
|
// info->affectedRows = taos_affected_rows(info->pRequest);
|
||||||
// return info->pRequest->code;
|
// return info->pRequest->code;
|
||||||
|
|
||||||
launchAsyncQuery(info->pRequest, info->pQuery);
|
launchAsyncQuery(info->pRequest, info->pQuery, NULL);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,10 +23,6 @@ extern "C" {
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
|
||||||
#define QUERY_POLICY_VNODE 1
|
|
||||||
#define QUERY_POLICY_HYBRID 2
|
|
||||||
#define QUERY_POLICY_QNODE 3
|
|
||||||
|
|
||||||
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
|
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
|
||||||
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
|
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
|
||||||
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
|
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
|
||||||
|
|
|
@ -496,10 +496,6 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
|
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
|
||||||
}
|
}
|
||||||
if (pCxt->pExecNodeList) {
|
|
||||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
|
||||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
|
||||||
}
|
|
||||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||||
pTableScan->dataRequired = pScanLogicNode->dataRequired;
|
pTableScan->dataRequired = pScanLogicNode->dataRequired;
|
||||||
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
|
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
|
||||||
|
@ -535,12 +531,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
||||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
|
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
|
||||||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
|
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
|
||||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
|
||||||
} else {
|
|
||||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
|
||||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
|
||||||
}
|
}
|
||||||
|
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||||
|
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||||
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||||
|
|
||||||
|
@ -1432,8 +1425,6 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
|
||||||
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
|
||||||
pSubplan->msgType = pModify->msgType;
|
pSubplan->msgType = pModify->msgType;
|
||||||
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
|
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
|
||||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
|
||||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
|
||||||
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
|
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -652,7 +652,7 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
if (addNum <= 0) {
|
if (addNum <= 0) {
|
||||||
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
|
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -677,7 +677,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
|
SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,6 +133,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error"
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node")
|
||||||
|
|
||||||
// mnode-common
|
// mnode-common
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
|
||||||
|
|
Loading…
Reference in New Issue