fix(stream): check the right return code for concurrent checkpoint trans.

This commit is contained in:
Haojun Liao 2024-11-04 19:38:18 +08:00
parent f61e79ec1d
commit 0614efdffd
3 changed files with 89 additions and 4 deletions

View File

@ -1646,6 +1646,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "checkpointInterval"); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "checkpointInterval");
tsStreamCheckpointInterval = pItem->i32; tsStreamCheckpointInterval = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "concurrentCheckpoint");
tsMaxConcurrentCheckpoint = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamSinkDataRate"); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamSinkDataRate");
tsSinkDataRate = pItem->fval; tsSinkDataRate = pItem->fval;

View File

@ -1294,9 +1294,10 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
void* p = taosArrayPush(pList, &in); void* p = taosArrayPush(pList, &in);
if (p) { if (p) {
int32_t currentSize = taosArrayGetSize(pList); int32_t currentSize = taosArrayGetSize(pList);
mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %ds(%" PRId64 mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
"s) beyond concurrently launch threshold:%d", "s), concurrently launch threshold:%d",
pStream->name, pStream->uid, tsStreamCheckpointInterval, duration / 1000, currentSize); pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
tsMaxConcurrentCheckpoint);
} else { } else {
mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid); mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
} }
@ -1348,7 +1349,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
sdbRelease(pSdb, p); sdbRelease(pSdb, p);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
started += 1; started += 1;
if (started >= capacity) { if (started >= capacity) {
@ -1356,6 +1357,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
(started + numOfCheckpointTrans)); (started + numOfCheckpointTrans));
break; break;
} }
} else {
mError("failed to start checkpoint trans, code:%s", tstrerror(code));
} }
} }
} }

View File

@ -0,0 +1,79 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c supportVnodes -v 1
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database abc1 vgroups 1;
sql use abc1;
sql create table st1(ts timestamp, k int) tags(a int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
sql insert into t1 values(now, 1);
sql create stream str1 trigger at_once into str_dst1 as select count(*) from st1 interval(30s);
sql create stream str2 trigger at_once into str_dst2 as select count(*) from st1 interval(30s);
sql create stream str3 trigger at_once into str_dst3 as select count(*) from st1 interval(30s);
print ============== create 3 streams, check the concurrently checkpoint
sleep 180000
sql select task_id, checkpoint_id from information_schema.ins_stream_tasks order by checkpoint_id;
print $data01 $data11 $data21
if $data01 == $data11 then
print not allowed 2 checkpoint start completed
return -1
endi
if $data11 == $data21 then
print not allowed 2 checkpoints start concurrently
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print ========== concurrent checkpoint is set 2
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c concurrentCheckpoint -v 2
system sh/exec.sh -n dnode1 -s start
print ========== step2
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database abc1 vgroups 1;
sql use abc1;
sql create table st1(ts timestamp, k int) tags(a int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
sql insert into t1 values(now, 1);
sql create stream str1 trigger at_once into str_dst1 as select count(*) from st1 interval(30s);
sql create stream str2 trigger at_once into str_dst2 as select count(*) from st1 interval(30s);
sql create stream str3 trigger at_once into str_dst3 as select count(*) from st1 interval(30s);
print ============== create 3 streams, check the concurrently checkpoint
sleep 180000
sql select count(*) a, checkpoint_id from information_schema.ins_stream_tasks group by checkpoint_id order by a;
print $data00 $data01
print $data10 $data11
if $data00 != 1 then
print expect 1, actual $data00
return -1
endi
if $data10 != 2 then
print expect 2, actual $data10
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT