Merge pull request #25434 from taosdata/fix/3_liaohj

fix(stream):update the check-rsp prcedure, to avoid repeatly start check-rsp procedure.
This commit is contained in:
Haojun Liao 2024-04-22 22:13:19 +08:00 committed by GitHub
commit ada1c6d4c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 93 additions and 63 deletions

View File

@ -444,6 +444,7 @@ typedef struct STaskCheckInfo {
int64_t startTs; int64_t startTs;
int32_t notReadyTasks; int32_t notReadyTasks;
int32_t inCheckProcess; int32_t inCheckProcess;
int32_t stopCheckProcess;
tmr_h checkRspTmr; tmr_h checkRspTmr;
TdThreadMutex checkInfoLock; TdThreadMutex checkInfoLock;
} STaskCheckInfo; } STaskCheckInfo;
@ -844,14 +845,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask); bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id); int32_t* pNotReady, const char* id);
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo); void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id);
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);

View File

@ -216,7 +216,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask); streamTaskResetStatus(pTask);
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
SStreamTask** ppHTask = NULL; SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -231,7 +232,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
streamTaskResetStatus(*ppHTask); streamTaskResetStatus(*ppHTask);
streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
} }
} }

View File

@ -184,15 +184,10 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
return;
}
streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs());
// serialize streamProcessScanHistoryFinishRsp // serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
@ -206,8 +201,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
streamTaskStartMonitorCheckRsp(pTask);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
@ -226,11 +222,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
streamTaskStartMonitorCheckRsp(pTask);
} else { // for sink task, set it ready directly. } else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
doProcessDownstreamReadyRsp(pTask); doProcessDownstreamReadyRsp(pTask);
} }
} }
@ -405,7 +399,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (left == 0) { if (left == 0) {
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
streamTaskCompleteCheck(pInfo, id); streamTaskStopMonitorCheckRsp(pInfo, id);
} else { } else {
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);

View File

@ -534,6 +534,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr); int code = taosThreadMutexAttrInit(&attr);
if (code != 0) { if (code != 0) {
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
@ -563,6 +564,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pDownstreamUpdateList == NULL) { if (pOutputInfo->pDownstreamUpdateList == NULL) {
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
if (pTask->taskCheckInfo.pList == NULL) {
stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -942,14 +951,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
if (pInfo->pList == NULL) {
pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
} else {
taosArrayClear(pInfo->pList); taosArrayClear(pInfo->pList);
}
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
pInfo->notReadyTasks = 1; pInfo->notReadyTasks = 1;
@ -959,8 +962,6 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
} }
pInfo->startTs = startTs; pInfo->startTs = startTs;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1014,39 +1015,33 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->inCheckProcess == 0) { if (pInfo->inCheckProcess == 0) {
pInfo->inCheckProcess = 1; pInfo->inCheckProcess = 1;
} else { } else {
ASSERT(pInfo->startTs > 0); ASSERT(pInfo->startTs > 0);
stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs); stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set the in-check-procedure flag", id); stDebug("s-task:%s set the in-check-procedure flag", id);
return 0; return 0;
} }
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) { static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
if (!pInfo->inCheckProcess) { if (!pInfo->inCheckProcess) {
taosThreadMutexUnlock(&pInfo->checkInfoLock); stWarn("s-task:%s already not in-check-procedure", id);
return TSDB_CODE_SUCCESS;
} }
int64_t el = taosGetTimestampMs() - pInfo->startTs; int64_t el = taosGetTimestampMs() - pInfo->startTs;
stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el); stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0; pInfo->startTs = 0;
pInfo->inCheckProcess = 0;
pInfo->notReadyTasks = 0; pInfo->notReadyTasks = 0;
pInfo->inCheckProcess = 0;
pInfo->stopCheckProcess = 0;
taosArrayClear(pInfo->pList); taosArrayClear(pInfo->pList);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return 0; return 0;
} }
@ -1099,16 +1094,22 @@ static void rspMonitorFn(void* param, void* tmrId) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs; int64_t el = now - pInfo->startTs;
ETaskStatus state = pStat->state; ETaskStatus state = pStat->state;
const char* id = pTask->id.idStr;
int32_t numOfReady = 0; int32_t numOfReady = 0;
int32_t numOfFault = 0; int32_t numOfFault = 0;
const char* id = pTask->id.idStr; int32_t numOfNotRsp = 0;
int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0;
stDebug("s-task:%s start to do check downstream rsp check", id); stDebug("s-task:%s start to do check downstream rsp check", id);
if (state == TASK_STATUS__STOP) { if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, id);
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
return; return;
@ -1117,7 +1118,11 @@ static void rspMonitorFn(void* param, void* tmrId) {
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, id);
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return; return;
} }
@ -1127,8 +1132,8 @@ static void rspMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
vgId, ref); vgId, ref);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamTaskCompleteCheck(pInfo, id);
return; return;
} }
@ -1141,7 +1146,8 @@ static void rspMonitorFn(void* param, void* tmrId) {
if (p->status == TASK_DOWNSTREAM_READY) { if (p->status == TASK_DOWNSTREAM_READY) {
numOfReady += 1; numOfReady += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId); stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
p->taskId);
numOfFault += 1; numOfFault += 1;
} else { // TASK_DOWNSTREAM_NOT_READY } else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet if (p->rspTs == 0) { // not response yet
@ -1149,7 +1155,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId); taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION } else { // el < CHECK_NOT_RSP_DURATION
// do nothing and continue waiting for their rsps numOfNotRsp += 1; // do nothing and continue waiting for their rsp
} }
} else { } else {
taosArrayPush(pNotReadyList, &p->taskId); taosArrayPush(pNotReadyList, &p->taskId);
@ -1160,33 +1166,35 @@ static void rspMonitorFn(void* param, void* tmrId) {
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
} }
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore // fault tasks detected, not try anymore
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
(numOfFault > 0)) { if ((numOfNotRsp == 0) && (numOfFault > 0)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList); taosArrayDestroy(pTimeoutList);
streamTaskCompleteCheck(pInfo, id); streamTaskCompleteCheckRsp(pInfo, id);
return; return;
} }
// checking of downstream tasks has been stopped by other threads // checking of downstream tasks has been stopped by other threads
if (pInfo->inCheckProcess == 0) { if (pInfo->stopCheckProcess == 1) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
"timeout:%d, ready:%d ref:%d", "fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
@ -1238,25 +1246,53 @@ static void rspMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
} }
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList); taosArrayDestroy(pTimeoutList);
} }
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL); STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
}
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
} else {
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr);
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return 0; return 0;
} }
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, id);
pInfo->stopCheckProcess = 1;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set stop check rsp mon", id);
return TSDB_CODE_SUCCESS;
}
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) { void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
ASSERT(pInfo->inCheckProcess == 0); ASSERT(pInfo->inCheckProcess == 0);