Merge branch 'develop' into nanosupport_TestCase_for_connectors
This commit is contained in:
commit
9274679008
|
@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
|
||||||
int dcol = 0;
|
int dcol = 0;
|
||||||
|
|
||||||
while (dcol < pCols->numOfCols) {
|
while (dcol < pCols->numOfCols) {
|
||||||
|
bool setCol = 0;
|
||||||
SDataCol *pDataCol = &(pCols->cols[dcol]);
|
SDataCol *pDataCol = &(pCols->cols[dcol]);
|
||||||
if (rcol >= schemaNCols(pSchema)) {
|
if (rcol >= schemaNCols(pSchema)) {
|
||||||
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
||||||
|
@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
|
||||||
STColumn *pRowCol = schemaColAt(pSchema, rcol);
|
STColumn *pRowCol = schemaColAt(pSchema, rcol);
|
||||||
if (pRowCol->colId == pDataCol->colId) {
|
if (pRowCol->colId == pDataCol->colId) {
|
||||||
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
|
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
|
||||||
|
if(!isNull(value, pDataCol->type)) setCol = 1;
|
||||||
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
|
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
|
||||||
dcol++;
|
dcol++;
|
||||||
rcol++;
|
rcol++;
|
||||||
} else if (pRowCol->colId < pDataCol->colId) {
|
} else if (pRowCol->colId < pDataCol->colId) {
|
||||||
rcol++;
|
rcol++;
|
||||||
} else {
|
} else {
|
||||||
if(forceSetNull) {
|
if(forceSetNull || setCol) {
|
||||||
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
||||||
}
|
}
|
||||||
dcol++;
|
dcol++;
|
||||||
|
@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
|
||||||
int nRowCols = kvRowNCols(row);
|
int nRowCols = kvRowNCols(row);
|
||||||
|
|
||||||
while (dcol < pCols->numOfCols) {
|
while (dcol < pCols->numOfCols) {
|
||||||
|
bool setCol = 0;
|
||||||
SDataCol *pDataCol = &(pCols->cols[dcol]);
|
SDataCol *pDataCol = &(pCols->cols[dcol]);
|
||||||
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
|
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
|
||||||
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
||||||
|
@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
|
||||||
|
|
||||||
if (colIdx->colId == pDataCol->colId) {
|
if (colIdx->colId == pDataCol->colId) {
|
||||||
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
|
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
|
||||||
|
if(!isNull(value, pDataCol->type)) setCol = 1;
|
||||||
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
|
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
|
||||||
++dcol;
|
++dcol;
|
||||||
++rcol;
|
++rcol;
|
||||||
} else if (colIdx->colId < pDataCol->colId) {
|
} else if (colIdx->colId < pDataCol->colId) {
|
||||||
++rcol;
|
++rcol;
|
||||||
} else {
|
} else {
|
||||||
if (forceSetNull) {
|
if(forceSetNull || setCol) {
|
||||||
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
|
||||||
}
|
}
|
||||||
++dcol;
|
++dcol;
|
||||||
|
@ -518,7 +522,6 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: refactor this function to eliminate additional memory copy
|
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
|
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
|
||||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
||||||
ASSERT(target->numOfCols == source->numOfCols);
|
ASSERT(target->numOfCols == source->numOfCols);
|
||||||
|
@ -534,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
|
||||||
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
||||||
for (int i = 0; i < rowsToMerge; i++) {
|
for (int i = 0; i < rowsToMerge; i++) {
|
||||||
for (int j = 0; j < source->numOfCols; j++) {
|
for (int j = 0; j < source->numOfCols; j++) {
|
||||||
if (source->cols[j].len > 0) {
|
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
|
||||||
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
|
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
|
||||||
target->maxPoints);
|
target->maxPoints);
|
||||||
}
|
}
|
||||||
|
@ -578,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
|
||||||
if (key1 < key2) {
|
if (key1 < key2) {
|
||||||
for (int i = 0; i < src1->numOfCols; i++) {
|
for (int i = 0; i < src1->numOfCols; i++) {
|
||||||
ASSERT(target->cols[i].type == src1->cols[i].type);
|
ASSERT(target->cols[i].type == src1->cols[i].type);
|
||||||
if (src1->cols[i].len > 0) {
|
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
|
||||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
|
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
|
||||||
target->maxPoints);
|
target->maxPoints);
|
||||||
}
|
}
|
||||||
|
@ -596,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
|
||||||
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
|
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
|
||||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
|
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
|
||||||
target->maxPoints);
|
target->maxPoints);
|
||||||
|
} else if(target->cols[i].len > 0) {
|
||||||
|
dataColSetNullAt(&target->cols[i], target->numOfRows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
target->numOfRows++;
|
target->numOfRows++;
|
||||||
|
|
|
@ -1418,13 +1418,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
|
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
|
||||||
bool isRowDel = false;
|
|
||||||
SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
|
SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
|
||||||
if (row == NULL || memRowKey(row) > maxKey) {
|
if (row == NULL || memRowKey(row) > maxKey) {
|
||||||
key2 = INT64_MAX;
|
key2 = INT64_MAX;
|
||||||
} else {
|
} else {
|
||||||
key2 = memRowKey(row);
|
key2 = memRowKey(row);
|
||||||
isRowDel = memRowDeleted(row);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
|
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
|
||||||
|
@ -1439,36 +1437,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
||||||
pTarget->numOfRows++;
|
pTarget->numOfRows++;
|
||||||
(*iter)++;
|
(*iter)++;
|
||||||
} else if (key1 > key2) {
|
} else if (key1 > key2) {
|
||||||
if (!isRowDel) {
|
|
||||||
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
|
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
|
||||||
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
|
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
|
||||||
ASSERT(pSchema != NULL);
|
ASSERT(pSchema != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
|
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
|
||||||
}
|
|
||||||
|
|
||||||
tSkipListIterNext(pCommitIter->pIter);
|
tSkipListIterNext(pCommitIter->pIter);
|
||||||
} else {
|
} else {
|
||||||
if (update) {
|
if (update != TD_ROW_OVERWRITE_UPDATE) {
|
||||||
if (!isRowDel) {
|
//copy disk data
|
||||||
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
|
|
||||||
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
|
|
||||||
ASSERT(pSchema != NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(!isRowDel);
|
|
||||||
|
|
||||||
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||||
//TODO: dataColAppendVal may fail
|
//TODO: dataColAppendVal may fail
|
||||||
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
||||||
pTarget->maxPoints);
|
pTarget->maxPoints);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTarget->numOfRows++;
|
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
|
||||||
|
}
|
||||||
|
if (update != TD_ROW_DISCARD_UPDATE) {
|
||||||
|
//copy mem data
|
||||||
|
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
|
||||||
|
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
|
||||||
|
ASSERT(pSchema != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
|
||||||
}
|
}
|
||||||
(*iter)++;
|
(*iter)++;
|
||||||
tSkipListIterNext(pCommitIter->pIter);
|
tSkipListIterNext(pCommitIter->pIter);
|
||||||
|
|
|
@ -17,7 +17,7 @@ go env -w GO111MODULE=on
|
||||||
go env -w GOPROXY=https://goproxy.io,direct
|
go env -w GOPROXY=https://goproxy.io,direct
|
||||||
|
|
||||||
bash ./case001/case001.sh $severIp $serverPort
|
bash ./case001/case001.sh $severIp $serverPort
|
||||||
#bash ./case002/case002.sh $severIp $serverPort
|
bash ./case002/case002.sh $severIp $serverPort
|
||||||
#bash ./case003/case003.sh $severIp $serverPort
|
#bash ./case003/case003.sh $severIp $serverPort
|
||||||
|
|
||||||
cd nanosupport
|
cd nanosupport
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
_ "github.com/taosdata/driver-go/taosSql"
|
_ "github.com/taosdata/driver-go/v2/taosSql"
|
||||||
"log"
|
"log"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
@ -62,6 +62,7 @@ func main() {
|
||||||
|
|
||||||
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
|
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
|
||||||
// open connect to taos server
|
// open connect to taos server
|
||||||
|
fmt.Printf("url:%s",url)
|
||||||
db, err := sql.Open(taosDriverName, url)
|
db, err := sql.Open(taosDriverName, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Open database error: %s\n", err)
|
log.Fatalf("Open database error: %s\n", err)
|
||||||
|
@ -167,17 +168,18 @@ func insert_data(db *sql.DB, demot string) {
|
||||||
|
|
||||||
func select_data(db *sql.DB, demot string) {
|
func select_data(db *sql.DB, demot string) {
|
||||||
st := time.Now().Nanosecond()
|
st := time.Now().Nanosecond()
|
||||||
|
fmt.Println(demot)
|
||||||
rows, err := db.Query("select * from ? ", demot) // go text mode
|
rows, err := db.Query("select * from ? ", demot) // go text mode
|
||||||
|
fmt.Println("end query",err)
|
||||||
checkErr(err, "select db.Query")
|
checkErr(err, "select db.Query")
|
||||||
|
|
||||||
fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
|
fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
|
||||||
var affectd int
|
var affectd int
|
||||||
|
|
||||||
//decoder := mahonia.NewDecoder("gbk") // 把原来ANSI格式的文本文件里的字符,用gbk进行解码。
|
//decoder := mahonia.NewDecoder("gbk") // 把原来ANSI格式的文本文件里的字符,用gbk进行解码。
|
||||||
|
fmt.Println("start next")
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var ts string
|
var ts time.Time
|
||||||
var name string
|
var name string
|
||||||
var id int
|
var id int
|
||||||
var len int8
|
var len int8
|
||||||
|
@ -187,6 +189,7 @@ func select_data(db *sql.DB, demot string) {
|
||||||
var dv float64
|
var dv float64
|
||||||
|
|
||||||
err = rows.Scan(&ts, &id, &name, &len, &flag, ¬es, &fv, &dv)
|
err = rows.Scan(&ts, &id, &name, &len, &flag, ¬es, &fv, &dv)
|
||||||
|
fmt.Println("rows:",err)
|
||||||
checkErr(err, "select rows.Scan")
|
checkErr(err, "select rows.Scan")
|
||||||
|
|
||||||
fmt.Printf("%s|\t", ts)
|
fmt.Printf("%s|\t", ts)
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
@echo off
|
||||||
|
echo ==== start run cases001.go
|
||||||
|
|
||||||
|
del go.*
|
||||||
|
go mod init demotest
|
||||||
|
go build
|
||||||
|
demotest.exe -h %1 -p %2
|
||||||
|
cd ..
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql/driver"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
taos "github.com/taosdata/driver-go/v2/af"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Subscribe_check(topic taos.Subscriber, check int) bool {
|
||||||
|
count := 0
|
||||||
|
rows, err := topic.Consume()
|
||||||
|
defer func() { rows.Close(); time.Sleep(time.Second) }()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(3)
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
values := make([]driver.Value, 2)
|
||||||
|
err := rows.Next(values)
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
} else if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, err)
|
||||||
|
os.Exit(4)
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if count == check {
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func main() {
|
||||||
|
ts := 1630461600000
|
||||||
|
db, err := taos.Open("127.0.0.1", "", "", "", 0)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
db.Exec("drop if exists database test")
|
||||||
|
db.Exec("create if not exists database test")
|
||||||
|
db.Exec("use test")
|
||||||
|
db.Exec("drop if exists database test")
|
||||||
|
db.Exec("create table test (ts timestamp ,level int)")
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+i, i)
|
||||||
|
db.Exec(sqlcmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("consumption 01.")
|
||||||
|
topic, err := db.Subscribe(false, "test", "select ts, level from test", time.Second)
|
||||||
|
if Subscribe_check(topic, 10) {
|
||||||
|
os.Exit(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("consumption 02: no new rows inserted")
|
||||||
|
if Subscribe_check(topic, 0) {
|
||||||
|
os.Exit(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("consumption 03: after one new rows inserted")
|
||||||
|
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+10, 10)
|
||||||
|
db.Exec(sqlcmd)
|
||||||
|
if Subscribe_check(topic, 1) {
|
||||||
|
os.Exit(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("consumption 04: keep progress and continue previous subscription")
|
||||||
|
topic.Unsubscribe(true)
|
||||||
|
topic, err = db.Subscribe(false, "test", "select ts, level from test", time.Second)
|
||||||
|
if Subscribe_check(topic, 0) {
|
||||||
|
os.Exit(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
echo "==== start run cases001.go"
|
||||||
|
|
||||||
|
set +e
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
script_dir="$(dirname $(readlink -f $0))"
|
||||||
|
#echo "pwd: $script_dir, para0: $0"
|
||||||
|
|
||||||
|
#execName=$0
|
||||||
|
#execName=`echo ${execName##*/}`
|
||||||
|
#goName=`echo ${execName%.*}`
|
||||||
|
|
||||||
|
###### step 3: start build
|
||||||
|
cd $script_dir
|
||||||
|
rm -f go.*
|
||||||
|
go mod init demotest > /dev/null 2>&1
|
||||||
|
go mod tidy > /dev/null 2>&1
|
||||||
|
go build > /dev/null 2>&1
|
||||||
|
sleep 1s
|
||||||
|
./demotest -h $1 -p $2
|
Loading…
Reference in New Issue