From 6f4fb506a0efccc66a69f181616b1214c23c7a09 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 16 Dec 2023 17:43:41 +0800 Subject: [PATCH 1/4] fix stream backend crash when transfer --- source/libs/stream/src/streamBackendRocksdb.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 630650025d..1bc402ef6c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2289,7 +2289,10 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { } } if (pState != NULL && idx != -1) { - STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + if (wrapper == NULL) { + return -1; + } rocksdb_column_family_handle_t* cf = NULL; taosThreadMutexLock(&wrapper->mutex); From 5aa575acfa4244584eaf490bd9a5c507f2c7978a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 16 Dec 2023 18:03:32 +0800 Subject: [PATCH 2/4] fix stream backend crash when transfer --- source/libs/stream/src/streamBackendRocksdb.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1bc402ef6c..3cf45066be 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2293,11 +2293,10 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { if (wrapper == NULL) { return -1; } - rocksdb_column_family_handle_t* cf = NULL; taosThreadMutexLock(&wrapper->mutex); - cf = wrapper->pCf[idx]; + rocksdb_column_family_handle_t* cf = wrapper->pCf[idx]; if (cf == NULL) { char* err = NULL; cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err); From 0840e6a0d8075df5f59a093af41892f50ce9fb31 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 18 Dec 2023 11:19:15 +0800 Subject: [PATCH 3/4] fix stream backend crash when transfer --- source/libs/stream/src/streamMeta.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 23cb6f5a35..c06dae76d7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -260,9 +260,17 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { } STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId); - if (pBackend == NULL) { - taosThreadMutexUnlock(&pMeta->backendMutex); - return -1; + while (1) { + if (pBackend == NULL) { + taosThreadMutexUnlock(&pMeta->backendMutex); + taosMsleep(1000); + stDebug("backed holded by other task, restart later, path: %s, key: %s", pMeta->path, key); + } else { + taosThreadMutexUnlock(&pMeta->backendMutex); + break; + } + taosThreadMutexLock(&pMeta->backendMutex); + pBackend = taskDbOpen(pMeta->path, key, chkpId); } int64_t tref = taosAddRef(taskDbWrapperId, pBackend); @@ -794,7 +802,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { } int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { - TBC* pCur = NULL; + TBC* pCur = NULL; void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL; From 1dea66d4f5bc020c4da80ec0503ff5ff6b810350 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 18 Dec 2023 15:48:01 +0800 Subject: [PATCH 4/4] fix stream backend crash when transfer --- source/libs/stream/src/streamStart.c | 88 +++++++++++++++------------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 309f377621..c0dbd45225 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -14,14 +14,14 @@ */ #include "streamInt.h" +#include "streamsm.h" #include "trpc.h" #include "ttimer.h" #include "wal.h" -#include "streamsm.h" -#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms -#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec -#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE) +#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms +#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec +#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE) typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; @@ -56,7 +56,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", - pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p); + pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p); } ASSERT(pTask->status.downstreamReady == 0); @@ -77,7 +77,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { initScanHistoryReq(pTask, &req, igUntreated); int32_t len = sizeof(SStreamScanHistoryReq); - void* serializedReq = rpcMallocCont(len); + void* serializedReq = rpcMallocCont(len); if (serializedReq == NULL) { return -1; } @@ -96,7 +96,7 @@ static void doReExecScanhistory(void* param, void* tmrId) { SStreamTask* pTask = param; pTask->schedHistoryInfo.numOfTicks -= 1; - char* p = NULL; + char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { streamMetaReleaseTask(pTask->pMeta, pTask); @@ -115,8 +115,7 @@ static void doReExecScanhistory(void* param, void* tmrId) { // release the task. streamMetaReleaseTask(pTask->pMeta, pTask); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer); + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); } } @@ -135,7 +134,7 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) pTask->schedHistoryInfo.numOfTicks = numOfTicks; int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); + stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref); if (pTask->schedHistoryInfo.pTimer == NULL) { pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); @@ -158,7 +157,7 @@ static int32_t doStartScanHistoryTask(SStreamTask* pTask) { } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { - int32_t level = pTask->info.taskLevel; + int32_t level = pTask->info.taskLevel; ETaskStatus status = streamTaskGetStatus(pTask, NULL); ASSERT(pTask->status.downstreamReady == 1 && @@ -213,7 +212,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); @@ -263,7 +262,7 @@ static void destroyRecheckInfo(STaskRecheckInfo* pInfo) { static void recheckDownstreamTasks(void* param, void* tmrId) { STaskRecheckInfo* pInfo = param; - SStreamTask* pTask = pInfo->pTask; + SStreamTask* pTask = pInfo->pTask; SStreamTaskCheckReq* pReq = &pInfo->req; @@ -290,7 +289,8 @@ static void recheckDownstreamTasks(void* param, void* tmrId) { stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); } -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) { +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, + int64_t* oldStage) { SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); ASSERT(pInfo != NULL); @@ -330,7 +330,7 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { streamTaskSetReady(pTask); streamTaskSetRangeStreamCalc(pTask); - char* p = NULL; + char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); ASSERT(status == TASK_STATUS__READY); @@ -356,7 +356,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { streamTaskSetReady(pTask); streamTaskSetRangeStreamCalc(pTask); - char* p = NULL; + char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); @@ -374,7 +374,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event; if (pTask->info.fillHistory == 0) { - event = HAS_RELATED_FILLHISTORY_TASK(pTask)? TASK_EVENT_INIT_STREAM_SCANHIST:TASK_EVENT_INIT; + event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; } else { event = TASK_EVENT_INIT_SCANHIST; } @@ -418,8 +418,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (pRsp->status == TASK_DOWNSTREAM_READY) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - - bool found = false; + bool found = false; int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); for (int32_t i = 0; i < numOfReqs; i++) { int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); @@ -457,10 +456,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } else { // not ready, wait for 100ms and retry if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { - stError( - "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", " - "not check wait for downstream task nodeUpdate, and all tasks restart", - id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); + stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 + ", current stage:%" PRId64 + ", " + "not check wait for downstream task nodeUpdate, and all tasks restart", + id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else { stError( @@ -476,17 +476,22 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; + int64_t current = taosGetTimestampMs(); SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, - taosGetTimestampMs(), false); - streamMetaReleaseTask(pTask->pMeta, pHTask); + if (pHTask != NULL) { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, current, + false); + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, 0, current, false); + } } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id, + stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%" PRId64 ", retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer); } @@ -496,7 +501,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, - SRpcHandleInfo *pRpcInfo, int32_t taskId) { + SRpcHandleInfo* pRpcInfo, int32_t taskId) { SEncoder encoder; int32_t code; int32_t len; @@ -533,11 +538,11 @@ int32_t streamRestoreParam(SStreamTask* pTask) { } // source -int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { +int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow); } -int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { +int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); } @@ -567,7 +572,7 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { pBlock->info.rows = 1; pBlock->info.childId = pTask->info.selfChildId; - pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; + pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; taosArrayPush(pTranstate->blocks, pBlock); taosMemoryFree(pBlock); @@ -601,8 +606,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory ETaskStatus status = streamTaskGetStatus(pTask, &p); if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) { - stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", - id, p, pReq->upstreamTaskId); + stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p, + pReq->upstreamTaskId); void* pBuf = NULL; int32_t len = 0; @@ -612,8 +617,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); tmsgSendRsp(&msg); - stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, - taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); + stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel, + pReq->upstreamTaskId, pReq->upstreamNodeId); return 0; } @@ -642,13 +647,13 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory // mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks. if (taskLevel == TASK_LEVEL__AGG) { - /*int32_t code = */streamTaskScanHistoryDataComplete(pTask); + /*int32_t code = */ streamTaskScanHistoryDataComplete(pTask); } else { // for sink task, set normal streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); } } else { - stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", - id, pReq->upstreamTaskId, pReq->childId, left); + stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id, + pReq->upstreamTaskId, pReq->childId, left); } return 0; @@ -687,9 +692,9 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 - " verRange:%" PRId64 " - %" PRId64", init:%"PRId64, - pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, - pRange->range.minVer, pRange->range.maxVer, pHTask->execInfo.init); + " verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64, + pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer, pHTask->execInfo.init); } else { stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); } @@ -724,7 +729,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { - SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; pHTaskInfo->tickCount -= 1;