fix(stream): disable the exec of complete check status in timer thread.
This commit is contained in:
parent
57ee97814f
commit
bf5d523116
|
@ -56,6 +56,7 @@ extern "C" {
|
|||
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
||||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
|
||||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
typedef struct SStreamQueue SStreamQueue;
|
||||
|
@ -886,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
|||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||
int64_t endTs, bool ready);
|
||||
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
||||
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
|
||||
void streamMetaRLock(SStreamMeta* pMeta);
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||
|
|
|
@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry {
|
|||
int32_t transId;
|
||||
} STaskUpdateEntry;
|
||||
|
||||
typedef struct SMStreamCheckpointReadyRspMsg {
|
||||
SMsgHead head;
|
||||
} SMStreamCheckpointReadyRspMsg;
|
||||
|
||||
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.fillHistory);
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
|
@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||
|
||||
int64_t initTs = 0;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId};
|
||||
STaskId fId = {0};
|
||||
bool hasHistoryTask = false;
|
||||
|
||||
// todo extract method
|
||||
if (!isLeader) {
|
||||
// this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map.
|
||||
streamMetaRLock(pMeta);
|
||||
|
||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask != NULL) {
|
||||
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
if (hasHistoryTask) {
|
||||
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
|
||||
}
|
||||
|
||||
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
|
||||
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
|
||||
} else {
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
||||
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
|
||||
return code;
|
||||
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
|
||||
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
|
||||
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||
}
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
streamMetaRLock(pMeta);
|
||||
|
||||
// let's try to find this task in hashmap
|
||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask != NULL) {
|
||||
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
if (hasHistoryTask) {
|
||||
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
|
||||
}
|
||||
} else { // not exist even in the hash map of meta, forget it
|
||||
streamMetaRUnLock(pMeta);
|
||||
}
|
||||
|
||||
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
|
||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
||||
|
||||
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
return code;
|
||||
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||
}
|
||||
|
||||
code = streamProcessCheckRsp(pTask, &rsp);
|
||||
|
@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct SMStreamCheckpointReadyRspMsg {
|
||||
SMsgHead head;
|
||||
} SMStreamCheckpointReadyRspMsg;
|
||||
|
||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
|
@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
||||
streamMetaStopAllTasks(pMeta);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
|
||||
|
|
|
@ -295,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
|||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
if (p != NULL) {
|
||||
if (reqId != p->reqId) {
|
||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
|
||||
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
|
||||
id, reqId, p->reqId, taskId);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_FAILED;
|
||||
|
@ -521,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
|||
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
|
||||
}
|
||||
|
||||
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
|
||||
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
||||
// of restart in timer thread will result in a dead lock.
|
||||
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||
if (pRunReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId);
|
||||
|
||||
pRunReq->head.vgId = vgId;
|
||||
pRunReq->streamId = streamId;
|
||||
pRunReq->taskId = taskId;
|
||||
pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// this function is executed in timer thread
|
||||
void rspMonitorFn(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
@ -545,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||
|
||||
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||
}
|
||||
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return;
|
||||
|
@ -618,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
||||
streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||
}
|
||||
|
||||
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
taosArrayDestroy(pNotReadyList);
|
||||
|
|
|
@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx
|
|||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||
*/
|
||||
int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||
static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
// merge multiple input data if possible in the input queue.
|
||||
|
|
|
@ -1706,4 +1706,32 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
|
||||
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
|
||||
if (ppTask != NULL) {
|
||||
STaskCheckInfo* pInfo = &(*ppTask)->taskCheckInfo;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->startTs, now, false);
|
||||
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(*ppTask)) {
|
||||
STaskId hId = (*ppTask)->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, pInfo->startTs, now, false);
|
||||
}
|
||||
} else {
|
||||
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||
streamId, taskId, pMeta->vgId);
|
||||
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
return code;
|
||||
}
|
Loading…
Reference in New Issue