From 63494de7f92fbf55dfa5d7f7f952f776242872b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Jan 2024 10:28:07 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 26 +++++++++++------------ source/libs/stream/src/stream.c | 6 +++--- source/libs/stream/src/streamCheckpoint.c | 5 +++++ source/libs/stream/src/streamMeta.c | 12 +++++------ source/libs/stream/src/streamState.c | 1 - source/libs/stream/src/streamTask.c | 2 +- 7 files changed, 29 insertions(+), 25 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5bdf3ab75f..a0cff35623 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -834,7 +834,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); -void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 016e3e199e..1fc7402363 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1124,6 +1124,19 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) pRsp->info.handle = NULL; SStreamCheckpointSourceReq req = {0}; + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs + return code; + } + tDecoderClear(&decoder); + if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; @@ -1140,19 +1153,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); - SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); - tmsgSendRsp(&rsp); // error occurs - return code; - } - tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 32e724c156..75af5a9fc5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -243,18 +243,18 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S // blocked. Note that there is no race condition here. if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseAllUpstreamInput(pTask, pReq->upstreamTaskId); + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseAllUpstreamInput(pTask, pReq->upstreamTaskId); + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); // disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state STaskId* pRelTaskId = &pTask->streamTaskId; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId); if (pStreamTask != NULL) { atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseAllUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); + streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); streamMetaReleaseTask(pMeta, pStreamTask); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 60e4d74b43..006202fdd1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -185,6 +185,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int64_t checkpointId = pDataBlock->info.version; const char* id = pTask->id.idStr; int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pTask->pMeta->vgId; + + stDebug("s-task:%s vgId:%d start to handle the checkpoint block, checkpointId:%" PRId64 " ver:%" PRId64 + ", current checkpointingId:%" PRId64, + id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, checkpointId); // set task status if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bb5cb6e832..1f45e9b94c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1308,28 +1308,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaRLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-rlock", pMeta->vgId); +// stTrace("vgId:%d meta-rlock", pMeta->vgId); taosThreadRwlockRdlock(&pMeta->lock); } void streamMetaRUnLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-runlock", pMeta->vgId); +// stTrace("vgId:%d meta-runlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); } else { - stDebug("vgId:%d meta-runlock completed", pMeta->vgId); +// stDebug("vgId:%d meta-runlock completed", pMeta->vgId); } } void streamMetaWLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-wlock", pMeta->vgId); +// stTrace("vgId:%d meta-wlock", pMeta->vgId); taosThreadRwlockWrlock(&pMeta->lock); - stTrace("vgId:%d meta-wlock completed", pMeta->vgId); +// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-wunlock", pMeta->vgId); +// stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosThreadRwlockUnlock(&pMeta->lock); } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 776a9db522..19b7359981 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -435,7 +435,6 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) { // todo refactor - stDebug("streamStateReleaseBuf"); if (!pVal) { return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9e981a79c7..2497e8dd8e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -684,7 +684,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num); } -void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId) { +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); if (pInfo != NULL) { pInfo->dataAllowed = false;