refactor(stream): refactor check msg retry procedure and be tolerant to the dnode disconnection during check downstream tasks.

This commit is contained in:
Haojun Liao 2024-04-18 15:55:21 +08:00
parent deee0c3793
commit e7dee4ac63
6 changed files with 362 additions and 121 deletions

View File

@ -432,6 +432,22 @@ typedef struct SUpstreamInfo {
int32_t numOfClosed; int32_t numOfClosed;
} SUpstreamInfo; } SUpstreamInfo;
typedef struct SDownstreamStatusInfo {
int64_t reqId;
int32_t taskId;
int64_t rspTs;
int32_t status;
} SDownstreamStatusInfo;
typedef struct STaskCheckInfo {
SArray* pList;
int64_t startTs;
int32_t notReadyTasks;
int32_t inCheckProcess;
tmr_h checkRspTmr;
TdThreadMutex checkInfoLock;
} STaskCheckInfo;
struct SStreamTask { struct SStreamTask {
int64_t ver; int64_t ver;
SStreamTaskId id; SStreamTaskId id;
@ -455,14 +471,12 @@ struct SStreamTask {
SStreamState* pState; // state backend SStreamState* pState; // state backend
SArray* pRspMsgList; SArray* pRspMsgList;
SUpstreamInfo upstreamInfo; SUpstreamInfo upstreamInfo;
STaskCheckInfo taskCheckInfo;
// the followings attributes don't be serialized // the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo; SScanhistorySchedInfo schedHistoryInfo;
int32_t notReadyTasks;
int32_t numOfWaitingUpstream; int32_t numOfWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt; int32_t refCnt;
int32_t transferStateAlignCnt; int32_t transferStateAlignCnt;
struct SStreamMeta* pMeta; struct SStreamMeta* pMeta;
@ -832,6 +846,15 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask); bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id);
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id);
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);

View File

@ -26,7 +26,7 @@
extern "C" { extern "C" {
#endif #endif
#define CHECK_DOWNSTREAM_INTERVAL 100 #define CHECK_RSP_INTERVAL 200
#define LAUNCH_HTASK_INTERVAL 100 #define LAUNCH_HTASK_INTERVAL 100
#define WAIT_FOR_MINIMAL_INTERVAL 100.00 #define WAIT_FOR_MINIMAL_INTERVAL 100.00
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 #define MAX_RETRY_LAUNCH_HISTORY_TASK 40

View File

@ -396,6 +396,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) {
taosMemoryFree(file); taosMemoryFree(file);
return code; return code;
} }
int32_t doUploadChkp(void* param) { int32_t doUploadChkp(void* param) {
SAsyncUploadArg* arg = param; SAsyncUploadArg* arg = param;
char* path = NULL; char* path = NULL;
@ -436,6 +437,7 @@ int32_t doUploadChkp(void* param) {
taosMemoryFree(arg); taosMemoryFree(arg);
return code; return code;
} }
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload // async upload
UPLOAD_TYPE type = getUploadType(); UPLOAD_TYPE type = getUploadType();

View File

@ -1617,7 +1617,7 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
void* pIter = NULL; void* pIter = NULL;
size_t keyLen = 0; size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet), stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed"); succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {

View File

@ -83,7 +83,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0; return 0;
} }
static void doReExecScanhistory(void* param, void* tmrId) { static void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1; pTask->schedHistoryInfo.numOfTicks -= 1;
@ -105,7 +105,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
// release the task. // release the task.
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
} else { } else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
} }
} }
@ -131,9 +131,9 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref); stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) { if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); pTask->schedHistoryInfo.pTimer = taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
} else { } else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -184,12 +184,20 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
return;
}
streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs());
// serialize streamProcessScanHistoryFinishRsp // serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
pTask->checkReqId = req.reqId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
" window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64, " window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64,
@ -197,95 +205,35 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
streamTaskStartMonitorCheckRsp(pTask);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
if (pTask->checkReqIds == NULL) {
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
} else {
taosArrayClear(pTask->checkReqIds);
}
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
for (int32_t i = 0; i < numOfVgs; i++) { for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId; req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else {
streamTaskStartMonitorCheckRsp(pTask);
} else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
doProcessDownstreamReadyRsp(pTask); doProcessDownstreamReadyRsp(pTask);
} }
} }
static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
STaskRecheckInfo* pInfo = taosMemoryCalloc(1, sizeof(STaskRecheckInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pInfo->pTask = pTask;
pInfo->req = (SStreamTaskCheckReq){
.reqId = pRsp->reqId,
.streamId = pRsp->streamId,
.upstreamTaskId = pRsp->upstreamTaskId,
.upstreamNodeId = pRsp->upstreamNodeId,
.downstreamTaskId = pRsp->downstreamTaskId,
.downstreamNodeId = pRsp->downstreamNodeId,
.childId = pRsp->childId,
.stage = pTask->pMeta->stage,
};
return pInfo;
}
static void destroyRecheckInfo(STaskRecheckInfo* pInfo) {
if (pInfo != NULL) {
taosTmrStop(pInfo->checkTimer);
pInfo->checkTimer = NULL;
taosMemoryFree(pInfo);
}
}
static void recheckDownstreamTasks(void* param, void* tmrId) {
STaskRecheckInfo* pInfo = param;
SStreamTask* pTask = pInfo->pTask;
SStreamTaskCheckReq* pReq = &pInfo->req;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo->taskId == pReq->downstreamTaskId) {
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pVgInfo->epSet);
}
}
}
destroyRecheckInfo(pInfo);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
}
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage) { int64_t* oldStage) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
@ -439,7 +387,12 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t total = streamTaskGetNumOfDownstream(pTask);
int32_t left = -1;
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id); stDebug("s-task:%s should stop, do not do check downstream again", id);
@ -447,47 +400,21 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} }
if (pRsp->status == TASK_DOWNSTREAM_READY) { if (pRsp->status == TASK_DOWNSTREAM_READY) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
bool found = false;
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
for (int32_t i = 0; i < numOfReqs; i++) {
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
if (reqId == pRsp->reqId) {
found = true;
break;
}
}
if (!found) { if (left == 0) {
return -1; doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
} streamTaskCompleteCheck(pInfo, id);
int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
ASSERT(left >= 0);
if (left == 0) {
pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);;
doProcessDownstreamReadyRsp(pTask);
} else {
int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
} else { } else {
ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
if (pRsp->reqId != pTask->checkReqId) { pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
return -1;
}
doProcessDownstreamReadyRsp(pTask);
} }
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else { } else {
@ -499,7 +426,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} }
int32_t startTs = pTask->execInfo.checkTs; int32_t startTs = pTask->execInfo.checkTs;
int64_t now = taosGetTimestampMs();
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
// automatically set the related fill-history task to be failed. // automatically set the related fill-history task to be failed.
@ -507,13 +433,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
STaskId* pId = &pTask->hTaskInfo.id; STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
} }
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%" PRId64 ", retry in 100ms, ref:%d ", id, ASSERT(left > 0);
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer); pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
} }
} }

View File

@ -21,6 +21,8 @@
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
@ -113,6 +115,9 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
if (fillHistory) { if (fillHistory) {
ASSERT(hasFillhistory); ASSERT(hasFillhistory);
} }
@ -424,9 +429,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo); tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);
} }
streamTaskCleanCheckInfo(&pTask->taskCheckInfo);
if (pTask->pState) { if (pTask->pState) {
stDebug("s-task:0x%x start to free task state", taskId); stDebug("s-task:0x%x start to free task state", taskId);
streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING);
@ -933,3 +939,289 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
tmsgSendReq(&pTask->info.mnodeEpset, &msg); tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0; return 0;
} }
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
if (pInfo->pList == NULL) {
pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
} else {
taosArrayClear(pInfo->pList);
}
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
pInfo->notReadyTasks = 1;
} else {
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
}
pInfo->startTs = startTs;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
taosThreadMutexLock(&pInfo->checkInfoLock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->taskId == taskId) {
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
}
taosArrayPush(pInfo->pList, &info);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->taskId == taskId) {
ASSERT(reqId == p->reqId);
p->status = status;
p->rspTs = rspTs;
// count down one, since it is ready now
if (p->status == TASK_DOWNSTREAM_READY) {
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
} else {
*pNotReady = pInfo->notReadyTasks;
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stError("s-task:%s unexpected check rsp msg, downstream task:0x%x, reqId:%"PRIx64, id, taskId, reqId);
return TSDB_CODE_FAILED;
}
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->inCheckProcess == 0) {
pInfo->inCheckProcess = 1;
} else {
ASSERT(pInfo->startTs > 0);
stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set the in check procedure flag", id);
return 0;
}
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
if (!pInfo->inCheckProcess) {
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
int64_t el = taosGetTimestampMs() - pInfo->startTs;
stDebug("s-task:%s check downstream completed, elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0;
pInfo->inCheckProcess = 0;
pInfo->notReadyTasks = 0;
taosArrayClear(pInfo->pList);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return 0;
}
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId,
.stage = pTask->pMeta->stage,
};
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (p->taskId == pVgInfo->taskId) {
req.reqId = p->reqId;
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break;
}
}
}
static void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs;
ETaskStatus state = pStat->state;
int32_t numOfReady = 0;
int32_t numOfFault = 0;
stDebug("s-task:%s start to do check downstream rsp check", pTask->id.idStr);
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor rsp tmr, ref:%d", pTask->id.idStr, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
return;
}
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->notReadyTasks == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", pTask->id.idStr,
pStat->name, vgId, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
return;
}
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->status == TASK_DOWNSTREAM_READY) {
numOfReady += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", pTask->id.idStr,
p->taskId);
numOfFault += 1;
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
ASSERT(p->status == -1);
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION
// do nothing and continue waiting for their rsps
}
} else {
taosArrayPush(pNotReadyList, &p->taskId);
}
}
}
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", pTask->id.idStr, pStat->name);
}
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, ref:%d",
pTask->id.idStr, pStat->name, vgId, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
return;
}
if (numOfNotReady > 0) { // check to make sure not in recheck timer
ASSERT(pTask->status.downstreamReady == 0);
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p->taskId == taskId) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
}
}
stDebug("s-task:%s %d downstream tasks not ready, send check msg again", pTask->id.idStr, numOfNotReady);
}
if (numOfTimeout > 0) {
pInfo->startTs = now;
ASSERT(pTask->status.downstreamReady == 0);
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p->taskId == taskId) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
break;
}
}
}
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, pTask->id.idStr,
numOfTimeout, now);
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", pTask->id.idStr,
numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
return 0;
}
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
ASSERT(pInfo->inCheckProcess == 0);
pInfo->pList = taosArrayDestroy(pInfo->pList);
if (pInfo->checkRspTmr != NULL) {
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
pInfo->checkRspTmr = NULL;
}
taosThreadMutexDestroy(&pInfo->checkInfoLock);
}