Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression

This commit is contained in:
Hongze Cheng 2022-09-21 17:59:00 +08:00
commit 2a0732823c
11 changed files with 194 additions and 41 deletions

View File

@ -218,12 +218,12 @@ def pre_test_win(){
if (env.CHANGE_URL =~ /\/TDengine\//) {
bat '''
cd %WIN_INTERNAL_ROOT%
git pull
git pull origin ''' + env.CHANGE_TARGET + '''
'''
bat '''
cd %WIN_COMMUNITY_ROOT%
git remote prune origin
git pull
git pull origin ''' + env.CHANGE_TARGET + '''
'''
bat '''
cd %WIN_COMMUNITY_ROOT%
@ -236,7 +236,7 @@ def pre_test_win(){
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
bat '''
cd %WIN_INTERNAL_ROOT%
git pull
git pull origin ''' + env.CHANGE_TARGET + '''
'''
bat '''
cd %WIN_INTERNAL_ROOT%

View File

@ -6,6 +6,9 @@ description: TDengine release history, Release Notes and download links.
import Release from "/components/ReleaseV3";
## 3.0.1.2
<Release type="tdengine" version="3.0.1.2" />
## 3.0.1.1

View File

@ -6,6 +6,10 @@ description: taosTools release history, Release Notes, download links.
import Release from "/components/ReleaseV3";
## 2.2.2
<Release type="tools" version="2.2.2" />
## 2.2.0
<Release type="tools" version="2.2.0" />

View File

@ -6,6 +6,9 @@ description: TDengine 发布历史、Release Notes 及下载链接
import Release from "/components/ReleaseV3";
## 3.0.1.2
<Release type="tdengine" version="3.0.1.2" />
## 3.0.1.1

View File

@ -6,6 +6,10 @@ description: taosTools 的发布历史、Release Notes 和下载链接
import Release from "/components/ReleaseV3";
## 2.2.2
<Release type="tools" version="2.2.2" />
## 2.2.0
<Release type="tools" version="2.2.0" />

View File

@ -1687,6 +1687,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
while (1) {
if (pInfo->tqReader->pMsg == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
return NULL;
}

View File

@ -1695,6 +1695,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
}
nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
@ -3073,6 +3074,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
}
}
@ -3109,9 +3111,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
TSKEY maxTs = INT64_MIN;
TSKEY minTs = INT64_MAX;
@ -3175,6 +3174,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
}
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
@ -5755,8 +5757,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
SStreamState* pState = pTaskInfo->streamInfo.pState;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
@ -5805,36 +5805,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
#if 0
if (pState) {
printf(">>>>>>>> stream read backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char* val = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val, sz);
streamFreeVal(val);
SStreamStateCur* pCur = streamStateGetCur(pState, &key);
ASSERT(pCur);
while (streamStateCurNext(pState, pCur) == 0) {
SWinKey key1;
const void* val1;
if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
break;
}
printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
}
streamStateFreeCur(pCur);
}
#endif
pOperator->status = OP_RES_TO_RETURN;
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pOperator);

View File

@ -80,6 +80,7 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg)
ASSERT(xFree);
for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) {
tdbDebug("tdbPage/destroy/free ovfl cell: %p/%p", pPage->apOvfl[iOvfl], pPage);
tdbOsFree(pPage->apOvfl[iOvfl]);
}
@ -152,7 +153,7 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
pNewCell = (SCell *)tdbOsMalloc(szCell);
memcpy(pNewCell, pCell, szCell);
tdbDebug("tdbPage/new ovfl cell: %p", pNewCell);
tdbDebug("tdbPage/insert/new ovfl cell: %p/%p", pNewCell, pPage);
pPage->apOvfl[iOvfl] = pNewCell;
pPage->aiOvfl[iOvfl] = idx;
@ -202,7 +203,7 @@ int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt) {
if (pPage->aiOvfl[iOvfl] == idx) {
// remove the over flow cell
tdbOsFree(pPage->apOvfl[iOvfl]);
tdbDebug("tdbPage/free ovfl cell: %p", pPage->apOvfl[iOvfl]);
tdbDebug("tdbPage/drop/free ovfl cell: %p", pPage->apOvfl[iOvfl]);
for (; (++iOvfl) < pPage->nOverflow;) {
pPage->aiOvfl[iOvfl - 1] = pPage->aiOvfl[iOvfl] - 1;
pPage->apOvfl[iOvfl - 1] = pPage->apOvfl[iOvfl];
@ -255,6 +256,7 @@ void tdbPageCopy(SPage *pFromPage, SPage *pToPage, int deepCopyOvfl) {
int szCell = (*pFromPage->xCellSize)(pFromPage, pFromPage->apOvfl[iOvfl], 0, NULL, NULL);
pNewCell = (SCell *)tdbOsMalloc(szCell);
memcpy(pNewCell, pFromPage->apOvfl[iOvfl], szCell);
tdbDebug("tdbPage/copy/new ovfl cell: %p/%p/%p", pNewCell, pToPage, pFromPage);
}
pToPage->apOvfl[iOvfl] = pNewCell;

View File

@ -79,9 +79,11 @@ fi
ulimit -c unlimited
TMP_DIR=$WORKDIR/tmp
SOURCEDIR=$WORKDIR/src
MOUNT_DIR=""
packageName="TDengine-server-3.0.1.0-Linux-x64.tar.gz"
rm -rf ${TMP_DIR}/thread_volume/$thread_no/sim
mkdir -p $SOURCEDIR
mkdir -p ${TMP_DIR}/thread_volume/$thread_no/sim/tsim
mkdir -p ${TMP_DIR}/thread_volume/$thread_no/coredump
rm -rf ${TMP_DIR}/thread_volume/$thread_no/coredump/*
@ -90,6 +92,11 @@ if [ ! -d "${TMP_DIR}/thread_volume/$thread_no/$exec_dir" ]; then
echo "cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/"
cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/
fi
if [ ! -f "${SOURCEDIR}/${packageName}" ]; then
wget -P ${SOURCEDIR} https://taosdata.com/assets-download/3.0/${packageName}
fi
MOUNT_DIR="$TMP_DIR/thread_volume/$thread_no/$exec_dir:$CONTAINER_TESTDIR/tests/$exec_dir"
echo "$thread_no -> ${exec_dir}:$cmd"
coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname`
@ -97,6 +104,7 @@ coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname`
docker run \
-v $REP_MOUNT_PARAM \
-v $MOUNT_DIR \
-v ${SOURCEDIR}:/usr/local/src/ \
-v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \
-v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \
-v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \

View File

@ -0,0 +1,157 @@
from urllib.parse import uses_relative
import taos
import sys
import os
import time
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
class TDTestCase:
def caseDescription(self):
'''
3.0 data compatibility test
case1: basedata version is 3.0.1.0
'''
return
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def getCfgPath(self):
buildPath = self.getBuildPath()
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
cfgPath = buildPath + "/../sim/dnode1/cfg/"
else:
cfgPath = buildPath + "/../sim/dnode1/cfg/"
return cfgPath
def installTaosd(self,bPath,cPath):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath="/usr/local/src/"
packageName="TDengine-server-3.0.1.0-Linux-x64.tar.gz"
os.system(f"cd {packagePath} && tar xvf TDengine-server-3.0.1.0-Linux-x64.tar.gz && cd TDengine-server-3.0.1.0 && ./install.sh -e no " )
tdDnodes.stop(1)
print(f"start taosd: nohup taosd -c {cPath} & ")
os.system(f" nohup taosd -c {cPath} & " )
sleep(1)
def buildTaosd(self,bPath):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os.system(f" cd {bPath} && make install ")
def run(self):
bPath=self.getBuildPath()
cPath=self.getCfgPath()
dbname = "test"
stb = f"{dbname}.meters"
self.installTaosd(bPath,cPath)
tableNumbers=100
recordNumbers1=100
recordNumbers2=1000
tdsqlF=tdCom.newTdSql()
print(tdsqlF)
tdsqlF.query(f"SELECT SERVER_VERSION();")
print(tdsqlF.query(f"SELECT SERVER_VERSION();"))
oldServerVersion=tdsqlF.queryResult[0][0]
tdLog.info(f"Base server version is {oldServerVersion}")
tdsqlF.query(f"SELECT CLIENT_VERSION();")
# the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
oldClientVersion=tdsqlF.queryResult[0][0]
tdLog.info(f"Base client version is {oldClientVersion}")
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{oldServerVersion}")
tdLog.info(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
sleep(3)
# tdsqlF.query(f"select count(*) from {stb}")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
os.system("pkill taosd")
sleep(1)
tdLog.printNoPrefix("==========step2:update new version ")
self.buildTaosd(bPath)
tdDnodes.start(1)
sleep(1)
tdsql=tdCom.newTdSql()
print(tdsql)
tdsql.query(f"SELECT SERVER_VERSION();")
nowServerVersion=tdsql.queryResult[0][0]
tdLog.info(f"New server version is {nowServerVersion}")
tdsql.query(f"SELECT CLIENT_VERSION();")
nowClientVersion=tdsql.queryResult[0][0]
tdLog.info(f"New client version is {nowClientVersion}")
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdsql=tdCom.newTdSql()
tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542")
tdsql.execute("drop database if exists db")
tdsql.execute("create database db")
tdsql.execute("use db")
tdsql.execute("create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);")
tdsql.execute("insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);")
tdsql.error(" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.error(" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.execute("insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);")
tdsql.query("select * from db.ct3")
tdsql.checkData(0,1,13)
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14)
tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows
for i in range(qRows) :
if tdsql.queryResult[i][0]=="retentions" :
return True
else:
return False
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -17,6 +17,7 @@ python3 ./test.py -f 0-others/udf_cfg2.py
python3 ./test.py -f 0-others/sysinfo.py
python3 ./test.py -f 0-others/user_control.py
python3 ./test.py -f 0-others/fsync.py
python3 ./test.py -f 0-others/compatibility.py
python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py