fix(stream):adjust status of ins_streams

This commit is contained in:
54liuyao 2024-12-17 17:59:53 +08:00
parent 1c6655f450
commit 63a6428e34
3 changed files with 207 additions and 14 deletions

View File

@ -1680,11 +1680,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid); mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
if (pStream->status == STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;
}
if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) { if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return code; return code;
@ -1773,7 +1768,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
// pause stream // pause stream
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__PAUSE;
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code) { if (code) {
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
@ -1824,11 +1818,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
} }
if (pStream->status != STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;
}
mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid); mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);

View File

@ -854,8 +854,7 @@ int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) {
return TSDB_CODE_MND_STREAM_NOT_EXIST; return TSDB_CODE_MND_STREAM_NOT_EXIST;
} }
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { static void mndShowStreamStatus(char *dst, int8_t status) {
int8_t status = atomic_load_8(&pStream->status);
if (status == STREAM_STATUS__NORMAL) { if (status == STREAM_STATUS__NORMAL) {
strcpy(dst, "ready"); strcpy(dst, "ready");
} else if (status == STREAM_STATUS__STOP) { } else if (status == STREAM_STATUS__STOP) {
@ -891,6 +890,41 @@ static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
varDataSetLen(pBuf, len + 2); varDataSetLen(pBuf, len + 2);
} }
static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamTaskIter *pIter = NULL;
bool isPaused = true;
taosRLockLatch(&pStream->lock);
code = createStreamTaskIter(pStream, &pIter);
TSDB_CHECK_CODE(code, lino, _end);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
TSDB_CHECK_CODE(code, lino, _end);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
}
if (pe->status != TASK_STATUS__PAUSE) {
isPaused = false;
}
}
(*pRes) = isPaused;
_end:
destroyStreamTaskIter(pIter);
taosRUnLockLatch(&pStream->lock);
if (code != TSDB_CODE_SUCCESS) {
mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code));
}
return code;
}
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) { int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
int32_t code = 0; int32_t code = 0;
int32_t cols = 0; int32_t cols = 0;
@ -939,7 +973,15 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0}; char status2[20] = {0};
mndShowStreamStatus(status2, pStream); bool isPaused = false;
code = isAllTaskPaused(pStream, &isPaused);
TSDB_CHECK_CODE(code, lino, _end);
int8_t streamStatus = atomic_load_8(&pStream->status);
if (isPaused) {
streamStatus = STREAM_STATUS__PAUSE;
}
mndShowStreamStatus(status2, streamStatus);
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);

View File

@ -398,4 +398,166 @@ endi
print ===== step5 over print ===== step5 over
print ===== step6
sql drop database if exists test6;
sql create database test7 vgroups 1;
sql use test7;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create stream streams8 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt8 as select _wstart, count(*) c1 from st interval(10s);
sql create stream streams9 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt9 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
run tsim/stream/checkTaskStatus.sim
$loop_count = 0
loop7:
$loop_count = $loop_count + 1
if $loop_count == 40 then
return -1
endi
sleep 500
sql select status, * from information_schema.ins_streams where status != "ready";
if $rows != 0 then
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
goto loop7
endi
sql pause stream streams8;
sql pause stream streams9;
sql pause stream streams8;
sql pause stream streams9;
sleep 1000
sql pause stream streams8;
sql pause stream streams9;
sleep 1000
$loop_count = 0
loop80:
$loop_count = $loop_count + 1
if $loop_count == 40 then
print pause stream failed
goto end_step_6
endi
sleep 1000
sql select status, * from information_schema.ins_stream_tasks where status != "paused";
if $rows != 2 then
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
goto loop80
endi
$loop_count = 0
loop8:
$loop_count = $loop_count + 1
if $loop_count == 40 then
return -1
endi
sleep 1000
sql select status, * from information_schema.ins_streams where status == "paused";
if $rows != 2 then
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
goto loop8
endi
sql resume stream streams8;
sql resume stream streams9;
sql resume stream streams8;
sql resume stream streams9;
sleep 1000
sql resume stream streams8;
sql resume stream streams9;
sleep 1000
$loop_count = 0
loop90:
$loop_count = $loop_count + 1
if $loop_count == 40 then
print pause stream failed
goto end_step_6
endi
sleep 1000
sql select status, * from information_schema.ins_stream_tasks where status == "paused";
if $rows != 0 then
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
goto loop90
endi
$loop_count = 0
loop9:
$loop_count = $loop_count + 1
if $loop_count == 40 then
return -1
endi
sleep 1000
sql select status, * from information_schema.ins_streams where status != "paused";
if $rows != 2 then
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
goto loop9
endi
end_step_6:
print ===== step6 over
system sh/stop_dnodes.sh system sh/stop_dnodes.sh