From 19b0de6582343bb1ef218eb1db48decf5950f991 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 13:49:40 +0800 Subject: [PATCH 1/4] fix(stream): add task node into update list if it is timeout for more than 100sec. --- include/libs/stream/tstream.h | 3 +- source/libs/stream/src/streamCheckStatus.c | 153 +++++++++++++-------- 2 files changed, 95 insertions(+), 61 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 260c1c54d2..48a6d046ea 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -435,6 +435,7 @@ typedef struct SUpstreamInfo { typedef struct SDownstreamStatusInfo { int64_t reqId; int32_t taskId; + int32_t vgId; int64_t rspTs; int32_t status; } SDownstreamStatusInfo; @@ -847,7 +848,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id); int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t* pNotReady, const char* id); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 42bf334fa3..00a78b7f81 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -27,15 +27,17 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id); static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); +static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); + static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); - static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status void streamTaskCheckDownstream(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; + const char* idstr = pTask->id.idStr; SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -51,15 +53,14 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { streamTaskStartMonitorCheckRsp(pTask); - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; - req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId); + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); @@ -71,24 +72,22 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { int32_t numOfVgs = taosArrayGetSize(vgInfo); stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + idstr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId); + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { // for sink task, set it ready directly. - stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); + stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); processDownstreamReadyRsp(pTask); } } @@ -158,8 +157,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { - SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { + SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; taosThreadMutexLock(&pInfo->checkInfoLock); @@ -447,6 +446,78 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i } } +void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) { + pReq->reqId = reqId; + pReq->downstreamTaskId = dstTaskId; + pReq->downstreamNodeId = dstNodeId; +} + +void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); + + ASSERT(pTask->status.downstreamReady == 0); + + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + ASSERT(p->status == -1 && p->rspTs == 0); + doSendCheckMsg(pTask, p); + } + } + + pInfo->timeoutRetryCount += 1; + + // timeout more than 100 sec, add into node update list + if (pInfo->timeoutRetryCount > 10) { + pInfo->timeoutRetryCount = 0; + + for(int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*) taosArrayGet(pTimeoutList, i); + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + addIntoNodeUpdateList(pTask, p->vgId); + stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", + id, vgId, p->taskId, p->vgId); + } + } + + stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); + } else { + stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + } +} + +void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t numOfNotReady = taosArrayGetSize(pNotReadyList); + + ASSERT(pTask->status.downstreamReady == 0); + + // reset the info, and send the check msg to failure downstream again + for (int32_t i = 0; i < numOfNotReady; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + p->rspTs = 0; + p->status = -1; + doSendCheckMsg(pTask, p); + } + } + + pInfo->notReadyRetryCount += 1; + stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, + vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); +} + void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamTaskState* pStat = streamTaskGetStatus(pTask); @@ -461,6 +532,7 @@ void rspMonitorFn(void* param, void* tmrId) { int32_t numOfNotRsp = 0; int32_t numOfNotReady = 0; int32_t numOfTimeout = 0; + int32_t total = taosArrayGetSize(pInfo->pList); stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); @@ -510,7 +582,7 @@ void rspMonitorFn(void* param, void* tmrId) { numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); if (numOfFault > 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( @@ -550,57 +622,18 @@ void rspMonitorFn(void* param, void* tmrId) { } if (numOfNotReady > 0) { // check to make sure not in recheck timer - ASSERT(pTask->status.downstreamReady == 0); - - // reset the info, and send the check msg to failure downstream again - for (int32_t i = 0; i < numOfNotReady; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - p->rspTs = 0; - p->status = -1; - doSendCheckMsg(pTask, p); - } - } - - pInfo->notReadyRetryCount += 1; - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, - numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); + handleNotReadyDownstreamTask(pTask, pNotReadyList); } - // todo add into node update list and send to mnode if (numOfTimeout > 0) { - ASSERT(pTask->status.downstreamReady == 0); - - for (int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - ASSERT(p->status == -1 && p->rspTs == 0); - doSendCheckMsg(pTask, p); - } - } - - pInfo->timeoutRetryCount += 1; - - // timeout more than 100 sec, add into node update list - if (pInfo->timeoutRetryCount > 10) { - pInfo->timeoutRetryCount = 0; - stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId, - numOfTimeout); - } else { - stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); - } + handleTimeoutDownstreamTasks(pTask, pTimeoutList); } taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, - numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", + id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); From 88af03096652e571c5a292351d3bbf731d6c58ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 13:53:41 +0800 Subject: [PATCH 2/4] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 41 ++++++++++------------ 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 00a78b7f81..1135878929 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -18,7 +18,7 @@ #include "streamBackendRocksdb.h" #include "streamInt.h" -#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec +#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec static void processDownstreamReadyRsp(SStreamTask* pTask); static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); @@ -29,15 +29,15 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, cons static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); -static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, - int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); +static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, + int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status void streamTaskCheckDownstream(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; - const char* idstr = pTask->id.idStr; + const char* idstr = pTask->id.idStr; SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -60,8 +60,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, - idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, - pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, + pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); @@ -71,8 +71,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); - stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - idstr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr, + numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); @@ -181,7 +181,6 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { taosThreadMutexLock(&pInfo->checkInfoLock); int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { - taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; } @@ -306,10 +305,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { - if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 - " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; @@ -340,7 +337,8 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { pInfo->inCheckProcess = 1; } else { ASSERT(pInfo->startTs > 0); - stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); + stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id, + pInfo->startTs); return TSDB_CODE_FAILED; } @@ -388,9 +386,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { STaskOutputInfo* pOutputInfo = &pTask->outputInfo; if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { - req.reqId = p->reqId; - req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; - req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; + STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; + setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); @@ -403,12 +401,10 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (p->taskId == pVgInfo->taskId) { - req.reqId = p->reqId; - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; + setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 - " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, + " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); @@ -422,7 +418,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); if (p->status == TASK_DOWNSTREAM_READY) { @@ -476,8 +471,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { if (pInfo->timeoutRetryCount > 10) { pInfo->timeoutRetryCount = 0; - for(int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*) taosArrayGet(pTimeoutList, i); + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { addIntoNodeUpdateList(pTask, p->vgId); From 3961903fea459d04110e5a23e28cfe1270a710e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 14:00:03 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 5 +-- source/libs/stream/src/streamCheckStatus.c | 42 ++++++++++++---------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 48a6d046ea..3c74a9fd7b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -848,12 +848,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id); -int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, - int32_t* pNotReady, const char* id); -void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 1135878929..0f4662594f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -26,9 +26,13 @@ static void rspMonitorFn(void* param, void* tmrId); static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id); +static int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id); static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); +static void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList); +static void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList); +static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, + int64_t reqId, int32_t* pNotReady, const char* id); static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); - static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); @@ -157,24 +161,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { - SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; - - taosThreadMutexLock(&pInfo->checkInfoLock); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; - } - - taosArrayPush(pInfo->pList, &info); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; -} - int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; @@ -375,6 +361,24 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* return 0; } +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { + SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; + + taosThreadMutexLock(&pInfo->checkInfoLock); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + + taosArrayPush(pInfo->pList, &info); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} + void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, From 8b53f766911a336715c72e03272085a5cf5ebeb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 15:56:34 +0800 Subject: [PATCH 4/4] fix(stream): discard the repeatly recv checkpoint-source msg. --- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/vnode/src/tq/tq.c | 14 ++++++- source/libs/stream/inc/streamInt.h | 24 ++++++----- source/libs/stream/src/streamBackendRocksdb.c | 12 +++--- source/libs/stream/src/streamCheckpoint.c | 40 +++++++++---------- source/libs/stream/src/streamQueue.c | 1 - 7 files changed, 53 insertions(+), 42 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d34e23c0ba..ba96dc0adf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 300; +int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 16; int32_t tsTtlUnit = 86400; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 521f359f73..844aae0f57 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -847,7 +847,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { if (pIter == NULL) break; maxChkptId = TMAX(maxChkptId, pStream->checkpointId); - mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, + mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, pStream->checkpointId); sdbRelease(pSdb, pStream); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 567d61e27a..cf985cd164 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1156,14 +1156,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { - tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 - " transId:%d already received, ignore this msg and continue process checkpoint", + tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 + " transId:%d already handled, ignore msg and continue process checkpoint", pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; + } else { // checkpoint already finished, and not in checkpoint status + if (req.checkpointId == pTask->chkInfo.checkpointId) { + tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 + " transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId); + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + + return TSDB_CODE_SUCCESS; + } } streamProcessCheckpointSourceReq(pTask, &req); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index b3ed86cff8..44fb0706b8 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -69,6 +69,7 @@ typedef struct { int64_t chkpId; char* dbPrefixPath; } SStreamTaskSnap; + struct STokenBucket { int32_t numCapacity; // total capacity, available token per second int32_t numOfToken; // total available tokens @@ -148,18 +149,19 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); void streamMetaRemoveDB(void* arg, char* key); -typedef enum UPLOAD_TYPE { - UPLOAD_DISABLE = -1, - UPLOAD_S3 = 0, - UPLOAD_RSYNC = 1, -} UPLOAD_TYPE; +typedef enum ECHECKPOINT_BACKUP_TYPE { + DATA_UPLOAD_DISABLE = -1, + DATA_UPLOAD_S3 = 0, + DATA_UPLOAD_RSYNC = 1, +} ECHECKPOINT_BACKUP_TYPE; -UPLOAD_TYPE getUploadType(); -int uploadCheckpoint(char* id, char* path); -int downloadCheckpoint(char* id, char* path); -int deleteCheckpoint(char* id); -int deleteCheckpointFile(char* id, char* name); -int downloadCheckpointByName(char* id, char* fname, char* dstName); +ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); + +int32_t streamTaskBackupCheckpoint(char* id, char* path); +int32_t downloadCheckpoint(char* id, char* path); +int32_t deleteCheckpoint(char* id); +int32_t deleteCheckpointFile(char* id, char* name); +int32_t downloadCheckpointByName(char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 06093cbaf8..123458f372 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -376,10 +376,10 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char return code; } int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - UPLOAD_TYPE type = getUploadType(); - if (type == UPLOAD_S3) { + ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); + if (type == DATA_UPLOAD_S3) { return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath); - } else if (type == UPLOAD_RSYNC) { + } else if (type == DATA_UPLOAD_RSYNC) { return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath); } return -1; @@ -2111,11 +2111,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { STaskDbWrapper* pDb = arg; - UPLOAD_TYPE utype = type; + ECHECKPOINT_BACKUP_TYPE utype = type; - if (utype == UPLOAD_RSYNC) { + if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); - } else if (utype == UPLOAD_S3) { + } else if (utype == DATA_UPLOAD_S3) { return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); } return -1; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8efd661d12..e6d7c2fde8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -19,7 +19,7 @@ #include "streamInt.h" typedef struct { - UPLOAD_TYPE type; + ECHECKPOINT_BACKUP_TYPE type; char* taskId; int64_t chkpId; @@ -416,7 +416,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) { return code; } -int32_t doUploadChkp(void* param) { +int32_t uploadCheckpointData(void* param) { SAsyncUploadArg* arg = param; char* path = NULL; int32_t code = 0; @@ -426,13 +426,13 @@ int32_t doUploadChkp(void* param) { (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } - if (arg->type == UPLOAD_S3) { + if (arg->type == DATA_UPLOAD_S3) { if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); } } - if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) { + if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); } @@ -459,8 +459,8 @@ int32_t doUploadChkp(void* param) { int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { // async upload - UPLOAD_TYPE type = getUploadType(); - if (type == UPLOAD_DISABLE) { + ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); + if (type == DATA_UPLOAD_DISABLE) { return 0; } @@ -474,7 +474,7 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { arg->chkpId = chkpId; arg->pTask = pTask; - return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); + return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { @@ -558,7 +558,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -static int uploadCheckpointToS3(char* id, char* path) { +static int32_t uploadCheckpointToS3(char* id, char* path) { TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) return -1; @@ -590,8 +590,8 @@ static int uploadCheckpointToS3(char* id, char* path) { return 0; } -static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { - int code = 0; +static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { + int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { @@ -601,19 +601,19 @@ static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { return code; } -UPLOAD_TYPE getUploadType() { +ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { if (strlen(tsSnodeAddress) != 0) { - return UPLOAD_RSYNC; + return DATA_UPLOAD_RSYNC; } else if (tsS3StreamEnabled) { - return UPLOAD_S3; + return DATA_UPLOAD_S3; } else { - return UPLOAD_DISABLE; + return DATA_UPLOAD_DISABLE; } } -int uploadCheckpoint(char* id, char* path) { +int32_t streamTaskBackupCheckpoint(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("uploadCheckpoint parameters invalid"); + stError("streamTaskBackupCheckpoint parameters invalid"); return -1; } if (strlen(tsSnodeAddress) != 0) { @@ -625,7 +625,7 @@ int uploadCheckpoint(char* id, char* path) { } // fileName: CURRENT -int downloadCheckpointByName(char* id, char* fname, char* dstName) { +int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("uploadCheckpointByName parameters invalid"); return -1; @@ -638,7 +638,7 @@ int downloadCheckpointByName(char* id, char* fname, char* dstName) { return 0; } -int downloadCheckpoint(char* id, char* path) { +int32_t downloadCheckpoint(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("downloadCheckpoint parameters invalid"); return -1; @@ -651,7 +651,7 @@ int downloadCheckpoint(char* id, char* path) { return 0; } -int deleteCheckpoint(char* id) { +int32_t deleteCheckpoint(char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); return -1; @@ -664,7 +664,7 @@ int deleteCheckpoint(char* id) { return 0; } -int deleteCheckpointFile(char* id, char* name) { +int32_t deleteCheckpointFile(char* id, char* name) { char object[128] = {0}; snprintf(object, sizeof(object), "%s/%s", id, name); char* tmp = object; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 9f79501471..9e872a1aff 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -17,7 +17,6 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec -#define WAIT_FOR_DURATION 10 // todo refactor: // read data from input queue