Merge branch '3.0' into fix/TD-20286
This commit is contained in:
commit
9a5e82267e
|
@ -838,6 +838,7 @@ typedef struct {
|
|||
int64_t dbId;
|
||||
int32_t vgVersion;
|
||||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||
int64_t stateTs; // ms
|
||||
} SUseDbReq;
|
||||
|
||||
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
|
||||
|
@ -853,6 +854,7 @@ typedef struct {
|
|||
int8_t hashMethod;
|
||||
SArray* pVgroupInfos; // Array of SVgroupInfo
|
||||
int32_t errCode;
|
||||
int64_t stateTs; // ms
|
||||
} SUseDbRsp;
|
||||
|
||||
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
|
||||
|
|
|
@ -124,6 +124,7 @@ typedef struct SDbVgVersion {
|
|||
int64_t dbId;
|
||||
int32_t vgVersion;
|
||||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||
int64_t stateTs;
|
||||
} SDbVgVersion;
|
||||
|
||||
typedef struct STbSVersion {
|
||||
|
|
|
@ -13,7 +13,7 @@ def sync_source(branch_name) {
|
|||
git branch
|
||||
git pull || git pull
|
||||
git log | head -n 20
|
||||
git submodule update --init --recursive
|
||||
git clean -fxd
|
||||
'''
|
||||
return 1
|
||||
}
|
||||
|
@ -37,19 +37,24 @@ def build_run() {
|
|||
pipeline {
|
||||
agent none
|
||||
parameters {
|
||||
choice(
|
||||
name: 'sourcePath',
|
||||
choices: ['nas','web'],
|
||||
description: 'choice which way to download the installation pacakge;web is Office Web and nas means taos nas server '
|
||||
)
|
||||
string (
|
||||
name:'version',
|
||||
defaultValue:'3.0.0.1',
|
||||
defaultValue:'3.0.1.6',
|
||||
description: 'release version number,eg: 3.0.0.1 or 3.0.0.'
|
||||
)
|
||||
string (
|
||||
name:'baseVersion',
|
||||
defaultValue:'3.0.0.1',
|
||||
defaultValue:'3.0.1.6',
|
||||
description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1'
|
||||
)
|
||||
string (
|
||||
name:'toolsVersion',
|
||||
defaultValue:'2.1.2',
|
||||
defaultValue:'2.2.7',
|
||||
description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1'
|
||||
)
|
||||
string (
|
||||
|
@ -62,7 +67,7 @@ pipeline {
|
|||
WORK_DIR = '/var/lib/jenkins/workspace'
|
||||
TDINTERNAL_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal'
|
||||
TDENGINE_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal/community'
|
||||
BRANCH_NAME = 'test/chr/TD-14699'
|
||||
BRANCH_NAME = '3.0'
|
||||
|
||||
TD_SERVER_TAR = "TDengine-server-${version}-Linux-x64.tar.gz"
|
||||
BASE_TD_SERVER_TAR = "TDengine-server-${baseVersion}-Linux-x64.tar.gz"
|
||||
|
@ -104,17 +109,17 @@ pipeline {
|
|||
sync_source("${BRANCH_NAME}")
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
}
|
||||
|
@ -127,17 +132,17 @@ pipeline {
|
|||
sync_source("${BRANCH_NAME}")
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
dpkg -r tdengine
|
||||
'''
|
||||
|
@ -152,17 +157,17 @@ pipeline {
|
|||
sync_source("${BRANCH_NAME}")
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
}
|
||||
|
@ -175,17 +180,17 @@ pipeline {
|
|||
sync_source("${BRANCH_NAME}")
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
sudo rpm -e tdengine
|
||||
'''
|
||||
|
@ -199,7 +204,7 @@ pipeline {
|
|||
sync_source("${BRANCH_NAME}")
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_SERVER_ARM_TAR} ${version} ${BASE_TD_SERVER_ARM_TAR} ${baseVersion} server
|
||||
bash testpackage.sh ${TD_SERVER_ARM_TAR} ${version} ${BASE_TD_SERVER_ARM_TAR} ${baseVersion} server ${sourcePath}
|
||||
python3 checkPackageRuning.py
|
||||
'''
|
||||
}
|
||||
|
@ -215,7 +220,7 @@ pipeline {
|
|||
timeout(time: 30, unit: 'MINUTES'){
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_CLIENT_TAR} ${version} ${BASE_TD_CLIENT_TAR} ${baseVersion} client
|
||||
bash testpackage.sh ${TD_CLIENT_TAR} ${version} ${BASE_TD_CLIENT_TAR} ${baseVersion} client ${sourcePath}
|
||||
python3 checkPackageRuning.py 192.168.0.21
|
||||
'''
|
||||
}
|
||||
|
@ -227,7 +232,7 @@ pipeline {
|
|||
timeout(time: 30, unit: 'MINUTES'){
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_CLIENT_LITE_TAR} ${version} ${BASE_TD_CLIENT_LITE_TAR} ${baseVersion} client
|
||||
bash testpackage.sh ${TD_CLIENT_LITE_TAR} ${version} ${BASE_TD_CLIENT_LITE_TAR} ${baseVersion} client ${sourcePath}
|
||||
python3 checkPackageRuning.py 192.168.0.24
|
||||
'''
|
||||
}
|
||||
|
@ -241,7 +246,7 @@ pipeline {
|
|||
timeout(time: 30, unit: 'MINUTES'){
|
||||
sh '''
|
||||
cd ${TDENGINE_ROOT_DIR}/packaging
|
||||
bash testpackage.sh ${TD_CLIENT_ARM_TAR} ${version} ${BASE_TD_CLIENT_ARM_TAR} ${baseVersion} client
|
||||
bash testpackage.sh ${TD_CLIENT_ARM_TAR} ${version} ${BASE_TD_CLIENT_ARM_TAR} ${baseVersion} client ${sourcePath}
|
||||
python3 checkPackageRuning.py 192.168.0.21
|
||||
'''
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ version=$2
|
|||
originPackageName=$3
|
||||
originversion=$4
|
||||
testFile=$5
|
||||
# sourcePath:web/nas
|
||||
sourcePath=$6
|
||||
subFile="taos.tar.gz"
|
||||
|
||||
# Color setting
|
||||
|
@ -71,16 +73,23 @@ fi
|
|||
function wgetFile {
|
||||
|
||||
file=$1
|
||||
|
||||
if [ ! -f ${file} ];then
|
||||
echoColor BD "wget https://www.taosdata.com/assets-download/3.0/${file}"
|
||||
wget https://www.taosdata.com/assets-download/3.0/${file}
|
||||
else
|
||||
echoColor YD "${file} already exists and use new file "
|
||||
versionPath=$2
|
||||
sourceP=$3
|
||||
nasServerIP="192.168.1.131"
|
||||
packagePath="/nas/TDengine/v${versionPath}/community"
|
||||
if [ -f ${file} ];then
|
||||
echoColor YD "${file} already exists ,it will delete it and download it again "
|
||||
rm -rf ${file}
|
||||
echoColor BD "wget https://www.taosdata.com/assets-download/3.0/${file}"
|
||||
wget https://www.taosdata.com/assets-download/3.0/${file}
|
||||
fi
|
||||
|
||||
if [ ${sourceP} = 'web' ];then
|
||||
echoColor BD "====download====:wget https://www.taosdata.com/assets-download/3.0/${file}"
|
||||
wget https://www.taosdata.com/assets-download/3.0/${file}
|
||||
elif [ ${sourceP} = 'nas' ];then
|
||||
echoColor BD "====download====:scp root@${nasServerIP}:${packagePath}/${file} ."
|
||||
scp root@${nasServerIP}:${packagePath}/${file} .
|
||||
fi
|
||||
|
||||
}
|
||||
|
||||
function newPath {
|
||||
|
@ -142,8 +151,9 @@ if [ -d ${installPath}/${tdPath} ] ;then
|
|||
fi
|
||||
|
||||
echoColor G "===== download installPackage ====="
|
||||
cd ${installPath} && wgetFile ${packgeName}
|
||||
cd ${oriInstallPath} && wgetFile ${originPackageName}
|
||||
cd ${installPath} && wgetFile ${packgeName} ${version} ${sourcePath}
|
||||
cd ${oriInstallPath} && wgetFile ${originPackageName} ${originversion} ${sourcePath}
|
||||
|
||||
|
||||
cd ${installPath}
|
||||
cp -r ${scriptDir}/debRpmAutoInstall.sh .
|
||||
|
@ -193,7 +203,7 @@ elif [[ ${packgeName} =~ "tar" ]];then
|
|||
cd ${oriInstallPath}
|
||||
if [ ! -f {originPackageName} ];then
|
||||
echoColor YD "download base installPackage"
|
||||
wgetFile ${originPackageName}
|
||||
wgetFile ${originPackageName} ${originversion} ${sourcePath}
|
||||
fi
|
||||
echoColor YD "unzip the base installation package"
|
||||
echoColor BD "tar -xf ${originPackageName}" && tar -xf ${originPackageName}
|
||||
|
@ -238,14 +248,19 @@ cd ${installPath}
|
|||
if [[ ${packgeName} =~ "Lite" ]] || ([[ ${packgeName} =~ "x64" ]] && [[ ${packgeName} =~ "client" ]]) || ([[ ${packgeName} =~ "deb" ]] && [[ ${packgeName} =~ "server" ]]) || ([[ ${packgeName} =~ "rpm" ]] && [[ ${packgeName} =~ "server" ]]) ;then
|
||||
echoColor G "===== install taos-tools when package is lite or client ====="
|
||||
cd ${installPath}
|
||||
wgetFile taosTools-2.1.3-Linux-x64.tar.gz .
|
||||
if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then
|
||||
wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web
|
||||
tar xf taosTools-2.1.3-Linux-x64.tar.gz
|
||||
fi
|
||||
cd taosTools-2.1.3 && bash install-taostools.sh
|
||||
elif ([[ ${packgeName} =~ "arm64" ]] && [[ ${packgeName} =~ "client" ]]);then
|
||||
echoColor G "===== install taos-tools arm when package is arm64-client ====="
|
||||
cd ${installPath}
|
||||
wgetFile taosTools-2.1.3-Linux-arm64.tar.gz .
|
||||
if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then
|
||||
wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web
|
||||
tar xf taosTools-2.1.3-Linux-arm64.tar.gz
|
||||
fi
|
||||
|
||||
cd taosTools-2.1.3 && bash install-taostools.sh
|
||||
fi
|
||||
|
||||
|
|
|
@ -2238,6 +2238,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
|
|||
if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfTable) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->stateTs) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -2254,6 +2255,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
|
|||
if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfTable) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->stateTs) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -2490,6 +2492,7 @@ int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) {
|
|||
}
|
||||
|
||||
if (tEncodeI32(pEncoder, pRsp->errCode) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pRsp->stateTs) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2555,6 +2558,7 @@ int32_t tDeserializeSUseDbRspImp(SDecoder *pDecoder, SUseDbRsp *pRsp) {
|
|||
}
|
||||
|
||||
if (tDecodeI32(pDecoder, &pRsp->errCode) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->stateTs) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -102,7 +102,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -116,7 +116,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TRIM_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -127,7 +127,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -151,7 +151,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -321,6 +321,7 @@ typedef struct {
|
|||
int32_t vgVersion;
|
||||
SDbCfg cfg;
|
||||
SRWLatch lock;
|
||||
int64_t stateTs;
|
||||
} SDbObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1182,6 +1182,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
|
|||
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
pRsp->uid = pDb->uid;
|
||||
pRsp->vgVersion = pDb->vgVersion;
|
||||
pRsp->stateTs = pDb->stateTs;
|
||||
pRsp->vgNum = taosArrayGetSize(pRsp->pVgroupInfos);
|
||||
pRsp->hashMethod = pDb->cfg.hashMethod;
|
||||
pRsp->hashPrefix = pDb->cfg.hashPrefix;
|
||||
|
@ -1234,6 +1235,8 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
mDebug("db:%s, process usedb req vgVersion:%d stateTs:%" PRId64 ", rsp vgVersion:%d stateTs:%" PRId64,
|
||||
usedbReq.db, usedbReq.vgVersion, usedbReq.stateTs, usedbRsp.vgVersion, usedbRsp.stateTs);
|
||||
code = 0;
|
||||
}
|
||||
}
|
||||
|
@ -1290,13 +1293,19 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
|||
|
||||
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
|
||||
|
||||
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
|
||||
mInfo("db:%s, version and numOfTable not changed", pDbVgVersion->dbFName);
|
||||
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable /* &&
|
||||
pDbVgVersion->stateTs == pDb->stateTs */) {
|
||||
mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
|
||||
" numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
|
||||
pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable,
|
||||
pDb->vgVersion, pDb->stateTs, numOfTable);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
continue;
|
||||
} else {
|
||||
mInfo("db:%s, vgroup version changed from %d to %d", pDbVgVersion->dbFName, pDbVgVersion->vgVersion,
|
||||
pDb->vgVersion);
|
||||
mInfo("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
|
||||
" numOfTables:%d, changed to vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
|
||||
pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable,
|
||||
pDb->vgVersion, pDb->stateTs, numOfTable);
|
||||
}
|
||||
|
||||
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||
|
@ -1310,6 +1319,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
|||
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
usedbRsp.uid = pDb->uid;
|
||||
usedbRsp.vgVersion = pDb->vgVersion;
|
||||
usedbRsp.stateTs = pDb->stateTs;
|
||||
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
|
||||
usedbRsp.hashMethod = pDb->cfg.hashMethod;
|
||||
usedbRsp.hashPrefix = pDb->cfg.hashPrefix;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndDnode.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndQnode.h"
|
||||
|
@ -376,8 +377,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
|||
if (pVgroup->vnodeGid[vg].dnodeId == statusReq.dnodeId) {
|
||||
if (pVgroup->vnodeGid[vg].syncState != pVload->syncState ||
|
||||
pVgroup->vnodeGid[vg].syncRestore != pVload->syncRestore) {
|
||||
mTrace("vgId:%d, role changed, old state:%s restored:%d new state:%s restored:%d", pVgroup->vgId,
|
||||
syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore,
|
||||
mInfo("vgId:%d, state changed by status msg, old state:%s restored:%d new state:%s restored:%d",
|
||||
pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore,
|
||||
syncStr(pVload->syncState), pVload->syncRestore);
|
||||
pVgroup->vnodeGid[vg].syncState = pVload->syncState;
|
||||
pVgroup->vnodeGid[vg].syncRestore = pVload->syncRestore;
|
||||
|
@ -387,7 +388,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
if (roleChanged) {
|
||||
// notify scheduler role has changed
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
if (pDb != NULL && pDb->stateTs != curMs) {
|
||||
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs);
|
||||
pDb->stateTs = curMs;
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -139,6 +139,63 @@ static void mndIncreaseUpTime(SMnode *pMnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
bool roleChanged = false;
|
||||
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
|
||||
if (pVgroup->vnodeGid[vg].dnodeId == dnodeId) {
|
||||
if (pVgroup->vnodeGid[vg].syncState != TAOS_SYNC_STATE_ERROR) {
|
||||
mInfo("vgId:%d, state changed by offline check, old state:%s restored:%d new state:error restored:0",
|
||||
pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore);
|
||||
pVgroup->vnodeGid[vg].syncState = TAOS_SYNC_STATE_ERROR;
|
||||
pVgroup->vnodeGid[vg].syncRestore = 0;
|
||||
roleChanged = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (roleChanged) {
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
if (pDb != NULL && pDb->stateTs != curMs) {
|
||||
mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
|
||||
curMs);
|
||||
pDb->stateTs = curMs;
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
}
|
||||
|
||||
static void mndCheckDnodeOffline(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
bool online = mndIsDnodeOnline(pDnode, curMs);
|
||||
if (!online) {
|
||||
mInfo("dnode:%d, in offline state", pDnode->id);
|
||||
mndSetVgroupOffline(pMnode, pDnode->id, curMs);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
}
|
||||
|
||||
static void *mndThreadFp(void *param) {
|
||||
SMnode *pMnode = param;
|
||||
int64_t lastTime = 0;
|
||||
|
@ -174,6 +231,10 @@ static void *mndThreadFp(void *param) {
|
|||
if (sec % tsUptimeInterval == 0) {
|
||||
mndIncreaseUpTime(pMnode);
|
||||
}
|
||||
|
||||
if (sec % (tsStatusInterval * 5) == 0) {
|
||||
mndCheckDnodeOffline(pMnode);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
|
|
@ -366,9 +366,9 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
|
|||
tMapDataClear(&p->mapData);
|
||||
}
|
||||
|
||||
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
|
||||
static void destroyAllBlockScanInfo(SHashObj* pTableMap, bool clearEntry) {
|
||||
void* p = NULL;
|
||||
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
||||
while (clearEntry && ((p = taosHashIterate(pTableMap, p)) != NULL)) {
|
||||
clearBlockScanInfo(*(STableBlockScanInfo**)p);
|
||||
}
|
||||
|
||||
|
@ -3768,7 +3768,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
cleanupDataBlockIterator(&pReader->status.blockIter);
|
||||
|
||||
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||
destroyAllBlockScanInfo(pReader->status.pTableMap);
|
||||
destroyAllBlockScanInfo(pReader->status.pTableMap, (pReader->innerReader[0] == NULL) ? true : false);
|
||||
blockDataDestroy(pReader->pResBlock);
|
||||
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
|
||||
|
||||
|
|
|
@ -3383,6 +3383,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||
if (NULL == pOperator) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||
|
@ -3403,6 +3408,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
|
||||
if (NULL == pOperator) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||
|
|
|
@ -66,16 +66,36 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNod
|
|||
return pSubplan;
|
||||
}
|
||||
|
||||
static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, ESubplanType subplanType) {
|
||||
static bool splHasScan(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pNode->pChildren) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
|
||||
return true;
|
||||
}
|
||||
return splHasScan((SLogicNode*)pChild);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void splSetSubplanType(SLogicSubplan* pSubplan) {
|
||||
pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
|
||||
}
|
||||
|
||||
static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
|
||||
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
if (NULL == pSubplan) {
|
||||
return NULL;
|
||||
}
|
||||
pSubplan->id.queryId = pCxt->queryId;
|
||||
pSubplan->id.groupId = pCxt->groupId;
|
||||
pSubplan->subplanType = subplanType;
|
||||
pSubplan->pNode = pNode;
|
||||
pNode->pParent = NULL;
|
||||
splSetSubplanType(pSubplan);
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
|
@ -1204,7 +1224,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
|
|||
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pSplitNode->pChildren) {
|
||||
SLogicSubplan* pNewSubplan = splCreateSubplan(pCxt, (SLogicNode*)pChild, pUnionSubplan->subplanType);
|
||||
SLogicSubplan* pNewSubplan = splCreateSubplan(pCxt, (SLogicNode*)pChild);
|
||||
code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
REPLACE_NODE(NULL);
|
||||
|
@ -1390,10 +1410,9 @@ static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
|
||||
SLogicSubplan* pNewSubplan = NULL;
|
||||
SNodeList* pSubplanChildren = info.pSubplan->pChildren;
|
||||
ESubplanType subplanType = info.pSubplan->subplanType;
|
||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pNewSubplan = splCreateSubplan(pCxt, info.pQueryRoot, subplanType);
|
||||
pNewSubplan = splCreateSubplan(pCxt, info.pQueryRoot);
|
||||
if (NULL == pNewSubplan) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -912,8 +912,8 @@ int32_t filterDetachCnfGroups(SArray *group, SArray *left, SArray *right) {
|
|||
|
||||
if (taosArrayGetSize(left) <= 0) {
|
||||
if (taosArrayGetSize(right) <= 0) {
|
||||
fltError("both groups are empty");
|
||||
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
fltDebug("both groups are empty");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SFilterGroup *gp = NULL;
|
||||
|
|
|
@ -474,7 +474,7 @@ void* destroyConnPool(void* pool) {
|
|||
}
|
||||
|
||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||
char key[32] = {0};
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
|
@ -525,7 +525,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
conn->status = ConnInPool;
|
||||
|
||||
if (conn->list == NULL) {
|
||||
char key[32] = {0};
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
|
||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||
conn->list = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
|
|
Loading…
Reference in New Issue