Merge pull request #29397 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch
This commit is contained in:
Shengliang Guan 2024-12-30 14:06:17 +08:00 committed by GitHub
commit 057f67c90b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 149 additions and 8 deletions

View File

@ -1415,7 +1415,8 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
SVAlterTbReq vAlterTbReq = {0};
SVAlterTbRsp vAlterTbRsp = {0};
SDecoder dc = {0};
int32_t rcode = 0;
int32_t code = 0;
int32_t lino = 0;
int32_t ret;
SEncoder ec = {0};
STableMetaRsp vMetaRsp = {0};
@ -1431,7 +1432,6 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
rcode = -1;
goto _exit;
}
@ -1439,7 +1439,6 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
vAlterTbRsp.code = terrno;
tDecoderClear(&dc);
rcode = -1;
goto _exit;
}
tDecoderClear(&dc);
@ -1449,6 +1448,31 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
vAlterTbRsp.pMeta = &vMetaRsp;
}
if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, vAlterTbReq.tbName);
if (uid == 0) {
vError("vgId:%d, %s failed at %s:%d since table %s not found", TD_VID(pVnode), __func__, __FILE__, __LINE__,
vAlterTbReq.tbName);
goto _exit;
}
SArray* tbUids = taosArrayInit(4, sizeof(int64_t));
void* p = taosArrayPush(tbUids, &uid);
TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
vDebug("vgId:%d, remove tags value altered table:%s from query table list", TD_VID(pVnode), vAlterTbReq.tbName);
if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, false)) < 0) {
vError("vgId:%d, failed to remove tbUid list since %s", TD_VID(pVnode), tstrerror(code));
}
vDebug("vgId:%d, try to add table:%s in query table list", TD_VID(pVnode), vAlterTbReq.tbName);
if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, true)) < 0) {
vError("vgId:%d, failed to add tbUid list since %s", TD_VID(pVnode), tstrerror(code));
}
taosArrayDestroy(tbUids);
}
_exit:
taosArrayDestroy(vAlterTbReq.pMultiTag);
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
@ -1457,6 +1481,7 @@ _exit:
if (tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp) != 0) {
vError("vgId:%d, failed to encode alter table response", TD_VID(pVnode));
}
tEncoderClear(&ec);
if (vMetaRsp.pSchemas) {
taosMemoryFree(vMetaRsp.pSchemas);

View File

@ -18,7 +18,7 @@
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec
#define CHECK_NOT_RSP_DURATION 60 * 1000 // 60 sec
static void processDownstreamReadyRsp(SStreamTask* pTask);
static void rspMonitorFn(void* param, void* tmrId);
@ -660,7 +660,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
pInfo->timeoutRetryCount += 1;
// timeout more than 100 sec, add into node update list
// timeout more than 600 sec, add into node update list
if (pInfo->timeoutRetryCount > 10) {
pInfo->timeoutRetryCount = 0;
@ -674,7 +674,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) {
code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list",
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 600sec, add into nodeUpdate list",
id, vgId, p->taskId, p->vgId);
}
}

View File

@ -36,7 +36,7 @@
"insert_mode": "taosc",
"line_protocol": "line",
"childtable_limit": -10,
"childtable_offset": 10,
"childtable_offset": 0,
"insert_rows": 20,
"insert_interval": 0,
"interlace_rows": 0,

View File

@ -36,7 +36,7 @@
"insert_mode": "taosc",
"line_protocol": "line",
"childtable_limit": -10,
"childtable_offset": 10,
"childtable_offset": 0,
"insert_rows": 20,
"insert_interval": 0,
"interlace_rows": 0,

View File

@ -1387,6 +1387,8 @@
,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/basic5.sim
,,y,script,./test.sh -f tsim/stream/tag.sim
,,y,script,./test.sh -f tsim/stream/snodeCheck.sim
,,y,script,./test.sh -f tsim/stream/concurrentcheckpt.sim
,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim

View File

@ -0,0 +1,110 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
print step1
print =============== create database
sql create database test vgroups 2;
sql use test;
sql create table st1(ts timestamp, a int, b int , c int, d double) tags(x int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 WATERMARK 100s into streamt as select _wstart as s, count(*) c1 from st1 where x>=2 interval(60s) ;
run tsim/stream/checkTaskStatus.sim
sql insert into t2 values(1648791213000,0,1,1,1.0);
sql insert into t2 values(1648791213001,9,2,2,1.1);
sql insert into t2 values(1648791213009,0,3,3,1.0);
sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);
sleep 300
sql select * from streamt;
if $data01 != 3 then
return -1
endi
sql alter table t1 set tag x=3;
sql insert into t1 values(1648791233000,0,1,1,1.0);
sql insert into t1 values(1648791233001,9,2,2,1.1);
sql insert into t1 values(1648791233009,0,3,3,1.0);
sleep 1000
sql select * from streamt;
if $data01 != 6 then
return -1
endi
sql alter table t1 set tag x=1;
sql alter table t2 set tag x=1;
sql insert into t1 values(1648791243000,0,1,1,1.0);
sql insert into t1 values(1648791243001,9,2,2,1.1);
sql select * from streamt;
if $data01 != 6 then
return -1
endi
#$loop_count = 0
#loop2:
#
#sleep 300
#print 1 sql select * from streamt;
#sql select * from streamt;
#
#print $data00 $data01 $data02 $data03
#print $data10 $data11 $data12 $data13
#
#$loop_count = $loop_count + 1
#if $loop_count == 10 then
# return -1
#endi
#
## row 0
#if $data01 != 3 then
# print ======data01=$data01
# goto loop2
#endi
#
#if $data02 != 6 then
# print ======data02=$data02
# goto loop2
#endi
#
#if $data03 != 3 then
# print ======data03=$data03
# goto loop2
#endi
#
## row 1
#if $data11 != 3 then
# print ======data11=$data11
# goto loop2
#endi
#
#if $data12 != 6 then
# print ======data12=$data12
# goto loop2
#endi
#
#if $data13 != 3 then
# print ======data13=$data13
# goto loop2
#endi
#
print tag end
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -111,6 +111,10 @@ run tsim/stream/distributeInterval0.sim
run tsim/stream/distributeSession0.sim
run tsim/stream/state0.sim
run tsim/stream/basic2.sim
run tsim/stream/basic3.sim
run tsim/stream/basic4.sim
run tsim/stream/basic5.sim
run tsim/stream/tag.sim
run tsim/stream/concurrentcheckpt.sim
run tsim/insert/basic1.sim
run tsim/insert/commit-merge0.sim