fix(stream): record the launch failure.

This commit is contained in:
Haojun Liao 2023-12-20 13:55:12 +08:00
parent 94b260a1f9
commit d25a323a4c
3 changed files with 5 additions and 1 deletions

View File

@ -324,6 +324,7 @@ typedef struct SStreamStatus {
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int32_t timerActive; // timer is active int32_t timerActive; // timer is active
int8_t allowedAddInTimer; // allowed to add into timer
int32_t inScanHistorySentinel; int32_t inScanHistorySentinel;
} SStreamStatus; } SStreamStatus;

View File

@ -1071,6 +1071,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
tqScanWal(pTq); tqScanWal(pTq);
return 0; return 0;
} }
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if(code == 0 && taskId > 0){ if(code == 0 && taskId > 0){
tqScanWalAsync(pTq, false); tqScanWalAsync(pTq, false);

View File

@ -621,6 +621,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0,
taosGetTimestampMs(), false);
continue; continue;
} }
@ -637,7 +639,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
streamLaunchFillHistoryTask(pTask); streamLaunchFillHistoryTask(pTask);
} }
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
pTask->execInfo.start, true); pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
continue; continue;