Merge branch 'develop' into xiaoping/add_test_case
This commit is contained in:
commit
1dd928dd63
|
@ -179,19 +179,20 @@ taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s);
|
||||||
|
|
||||||
### TDengine服务器支持的平台列表
|
### TDengine服务器支持的平台列表
|
||||||
|
|
||||||
| | **CentOS 6/7/8** | **Ubuntu 16/18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** |
|
| | **CentOS 6/7/8** | **Ubuntu 16/18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **华为 EulerOS** |
|
||||||
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- |
|
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- | --------------------- |
|
||||||
| X64 | ● | ● | | ○ | ● | ● |
|
| X64 | ● | ● | | ○ | ● | ● | ● |
|
||||||
| 树莓派 ARM32 | | ● | ● | | | |
|
| 树莓派 ARM32 | | ● | ● | | | | |
|
||||||
| 龙芯 MIPS64 | | | ● | | | |
|
| 龙芯 MIPS64 | | | ● | | | | |
|
||||||
| 鲲鹏 ARM64 | | ○ | ○ | | ● | |
|
| 鲲鹏 ARM64 | | ○ | ○ | | ● | | |
|
||||||
| 申威 Alpha64 | | | ○ | ● | | |
|
| 申威 Alpha64 | | | ○ | ● | | | |
|
||||||
| 飞腾 ARM64 | | ○ 优麒麟 | | | | |
|
| 飞腾 ARM64 | | ○ 优麒麟 | | | | | |
|
||||||
| 海光 X64 | ● | ● | ● | ○ | ● | ● |
|
| 海光 X64 | ● | ● | ● | ○ | ● | ● | |
|
||||||
| 瑞芯微 ARM64/32 | | | ○ | | | |
|
| 瑞芯微 ARM64/32 | | | ○ | | | | |
|
||||||
| 全志 ARM64/32 | | | ○ | | | |
|
| 全志 ARM64/32 | | | ○ | | | | |
|
||||||
| 炬力 ARM64/32 | | | ○ | | | |
|
| 炬力 ARM64/32 | | | ○ | | | | |
|
||||||
| TI ARM32 | | | ○ | | | |
|
| TI ARM32 | | | ○ | | | | |
|
||||||
|
| 华为云 ARM64 | | | | | | | ● |
|
||||||
|
|
||||||
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
|
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
|
||||||
|
|
||||||
|
|
|
@ -200,6 +200,7 @@ static bool isPointInterpoQuery(SQuery *pQuery);
|
||||||
static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
|
static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
|
||||||
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
|
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
|
||||||
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
|
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
|
||||||
|
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
|
||||||
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
|
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
|
||||||
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
|
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
|
||||||
int32_t groupIndex);
|
int32_t groupIndex);
|
||||||
|
@ -1330,6 +1331,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
|
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
|
||||||
int16_t bytes = pColInfoData->info.bytes;
|
int16_t bytes = pColInfoData->info.bytes;
|
||||||
int16_t type = pColInfoData->info.type;
|
int16_t type = pColInfoData->info.type;
|
||||||
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
|
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
qError("QInfo:%"PRIu64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
|
qError("QInfo:%"PRIu64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
|
||||||
|
@ -1350,6 +1352,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
|
||||||
|
|
||||||
memcpy(pInfo->prevData, val, bytes);
|
memcpy(pInfo->prevData, val, bytes);
|
||||||
|
|
||||||
|
if (pQuery->stableQuery && pQuery->stabledev && (pRuntimeEnv->prevResult != NULL)) {
|
||||||
|
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
|
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
@ -3396,6 +3402,42 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) {
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
int32_t numOfExprs = pQuery->numOfOutput;
|
||||||
|
for(int32_t i = 0; i < numOfExprs; ++i) {
|
||||||
|
SExprInfo* pExprInfo = &(pExpr[i]);
|
||||||
|
if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
|
||||||
|
|
||||||
|
pCtx[i].param[0].arr = NULL;
|
||||||
|
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
|
||||||
|
|
||||||
|
// TODO use hash to speedup this loop
|
||||||
|
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
|
||||||
|
for (int32_t j = 0; j < numOfGroup; ++j) {
|
||||||
|
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
|
||||||
|
if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) {
|
||||||
|
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
|
||||||
|
for (int32_t k = 0; k < numOfCols; ++k) {
|
||||||
|
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
|
||||||
|
if (pres->colId == pFuncMsg->colInfo.colId) {
|
||||||
|
pCtx[i].param[0].arr = pres->pResult;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* There are two cases to handle:
|
* There are two cases to handle:
|
||||||
*
|
*
|
||||||
|
|
|
@ -16,45 +16,47 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
_ "github.com/taosdata/driver-go/taosSql"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"flag"
|
|
||||||
"math/rand"
|
_ "github.com/taosdata/driver-go/taosSql"
|
||||||
|
|
||||||
//"golang.org/x/sys/unix"
|
//"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxLocationSize = 32
|
maxLocationSize = 32
|
||||||
maxSqlBufSize = 65480
|
//maxSqlBufSize = 65480
|
||||||
)
|
)
|
||||||
|
|
||||||
var locations = [maxLocationSize]string {
|
var locations = [maxLocationSize]string{
|
||||||
"Beijing", "Shanghai", "Guangzhou", "Shenzhen",
|
"Beijing", "Shanghai", "Guangzhou", "Shenzhen",
|
||||||
"HangZhou", "Tianjin", "Wuhan", "Changsha",
|
"HangZhou", "Tianjin", "Wuhan", "Changsha",
|
||||||
"Nanjing", "Xian"}
|
"Nanjing", "Xian"}
|
||||||
|
|
||||||
type config struct {
|
type config struct {
|
||||||
hostName string
|
hostName string
|
||||||
serverPort int
|
serverPort int
|
||||||
user string
|
user string
|
||||||
password string
|
password string
|
||||||
dbName string
|
dbName string
|
||||||
supTblName string
|
supTblName string
|
||||||
tablePrefix string
|
tablePrefix string
|
||||||
numOftables int
|
numOftables int
|
||||||
numOfRecordsPerTable int
|
numOfRecordsPerTable int
|
||||||
numOfRecordsPerReq int
|
numOfRecordsPerReq int
|
||||||
numOfThreads int
|
numOfThreads int
|
||||||
startTimestamp string
|
startTimestamp string
|
||||||
startTs int64
|
startTs int64
|
||||||
|
|
||||||
keep int
|
keep int
|
||||||
days int
|
days int
|
||||||
}
|
}
|
||||||
|
|
||||||
var configPara config
|
var configPara config
|
||||||
|
@ -62,7 +64,7 @@ var taosDriverName = "taosSql"
|
||||||
var url string
|
var url string
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
flag.StringVar(&configPara.hostName, "h", "127.0.0.1","The host to connect to TDengine server.")
|
flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.")
|
||||||
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
|
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
|
||||||
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
|
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
|
||||||
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
|
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
|
||||||
|
@ -80,14 +82,14 @@ func init() {
|
||||||
configPara.supTblName = "meters"
|
configPara.supTblName = "meters"
|
||||||
|
|
||||||
startTs, err := time.ParseInLocation("2006-01-02 15:04:05", configPara.startTimestamp, time.Local)
|
startTs, err := time.ParseInLocation("2006-01-02 15:04:05", configPara.startTimestamp, time.Local)
|
||||||
if err==nil {
|
if err == nil {
|
||||||
configPara.startTs = startTs.UnixNano() / 1e6
|
configPara.startTs = startTs.UnixNano() / 1e6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func printAllArgs() {
|
func printAllArgs() {
|
||||||
fmt.Printf("\n============= args parse result: =============\n")
|
fmt.Printf("\n============= args parse result: =============\n")
|
||||||
fmt.Printf("hostName: %v\n", configPara.hostName)
|
fmt.Printf("hostName: %v\n", configPara.hostName)
|
||||||
fmt.Printf("serverPort: %v\n", configPara.serverPort)
|
fmt.Printf("serverPort: %v\n", configPara.serverPort)
|
||||||
fmt.Printf("usr: %v\n", configPara.user)
|
fmt.Printf("usr: %v\n", configPara.user)
|
||||||
fmt.Printf("password: %v\n", configPara.password)
|
fmt.Printf("password: %v\n", configPara.password)
|
||||||
|
@ -104,10 +106,10 @@ func printAllArgs() {
|
||||||
func main() {
|
func main() {
|
||||||
printAllArgs()
|
printAllArgs()
|
||||||
fmt.Printf("Please press enter key to continue....\n")
|
fmt.Printf("Please press enter key to continue....\n")
|
||||||
fmt.Scanln()
|
_, _ = fmt.Scanln()
|
||||||
|
|
||||||
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
|
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
|
||||||
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
|
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
|
||||||
// open connect to taos server
|
// open connect to taos server
|
||||||
//db, err := sql.Open(taosDriverName, url)
|
//db, err := sql.Open(taosDriverName, url)
|
||||||
//if err != nil {
|
//if err != nil {
|
||||||
|
@ -115,7 +117,7 @@ func main() {
|
||||||
// os.Exit(1)
|
// os.Exit(1)
|
||||||
//}
|
//}
|
||||||
//defer db.Close()
|
//defer db.Close()
|
||||||
rand.Seed(time.Now().Unix())
|
rand.Seed(time.Now().Unix())
|
||||||
|
|
||||||
createDatabase(configPara.dbName, configPara.supTblName)
|
createDatabase(configPara.dbName, configPara.supTblName)
|
||||||
fmt.Printf("======== create database success! ========\n\n")
|
fmt.Printf("======== create database success! ========\n\n")
|
||||||
|
@ -138,7 +140,7 @@ func main() {
|
||||||
func createDatabase(dbName string, supTblName string) {
|
func createDatabase(dbName string, supTblName string) {
|
||||||
db, err := sql.Open(taosDriverName, url)
|
db, err := sql.Open(taosDriverName, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Open database error: %s\n", err)
|
fmt.Printf("Open database error: %s\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
@ -165,27 +167,27 @@ func createDatabase(dbName string, supTblName string) {
|
||||||
checkErr(err, sqlStr)
|
checkErr(err, sqlStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func multiThreadCreateTable(threads int, ntables int, dbName string, tablePrefix string) {
|
func multiThreadCreateTable(threads int, nTables int, dbName string, tablePrefix string) {
|
||||||
st := time.Now().UnixNano()
|
st := time.Now().UnixNano()
|
||||||
|
|
||||||
if (threads < 1) {
|
if threads < 1 {
|
||||||
threads = 1;
|
threads = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
a := ntables / threads;
|
a := nTables / threads
|
||||||
if (a < 1) {
|
if a < 1 {
|
||||||
threads = ntables;
|
threads = nTables
|
||||||
a = 1;
|
a = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
b := ntables % threads;
|
b := nTables % threads
|
||||||
|
|
||||||
last := 0;
|
last := 0
|
||||||
endTblId := 0
|
endTblId := 0
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i := 0; i < threads; i++ {
|
for i := 0; i < threads; i++ {
|
||||||
startTblId := last
|
startTblId := last
|
||||||
if (i < b ) {
|
if i < b {
|
||||||
endTblId = last + a
|
endTblId = last + a
|
||||||
} else {
|
} else {
|
||||||
endTblId = last + a - 1
|
endTblId = last + a - 1
|
||||||
|
@ -206,42 +208,43 @@ func createTable(dbName string, childTblPrefix string, startTblId int, endTblId
|
||||||
|
|
||||||
db, err := sql.Open(taosDriverName, url)
|
db, err := sql.Open(taosDriverName, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Open database error: %s\n", err)
|
fmt.Printf("Open database error: %s\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
for i := startTblId; i <= endTblId; i++ {
|
for i := startTblId; i <= endTblId; i++ {
|
||||||
sqlStr := "create table if not exists " + dbName + "." + childTblPrefix + strconv.Itoa(i) + " using " + dbName + ".meters tags('" + locations[i%maxLocationSize] + "', " + strconv.Itoa(i) + ");"
|
sqlStr := "create table if not exists " + dbName + "." + childTblPrefix + strconv.Itoa(i) + " using " + dbName + ".meters tags('" + locations[i%maxLocationSize] + "', " + strconv.Itoa(i) + ");"
|
||||||
//fmt.Printf("sqlStr: %v\n", sqlStr)
|
//fmt.Printf("sqlStr: %v\n", sqlStr)
|
||||||
_, err = db.Exec(sqlStr)
|
_, err = db.Exec(sqlStr)
|
||||||
checkErr(err, sqlStr)
|
checkErr(err, sqlStr)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
runtime.Goexit()
|
runtime.Goexit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateRowData(ts int64) string {
|
func generateRowData(ts int64) string {
|
||||||
voltage := rand.Int() % 1000
|
voltage := rand.Int() % 1000
|
||||||
current := 200 + rand.Float32()
|
current := 200 + rand.Float32()
|
||||||
phase := rand.Float32()
|
phase := rand.Float32()
|
||||||
values := "( " + strconv.FormatInt(ts, 10) + ", " + strconv.FormatFloat(float64(current), 'f', 6, 64) + ", " + strconv.Itoa(voltage) + ", " + strconv.FormatFloat(float64(phase), 'f', 6, 64) + " ) "
|
values := "( " + strconv.FormatInt(ts, 10) + ", " + strconv.FormatFloat(float64(current), 'f', 6, 64) + ", " + strconv.Itoa(voltage) + ", " + strconv.FormatFloat(float64(phase), 'f', 6, 64) + " ) "
|
||||||
return values
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
func insertData(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
|
func insertData(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
|
||||||
//fmt.Printf("subThread[%d]: insert data to table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
|
//fmt.Printf("subThread[%d]: insert data to table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
|
||||||
// windows.GetCurrentThreadId()
|
// windows.GetCurrentThreadId()
|
||||||
|
|
||||||
db, err := sql.Open(taosDriverName, url)
|
db, err := sql.Open(taosDriverName, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Open database error: %s\n", err)
|
fmt.Printf("Open database error: %s\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
tmpTs := configPara.startTs;
|
tmpTs := configPara.startTs
|
||||||
//rand.New(rand.NewSource(time.Now().UnixNano()))
|
//rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
for tID := startTblId; tID <= endTblId; tID++{
|
for tID := startTblId; tID <= endTblId; tID++ {
|
||||||
totalNum := 0
|
totalNum := 0
|
||||||
for {
|
for {
|
||||||
sqlStr := "insert into " + dbName + "." + childTblPrefix + strconv.Itoa(tID) + " values "
|
sqlStr := "insert into " + dbName + "." + childTblPrefix + strconv.Itoa(tID) + " values "
|
||||||
|
@ -249,13 +252,13 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
|
||||||
for {
|
for {
|
||||||
tmpTs += 1000
|
tmpTs += 1000
|
||||||
valuesOfRow := generateRowData(tmpTs)
|
valuesOfRow := generateRowData(tmpTs)
|
||||||
currRowNum += 1
|
currRowNum += 1
|
||||||
totalNum += 1
|
totalNum += 1
|
||||||
|
|
||||||
sqlStr = fmt.Sprintf("%s %s", sqlStr, valuesOfRow)
|
sqlStr = fmt.Sprintf("%s %s", sqlStr, valuesOfRow)
|
||||||
|
|
||||||
if (currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable) {
|
if currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,12 +268,12 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
|
||||||
count, err := res.RowsAffected()
|
count, err := res.RowsAffected()
|
||||||
checkErr(err, "rows affected")
|
checkErr(err, "rows affected")
|
||||||
|
|
||||||
if (count != int64(currRowNum)) {
|
if count != int64(currRowNum) {
|
||||||
fmt.Printf("insert data, expect affected:%d, actual:%d\n", currRowNum, count)
|
fmt.Printf("insert data, expect affected:%d, actual:%d\n", currRowNum, count)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalNum >= configPara.numOfRecordsPerTable) {
|
if totalNum >= configPara.numOfRecordsPerTable {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,44 +282,46 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
|
||||||
wg.Done()
|
wg.Done()
|
||||||
runtime.Goexit()
|
runtime.Goexit()
|
||||||
}
|
}
|
||||||
func multiThreadInsertData(threads int, ntables int, dbName string, tablePrefix string) {
|
|
||||||
|
func multiThreadInsertData(threads int, nTables int, dbName string, tablePrefix string) {
|
||||||
st := time.Now().UnixNano()
|
st := time.Now().UnixNano()
|
||||||
|
|
||||||
if (threads < 1) {
|
if threads < 1 {
|
||||||
threads = 1;
|
threads = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
a := ntables / threads;
|
a := nTables / threads
|
||||||
if (a < 1) {
|
if a < 1 {
|
||||||
threads = ntables;
|
threads = nTables
|
||||||
a = 1;
|
a = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
b := ntables % threads;
|
b := nTables % threads
|
||||||
|
|
||||||
last := 0;
|
last := 0
|
||||||
endTblId := 0
|
endTblId := 0
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i := 0; i < threads; i++ {
|
for i := 0; i < threads; i++ {
|
||||||
startTblId := last
|
startTblId := last
|
||||||
if (i < b ) {
|
if i < b {
|
||||||
endTblId = last + a
|
endTblId = last + a
|
||||||
} else {
|
} else {
|
||||||
endTblId = last + a - 1
|
endTblId = last + a - 1
|
||||||
}
|
}
|
||||||
last = endTblId + 1
|
last = endTblId + 1
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go insertData(dbName, tablePrefix, startTblId , endTblId, &wg)
|
go insertData(dbName, tablePrefix, startTblId, endTblId, &wg)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
et := time.Now().UnixNano()
|
et := time.Now().UnixNano()
|
||||||
fmt.Printf("insert data spent duration: %6.6fs\n", (float32(et-st))/1e9)
|
fmt.Printf("insert data spent duration: %6.6fs\n", (float32(et-st))/1e9)
|
||||||
}
|
}
|
||||||
func selectTest(dbName string, tbPrefix string, supTblName string){
|
|
||||||
|
func selectTest(dbName string, tbPrefix string, supTblName string) {
|
||||||
db, err := sql.Open(taosDriverName, url)
|
db, err := sql.Open(taosDriverName, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Open database error: %s\n", err)
|
fmt.Printf("Open database error: %s\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
@ -332,12 +337,12 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
|
||||||
fmt.Printf("query sql: %s\n", sqlStr)
|
fmt.Printf("query sql: %s\n", sqlStr)
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
ts string
|
ts string
|
||||||
current float32
|
current float32
|
||||||
voltage int
|
voltage int
|
||||||
phase float32
|
phase float32
|
||||||
location string
|
location string
|
||||||
groupid int
|
groupid int
|
||||||
)
|
)
|
||||||
err := rows.Scan(&ts, ¤t, &voltage, &phase, &location, &groupid)
|
err := rows.Scan(&ts, ¤t, &voltage, &phase, &location, &groupid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -352,7 +357,7 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
|
||||||
}
|
}
|
||||||
|
|
||||||
// select sql 2
|
// select sql 2
|
||||||
sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa( rand.Int() % configPara.numOftables)
|
sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa(rand.Int()%configPara.numOftables)
|
||||||
rows, err = db.Query(sqlStr)
|
rows, err = db.Query(sqlStr)
|
||||||
checkErr(err, sqlStr)
|
checkErr(err, sqlStr)
|
||||||
|
|
||||||
|
@ -360,9 +365,9 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
|
||||||
fmt.Printf("\nquery sql: %s\n", sqlStr)
|
fmt.Printf("\nquery sql: %s\n", sqlStr)
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
voltageAvg float32
|
voltageAvg float32
|
||||||
voltageMin int
|
voltageMin int
|
||||||
voltageMax int
|
voltageMax int
|
||||||
)
|
)
|
||||||
err := rows.Scan(&voltageAvg, &voltageMin, &voltageMax)
|
err := rows.Scan(&voltageAvg, &voltageMin, &voltageMax)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -385,10 +390,10 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
|
||||||
fmt.Printf("\nquery sql: %s\n", sqlStr)
|
fmt.Printf("\nquery sql: %s\n", sqlStr)
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
lastTs string
|
lastTs string
|
||||||
lastCurrent float32
|
lastCurrent float32
|
||||||
lastVoltage int
|
lastVoltage int
|
||||||
lastPhase float32
|
lastPhase float32
|
||||||
)
|
)
|
||||||
err := rows.Scan(&lastTs, &lastCurrent, &lastVoltage, &lastPhase)
|
err := rows.Scan(&lastTs, &lastCurrent, &lastVoltage, &lastPhase)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue