diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 04694b05fd..ba5219cf20 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -801,7 +801,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ int64_t* oldStage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); -bool streamTaskAllUpstreamClosed(SStreamTask* pTask); +bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); @@ -826,8 +826,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); -int32_t streamResetParamForScanHistory(SStreamTask* pTask); -void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); +void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); @@ -835,7 +834,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a53b322753..016e3e199e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -742,8 +742,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { return code; } - streamTaskOpenAllUpstreamInput(pTask); - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { STaskId taskId = {0}; if (pTask->info.fillHistory) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 5a92677462..358af0b083 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -76,33 +76,6 @@ int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return 0; } -int32_t tqUpdateNodeEpsetAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { - int32_t vgId = pMeta->vgId; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr()); - return -1; - } - - tqDebug("vgId:%d update s-task:0x%x nodeEpset async", vgId, taskId); - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = STREAM_EXEC_T_UPDATE_TASK_EPSET; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(cb, STREAM_QUEUE, &msg); - return 0; -} - int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -728,10 +701,6 @@ int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) { STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); streamTaskResetStatus(*pTask); - -// if ((*pTask)->info.fillHistory == 1) { -// streamResetParamForScanHistory(*pTask); -// } } return 0; @@ -922,7 +891,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){ } tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); - streamTaskPause(pTask, pMeta); + streamTaskPause(pMeta, pTask); SStreamTask* pHistoryTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -939,7 +908,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){ tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - streamTaskPause(pHistoryTask, pMeta); + streamTaskPause(pMeta, pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 1d1eda74f6..1e4b8996b6 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -120,7 +120,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); void streamTaskSetCheckpointFailedId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); -STaskId streamTaskExtractKey(const SStreamTask* pTask); +STaskId streamTaskGetTaskId(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7d507a1394..32e724c156 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -243,18 +243,18 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S // blocked. Note that there is no race condition here. if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + streamTaskCloseAllUpstreamInput(pTask, pReq->upstreamTaskId); stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + streamTaskCloseAllUpstreamInput(pTask, pReq->upstreamTaskId); // disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state STaskId* pRelTaskId = &pTask->streamTaskId; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId); if (pStreamTask != NULL) { atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); + streamTaskCloseAllUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); streamMetaReleaseTask(pMeta, pStreamTask); } @@ -300,28 +300,6 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } -void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { - int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); - if (num == 0) { - return; - } - - for (int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - pInfo->dataAllowed = true; - } - - pTask->upstreamInfo.numOfClosed = 0; - stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr); -} - -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); - if (pInfo != NULL) { - pInfo->dataAllowed = false; - } -} - SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < num; ++i) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ee2b70eebe..20e7be0509 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -47,6 +47,12 @@ struct SMetaHbInfo { int64_t hbStart; }; +typedef struct STaskInitTs { + int64_t start; + int64_t end; + bool success; +} STaskInitTs; + SMetaRefMgt gMetaRefMgt; void metaRefMgtInit(); @@ -581,7 +587,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - STaskId id = streamTaskExtractKey(pTask); + STaskId id = streamTaskGetTaskId(pTask); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p != NULL) { return 0; @@ -871,7 +877,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask); - STaskId id = streamTaskExtractKey(pTask); + STaskId id = streamTaskGetTaskId(pTask); taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); @@ -1407,37 +1413,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } } -int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { - streamMetaRLock(pMeta); - - int32_t num = taosArrayGetSize(pMeta->pTaskList); - stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); - if (num == 0) { - streamMetaRUnLock(pMeta); - return TSDB_CODE_SUCCESS; - } - - // send hb msg to mnode before closing all tasks. - SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); - int32_t numOfTasks = taosArrayGetSize(pTaskList); - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId); - if (pTask == NULL) { - continue; - } - - streamTaskStop(pTask); - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - - streamMetaRUnLock(pMeta); - return 0; -} - int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pMeta->vgId; @@ -1512,6 +1487,37 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { return code; } +int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { + streamMetaRLock(pMeta); + + int32_t num = taosArrayGetSize(pMeta->pTaskList); + stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); + if (num == 0) { + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; + } + + // send hb msg to mnode before closing all tasks. + SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); + int32_t numOfTasks = taosArrayGetSize(pTaskList); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + streamTaskStop(pTask); + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + + streamMetaRUnLock(pMeta); + return 0; +} + int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t vgId = pMeta->vgId; stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId); @@ -1547,4 +1553,74 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas streamMetaReleaseTask(pMeta, pTask); return ret; +} + +static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { + int32_t vgId = pMeta->vgId; + void* pIter = NULL; + size_t keyLen = 0; + + stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet), + succ ? "success" : "failed"); + + while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { + STaskInitTs* pInfo = pIter; + void* key = taosHashGetKey(pIter, &keyLen); + + SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); + if (pTask1 == NULL) { + stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); + } else { + stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, + (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + } + } +} + +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; + + streamMetaWLock(pMeta); + + if (pStartInfo->taskStarting != 1) { + int64_t el = endTs - startTs; + qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", + pMeta->vgId, taskId, ready, el); + streamMetaWUnLock(pMeta); + return 0; + } + + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; + + STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; + taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); + + int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); + + if (numOfRecv == numOfTotal) { + pStartInfo->readyTs = taosGetTimestampMs(); + pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; + + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 + ", readyTs:%" PRId64 " total elapsed time:%.2fs", + pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, + pStartInfo->elapsedTime / 1000.0); + + // print the initialization elapsed time and info + displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); + displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); + streamMetaResetStartInfo(pStartInfo); + streamMetaWUnLock(pMeta); + + pStartInfo->completeFn(pMeta); + } else { + streamMetaWUnLock(pMeta); + stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId, + ready, numOfRecv, numOfTotal); + } + + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 84f1dbb4d7..5e1566c1e1 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -35,12 +35,6 @@ typedef struct STaskRecheckInfo { void* checkTimer; } STaskRecheckInfo; -typedef struct STaskInitTs { - int64_t start; - int64_t end; - bool success; -} STaskInitTs; - static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); @@ -546,15 +540,6 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) { return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } -int32_t streamResetParamForScanHistory(SStreamTask* pTask) { - stDebug("s-task:%s reset operator option for scan-history data", pTask->id.idStr); - if (pTask->exec.pExecutor != NULL) { - return qResetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); - } else { - return TSDB_CODE_SUCCESS; - } -} - int32_t streamRestoreParam(SStreamTask* pTask) { stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr); return qRestoreStreamOperatorOption(pTask->exec.pExecutor); @@ -1134,97 +1119,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } } -void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); - - int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); - - // in case of fill-history task, stop the tsdb file scan operation. - if (pTask->info.fillHistory == 1) { - void* pExecutor = pTask->exec.pExecutor; - qKillTask(pExecutor, TSDB_CODE_SUCCESS); - } - - stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); -} - -void streamTaskResume(SStreamTask* pTask) { - SStreamTaskState prevState = *streamTaskGetStatus(pTask); - SStreamMeta* pMeta = pTask->pMeta; - - if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) { - streamTaskRestoreStatus(pTask); - - char* pNew = streamTaskGetStatus(pTask)->name; - if (prevState.state == TASK_STATUS__PAUSE) { - int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); - } else { - stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name); - } - } else { - stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); - } -} - -static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { - int32_t vgId = pMeta->vgId; - void* pIter = NULL; - size_t keyLen = 0; - - stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet), - succ ? "success" : "failed"); - - while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { - STaskInitTs* pInfo = pIter; - void* key = taosHashGetKey(pIter, &keyLen); - - SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); - if (pTask1 == NULL) { - stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); - } else { - stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, - (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); - } - } -} - -int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, - int64_t endTs, bool ready) { - STaskStartInfo* pStartInfo = &pMeta->startInfo; - STaskId id = {.streamId = streamId, .taskId = taskId}; - - streamMetaWLock(pMeta); - SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; - - STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; - taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); - - int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - - if (numOfRecv == numOfTotal) { - pStartInfo->readyTs = taosGetTimestampMs(); - pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, - pStartInfo->elapsedTime / 1000.0); - - // print the initialization elapsed time and info - displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); - displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); - streamMetaWUnLock(pMeta); - - pStartInfo->completeFn(pMeta); - } else { - streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId, - ready, numOfRecv, numOfTotal); - } - - return TSDB_CODE_SUCCESS; -} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c14d355dc8..881f10502c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -30,6 +30,55 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { return 0; } +static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { + char buf[512] = {0}; + + if (pTask->info.nodeId == nodeId) { // execution task should be moved away + epsetAssign(&pTask->info.epSet, pEpSet); + EPSET_TO_STR(pEpSet, buf) + stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); + } + + // check for the dispath info and the upstream task info + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else if (level == TASK_LEVEL__AGG) { + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else { // TASK_LEVEL__SINK + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + } + + return 0; +} + +static void freeItem(void* p) { + SStreamContinueExecInfo* pInfo = p; + rpcFreeCont(pInfo->msg.pCont); +} + +static void freeUpstreamItem(void* p) { + SStreamChildEpInfo** pInfo = p; + taosMemoryFree(*pInfo); +} + +static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { + SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); + if (pEpInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pEpInfo->childId = pTask->info.selfChildId; + pEpInfo->epSet = pTask->info.epSet; + pEpInfo->nodeId = pTask->info.nodeId; + pEpInfo->taskId = pTask->id.taskId; + pEpInfo->stage = -1; + + return pEpInfo; +} + SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); @@ -291,16 +340,6 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { return 0; } -static void freeItem(void* p) { - SStreamContinueExecInfo* pInfo = p; - rpcFreeCont(pInfo->msg.pCont); -} - -static void freeUpstreamItem(void* p) { - SStreamChildEpInfo** pInfo = p; - taosMemoryFree(*pInfo); -} - void tFreeStreamTask(SStreamTask* pTask) { char* p = NULL; int32_t taskId = pTask->id.taskId; @@ -475,14 +514,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } taosThreadMutexInit(&pTask->lock, &attr); - // if (pTask->info.fillHistory == 1) { - // // - // } else { - - // } - // if (streamTaskSetDb(pMeta, pTask) != 0) { - // return -1; - // } streamTaskOpenAllUpstreamInput(pTask); pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); @@ -509,22 +540,6 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { } } -static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { - SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); - if (pEpInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pEpInfo->childId = pTask->info.selfChildId; - pEpInfo->epSet = pTask->info.epSet; - pEpInfo->nodeId = pTask->info.nodeId; - pEpInfo->taskId = pTask->id.taskId; - pEpInfo->stage = -1; - - return pEpInfo; -} - int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask); if (pEpInfo == NULL) { @@ -622,29 +637,6 @@ int32_t streamTaskStop(SStreamTask* pTask) { return 0; } -int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { - char buf[512] = {0}; - - if (pTask->info.nodeId == nodeId) { // execution task should be moved away - epsetAssign(&pTask->info.epSet, pEpSet); - EPSET_TO_STR(pEpSet, buf) - stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); - } - - // check for the dispath info and the upstream task info - int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SOURCE) { - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else if (level == TASK_LEVEL__AGG) { - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else { // TASK_LEVEL__SINK - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - } - - return 0; -} - int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { STaskExecStatisInfo* p = &pTask->execInfo; @@ -677,7 +669,29 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); } -bool streamTaskAllUpstreamClosed(SStreamTask* pTask) { +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { + int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); + if (num == 0) { + return; + } + + for (int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + pInfo->dataAllowed = true; + } + + pTask->upstreamInfo.numOfClosed = 0; + stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num); +} + +void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + if (pInfo != NULL) { + pInfo->dataAllowed = false; + } +} + +bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) { return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList); } @@ -760,7 +774,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -STaskId streamTaskExtractKey(const SStreamTask* pTask) { +STaskId streamTaskGetTaskId(const SStreamTask* pTask) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; return id; } @@ -801,3 +815,36 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->chkpointTransId = pSrc->chkpointTransId; } +void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); + + int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + + // in case of fill-history task, stop the tsdb file scan operation. + if (pTask->info.fillHistory == 1) { + void* pExecutor = pTask->exec.pExecutor; + qKillTask(pExecutor, TSDB_CODE_SUCCESS); + } + + stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); +} + +void streamTaskResume(SStreamTask* pTask) { + SStreamTaskState prevState = *streamTaskGetStatus(pTask); + SStreamMeta* pMeta = pTask->pMeta; + + if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) { + streamTaskRestoreStatus(pTask); + + char* pNew = streamTaskGetStatus(pTask)->name; + if (prevState.state == TASK_STATUS__PAUSE) { + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); + } else { + stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name); + } + } else { + stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); + } +}