Merge pull request #23743 from taosdata/fix/pause_stream

fix(stream): remove invalid assert
This commit is contained in:
Haojun Liao 2023-11-17 23:48:47 +08:00 committed by GitHub
commit 1b4f187ded
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 22 deletions

View File

@ -831,7 +831,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready);
void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta);

View File

@ -947,9 +947,10 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pTq->pStreamMeta->vgId; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int32_t code; int32_t code;
SStreamTaskCheckRsp rsp; SStreamTaskCheckRsp rsp;
@ -969,21 +970,23 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
return code; return code;
} }
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId); rsp.streamId, rsp.upstreamTaskId, pMeta->vgId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return -1; return -1;
} }
code = streamProcessCheckRsp(pTask, &rsp); code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
} }

View File

@ -100,7 +100,8 @@ int32_t tqStartStreamTasks(STQ* pTq) {
streamLaunchFillHistoryTask(pTask); streamLaunchFillHistoryTask(pTask);
} }
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
continue; continue;
} }

View File

@ -487,7 +487,6 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask)
if (ref > 0) { if (ref > 0) {
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) { } else if (ref == 0) {
ASSERT(streamTaskShouldStop(pTask));
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
} else if (ref < 0) { } else if (ref < 0) {
@ -1071,7 +1070,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
void streamMetaNotifyClose(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId,
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
streamMetaWLock(pMeta); streamMetaWLock(pMeta);

View File

@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p); pTask->id.idStr, numOfDowns, el, p);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
pTask->execInfo.start, true);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -410,7 +411,6 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id); stDebug("s-task:%s should stop, do not do check downstream again", id);
@ -470,8 +470,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} }
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
taosGetTimestampMs(), false);
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
@ -1067,14 +1067,12 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
} }
} }
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) { int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
SStreamMeta* pMeta = pTask->pMeta; int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
STaskId id = streamTaskExtractKey(pTask);
STaskStartInfo* pStartInfo = &pMeta->startInfo;
SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet; SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
@ -1086,9 +1084,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs", ", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0); pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info // print the initialization elapsed time and info