fix(stream): check the right return code for concurrent checkpoint trans.

This commit is contained in:
Haojun Liao 2024-11-04 19:38:18 +08:00
parent 28465ecbe3
commit d75d22eb3c
3 changed files with 89 additions and 4 deletions

View File

@ -1646,6 +1646,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "checkpointInterval");
tsStreamCheckpointInterval = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "concurrentCheckpoint");
tsMaxConcurrentCheckpoint = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamSinkDataRate");
tsSinkDataRate = pItem->fval;

View File

@ -1284,9 +1284,10 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
void* p = taosArrayPush(pList, &in);
if (p) {
int32_t currentSize = taosArrayGetSize(pList);
mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %ds(%" PRId64
"s) beyond concurrently launch threshold:%d",
pStream->name, pStream->uid, tsStreamCheckpointInterval, duration / 1000, currentSize);
mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
"s), concurrently launch threshold:%d",
pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
tsMaxConcurrentCheckpoint);
} else {
mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
}
@ -1338,7 +1339,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
sdbRelease(pSdb, p);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
started += 1;
if (started >= capacity) {
@ -1346,6 +1347,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
(started + numOfCheckpointTrans));
break;
}
} else {
mError("failed to start checkpoint trans, code:%s", tstrerror(code));
}
}
}

View File

@ -0,0 +1,79 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c supportVnodes -v 1
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database abc1 vgroups 1;
sql use abc1;
sql create table st1(ts timestamp, k int) tags(a int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
sql insert into t1 values(now, 1);
sql create stream str1 trigger at_once into str_dst1 as select count(*) from st1 interval(30s);
sql create stream str2 trigger at_once into str_dst2 as select count(*) from st1 interval(30s);
sql create stream str3 trigger at_once into str_dst3 as select count(*) from st1 interval(30s);
print ============== create 3 streams, check the concurrently checkpoint
sleep 180000
sql select task_id, checkpoint_id from information_schema.ins_stream_tasks order by checkpoint_id;
print $data01 $data11 $data21
if $data01 == $data11 then
print not allowed 2 checkpoint start completed
return -1
endi
if $data11 == $data21 then
print not allowed 2 checkpoints start concurrently
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print ========== concurrent checkpoint is set 2
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c concurrentCheckpoint -v 2
system sh/exec.sh -n dnode1 -s start
print ========== step2
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database abc1 vgroups 1;
sql use abc1;
sql create table st1(ts timestamp, k int) tags(a int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
sql insert into t1 values(now, 1);
sql create stream str1 trigger at_once into str_dst1 as select count(*) from st1 interval(30s);
sql create stream str2 trigger at_once into str_dst2 as select count(*) from st1 interval(30s);
sql create stream str3 trigger at_once into str_dst3 as select count(*) from st1 interval(30s);
print ============== create 3 streams, check the concurrently checkpoint
sleep 180000
sql select count(*) a, checkpoint_id from information_schema.ins_stream_tasks group by checkpoint_id order by a;
print $data00 $data01
print $data10 $data11
if $data00 != 1 then
print expect 1, actual $data00
return -1
endi
if $data10 != 2 then
print expect 2, actual $data10
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT