diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h new file mode 100644 index 0000000000..d39a9b0cba --- /dev/null +++ b/include/libs/stream/streammsg.h @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_STREAMMSG_H +#define TDENGINE_STREAMMSG_H + +typedef struct SStreamChildEpInfo { + int32_t nodeId; + int32_t childId; + int32_t taskId; + SEpSet epSet; + bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it + int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer +} SStreamChildEpInfo; + +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); + +// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished +typedef struct { + int64_t streamId; + int64_t checkpointId; + int32_t taskId; + int32_t nodeId; + SEpSet mgmtEps; + int32_t mnodeId; + int32_t transId; + int8_t mndTrigger; + int64_t expireTime; +} SStreamCheckpointSourceReq; + +typedef struct { + int64_t streamId; + int64_t checkpointId; + int32_t taskId; + int32_t nodeId; + int32_t mnodeId; + int64_t expireTime; + int8_t success; +} SStreamCheckpointSourceRsp; + +int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); +int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); + +int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); + +typedef struct SStreamTaskNodeUpdateMsg { + int32_t transId; // to identify the msg + int64_t streamId; + int32_t taskId; + SArray* pNodeList; // SArray +} SStreamTaskNodeUpdateMsg; + +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); + +typedef struct { + int64_t reqId; + int64_t stage; + int64_t streamId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int32_t downstreamTaskId; + int32_t childId; +} SStreamTaskCheckReq; + +int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); +int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); + +typedef struct { + int64_t reqId; + int64_t streamId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int32_t downstreamTaskId; + int32_t childId; + int64_t oldStage; + int8_t status; +} SStreamTaskCheckRsp; + +int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); +int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); + +typedef struct { + SMsgHead msgHead; + int64_t streamId; + int64_t checkpointId; + int32_t downstreamTaskId; + int32_t downstreamNodeId; + int32_t upstreamTaskId; + int32_t upstreamNodeId; + int32_t childId; +} SStreamCheckpointReadyMsg; + +int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); +int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); + +struct SStreamDispatchReq { + int32_t type; + int64_t stage; // nodeId from upstream task + int64_t streamId; + int32_t taskId; + int32_t msgId; // msg id to identify if the incoming msg from the same sender + int32_t srcVgId; + int32_t upstreamTaskId; + int32_t upstreamChildId; + int32_t upstreamNodeId; + int32_t upstreamRelTaskId; + int32_t blockNum; + int64_t totalLen; + SArray* dataLen; // SArray + SArray* data; // SArray +}; + +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq); +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq); +void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq); + +struct SStreamRetrieveReq { + int64_t streamId; + int64_t reqId; + int32_t srcTaskId; + int32_t srcNodeId; + int32_t dstTaskId; + int32_t dstNodeId; + int32_t retrieveLen; + SRetrieveTableRsp* pRetrieve; +}; + +int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq); +int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq); +void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq); + +typedef struct SStreamTaskCheckpointReq { + int64_t streamId; + int32_t taskId; + int32_t nodeId; +} SStreamTaskCheckpointReq; + +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); + +typedef struct SStreamHbMsg { + int32_t vgId; + int32_t numOfTasks; + SArray* pTaskStatus; // SArray + SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. +} SStreamHbMsg; + +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); +void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); + +typedef struct { + SMsgHead head; + int64_t streamId; + int32_t taskId; + int32_t reqType; +} SStreamTaskRunReq; + +#endif // TDENGINE_STREAMMSG_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1b4636120b..27a9168a99 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _STREAM_H_ -#define _STREAM_H_ +#ifndef TDENGINE_TSTREAM_H +#define TDENGINE_TSTREAM_H #include "os.h" #include "streamState.h" @@ -24,6 +24,7 @@ #include "tmsgcb.h" #include "tqueue.h" #include "ttimer.h" +#include "streammsg.h" #ifdef __cplusplus extern "C" { @@ -150,6 +151,8 @@ typedef enum EStreamTaskEvent { TASK_EVENT_DROPPING = 0xA, } EStreamTaskEvent; +typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); + typedef struct { int8_t type; } SStreamQueueItem; @@ -194,30 +197,6 @@ struct SStreamQueueNode { SStreamQueueNode* next; }; -typedef struct { - SStreamQueueNode* head; - int64_t size; -} SStreamQueueRes; - -#if 0 -bool streamQueueResEmpty(const SStreamQueueRes* pRes); -int64_t streamQueueResSize(const SStreamQueueRes* pRes); -SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes); -SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes); -void streamQueueResClear(SStreamQueueRes* pRes); -SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode); -#endif - -typedef struct { - SStreamQueueNode* pHead; -} SStreamQueue1; - -#if 0 -bool streamQueueHasTask(const SStreamQueue1* pQueue); -int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); -SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); -#endif - SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); @@ -283,15 +262,6 @@ typedef struct { int8_t reserved; } STaskSinkFetch; -typedef struct SStreamChildEpInfo { - int32_t nodeId; - int32_t childId; - int32_t taskId; - SEpSet epSet; - bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it - int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer -} SStreamChildEpInfo; - typedef struct STaskId { int64_t streamId; int64_t taskId; @@ -330,7 +300,6 @@ typedef struct SStreamStatus { int64_t lastExecTs; // last exec time stamp int32_t inScanHistorySentinel; bool appendTranstateBlock; // has append the transfer state data block already - bool supplementaryWalscan; // complete the supplementary wal scan or not } SStreamStatus; typedef struct SDataRange { @@ -349,6 +318,7 @@ typedef struct SSTaskBasicInfo { int64_t triggerParam; // in msec } SSTaskBasicInfo; +typedef struct SStreamRetrieveReq SStreamRetrieveReq; typedef struct SStreamDispatchReq SStreamDispatchReq; typedef struct STokenBucket STokenBucket; typedef struct SMetaHbInfo SMetaHbInfo; @@ -560,9 +530,6 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; -int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); -int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); - SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); @@ -573,35 +540,12 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); +// stream task queue related API int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); bool streamQueueIsFull(const SStreamQueue* pQueue); -typedef struct { - SMsgHead head; - int64_t streamId; - int32_t taskId; - int32_t reqType; -} SStreamTaskRunReq; - -struct SStreamDispatchReq { - int32_t type; - int64_t stage; // nodeId from upstream task - int64_t streamId; - int32_t taskId; - int32_t msgId; // msg id to identify if the incoming msg from the same sender - int32_t srcVgId; - int32_t upstreamTaskId; - int32_t upstreamChildId; - int32_t upstreamNodeId; - int32_t upstreamRelTaskId; - int32_t blockNum; - int64_t totalLen; - SArray* dataLen; // SArray - SArray* data; // SArray -}; - typedef struct { int64_t streamId; int32_t upstreamNodeId; @@ -613,17 +557,6 @@ typedef struct { int64_t stage; } SStreamDispatchRsp; -typedef struct { - int64_t streamId; - int64_t reqId; - int32_t srcTaskId; - int32_t srcNodeId; - int32_t dstTaskId; - int32_t dstNodeId; - int32_t retrieveLen; - SRetrieveTableRsp* pRetrieve; -} SStreamRetrieveReq; - typedef struct { int64_t streamId; int32_t childId; @@ -631,29 +564,6 @@ typedef struct { int32_t rspToTaskId; } SStreamRetrieveRsp; -typedef struct { - int64_t reqId; - int64_t stage; - int64_t streamId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int32_t downstreamTaskId; - int32_t childId; -} SStreamTaskCheckReq; - -typedef struct { - int64_t reqId; - int64_t streamId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int32_t downstreamTaskId; - int32_t childId; - int64_t oldStage; - int8_t status; -} SStreamTaskCheckRsp; - typedef struct { SMsgHead msgHead; int64_t streamId; @@ -661,48 +571,6 @@ typedef struct { int8_t igUntreated; } SStreamScanHistoryReq; -// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished -typedef struct { - int64_t streamId; - int64_t checkpointId; - int32_t taskId; - int32_t nodeId; - SEpSet mgmtEps; - int32_t mnodeId; - int32_t transId; - int8_t mndTrigger; - int64_t expireTime; -} SStreamCheckpointSourceReq; - -typedef struct { - int64_t streamId; - int64_t checkpointId; - int32_t taskId; - int32_t nodeId; - int32_t mnodeId; - int64_t expireTime; - int8_t success; -} SStreamCheckpointSourceRsp; - -int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); -int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); - -int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); - -typedef struct { - SMsgHead msgHead; - int64_t streamId; - int64_t checkpointId; - int32_t downstreamTaskId; - int32_t downstreamNodeId; - int32_t upstreamTaskId; - int32_t upstreamNodeId; - int32_t childId; -} SStreamCheckpointReadyMsg; - -int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); -int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); - typedef struct STaskCkptInfo { int64_t latestId; // saved checkpoint id int64_t latestVer; // saved checkpoint ver @@ -733,77 +601,26 @@ typedef struct STaskStatusEntry { STaskCkptInfo checkpointInfo; } STaskStatusEntry; -typedef struct SStreamHbMsg { - int32_t vgId; - int32_t numOfTasks; - SArray* pTaskStatus; // SArray - SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. -} SStreamHbMsg; - -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); -void streamMetaClearHbMsg(SStreamHbMsg* pMsg); - typedef struct SNodeUpdateInfo { int32_t nodeId; SEpSet prevEp; SEpSet newEp; } SNodeUpdateInfo; -typedef struct SStreamTaskNodeUpdateMsg { - int32_t transId; // to identify the msg - int64_t streamId; - int32_t taskId; - SArray* pNodeList; // SArray -} SStreamTaskNodeUpdateMsg; - -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); -int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); - typedef struct SStreamTaskState { ETaskStatus state; char* name; } SStreamTaskState; -int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); -int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); - -int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); -int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); - -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq); -int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); -void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); - -int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); -void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); - -typedef struct SStreamTaskCheckpointReq { - int64_t streamId; - int32_t taskId; - int32_t nodeId; -} SStreamTaskCheckpointReq; - -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); - int32_t streamSetupScheduleTrigger(SStreamTask* pTask); +// dispatch related int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); -int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); -int32_t streamExecTask(SStreamTask* pTask); -int32_t streamResumeTask(SStreamTask* pTask); -int32_t streamTrySchedExec(SStreamTask* pTask); -int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); -int32_t streamTaskResumeInFuture(SStreamTask* pTask); -void streamTaskClearSchedIdleInfo(SStreamTask* pTask); -void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime); bool streamTaskShouldStop(const SStreamTask* pStatus); bool streamTaskShouldPause(const SStreamTask* pStatus); @@ -817,29 +634,38 @@ void streamTaskResetStatus(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask); ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); -void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); - -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); + +// stream task sched bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt); -int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); - -typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); -int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); -int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); -int32_t streamTaskRestoreStatus(SStreamTask* pTask); +int32_t streamExecTask(SStreamTask* pTask); +int32_t streamResumeTask(SStreamTask* pTask); +int32_t streamTrySchedExec(SStreamTask* pTask); +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); +int32_t streamTaskResumeInFuture(SStreamTask* pTask); +void streamTaskClearSchedIdleInfo(SStreamTask* pTask); +void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime); +// check downstream status +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage); void streamTaskSendCheckMsg(SStreamTask* pTask); void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp); -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, - int32_t taskId); -int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); +int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, + SRpcHandleInfo* pRpcInfo, int32_t taskId); +int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); + +// check downstream status +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); + +// fill-history task int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); @@ -859,11 +685,6 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); -int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); - -int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); -int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); -void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); @@ -899,9 +720,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); -void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, - int64_t startTs); - +void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, + int64_t startTs); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); @@ -914,6 +734,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); + +// timer tmr_h streamTimerGetInstance(); // checkpoint @@ -924,17 +746,26 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); -int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, - int32_t setCode); +int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, + int32_t setCode); +// stream task state machine, and event handling SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); void* streamDestroyStateMachine(SStreamTaskSM* pSM); +int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); +int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, + void* param); +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, + void* param); +int32_t streamTaskRestoreStatus(SStreamTask* pTask); -int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req); -void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); +// stream task retrieve related API +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq); +int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq *req); +void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); #ifdef __cplusplus } #endif -#endif /* ifndef _STREAM_H_ */ +#endif /* ifndef TDENGINE_TSTREAM_H */ diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 6a381ad31e..a81a391c3d 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -235,7 +235,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { - streamMetaClearHbMsg(&req); + tCleanupStreamHbMsg(&req); tDecoderClear(&decoder); terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -349,7 +349,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } taosThreadMutexUnlock(&execInfo.lock); - streamMetaClearHbMsg(&req); + tCleanupStreamHbMsg(&req); taosArrayDestroy(pFailedTasks); taosArrayDestroy(pOrphanTasks); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ed8d06080a..79f53e6dec 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1084,7 +1084,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return code; } @@ -1093,7 +1093,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1103,7 +1103,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1114,7 +1114,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1130,7 +1130,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1148,7 +1148,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; @@ -1179,7 +1179,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; @@ -1202,7 +1202,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return code; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4d78f6826c..62c3b06b65 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -303,7 +303,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) { return -1; } - tDeleteStreamDispatchReq(&req); + tCleanupStreamDispatchReq(&req); streamMetaReleaseTask(pMeta, pTask); return 0; } else { @@ -335,7 +335,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId); tmsgSendRsp(&rsp); - tDeleteStreamDispatchReq(&req); + tCleanupStreamDispatchReq(&req); return 0; } @@ -379,7 +379,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (pTask == NULL) { tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.dstTaskId); - tDeleteStreamRetrieveReq(&req); + tCleanupStreamRetrieveReq(&req); return -1; } @@ -389,14 +389,14 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { } else { req.srcNodeId = pTask->info.nodeId; req.srcTaskId = pTask->id.taskId; - code = broadcastRetrieveMsg(pTask, &req); + code = streamTaskBroadcastRetrieveReq(pTask, &req); } SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - sendRetrieveRsp(&req, &rsp); + streamTaskSendRetrieveRsp(&req, &rsp); streamMetaReleaseTask(pMeta, pTask); - tDeleteStreamRetrieveReq(&req); + tCleanupStreamRetrieveReq(&req); return code; } @@ -415,7 +415,7 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tDecoderClear(&decoder); streamTaskProcessCheckMsg(pMeta, &req, &rsp); - return streamSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId); + return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId); } int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { @@ -451,7 +451,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } - code = streamProcessCheckRsp(pTask, &rsp); + code = streamTaskProcessCheckRsp(pTask, &rsp); streamMetaReleaseTask(pMeta, pTask); return code; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index ceb6cd9739..949bc2e60b 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -101,6 +101,8 @@ extern int32_t taskDbWrapperId; int32_t streamTimerInit(); void streamTimerCleanUp(); +void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); + void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); @@ -122,6 +124,8 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); +int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); + void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 9c3908c833..1401dba820 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -137,7 +137,7 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS } -int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { +int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); int64_t now = taosGetTimestampMs(); @@ -202,7 +202,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, +int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId) { SEncoder encoder; int32_t code; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d0fc923f83..f7245acc55 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -129,7 +129,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas return TSDB_CODE_SUCCESS; } -void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) { +void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { taosArrayDestroyP(pReq->data, taosMemoryFree); taosArrayDestroy(pReq->dataLen); } @@ -162,9 +162,9 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { return 0; } -void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } +void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } -void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ +void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); @@ -176,7 +176,7 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ tmsgSendRsp(pRsp); } -int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req) { +int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* req) { int32_t code = 0; void* buf = NULL; int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); @@ -259,7 +259,7 @@ int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock) return code; } - code = broadcastRetrieveMsg(pTask, &req); + code = streamTaskBroadcastRetrieveReq(pTask, &req); taosMemoryFree(req.pRetrieve); return code; @@ -832,7 +832,7 @@ FAIL: return code; } -int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, +int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t setCode) { int32_t len = 0; int32_t code = 0; @@ -874,7 +874,7 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) { SStreamChkptReadyInfo info = {0}; - buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); + streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); if (pTask->pReadyMsgList == NULL) { pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8218319309..877590c4b6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1045,7 +1045,7 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { return false; } -void streamMetaClearHbMsg(SStreamHbMsg* pMsg) { +void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { if (pMsg == NULL) { return; } @@ -1203,7 +1203,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } _end: - streamMetaClearHbMsg(&hbMsg); + tCleanupStreamHbMsg(&hbMsg); return TSDB_CODE_SUCCESS; }