diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 89680f8f96..779fb416c6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -400,8 +400,9 @@ typedef struct SStreamMeta { TdThreadMutex backendMutex; SMetaHbInfo hbInfo; int32_t closedTask; + int32_t totalTasks; // this value should be increased when a new task is added into the meta int32_t chkptNotReadyTasks; - int64_t rid; + int64_t rid; int64_t chkpId; SArray* chkpSaved; @@ -711,6 +712,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); +int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index afae8131eb..000d6e9d5d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1687,7 +1687,8 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); + pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); + pMeta->totalTasks = pMeta->chkptNotReadyTasks; } total = taosArrayGetSize(pMeta->pTaskList); @@ -1798,19 +1799,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask)); } -// bool allStopped = true; -// int32_t numOfCount = streamMetaGetNumOfTasks(pMeta); -// for(int32_t i = 0; i < numOfCount; ++i) { -// SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); -// -// int64_t keys1[2] = {pId->streamId, pId->taskId}; -// SStreamTask** p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1)); -// if ((*p)->status.taskStatus != TASK_STATUS__STOP) { -// allStopped = false; -// tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId); -// break; -// } -// } taosWUnLockLatch(&pMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 05b5379333..60d23663d0 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -39,6 +39,7 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { void tqUpdateNodeStage(STQ* pTq) { SSyncState state = syncGetState(pTq->pVnode->sync); pTq->pStreamMeta->stage = state.term; + tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage); } static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 0e88f12d3b..db94f32459 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -423,12 +423,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); taosRealPath(tdir, NULL, sizeof(tdir)); - // open sma - if (smaOpen(pVnode, rollback)) { - vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno)); - goto _err; - } - // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -436,6 +430,19 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC goto _err; } + // sma required the tq is initialized before the vnode open + pVnode->pTq = tqOpen(tdir, pVnode); + if (pVnode->pTq == NULL) { + vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + // open sma + if (smaOpen(pVnode, rollback)) { + vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + // vnode begin if (vnodeBegin(pVnode) < 0) { vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -450,12 +457,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC goto _err; } - pVnode->pTq = tqOpen(tdir, pVnode); - if (pVnode->pTq == NULL) { - vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); - goto _err; - } - if (rollback) { vnodeRollback(pVnode); } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ccbfdfe82b..77a7456745 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -67,7 +67,6 @@ static void streamSchedByTimer(void* param, void* tmrId) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr); - streamMetaReleaseTask(NULL, pTask); return; } @@ -410,7 +409,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); - qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus); + qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus); } return 0; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b9f167c39b..ebf3ce8a30 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -181,8 +181,10 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc { // todo: remove this when the pipeline checkpoint generating is used. SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); + if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); + pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta); + pMeta->totalTasks = pMeta->chkptNotReadyTasks; } taosWUnLockLatch(&pMeta->lock); @@ -272,6 +274,9 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { keys[1] = pId->taskId; SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + if (p->info.fillHistory == 1) { + continue; + } int8_t prev = p->status.taskStatus; ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); @@ -304,36 +309,36 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t code = 0; -// if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { - // check for all tasks, and do generate the vnode-wide checkpoint data. - SStreamMeta* pMeta = pTask->pMeta; - int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); - ASSERT(remain >= 0); + // check for all tasks, and do generate the vnode-wide checkpoint data. + SStreamMeta* pMeta = pTask->pMeta; + int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); + ASSERT(remain >= 0); - if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state - qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); + if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state + qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); + pMeta->totalTasks = 0; - streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); - qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, - pTask->checkpointingId); - } else { - qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", - pMeta->vgId, pTask->id.idStr, remain, (int32_t)taosArrayGetSize(pMeta->pTaskList)); - } + streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); + streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); + qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, + pTask->checkpointingId); + } else { + qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, + pTask->id.idStr, remain, pMeta->totalTasks); + } - // send check point response to upstream task - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - code = streamTaskSendCheckpointSourceRsp(pTask); - } else { - code = streamTaskSendCheckpointReadyMsg(pTask); - } + // send check point response to upstream task + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } else { + code = streamTaskSendCheckpointReadyMsg(pTask); + } - if (code != TSDB_CODE_SUCCESS) { - // todo: let's retry send rsp to upstream/mnode - qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", - pTask->id.idStr, pTask->checkpointingId, tstrerror(code)); - } + if (code != TSDB_CODE_SUCCESS) { + // todo: let's retry send rsp to upstream/mnode + qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, + pTask->checkpointingId, tstrerror(code)); + } return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index dfd9534a60..d4dd878250 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -258,6 +258,7 @@ void streamMetaClear(SStreamMeta* pMeta) { // release the ref by timer if (p->triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer + qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); taosTmrStop(p->schedTimer); p->triggerParam = 0; streamMetaReleaseTask(pMeta, p); @@ -401,6 +402,22 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { return (int32_t)size; } +int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { + int32_t num = 0; + size_t size = taosArrayGetSize(pMeta->pTaskList); + for (int32_t i = 0; i < size; ++i) { + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + int64_t keys[2] = {pId->streamId, pId->taskId}; + + SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + if ((*p)->info.fillHistory == 0) { + num += 1; + } + } + + return num; +} + SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock);