fix(stream): check for not ready task when rsp returned.

This commit is contained in:
Haojun Liao 2024-01-06 19:43:39 +08:00
parent cfda97beef
commit 02f6c971fd
3 changed files with 80 additions and 16 deletions

View File

@ -56,6 +56,7 @@ extern "C" {
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
#define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_RESUME_TASK (-6)
#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7)
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;

View File

@ -77,6 +77,33 @@ int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
return 0; return 0;
} }
int32_t tqUpdateNodeEpsetAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr());
return -1;
}
tqDebug("vgId:%d update s-task:0x%x nodeEpset async", vgId, taskId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = STREAM_EXEC_T_UPDATE_TASK_EPSET;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
return 0;
}
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -443,12 +470,22 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
} }
static void setParam(SStreamTask* pTask, int64_t* initTs, bool* hasHTask, STaskId* pId) {
*initTs = pTask->execInfo.init;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
*hasHTask = true;
pId->streamId = pTask->hTaskInfo.id.streamId;
pId->taskId = pTask->hTaskInfo.id.taskId;
}
}
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t code = TSDB_CODE_SUCCESS;
int32_t code;
SStreamTaskCheckRsp rsp; SStreamTaskCheckRsp rsp;
SDecoder decoder; SDecoder decoder;
@ -467,35 +504,60 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
int64_t initTs = 0; int64_t initTs = 0;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
if (!isLeader) { STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId};
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); STaskId fId = {0};
if (pTask != NULL) { bool hasHistoryTask = false;
initTs = pTask->execInfo.init;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { // todo extract method
STaskId* pId = &pTask->hTaskInfo.id; if (!isLeader) {
streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, initTs, now, false); // this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map.
streamMetaRLock(pMeta);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false);
} }
streamMetaReleaseTask(pMeta, pTask); tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
} else {
streamMetaRUnLock(pMeta);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
} }
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
return code; return code;
} }
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, now, false); streamMetaRLock(pMeta);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false);
}
} else {
streamMetaRUnLock(pMeta);
}
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId); rsp.streamId, rsp.upstreamTaskId, vgId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
// failed to find the related fill-history task code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return -1; return code;
} }
code = streamProcessCheckRsp(pTask, &rsp); code = streamProcessCheckRsp(pTask, &rsp);

View File

@ -1472,6 +1472,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
// fill-history task can only be launched by related stream tasks. // fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo; STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
continue; continue;
} }