Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression
This commit is contained in:
commit
14b0b458e7
|
@ -79,7 +79,7 @@ def pre_test(){
|
|||
rm -rf debug
|
||||
mkdir debug
|
||||
cd debug
|
||||
cmake .. > /dev/null
|
||||
cmake .. -DBUILD_TEST=true > /dev/null
|
||||
make -j4> /dev/null
|
||||
|
||||
'''
|
||||
|
|
|
@ -173,7 +173,7 @@ def pre_test_build_mac() {
|
|||
'''
|
||||
sh '''
|
||||
cd ${WK}/debug
|
||||
cmake ..
|
||||
cmake .. -DBUILD_TEST=true
|
||||
make -j8
|
||||
'''
|
||||
sh '''
|
||||
|
@ -302,7 +302,7 @@ def pre_test_build_win() {
|
|||
set CL=/MP8
|
||||
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake"
|
||||
time /t
|
||||
cmake .. -G "NMake Makefiles JOM" || exit 7
|
||||
cmake .. -G "NMake Makefiles JOM" -DBUILD_TEST=true || exit 7
|
||||
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6"
|
||||
time /t
|
||||
jom -j 6 || exit 8
|
||||
|
|
|
@ -2,6 +2,12 @@
|
|||
# Deps options
|
||||
# =========================================================
|
||||
|
||||
option(
|
||||
BUILD_TEST
|
||||
"If build unit tests using googletest"
|
||||
OFF
|
||||
)
|
||||
|
||||
IF(${TD_WINDOWS})
|
||||
|
||||
MESSAGE("build pthread Win32")
|
||||
|
@ -46,12 +52,6 @@ IF(${TD_WINDOWS})
|
|||
ON
|
||||
)
|
||||
|
||||
option(
|
||||
BUILD_TEST
|
||||
"If build unit tests using googletest"
|
||||
ON
|
||||
)
|
||||
|
||||
option(
|
||||
TDENGINE_3
|
||||
"TDengine 3.x for taos-tools"
|
||||
|
@ -65,28 +65,8 @@ IF(${TD_WINDOWS})
|
|||
)
|
||||
|
||||
ELSEIF (TD_DARWIN_64)
|
||||
IF(${BUILD_TEST})
|
||||
add_definitions(-DCOMPILER_SUPPORTS_CXX13)
|
||||
option(
|
||||
BUILD_TEST
|
||||
"If build unit tests using googletest"
|
||||
ON
|
||||
)
|
||||
ELSE ()
|
||||
include(CheckCXXCompilerFlag)
|
||||
CHECK_CXX_COMPILER_FLAG("-std=c++13" COMPILER_SUPPORTS_CXX13)
|
||||
IF(${COMPILER_SUPPORTS_CXX13})
|
||||
add_definitions(-DCOMPILER_SUPPORTS_CXX13)
|
||||
option(
|
||||
BUILD_TEST
|
||||
"If build unit tests using googletest"
|
||||
ON
|
||||
)
|
||||
ELSE ()
|
||||
option(
|
||||
BUILD_TEST
|
||||
"If build unit tests using googletest"
|
||||
OFF
|
||||
)
|
||||
ENDIF ()
|
||||
ENDIF ()
|
||||
|
||||
|
|
|
@ -1424,6 +1424,14 @@ typedef struct {
|
|||
SExplainExecInfo* subplanInfo;
|
||||
} SExplainRsp;
|
||||
|
||||
typedef struct {
|
||||
SExplainRsp rsp;
|
||||
uint64_t qId;
|
||||
uint64_t tId;
|
||||
int64_t rId;
|
||||
int32_t eId;
|
||||
} SExplainLocalRsp;
|
||||
|
||||
typedef struct STableScanAnalyzeInfo {
|
||||
uint64_t totalRows;
|
||||
uint64_t totalCheckedRows;
|
||||
|
@ -1438,6 +1446,7 @@ typedef struct STableScanAnalyzeInfo {
|
|||
|
||||
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
|
||||
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
|
||||
void tFreeSExplainRsp(SExplainRsp *pRsp);
|
||||
|
||||
typedef struct {
|
||||
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
||||
|
|
|
@ -29,6 +29,15 @@ typedef void* DataSinkHandle;
|
|||
struct SRpcMsg;
|
||||
struct SSubplan;
|
||||
|
||||
typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
|
||||
|
||||
typedef struct {
|
||||
void *handle;
|
||||
bool localExec;
|
||||
localFetchFp fp;
|
||||
SArray *explainRes;
|
||||
} SLocalFetch;
|
||||
|
||||
typedef struct {
|
||||
void* tqReader;
|
||||
void* meta;
|
||||
|
@ -127,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
|
|||
* @param handle
|
||||
* @return
|
||||
*/
|
||||
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds);
|
||||
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal);
|
||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
|
||||
|
||||
/**
|
||||
|
|
|
@ -396,6 +396,7 @@ typedef struct SDownstreamSourceNode {
|
|||
uint64_t schedId;
|
||||
int32_t execId;
|
||||
int32_t fetchMsgType;
|
||||
bool localExec;
|
||||
} SDownstreamSourceNode;
|
||||
|
||||
typedef struct SExchangePhysiNode {
|
||||
|
|
|
@ -52,6 +52,7 @@ typedef enum {
|
|||
#define QUERY_POLICY_VNODE 1
|
||||
#define QUERY_POLICY_HYBRID 2
|
||||
#define QUERY_POLICY_QNODE 3
|
||||
#define QUERY_POLICY_CLIENT 4
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags; // the number of tags in schema
|
||||
|
@ -269,43 +270,43 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
#define qFatal(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_FATAL) { \
|
||||
taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qError(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qWarn(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qInfo(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qDebug(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qTrace(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qDebugL(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ enum {
|
|||
NODE_TYPE_QNODE,
|
||||
NODE_TYPE_SNODE,
|
||||
NODE_TYPE_MNODE,
|
||||
NODE_TYPE_CLIENT,
|
||||
};
|
||||
|
||||
typedef struct SQWorkerCfg {
|
||||
|
@ -55,7 +56,24 @@ typedef struct {
|
|||
uint64_t numOfErrors;
|
||||
} SQWorkerStat;
|
||||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
|
||||
typedef struct SQWMsgInfo {
|
||||
int8_t taskType;
|
||||
int8_t explain;
|
||||
int8_t needFetch;
|
||||
} SQWMsgInfo;
|
||||
|
||||
typedef struct SQWMsg {
|
||||
void *node;
|
||||
int32_t code;
|
||||
int32_t msgType;
|
||||
void *msg;
|
||||
int32_t msgLen;
|
||||
SQWMsgInfo msgInfo;
|
||||
SRpcHandleInfo connInfo;
|
||||
} SQWMsg;
|
||||
|
||||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb);
|
||||
|
||||
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
|
@ -81,6 +99,10 @@ void qWorkerDestroy(void **qWorkerMgmt);
|
|||
|
||||
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
|
||||
|
||||
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes);
|
||||
|
||||
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -64,6 +64,7 @@ typedef bool (*schedulerChkKillFp)(void* param);
|
|||
|
||||
typedef struct SSchedulerReq {
|
||||
bool syncReq;
|
||||
bool localReq;
|
||||
SRequestConnInfo *pConn;
|
||||
SArray *pNodeList;
|
||||
SQueryPlan *pDag;
|
||||
|
|
|
@ -483,6 +483,7 @@ enum {
|
|||
#define SNODE_HANDLE -2
|
||||
#define VNODE_HANDLE -3
|
||||
#define BNODE_HANDLE -4
|
||||
#define CLIENT_HANDLE -5
|
||||
|
||||
#define TSDB_CONFIG_OPTION_LEN 32
|
||||
#define TSDB_CONFIG_VALUE_LEN 64
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
#set -x
|
||||
set -v
|
||||
|
||||
# dockerbuild.sh
|
||||
# -n [version number]
|
||||
|
@ -11,8 +12,9 @@ set -e
|
|||
version=""
|
||||
passWord=""
|
||||
verType=""
|
||||
dockerLatest="n"
|
||||
|
||||
while getopts "hn:p:V:" arg
|
||||
while getopts "hn:p:V:a:" arg
|
||||
do
|
||||
case $arg in
|
||||
n)
|
||||
|
@ -30,8 +32,14 @@ do
|
|||
h)
|
||||
echo "Usage: `basename $0` -n [version number] "
|
||||
echo " -p [password for docker hub] "
|
||||
echo " -V [stable |beta] "
|
||||
echo " -a [y | n ] "
|
||||
exit 0
|
||||
;;
|
||||
a)
|
||||
#echo "dockerLatest=$OPTARG"
|
||||
dockerLatest=$(echo $OPTARG)
|
||||
;;
|
||||
?) #unknow option
|
||||
echo "unkonw argument"
|
||||
exit 1
|
||||
|
@ -41,42 +49,55 @@ done
|
|||
|
||||
echo "version=${version}"
|
||||
|
||||
#docker manifest rm tdengine/tdengine
|
||||
#docker manifest rm tdengine/tdengine:${version}
|
||||
if [ "$verType" == "beta" ]; then
|
||||
docker manifest create -a tdengine/tdengine-beta:${version} tdengine/tdengine-amd64-beta:${version} tdengine/tdengine-aarch64-beta:${version} tdengine/tdengine-aarch32-beta:${version}
|
||||
docker manifest create -a tdengine/tdengine-beta:latest tdengine/tdengine-amd64-beta:latest tdengine/tdengine-aarch64-beta:latest tdengine/tdengine-aarch32-beta:latest
|
||||
docker manifest rm tdengine/tdengine-beta:${version}
|
||||
docker manifest rm tdengine/tdengine-beta:latest
|
||||
docker manifest create -a tdengine/tdengine-beta:${version} tdengine/tdengine-amd64-beta:${version} tdengine/tdengine-aarch64-beta:${version} tdengine/tdengine-aarch32-beta:${version}
|
||||
docker manifest create -a tdengine/tdengine-beta:latest tdengine/tdengine-amd64-beta:latest tdengine/tdengine-aarch64-beta:latest tdengine/tdengine-aarch32-beta:latest
|
||||
docker manifest inspect tdengine/tdengine:latest
|
||||
docker manifest inspect tdengine/tdengine:${version}
|
||||
docker login -u tdengine -p ${passWord} #replace the docker registry username and password
|
||||
docker manifest push tdengine/tdengine-beta:${version}
|
||||
docker manifest push tdengine/tdengine-beta:latest
|
||||
elif [ "$verType" == "stable" ]; then
|
||||
docker manifest create -a tdengine/tdengine:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version}
|
||||
docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest
|
||||
docker manifest rm tdengine/tdengine:latest
|
||||
docker manifest rm tdengine/tdengine:${version}
|
||||
docker manifest create -a tdengine/tdengine:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version}
|
||||
docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest
|
||||
docker manifest inspect tdengine/tdengine:latest
|
||||
docker manifest inspect tdengine/tdengine:${version}
|
||||
docker login -u tdengine -p ${passWord} #replace the docker registry username and password
|
||||
docker manifest push tdengine/tdengine:${version}
|
||||
docker manifest push tdengine/tdengine:latest
|
||||
else
|
||||
if [ "$verType" == "stable" ]; then
|
||||
verType=stable
|
||||
dockerinput=TDengine-server-${version}-Linux-$cpuType.tar.gz
|
||||
dockerinput_x64=TDengine-server-${version}-Linux-amd64.tar.gz
|
||||
dockerim=tdengine/tdengine
|
||||
dockeramd64=tdengine/tdengine-amd64
|
||||
dockeraarch64=tdengine/tdengine-aarch64
|
||||
dockeraarch32=tdengine/tdengine-aarch32
|
||||
elif [ "$verType" == "beta" ];then
|
||||
verType=beta
|
||||
tagVal=ver-${version}-beta
|
||||
dockerinput=TDengine-server-${version}-${verType}-Linux-$cpuType.tar.gz
|
||||
dockerinput_x64=TDengine-server-${version}-${verType}-Linux-amd64.tar.gz
|
||||
dockerim=tdengine/tdengine-beta
|
||||
dockeramd64=tdengine/tdengine-amd64-beta
|
||||
dockeraarch64=tdengine/tdengine-aarch64-beta
|
||||
dockeraarch32=tdengine/tdengine-aarch32-beta
|
||||
else
|
||||
echo "unknow verType, nor stabel or beta"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# docker manifest create -a tdengine/${dockername}:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version}
|
||||
# docker manifest create -a tdengine/${dockername}:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest
|
||||
username="tdengine"
|
||||
|
||||
# docker login -u tdengine -p ${passWord} #replace the docker registry username and password
|
||||
# generate docker verison
|
||||
echo "generate ${dockerim}:${version}"
|
||||
docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version}
|
||||
docker manifest inspect ${dockerim}:${version}
|
||||
docker manifest rm ${dockerim}:${version}
|
||||
docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version}
|
||||
docker manifest inspect ${dockerim}:${version}
|
||||
docker login -u ${username} -p ${passWord}
|
||||
docker manifest push ${dockerim}:${version}
|
||||
|
||||
|
||||
# generate docker latest
|
||||
echo "generate ${dockerim}:latest "
|
||||
|
||||
if [ ${dockerLatest} == 'y' ] ;then
|
||||
echo "docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest"
|
||||
docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest
|
||||
docker manifest inspect ${dockerim}:latest
|
||||
docker manifest rm ${dockerim}:latest
|
||||
docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest
|
||||
docker manifest inspect ${dockerim}:latest
|
||||
docker login -u tdengine -p ${passWord} #replace the docker registry username and password
|
||||
docker manifest push ${dockerim}:latest
|
||||
docker pull tdengine/tdengine:latest
|
||||
|
||||
fi
|
||||
|
||||
# docker manifest push tdengine/tdengine:latest
|
||||
|
||||
# # how set latest version ???
|
||||
|
|
|
@ -149,26 +149,4 @@ rm -rf temp1.data
|
|||
if [ ${dockerLatest} == 'y' ] ;then
|
||||
docker tag tdengine/tdengine-${dockername}:${version} tdengine/tdengine-${dockername}:latest
|
||||
docker push tdengine/tdengine-${dockername}:latest
|
||||
echo ">>>>>>>>>>>>> check whether tdengine/tdengine-${dockername}:latest has been published correctly"
|
||||
docker run -d --name doctestla -p 7030-7049:6030-6049 -p 7030-7049:6030-6049/udp tdengine/tdengine-${dockername}:latest
|
||||
sleep 2
|
||||
curl -u root:taosdata -d 'show variables;' 127.0.0.1:7041/rest/sql > temp2.data
|
||||
version_latest=` cat temp2.data |jq .data| jq '.[]' |grep "version" -A 2 -B 1 | jq ".[1]" `
|
||||
echo "${version_latest}"
|
||||
if [ "${version_latest}" == "\"${version}\"" ] ; then
|
||||
echo "docker version is right "
|
||||
else
|
||||
echo "docker version is wrong "
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
rm -rf temp2.data
|
||||
|
||||
if [ -n "$(docker ps -aq)" ] ;then
|
||||
echo "delte docker process"
|
||||
docker stop $(docker ps -aq)
|
||||
docker rm $(docker ps -aq)
|
||||
fi
|
||||
|
||||
cd ${scriptDir}
|
||||
rm -f ${pkgFile}
|
||||
|
|
|
@ -44,8 +44,6 @@ cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=tru
|
|||
cmake --build .
|
||||
rd /s /Q C:\TDengine
|
||||
cmake --install .
|
||||
for /r c:\TDengine %%i in (*.dll) do signtool sign /f D:\\123.pfx /p taosdata %%i
|
||||
for /r c:\TDengine %%i in (*.exe) do signtool sign /f D:\\123.pfx /p taosdata %%i
|
||||
if not %errorlevel% == 0 ( call :RUNFAILED build x64 failed & exit /b 1)
|
||||
cd %package_dir%
|
||||
iscc /DMyAppInstallName="%packagServerName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release
|
||||
|
@ -53,7 +51,6 @@ if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x64% faile
|
|||
iscc /DMyAppInstallName="%packagClientName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release
|
||||
if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x64% failed & exit /b 1)
|
||||
|
||||
for /r ..\release %%i in (*.exe) do signtool sign /f d:\\123.pfx /p taosdata %%i
|
||||
goto EXIT0
|
||||
|
||||
:USAGE
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "trpc.h"
|
||||
#include "tsched.h"
|
||||
#include "ttime.h"
|
||||
#include "qworker.h"
|
||||
|
||||
#define TSC_VAR_NOT_RELEASE 1
|
||||
#define TSC_VAR_RELEASED 0
|
||||
|
|
|
@ -379,7 +379,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
|
|||
}
|
||||
|
||||
bool qnodeRequired(SRequestObj* pRequest) {
|
||||
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
|
||||
if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -483,6 +483,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
|||
|
||||
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
|
||||
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
char *policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
|
||||
|
||||
int32_t dbNum = taosArrayGetSize(pDbVgList);
|
||||
for (int32_t i = 0; i < dbNum; ++i) {
|
||||
|
@ -504,20 +505,20 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
|
|||
|
||||
int32_t vnodeNum = taosArrayGetSize(nodeList);
|
||||
if (vnodeNum > 0) {
|
||||
tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum);
|
||||
tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
int32_t mnodeNum = taosArrayGetSize(pMnodeList);
|
||||
if (mnodeNum <= 0) {
|
||||
tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId);
|
||||
tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
void* pData = taosArrayGet(pMnodeList, 0);
|
||||
taosArrayAddBatch(nodeList, pData, mnodeNum);
|
||||
|
||||
tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
|
||||
tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -561,7 +562,8 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
|
|||
int32_t code = 0;
|
||||
|
||||
switch (tsQueryPolicy) {
|
||||
case QUERY_POLICY_VNODE: {
|
||||
case QUERY_POLICY_VNODE:
|
||||
case QUERY_POLICY_CLIENT: {
|
||||
if (pResultMeta) {
|
||||
pDbVgList = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
|
@ -622,7 +624,8 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
|
|||
int32_t code = 0;
|
||||
|
||||
switch (tsQueryPolicy) {
|
||||
case QUERY_POLICY_VNODE: {
|
||||
case QUERY_POLICY_VNODE:
|
||||
case QUERY_POLICY_CLIENT: {
|
||||
int32_t dbNum = taosArrayGetSize(pRequest->dbList);
|
||||
if (dbNum > 0) {
|
||||
SCatalog* pCtg = NULL;
|
||||
|
@ -682,6 +685,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
.requestObjRefId = pRequest->self};
|
||||
SSchedulerReq req = {
|
||||
.syncReq = true,
|
||||
.localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
|
||||
.pConn = &conn,
|
||||
.pNodeList = pNodeList,
|
||||
.pDag = pDag,
|
||||
|
@ -1061,6 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
|
|||
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
|
||||
SSchedulerReq req = {
|
||||
.syncReq = false,
|
||||
.localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
|
||||
.pConn = &conn,
|
||||
.pNodeList = pNodeList,
|
||||
.pDag = pDag,
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "version.h"
|
||||
#include "qworker.h"
|
||||
|
||||
#define TSC_VAR_NOT_RELEASE 1
|
||||
#define TSC_VAR_RELEASED 0
|
||||
|
|
|
@ -285,7 +285,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
|
||||
|
|
|
@ -4365,7 +4365,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
|
|||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1;
|
||||
if (pRsp->numOfPlans > 0) {
|
||||
pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo));
|
||||
pRsp->subplanInfo = taosMemoryCalloc(pRsp->numOfPlans, sizeof(SExplainExecInfo));
|
||||
if (pRsp->subplanInfo == NULL) return -1;
|
||||
}
|
||||
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
|
||||
|
@ -4373,7 +4373,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
|
|||
if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1;
|
||||
if (tDecodeBinary(&decoder, (uint8_t **)&pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0)
|
||||
if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -4383,6 +4383,19 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSExplainRsp(SExplainRsp *pRsp) {
|
||||
if (NULL == pRsp) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
|
||||
SExplainExecInfo *pExec = pRsp->subplanInfo + i;
|
||||
taosMemoryFree(pExec->verboseInfo);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pRsp->subplanInfo);
|
||||
}
|
||||
|
||||
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
|
|
|
@ -170,7 +170,7 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t mndInitQuery(SMnode *pMnode) {
|
||||
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
|
||||
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
|
||||
mError("failed to init qworker in mnode since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
||||
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
||||
taosMemoryFreeClear(pQnode);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -690,7 +690,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
|||
|
||||
while (1) {
|
||||
uint64_t ts;
|
||||
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts);
|
||||
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL);
|
||||
if (code < 0) {
|
||||
if (code == TSDB_CODE_QRY_IN_EXEC) {
|
||||
break;
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "vnd.h"
|
||||
|
||||
int vnodeQueryOpen(SVnode *pVnode) {
|
||||
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||
}
|
||||
|
||||
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
|
||||
|
|
|
@ -99,8 +99,10 @@ extern "C" {
|
|||
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
int32_t nodeIdx;
|
||||
int32_t physiPlanExecNum;
|
||||
int32_t physiPlanExecIdx;
|
||||
bool singleChannel;
|
||||
SRWLatch lock;
|
||||
SSubplan *plan;
|
||||
SArray *nodeExecInfo; //Array<SExplainRsp>
|
||||
|
|
|
@ -21,14 +21,14 @@
|
|||
#include "tdatablock.h"
|
||||
|
||||
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
|
||||
|
||||
void qExplainFreeResNode(SExplainResNode *resNode) {
|
||||
if (NULL == resNode) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(resNode->pExecInfo);
|
||||
taosArrayDestroy(resNode->pExecInfo);
|
||||
|
||||
SNode *node = NULL;
|
||||
FOREACH(node, resNode->pChildren) { qExplainFreeResNode((SExplainResNode *)node); }
|
||||
|
@ -56,8 +56,9 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
|
|||
int32_t num = taosArrayGetSize(group->nodeExecInfo);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
|
||||
taosMemoryFreeClear(rsp->subplanInfo);
|
||||
tFreeSExplainRsp(rsp);
|
||||
}
|
||||
taosArrayDestroy(group->nodeExecInfo);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pCtx->groupHash, pIter);
|
||||
|
@ -66,6 +67,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
|
|||
|
||||
taosHashCleanup(pCtx->groupHash);
|
||||
taosArrayDestroy(pCtx->rows);
|
||||
taosMemoryFreeClear(pCtx->tbuf);
|
||||
taosMemoryFree(pCtx);
|
||||
}
|
||||
|
||||
|
@ -248,7 +250,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group) {
|
||||
int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, SExplainGroup *group) {
|
||||
*pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo));
|
||||
if (NULL == (*pExecInfo)) {
|
||||
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
|
||||
|
@ -256,17 +258,28 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
|
|||
}
|
||||
|
||||
SExplainRsp *rsp = NULL;
|
||||
for (int32_t i = 0; i < group->nodeNum; ++i) {
|
||||
rsp = taosArrayGet(group->nodeExecInfo, i);
|
||||
/*
|
||||
if (group->singleChannel) {
|
||||
if (0 == group->physiPlanExecIdx) {
|
||||
group->nodeIdx = 0;
|
||||
}
|
||||
|
||||
rsp = taosArrayGet(group->nodeExecInfo, group->nodeIdx++);
|
||||
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
|
||||
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
|
||||
*/
|
||||
taosArrayPush(*pExecInfo, rsp->subplanInfo);
|
||||
} else {
|
||||
for (int32_t i = 0; i < group->nodeNum; ++i) {
|
||||
rsp = taosArrayGet(group->nodeExecInfo, i);
|
||||
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
|
||||
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
|
||||
}
|
||||
}
|
||||
|
||||
++group->physiPlanExecIdx;
|
||||
|
@ -291,7 +304,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai
|
|||
resNode->pNode = pNode;
|
||||
|
||||
if (group->nodeExecInfo) {
|
||||
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group));
|
||||
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(pNode, &resNode->pExecInfo, group));
|
||||
}
|
||||
|
||||
QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren));
|
||||
|
@ -801,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
}
|
||||
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1));
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1, pExchNode->singleChannel));
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
|
||||
|
@ -1533,7 +1546,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel) {
|
||||
SExplainResNode *node = NULL;
|
||||
int32_t code = 0;
|
||||
SExplainCtx *ctx = (SExplainCtx *)pCtx;
|
||||
|
@ -1544,6 +1557,9 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
|
|||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
group->singleChannel = singleChannel;
|
||||
group->physiPlanExecIdx = 0;
|
||||
|
||||
QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node));
|
||||
|
||||
QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level));
|
||||
|
@ -1707,7 +1723,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
|
|||
}
|
||||
|
||||
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0, false));
|
||||
QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
|
||||
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
|
||||
|
||||
|
@ -1723,7 +1739,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
|
|||
SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId));
|
||||
if (NULL == group) {
|
||||
qError("group %d not in groupHash", groupId);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
tFreeSExplainRsp(pRspMsg);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -1732,7 +1748,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
|
|||
group->nodeExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainRsp));
|
||||
if (NULL == group->nodeExecInfo) {
|
||||
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
tFreeSExplainRsp(pRspMsg);
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -1742,7 +1758,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
|
|||
} else if (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum) {
|
||||
qError("group execInfo already full, size:%d, nodeNum:%d", (int32_t)taosArrayGetSize(group->nodeExecInfo),
|
||||
group->nodeNum);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
tFreeSExplainRsp(pRspMsg);
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
|
@ -1751,13 +1767,14 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
|
|||
if (group->physiPlanExecNum != pRspMsg->numOfPlans) {
|
||||
qError("physiPlanExecNum %d mismatch with others %d in group %d", pRspMsg->numOfPlans, group->physiPlanExecNum,
|
||||
groupId);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
tFreeSExplainRsp(pRspMsg);
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
taosArrayPush(group->nodeExecInfo, pRspMsg);
|
||||
|
||||
groupDone = (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum);
|
||||
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
|
|
@ -183,6 +183,7 @@ typedef struct SExecTaskInfo {
|
|||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
SSubplan* pSubplan;
|
||||
struct SOperatorInfo* pRoot;
|
||||
SLocalFetch localFetch;
|
||||
} SExecTaskInfo;
|
||||
|
||||
enum {
|
||||
|
|
|
@ -479,10 +479,14 @@ static void freeBlock(void* param) {
|
|||
blockDataDestroy(pBlock);
|
||||
}
|
||||
|
||||
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
|
||||
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int64_t threadId = taosGetSelfPthreadId();
|
||||
|
||||
if (pLocal) {
|
||||
memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
|
||||
}
|
||||
|
||||
taosArrayClearEx(pResList, freeBlock);
|
||||
|
||||
int64_t curOwner = 0;
|
||||
|
|
|
@ -1776,17 +1776,27 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
|
||||
|
||||
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
|
||||
|
||||
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
|
||||
pWrapper->exchangeId = pExchangeInfo->self;
|
||||
pWrapper->sourceIndex = sourceIndex;
|
||||
|
||||
if (pSource->localExec) {
|
||||
SDataBuf pBuf = {0};
|
||||
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
|
||||
loadRemoteDataCallback(pWrapper, &pBuf, code);
|
||||
taosMemoryFree(pWrapper);
|
||||
} else {
|
||||
SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
|
||||
if (NULL == pMsg) {
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
|
||||
|
||||
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
|
||||
|
||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
|
||||
sourceIndex, totalSources);
|
||||
|
@ -1806,10 +1816,6 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
|||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
|
||||
pWrapper->exchangeId = pExchangeInfo->self;
|
||||
pWrapper->sourceIndex = sourceIndex;
|
||||
|
||||
pMsgSendInfo->param = pWrapper;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
|
@ -1819,6 +1825,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
|||
|
||||
int64_t transporterId = 0;
|
||||
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3531,7 +3539,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||
pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
||||
pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
if (pHandle->vnode) {
|
||||
|
@ -4047,7 +4055,9 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
|||
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
|
||||
cleanupStreamInfo(&pTaskInfo->streamInfo);
|
||||
|
||||
if (!pTaskInfo->localFetch.localExec) {
|
||||
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pTaskInfo->sql);
|
||||
taosMemoryFreeClear(pTaskInfo->id.str);
|
||||
|
|
|
@ -627,6 +627,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
|
|||
COPY_SCALAR_FIELD(schedId);
|
||||
COPY_SCALAR_FIELD(execId);
|
||||
COPY_SCALAR_FIELD(fetchMsgType);
|
||||
COPY_SCALAR_FIELD(localExec);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ extern "C" {
|
|||
#include "executor.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
|
||||
#define QW_DEFAULT_SCHEDULER_NUMBER 100
|
||||
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||
|
@ -83,22 +83,6 @@ typedef struct SQWDebug {
|
|||
|
||||
extern SQWDebug gQWDebug;
|
||||
|
||||
typedef struct SQWMsgInfo {
|
||||
int8_t taskType;
|
||||
int8_t explain;
|
||||
int8_t needFetch;
|
||||
} SQWMsgInfo;
|
||||
|
||||
typedef struct SQWMsg {
|
||||
void *node;
|
||||
int32_t code;
|
||||
int32_t msgType;
|
||||
char *msg;
|
||||
int32_t msgLen;
|
||||
SQWMsgInfo msgInfo;
|
||||
SRpcHandleInfo connInfo;
|
||||
} SQWMsg;
|
||||
|
||||
typedef struct SQWHbParam {
|
||||
bool inUse;
|
||||
int32_t qwrId;
|
||||
|
@ -133,6 +117,7 @@ typedef struct SQWTaskCtx {
|
|||
int8_t taskType;
|
||||
int8_t explain;
|
||||
int8_t needFetch;
|
||||
int8_t localExec;
|
||||
int32_t msgType;
|
||||
int32_t fetchType;
|
||||
int32_t execId;
|
||||
|
@ -150,6 +135,7 @@ typedef struct SQWTaskCtx {
|
|||
|
||||
int8_t events[QW_EVENT_MAX];
|
||||
|
||||
SArray *explainRes;
|
||||
void *taskHandle;
|
||||
void *sinkHandle;
|
||||
STbVerInfo tbInfo;
|
||||
|
|
|
@ -42,7 +42,7 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
|
|||
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList);
|
||||
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
|
||||
void qwFreeFetchRsp(void *msg);
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp);
|
||||
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
|
||||
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn);
|
||||
|
|
|
@ -9,10 +9,10 @@
|
|||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
|
||||
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
|
||||
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
|
||||
|
||||
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize);
|
||||
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize));
|
||||
if (NULL == pRsp) {
|
||||
qError("rpcMallocCont %d failed", msgSize);
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
|
|
@ -63,12 +63,26 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
|
||||
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
|
||||
|
||||
if (ctx->localExec) {
|
||||
SExplainLocalRsp localRsp = {0};
|
||||
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
|
||||
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
|
||||
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
|
||||
localRsp.rsp.subplanInfo = pExec;
|
||||
localRsp.qId = qId;
|
||||
localRsp.tId = tId;
|
||||
localRsp.rId = rId;
|
||||
localRsp.eId = eId;
|
||||
taosArrayPush(ctx->explainRes, &localRsp);
|
||||
taosArrayDestroy(execInfoList);
|
||||
} else {
|
||||
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
|
||||
connInfo.ahandle = NULL;
|
||||
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
|
||||
taosArrayDestroyEx(execInfoList, freeItem);
|
||||
QW_ERR_RET(code);
|
||||
}
|
||||
}
|
||||
|
||||
if (!ctx->needFetch) {
|
||||
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
|
||||
|
@ -86,6 +100,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
|||
int32_t execNum = 0;
|
||||
qTaskInfo_t taskHandle = ctx->taskHandle;
|
||||
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
||||
SLocalFetch localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes};
|
||||
|
||||
SArray *pResList = taosArrayInit(4, POINTER_BYTES);
|
||||
while (true) {
|
||||
|
@ -94,8 +109,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
|||
// if *taskHandle is NULL, it's killed right now
|
||||
if (taskHandle) {
|
||||
qwDbgSimulateSleep();
|
||||
|
||||
code = qExecTaskOpt(taskHandle, pResList, &useconds);
|
||||
code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch);
|
||||
if (code) {
|
||||
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
|
||||
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||
|
@ -235,7 +249,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
||||
if (NULL == rsp) {
|
||||
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
||||
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
|
||||
*pOutput = output;
|
||||
} else {
|
||||
pOutput->queryEnd = output.queryEnd;
|
||||
|
@ -256,7 +270,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
|
||||
*dataLen += len;
|
||||
|
||||
QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp));
|
||||
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
|
||||
|
||||
output.pData = rsp->data + *dataLen - len;
|
||||
code = dsGetDataBlock(ctx->sinkHandle, &output);
|
||||
|
@ -480,8 +494,7 @@ _return:
|
|||
}
|
||||
|
||||
if (QW_PHASE_POST_QUERY == phase && ctx) {
|
||||
ctx->queryRsped = true;
|
||||
|
||||
if (!ctx->localExec) {
|
||||
bool rsped = false;
|
||||
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
|
||||
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
|
||||
|
@ -492,6 +505,9 @@ _return:
|
|||
}
|
||||
}
|
||||
|
||||
ctx->queryRsped = true;
|
||||
}
|
||||
|
||||
if (ctx) {
|
||||
QW_UPDATE_RSP_CODE(ctx, code);
|
||||
|
||||
|
@ -518,11 +534,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
|
|||
|
||||
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||
int32_t code = 0;
|
||||
bool queryRsped = false;
|
||||
SSubplan *plan = NULL;
|
||||
SQWPhaseInput input = {0};
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
DataSinkHandle sinkHandle = NULL;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
|
||||
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
|
||||
|
@ -562,6 +573,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
|
|||
ctx->explain = qwMsg->msgInfo.explain;
|
||||
ctx->needFetch = qwMsg->msgInfo.needFetch;
|
||||
ctx->msgType = qwMsg->msgType;
|
||||
ctx->localExec = false;
|
||||
|
||||
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
|
||||
|
||||
|
@ -584,19 +596,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
|
|||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
// QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
|
||||
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
|
||||
// queryRsped = true;
|
||||
|
||||
ctx->level = plan->level;
|
||||
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
|
||||
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
|
||||
|
||||
if (pTaskInfo && sinkHandle) {
|
||||
qwSaveTbVersionInfo(pTaskInfo, ctx);
|
||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -606,11 +611,6 @@ _return:
|
|||
input.msgType = qwMsg->msgType;
|
||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||
|
||||
// if (!queryRsped) {
|
||||
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
|
||||
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
//}
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -1006,8 +1006,8 @@ _return:
|
|||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
|
||||
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
|
||||
if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
|
||||
qError("invalid param to init qworker");
|
||||
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
@ -1030,22 +1030,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (cfg) {
|
||||
mgmt->cfg = *cfg;
|
||||
if (0 == mgmt->cfg.maxSchedulerNum) {
|
||||
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
|
||||
}
|
||||
if (0 == mgmt->cfg.maxTaskNum) {
|
||||
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
|
||||
}
|
||||
if (0 == mgmt->cfg.maxSchTaskNum) {
|
||||
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
|
||||
}
|
||||
} else {
|
||||
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
|
||||
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
|
||||
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
|
||||
}
|
||||
|
||||
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
|
||||
HASH_ENTRY_LOCK);
|
||||
|
@ -1070,7 +1057,11 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
|
||||
mgmt->nodeType = nodeType;
|
||||
mgmt->nodeId = nodeId;
|
||||
if (pMsgCb) {
|
||||
mgmt->msgCb = *pMsgCb;
|
||||
} else {
|
||||
memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
|
||||
}
|
||||
|
||||
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
|
||||
if (mgmt->refId < 0) {
|
||||
|
@ -1153,3 +1144,112 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) {
|
||||
SQWorker *mgmt = (SQWorker*)pMgmt;
|
||||
int32_t code = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
SSubplan *plan = (SSubplan *)qwMsg->msg;
|
||||
SQWPhaseInput input = {0};
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
DataSinkHandle sinkHandle = NULL;
|
||||
SReadHandle rHandle = {0};
|
||||
|
||||
QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
|
||||
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
|
||||
|
||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
|
||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
ctx->taskType = qwMsg->msgInfo.taskType;
|
||||
ctx->explain = qwMsg->msgInfo.explain;
|
||||
ctx->needFetch = qwMsg->msgInfo.needFetch;
|
||||
ctx->msgType = qwMsg->msgType;
|
||||
ctx->localExec = true;
|
||||
ctx->explainRes = explainRes;
|
||||
|
||||
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
|
||||
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
|
||||
|
||||
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
|
||||
if (code) {
|
||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||
QW_ERR_JRET(code);
|
||||
}
|
||||
|
||||
if (NULL == sinkHandle || NULL == pTaskInfo) {
|
||||
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
ctx->level = plan->level;
|
||||
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
|
||||
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
|
||||
|
||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(rHandle.pMsgCb);
|
||||
|
||||
input.code = code;
|
||||
input.msgType = qwMsg->msgType;
|
||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||
|
||||
if (ctx) {
|
||||
QW_UPDATE_RSP_CODE(ctx, code);
|
||||
qwReleaseTaskCtx(mgmt, ctx);
|
||||
}
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) {
|
||||
SQWorker *mgmt = (SQWorker*)pMgmt;
|
||||
int32_t code = 0;
|
||||
int32_t dataLen = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
void *rsp = NULL;
|
||||
bool queryStop = false;
|
||||
|
||||
SQWPhaseInput input = {0};
|
||||
|
||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
|
||||
|
||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
ctx->msgType = TDMT_SCH_MERGE_FETCH;
|
||||
ctx->explainRes = explainRes;
|
||||
|
||||
SOutputData sOutput = {0};
|
||||
|
||||
while (true) {
|
||||
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||
|
||||
if (NULL == rsp) {
|
||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
|
||||
|
||||
continue;
|
||||
} else {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
*pRsp = rsp;
|
||||
|
||||
input.code = code;
|
||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -877,7 +877,7 @@ TEST(seqTest, normalCase) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
|
||||
|
@ -913,7 +913,7 @@ TEST(seqTest, cancelFirst) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
|
||||
|
@ -950,7 +950,7 @@ TEST(seqTest, randCase) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
int32_t t = 0;
|
||||
|
@ -1021,7 +1021,7 @@ TEST(seqTest, multithreadRand) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
TdThreadAttr thattr;
|
||||
|
@ -1084,7 +1084,7 @@ TEST(rcTest, shortExecshortDelay) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qwtTestMaxExecTaskUsec = 0;
|
||||
|
@ -1168,7 +1168,7 @@ TEST(rcTest, longExecshortDelay) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qwtTestMaxExecTaskUsec = 1000000;
|
||||
|
@ -1254,7 +1254,7 @@ TEST(rcTest, shortExeclongDelay) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qwtTestMaxExecTaskUsec = 0;
|
||||
|
@ -1338,7 +1338,7 @@ TEST(rcTest, dropTest) {
|
|||
SMsgCb msgCb = {0};
|
||||
msgCb.mgmt = (void *)mockPointer;
|
||||
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
tsem_init(&qwtTestQuerySem, 0, 0);
|
||||
|
|
|
@ -9,7 +9,7 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
scheduler
|
||||
PUBLIC os util nodes planner qcom common catalog transport command
|
||||
PUBLIC os util nodes planner qcom common catalog transport command qworker executor
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
|
|
|
@ -151,6 +151,7 @@ typedef struct SSchedulerMgmt {
|
|||
SSchStat stat;
|
||||
SRWLatch hbLock;
|
||||
SHashObj *hbConnections;
|
||||
void *queryMgmt;
|
||||
} SSchedulerMgmt;
|
||||
|
||||
typedef struct SSchCallbackParamHeader {
|
||||
|
@ -235,8 +236,10 @@ typedef struct SSchTask {
|
|||
typedef struct SSchJobAttr {
|
||||
EExplainMode explainMode;
|
||||
bool queryJob;
|
||||
bool insertJob;
|
||||
bool needFetch;
|
||||
bool needFlowCtrl;
|
||||
bool localExec;
|
||||
} SSchJobAttr;
|
||||
|
||||
typedef struct {
|
||||
|
@ -304,6 +307,8 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
|
||||
#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
|
||||
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
|
||||
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
|
||||
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && (!SCH_IS_DATA_BIND_QRY_TASK(_task)))
|
||||
|
||||
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
|
||||
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
|
||||
|
@ -326,8 +331,9 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
|
||||
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
|
||||
|
||||
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
|
||||
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } else { (_job)->attr.insertJob = true; } } while (0)
|
||||
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
|
||||
#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob)
|
||||
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
|
||||
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
|
||||
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
|
||||
|
@ -502,6 +508,8 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
|
|||
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
|
||||
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
|
||||
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
|
||||
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
|
||||
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
|
||||
|
||||
extern SSchDebug gSCHDebug;
|
||||
|
||||
|
|
|
@ -720,6 +720,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
|||
}
|
||||
|
||||
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
|
||||
pJob->attr.localExec = pReq->localReq;
|
||||
pJob->conn = *pReq->pConn;
|
||||
if (pReq->sql) {
|
||||
pJob->sql = strdup(pReq->sql);
|
||||
|
|
|
@ -72,6 +72,71 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||
int32_t code = 0;
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
|
||||
if (NULL == msg) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
if (rsp->completed) {
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
|
||||
if (pRsp) {
|
||||
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schLaunchFetchTask(pJob));
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pJob->fetchRes) {
|
||||
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
atomic_store_ptr(&pJob->fetchRes, rsp);
|
||||
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
|
||||
|
||||
if (rsp->completed) {
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
|
||||
|
||||
msg = NULL;
|
||||
schProcessOnDataFetched(pJob);
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp));
|
||||
|
||||
if (pRsp) {
|
||||
SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
@ -301,65 +366,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
|
||||
SExplainRsp rsp = {0};
|
||||
if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
|
||||
taosMemoryFree(rsp.subplanInfo);
|
||||
tFreeSExplainRsp(&rsp);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
|
||||
SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
|
||||
|
||||
if (pRsp) {
|
||||
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
|
||||
}
|
||||
taosMemoryFreeClear(msg);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
case TDMT_SCH_MERGE_FETCH_RSP: {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
if (NULL == msg) {
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
if (rsp->completed) {
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
|
||||
if (pRsp) {
|
||||
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schLaunchFetchTask(pJob));
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pJob->fetchRes) {
|
||||
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
|
||||
taosMemoryFreeClear(rsp);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
atomic_store_ptr(&pJob->fetchRes, rsp);
|
||||
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
|
||||
|
||||
if (rsp->completed) {
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
|
||||
|
||||
code = schProcessFetchRsp(pJob, pTask, msg, rspCode);
|
||||
msg = NULL;
|
||||
|
||||
schProcessOnDataFetched(pJob);
|
||||
SCH_ERR_JRET(code);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_DROP_TASK_RSP: {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "qworker.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
|
@ -90,6 +91,10 @@ _return:
|
|||
}
|
||||
|
||||
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
if (NULL == addr) {
|
||||
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
|
||||
|
@ -230,7 +235,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
|
|||
SCH_RET(errCode);
|
||||
}
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||
bool moved = false;
|
||||
int32_t code = 0;
|
||||
|
@ -295,6 +299,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
.execId = pTask->execId,
|
||||
.addr = pTask->succeedAddr,
|
||||
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
||||
.localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
|
||||
};
|
||||
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
||||
SCH_UNLOCK(SCH_WRITE, &parent->planLock);
|
||||
|
@ -825,6 +830,120 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schHandleExplainRes(SArray *pExplainRes) {
|
||||
int32_t code = 0;
|
||||
int32_t resNum = taosArrayGetSize(pExplainRes);
|
||||
if (resNum <= 0) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
SSchTask *pTask = NULL;
|
||||
SSchJob *pJob = NULL;
|
||||
|
||||
for (int32_t i = 0; i < resNum; ++i) {
|
||||
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
|
||||
|
||||
pJob = schAcquireJob(localRsp->rId);
|
||||
if (NULL == pJob) {
|
||||
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
||||
}
|
||||
|
||||
int8_t status = 0;
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
|
||||
schReleaseJob(pJob->refId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
|
||||
code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
|
||||
}
|
||||
|
||||
schReleaseJob(pJob->refId);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code);
|
||||
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
localRsp->rsp.numOfPlans = 0;
|
||||
localRsp->rsp.subplanInfo = NULL;
|
||||
pTask = NULL;
|
||||
pJob = NULL;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
for (int32_t i = 0; i < resNum; ++i) {
|
||||
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
|
||||
tFreeSExplainRsp(&localRsp->rsp);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pExplainRes);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
SSubplan *plan = pTask->plan;
|
||||
int32_t code = 0;
|
||||
|
||||
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
|
||||
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
|
||||
pTask->msgLen);
|
||||
SCH_ERR_RET(code);
|
||||
} else if (tsQueryPlannerTrace) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
qSubPlanToString(plan, &msg, &msgLen);
|
||||
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
|
||||
taosMemoryFree(msg);
|
||||
}
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
|
||||
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
|
||||
}
|
||||
|
||||
SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
|
||||
}
|
||||
|
||||
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
//SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
if (NULL == schMgmt.queryMgmt) {
|
||||
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
|
||||
}
|
||||
|
||||
SArray *explainRes = NULL;
|
||||
SQWMsg qwMsg = {0};
|
||||
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
|
||||
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
|
||||
qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
|
||||
qwMsg.msg = pTask->plan;
|
||||
qwMsg.msgType = pTask->plan->msgType;
|
||||
qwMsg.connInfo.handle = pJob->conn.pTrans;
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
}
|
||||
|
||||
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
}
|
||||
|
||||
int32_t schLaunchTaskImpl(void *param) {
|
||||
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
|
||||
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
|
||||
|
@ -848,7 +967,8 @@ int32_t schLaunchTaskImpl(void *param) {
|
|||
pTask->retryTimes++;
|
||||
pTask->waitRetry = false;
|
||||
|
||||
SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
|
||||
SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE",
|
||||
pTask->execId, pTask->retryTimes);
|
||||
|
||||
SCH_LOG_TASK_START_TS(pTask);
|
||||
|
||||
|
@ -863,30 +983,11 @@ int32_t schLaunchTaskImpl(void *param) {
|
|||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
|
||||
}
|
||||
|
||||
SSubplan *plan = pTask->plan;
|
||||
|
||||
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
|
||||
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
|
||||
pTask->msgLen);
|
||||
SCH_ERR_JRET(code);
|
||||
} else if (tsQueryPlannerTrace) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
qSubPlanToString(plan, &msg, &msgLen);
|
||||
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
|
||||
taosMemoryFree(msg);
|
||||
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
|
||||
SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
|
||||
} else {
|
||||
SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
|
||||
}
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
|
||||
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -980,6 +1081,29 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
|
||||
SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
|
||||
}
|
||||
|
||||
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
|
||||
void *pRsp = NULL;
|
||||
SArray *explainRes = NULL;
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Note: no more error processing, handled in function internal
|
||||
int32_t schLaunchFetchTask(SSchJob *pJob) {
|
||||
int32_t code = 0;
|
||||
|
@ -990,7 +1114,11 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
|
||||
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
|
||||
SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
|
||||
} else {
|
||||
SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "schInt.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "qworker.h"
|
||||
|
||||
SSchedulerMgmt schMgmt = {
|
||||
.jobRef = -1,
|
||||
|
@ -192,4 +193,7 @@ void schedulerDestroy(void) {
|
|||
schMgmt.hbConnections = NULL;
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
qWorkerDestroy(&schMgmt.queryMgmt);
|
||||
schMgmt.queryMgmt = NULL;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,6 @@ target_include_directories(
|
|||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
if(BUILD_TEST AND BUILD_SYNC_TEST)
|
||||
add_subdirectory(test)
|
||||
endif(${BUILD_TEST})
|
||||
endif()
|
||||
|
|
|
@ -517,3 +517,94 @@ python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
|
|||
python3 ./test.py -f 2-query/sml.py -Q 3
|
||||
python3 ./test.py -f 2-query/interp.py -Q 3
|
||||
|
||||
|
||||
#------------querPolicy 4-----------
|
||||
|
||||
python3 ./test.py -f 2-query/between.py -Q 4
|
||||
python3 ./test.py -f 2-query/distinct.py -Q 4
|
||||
python3 ./test.py -f 2-query/varchar.py -Q 4
|
||||
python3 ./test.py -f 2-query/ltrim.py -Q 4
|
||||
python3 ./test.py -f 2-query/rtrim.py -Q 4
|
||||
python3 ./test.py -f 2-query/length.py -Q 4
|
||||
python3 ./test.py -f 2-query/char_length.py -Q 4
|
||||
python3 ./test.py -f 2-query/upper.py -Q 4
|
||||
python3 ./test.py -f 2-query/lower.py -Q 4
|
||||
python3 ./test.py -f 2-query/join.py -Q 4
|
||||
python3 ./test.py -f 2-query/join2.py -Q 4
|
||||
python3 ./test.py -f 2-query/cast.py -Q 4
|
||||
python3 ./test.py -f 2-query/substr.py -Q 4
|
||||
python3 ./test.py -f 2-query/union.py -Q 4
|
||||
python3 ./test.py -f 2-query/union1.py -Q 4
|
||||
python3 ./test.py -f 2-query/concat.py -Q 4
|
||||
python3 ./test.py -f 2-query/concat2.py -Q 4
|
||||
python3 ./test.py -f 2-query/concat_ws.py -Q 4
|
||||
python3 ./test.py -f 2-query/concat_ws2.py -Q 4
|
||||
#python3 ./test.py -f 2-query/check_tsdb.py -Q 4
|
||||
python3 ./test.py -f 2-query/spread.py -Q 4
|
||||
python3 ./test.py -f 2-query/hyperloglog.py -Q 4
|
||||
python3 ./test.py -f 2-query/explain.py -Q 4
|
||||
python3 ./test.py -f 2-query/leastsquares.py -Q 4
|
||||
python3 ./test.py -f 2-query/timezone.py -Q 4
|
||||
python3 ./test.py -f 2-query/Now.py -Q 4
|
||||
python3 ./test.py -f 2-query/Today.py -Q 4
|
||||
python3 ./test.py -f 2-query/max.py -Q 4
|
||||
python3 ./test.py -f 2-query/min.py -Q 4
|
||||
python3 ./test.py -f 2-query/count.py -Q 4
|
||||
#python3 ./test.py -f 2-query/last.py -Q 4
|
||||
python3 ./test.py -f 2-query/first.py -Q 4
|
||||
python3 ./test.py -f 2-query/To_iso8601.py -Q 4
|
||||
python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 4
|
||||
python3 ./test.py -f 2-query/timetruncate.py -Q 4
|
||||
python3 ./test.py -f 2-query/diff.py -Q 4
|
||||
python3 ./test.py -f 2-query/Timediff.py -Q 4
|
||||
python3 ./test.py -f 2-query/json_tag.py -Q 4
|
||||
python3 ./test.py -f 2-query/top.py -Q 4
|
||||
python3 ./test.py -f 2-query/bottom.py -Q 4
|
||||
python3 ./test.py -f 2-query/percentile.py -Q 4
|
||||
python3 ./test.py -f 2-query/apercentile.py -Q 4
|
||||
python3 ./test.py -f 2-query/abs.py -Q 4
|
||||
python3 ./test.py -f 2-query/ceil.py -Q 4
|
||||
python3 ./test.py -f 2-query/floor.py -Q 4
|
||||
python3 ./test.py -f 2-query/round.py -Q 4
|
||||
python3 ./test.py -f 2-query/log.py -Q 4
|
||||
python3 ./test.py -f 2-query/pow.py -Q 4
|
||||
python3 ./test.py -f 2-query/sqrt.py -Q 4
|
||||
python3 ./test.py -f 2-query/sin.py -Q 4
|
||||
python3 ./test.py -f 2-query/cos.py -Q 4
|
||||
python3 ./test.py -f 2-query/tan.py -Q 4
|
||||
python3 ./test.py -f 2-query/arcsin.py -Q 4
|
||||
python3 ./test.py -f 2-query/arccos.py -Q 4
|
||||
python3 ./test.py -f 2-query/arctan.py -Q 4
|
||||
python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 4
|
||||
# python3 ./test.py -f 2-query/nestedQuery.py -Q 4
|
||||
# python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
|
||||
# python3 ./test.py -f 2-query/avg.py -Q 4
|
||||
# python3 ./test.py -f 2-query/elapsed.py -Q 4
|
||||
python3 ./test.py -f 2-query/csum.py -Q 4
|
||||
#python3 ./test.py -f 2-query/mavg.py -Q 4
|
||||
python3 ./test.py -f 2-query/sample.py -Q 4
|
||||
python3 ./test.py -f 2-query/function_diff.py -Q 4
|
||||
python3 ./test.py -f 2-query/unique.py -Q 4
|
||||
python3 ./test.py -f 2-query/stateduration.py -Q 4
|
||||
python3 ./test.py -f 2-query/function_stateduration.py -Q 4
|
||||
python3 ./test.py -f 2-query/statecount.py -Q 4
|
||||
python3 ./test.py -f 2-query/tail.py -Q 4
|
||||
python3 ./test.py -f 2-query/ttl_comment.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_count.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_max.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_min.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_sum.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_spread.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_apercentile.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_avg.py -Q 4
|
||||
python3 ./test.py -f 2-query/distribute_agg_stddev.py -Q 4
|
||||
python3 ./test.py -f 2-query/twa.py -Q 4
|
||||
python3 ./test.py -f 2-query/irate.py -Q 4
|
||||
python3 ./test.py -f 2-query/function_null.py -Q 4
|
||||
python3 ./test.py -f 2-query/count_partition.py -Q 4
|
||||
python3 ./test.py -f 2-query/max_partition.py -Q 4
|
||||
python3 ./test.py -f 2-query/last_row.py -Q 4
|
||||
python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
|
||||
#python3 ./test.py -f 2-query/sml.py -Q 4
|
||||
python3 ./test.py -f 2-query/interp.py -Q 4
|
||||
|
||||
|
|
Loading…
Reference in New Issue