Merge branch 'main' into test/TS-4994-3.0m

This commit is contained in:
Shengliang Guan 2024-12-29 22:17:20 +08:00
commit 3d19bc451c
12 changed files with 326 additions and 10 deletions

66
.github/workflows/taosd-ci-build.yml vendored Normal file
View File

@ -0,0 +1,66 @@
name: TDengine Build
on:
pull_request:
branches:
- 'main'
- '3.0'
- '3.1'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
runs-on: ubuntu-latest
name: Run unit tests
steps:
- name: Checkout the repository
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.18
- name: Install system dependencies
run: |
sudo apt update -y
sudo apt install -y build-essential cmake \
libgeos-dev libjansson-dev libsnappy-dev liblzma-dev libz-dev \
zlib1g pkg-config libssl-dev gawk
- name: Build and install TDengine
run: |
mkdir debug && cd debug
cmake .. -DBUILD_HTTP=false -DBUILD_JDBC=false \
-DBUILD_TOOLS=true -DBUILD_TEST=off \
-DBUILD_KEEPER=true -DBUILD_DEPENDENCY_TESTS=false
make -j 4
sudo make install
which taosd
which taosadapter
which taoskeeper
- name: Start taosd
run: |
cp /etc/taos/taos.cfg ./
sudo echo "supportVnodes 256" >> taos.cfg
nohup sudo taosd -c taos.cfg &
- name: Start taosadapter
run: nohup sudo taosadapter &
- name: Run tests with taosBenchmark
run: |
taosBenchmark -t 10 -n 10 -y
taos -s "select count(*) from test.meters"
- name: Clean up
if: always()
run: |
if pgrep taosd; then sudo pkill taosd; fi
if pgrep taosadapter; then sudo pkill taosadapter; fi

View File

@ -10,6 +10,7 @@
</p>
<p>
[![Build Status](https://github.com/taosdata/TDengine/actions/workflows/taosd-ci-build.yml/badge.svg)](https://github.com/taosdata/TDengine/actions/workflows/taosd-ci-build.yml)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=3.0)](https://coveralls.io/github/taosdata/TDengine?branch=3.0)
[![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/4201/badge)](https://bestpractices.coreinfrastructure.org/projects/4201)
<br />

View File

@ -680,7 +680,7 @@ static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSe
return 0;
}
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
int32_t code = 0;
int32_t lino = 0;
@ -690,6 +690,8 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
TFileSetArray *fsetArray = fs->fSetArrTmp;
STFileSet *fset = NULL;
const STFileOp *op;
int32_t fid = INT32_MIN;
TSKEY now = taosGetTimestampMs();
TARRAY2_FOREACH_PTR(opArray, op) {
if (!fset || fset->fid != op->fid) {
STFileSet tfset = {.fid = op->fid};
@ -708,6 +710,15 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
code = tsdbTFileSetEdit(fs->tsdb, fset, op);
TSDB_CHECK_CODE(code, lino, _exit);
if (fid != op->fid) {
fid = op->fid;
if (etype == TSDB_FEDIT_COMMIT) {
fset->lastCommit = now;
} else if (etype == TSDB_FEDIT_COMPACT) {
fset->lastCompact = now;
}
}
}
// remove empty empty stt level and empty file set
@ -864,7 +875,7 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
fs->etype = etype;
// edit
code = edit_fs(fs, opArray);
code = edit_fs(fs, opArray, etype);
TSDB_CHECK_CODE(code, lino, _exit);
// save fs
@ -1288,6 +1299,12 @@ int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
return TSDB_CODE_SUCCESS;
}
extern bool tsdbShouldCompact(const STFileSet *pFileSet);
#ifndef TD_ENTERPRISE
bool tsdbShouldCompact(const STFileSet *pFileSet) { return false; }
#endif
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
STsdb *pTsdb = pReader->pTsdb;
int32_t code = TSDB_CODE_SUCCESS;
@ -1311,7 +1328,7 @@ static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
// get file set details
pReader->fid = pReader->pFileSet->fid;
tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
pReader->lastCompactTime = 0; // TODO
pReader->lastCompactTime = pReader->pFileSet->lastCompact;
pReader->totalSize = 0;
for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
STFileObj *fobj = pReader->pFileSet->farr[i];
@ -1375,7 +1392,7 @@ int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *fie
fieldName = "should_compact";
if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
*(char *)value = 0; // TODO
*(char *)value = tsdbShouldCompact(pReader->pFileSet);
return TSDB_CODE_SUCCESS;
}

View File

@ -273,6 +273,15 @@ int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json) {
if (code) return code;
}
// about compact and commit
if (cJSON_AddNumberToObject(json, "last compact", fset->lastCompact) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (cJSON_AddNumberToObject(json, "last commit", fset->lastCommit) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
}
@ -324,6 +333,20 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) {
} else {
return TSDB_CODE_FILE_CORRUPTED;
}
// about compact and commit
item1 = cJSON_GetObjectItem(json, "last compact");
if (cJSON_IsNumber(item1)) {
(*fset)->lastCompact = item1->valuedouble;
} else {
(*fset)->lastCompact = 0;
}
item1 = cJSON_GetObjectItem(json, "last commit");
if (cJSON_IsNumber(item1)) {
(*fset)->lastCommit = item1->valuedouble;
} else {
(*fset)->lastCommit = 0;
}
return 0;
}
@ -467,6 +490,9 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f
}
}
fset2->lastCompact = fset1->lastCompact;
fset2->lastCommit = fset1->lastCommit;
return 0;
}
@ -522,6 +548,9 @@ int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **f
if (code) return code;
}
(*fset)->lastCompact = fset1->lastCompact;
(*fset)->lastCommit = fset1->lastCommit;
return 0;
}
@ -617,6 +646,9 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
}
}
(*fset)->lastCompact = fset1->lastCompact;
(*fset)->lastCommit = fset1->lastCommit;
return 0;
}

View File

@ -92,6 +92,8 @@ struct STFileSet {
int64_t maxVerValid;
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr[1]; // level array
TSKEY lastCompact;
TSKEY lastCommit;
bool mergeScheduled;
SVATaskID mergeTask;

View File

@ -1415,7 +1415,8 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
SVAlterTbReq vAlterTbReq = {0};
SVAlterTbRsp vAlterTbRsp = {0};
SDecoder dc = {0};
int32_t rcode = 0;
int32_t code = 0;
int32_t lino = 0;
int32_t ret;
SEncoder ec = {0};
STableMetaRsp vMetaRsp = {0};
@ -1431,7 +1432,6 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
rcode = -1;
goto _exit;
}
@ -1439,7 +1439,6 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
vAlterTbRsp.code = terrno;
tDecoderClear(&dc);
rcode = -1;
goto _exit;
}
tDecoderClear(&dc);
@ -1449,6 +1448,31 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
vAlterTbRsp.pMeta = &vMetaRsp;
}
if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, vAlterTbReq.tbName);
if (uid == 0) {
vError("vgId:%d, %s failed at %s:%d since table %s not found", TD_VID(pVnode), __func__, __FILE__, __LINE__,
vAlterTbReq.tbName);
goto _exit;
}
SArray* tbUids = taosArrayInit(4, sizeof(int64_t));
void* p = taosArrayPush(tbUids, &uid);
TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
vDebug("vgId:%d, remove tags value altered table:%s from query table list", TD_VID(pVnode), vAlterTbReq.tbName);
if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, false)) < 0) {
vError("vgId:%d, failed to remove tbUid list since %s", TD_VID(pVnode), tstrerror(code));
}
vDebug("vgId:%d, try to add table:%s in query table list", TD_VID(pVnode), vAlterTbReq.tbName);
if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, true)) < 0) {
vError("vgId:%d, failed to add tbUid list since %s", TD_VID(pVnode), tstrerror(code));
}
taosArrayDestroy(tbUids);
}
_exit:
taosArrayDestroy(vAlterTbReq.pMultiTag);
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
@ -1457,6 +1481,7 @@ _exit:
if (tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp) != 0) {
vError("vgId:%d, failed to encode alter table response", TD_VID(pVnode));
}
tEncoderClear(&ec);
if (vMetaRsp.pSchemas) {
taosMemoryFree(vMetaRsp.pSchemas);

View File

@ -2269,6 +2269,8 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) {
if (ret) {
if (ret == TSDB_CODE_NOT_FOUND) {
// no more scan entry
setOperatorCompleted(pOperator);
pAPI->tsdReader.fileSetReaderClose(&pInfo->pFileSetReader);
break;
} else {
code = ret;

View File

@ -18,7 +18,7 @@
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec
#define CHECK_NOT_RSP_DURATION 60 * 1000 // 60 sec
static void processDownstreamReadyRsp(SStreamTask* pTask);
static void rspMonitorFn(void* param, void* tmrId);
@ -660,7 +660,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
pInfo->timeoutRetryCount += 1;
// timeout more than 100 sec, add into node update list
// timeout more than 600 sec, add into node update list
if (pInfo->timeoutRetryCount > 10) {
pInfo->timeoutRetryCount = 0;
@ -674,7 +674,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) {
code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list",
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 600sec, add into nodeUpdate list",
id, vgId, p->taskId, p->vgId);
}
}

View File

@ -482,6 +482,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/ins_filesets.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/grant.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py -R
@ -1383,6 +1384,8 @@
,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/basic5.sim
,,y,script,./test.sh -f tsim/stream/tag.sim
,,y,script,./test.sh -f tsim/stream/snodeCheck.sim
,,y,script,./test.sh -f tsim/stream/concurrentcheckpt.sim
,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim

View File

@ -0,0 +1,110 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
print step1
print =============== create database
sql create database test vgroups 2;
sql use test;
sql create table st1(ts timestamp, a int, b int , c int, d double) tags(x int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 WATERMARK 100s into streamt as select _wstart as s, count(*) c1 from st1 where x>=2 interval(60s) ;
run tsim/stream/checkTaskStatus.sim
sql insert into t2 values(1648791213000,0,1,1,1.0);
sql insert into t2 values(1648791213001,9,2,2,1.1);
sql insert into t2 values(1648791213009,0,3,3,1.0);
sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);
sleep 300
sql select * from streamt;
if $data01 != 3 then
return -1
endi
sql alter table t1 set tag x=3;
sql insert into t1 values(1648791233000,0,1,1,1.0);
sql insert into t1 values(1648791233001,9,2,2,1.1);
sql insert into t1 values(1648791233009,0,3,3,1.0);
sleep 1000
sql select * from streamt;
if $data01 != 6 then
return -1
endi
sql alter table t1 set tag x=1;
sql alter table t2 set tag x=1;
sql insert into t1 values(1648791243000,0,1,1,1.0);
sql insert into t1 values(1648791243001,9,2,2,1.1);
sql select * from streamt;
if $data01 != 6 then
return -1
endi
#$loop_count = 0
#loop2:
#
#sleep 300
#print 1 sql select * from streamt;
#sql select * from streamt;
#
#print $data00 $data01 $data02 $data03
#print $data10 $data11 $data12 $data13
#
#$loop_count = $loop_count + 1
#if $loop_count == 10 then
# return -1
#endi
#
## row 0
#if $data01 != 3 then
# print ======data01=$data01
# goto loop2
#endi
#
#if $data02 != 6 then
# print ======data02=$data02
# goto loop2
#endi
#
#if $data03 != 3 then
# print ======data03=$data03
# goto loop2
#endi
#
## row 1
#if $data11 != 3 then
# print ======data11=$data11
# goto loop2
#endi
#
#if $data12 != 6 then
# print ======data12=$data12
# goto loop2
#endi
#
#if $data13 != 3 then
# print ======data13=$data13
# goto loop2
#endi
#
print tag end
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -111,6 +111,10 @@ run tsim/stream/distributeInterval0.sim
run tsim/stream/distributeSession0.sim
run tsim/stream/state0.sim
run tsim/stream/basic2.sim
run tsim/stream/basic3.sim
run tsim/stream/basic4.sim
run tsim/stream/basic5.sim
run tsim/stream/tag.sim
run tsim/stream/concurrentcheckpt.sim
run tsim/insert/basic1.sim
run tsim/insert/commit-merge0.sim

View File

@ -0,0 +1,54 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import re
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.execute('create database db vgroups 1')
tdSql.execute('use db')
tdSql.execute('create table t1 (ts timestamp, a int, b int)')
tdSql.execute('insert into t1 values(\'2024-12-27 14:00:00\', 1, 2)')
tdSql.execute('flush database db')
tdLog.sleep(5)
rows = tdSql.query('select * from information_schema.ins_filesets')
tdSql.checkRows(1)
tdSql.checkEqual(tdSql.getData(0, 0), 'db')
tdSql.checkEqual(tdSql.getData(0, 1), 2)
tdSql.checkEqual(tdSql.getData(0, 2), 2008)
# tdSql.CheckEqual(str(tdSql.getData(0, 3)), '2024-12-23 08:00:00.000')
# tdSql.CheckEqual(str(tdSql.getData(0, 4)), '2025-01-02 07:59:59.999')
# tdSql.CheckEqual(tdSql.getData(0, 6), '1970-01-01 08:00:00.000')
# tdSql.CheckEqual(tdSql.getData(0, 7), False)
tdDnodes.stopAll()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())