refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-07 23:43:21 +08:00
parent b6debd985b
commit cdeb4462fb
9 changed files with 255 additions and 245 deletions

View File

@ -0,0 +1,175 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_STREAMMSG_H
#define TDENGINE_STREAMMSG_H
typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
SEpSet epSet;
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
} SStreamChildEpInfo;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
SEpSet mgmtEps;
int32_t mnodeId;
int32_t transId;
int8_t mndTrigger;
int64_t expireTime;
} SStreamCheckpointSourceReq;
typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
int32_t mnodeId;
int64_t expireTime;
int8_t success;
} SStreamCheckpointSourceRsp;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
typedef struct SStreamTaskNodeUpdateMsg {
int32_t transId; // to identify the msg
int64_t streamId;
int32_t taskId;
SArray* pNodeList; // SArray<SNodeUpdateInfo>
} SStreamTaskNodeUpdateMsg;
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
typedef struct {
int64_t reqId;
int64_t stage;
int64_t streamId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
} SStreamTaskCheckReq;
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
typedef struct {
int64_t reqId;
int64_t streamId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
int64_t oldStage;
int8_t status;
} SStreamTaskCheckRsp;
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
typedef struct {
SMsgHead msgHead;
int64_t streamId;
int64_t checkpointId;
int32_t downstreamTaskId;
int32_t downstreamNodeId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
} SStreamCheckpointReadyMsg;
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
struct SStreamDispatchReq {
int32_t type;
int64_t stage; // nodeId from upstream task
int64_t streamId;
int32_t taskId;
int32_t msgId; // msg id to identify if the incoming msg from the same sender
int32_t srcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
int32_t upstreamRelTaskId;
int32_t blockNum;
int64_t totalLen;
SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*>
};
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq);
void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq);
struct SStreamRetrieveReq {
int64_t streamId;
int64_t reqId;
int32_t srcTaskId;
int32_t srcNodeId;
int32_t dstTaskId;
int32_t dstNodeId;
int32_t retrieveLen;
SRetrieveTableRsp* pRetrieve;
};
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq);
void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq);
typedef struct SStreamTaskCheckpointReq {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SStreamTaskCheckpointReq;
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
int32_t reqType;
} SStreamTaskRunReq;
#endif // TDENGINE_STREAMMSG_H

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_H_
#define _STREAM_H_
#ifndef TDENGINE_TSTREAM_H
#define TDENGINE_TSTREAM_H
#include "os.h"
#include "streamState.h"
@ -24,6 +24,7 @@
#include "tmsgcb.h"
#include "tqueue.h"
#include "ttimer.h"
#include "streammsg.h"
#ifdef __cplusplus
extern "C" {
@ -150,6 +151,8 @@ typedef enum EStreamTaskEvent {
TASK_EVENT_DROPPING = 0xA,
} EStreamTaskEvent;
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
typedef struct {
int8_t type;
} SStreamQueueItem;
@ -194,30 +197,6 @@ struct SStreamQueueNode {
SStreamQueueNode* next;
};
typedef struct {
SStreamQueueNode* head;
int64_t size;
} SStreamQueueRes;
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes);
int64_t streamQueueResSize(const SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes);
void streamQueueResClear(SStreamQueueRes* pRes);
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode);
#endif
typedef struct {
SStreamQueueNode* pHead;
} SStreamQueue1;
#if 0
bool streamQueueHasTask(const SStreamQueue1* pQueue);
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
#endif
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
@ -283,15 +262,6 @@ typedef struct {
int8_t reserved;
} STaskSinkFetch;
typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
SEpSet epSet;
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
} SStreamChildEpInfo;
typedef struct STaskId {
int64_t streamId;
int64_t taskId;
@ -330,7 +300,6 @@ typedef struct SStreamStatus {
int64_t lastExecTs; // last exec time stamp
int32_t inScanHistorySentinel;
bool appendTranstateBlock; // has append the transfer state data block already
bool supplementaryWalscan; // complete the supplementary wal scan or not
} SStreamStatus;
typedef struct SDataRange {
@ -349,6 +318,7 @@ typedef struct SSTaskBasicInfo {
int64_t triggerParam; // in msec
} SSTaskBasicInfo;
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
typedef struct SStreamDispatchReq SStreamDispatchReq;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;
@ -560,9 +530,6 @@ typedef struct STaskUpdateEntry {
int32_t transId;
} STaskUpdateEntry;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
@ -573,35 +540,12 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId);
// stream task queue related API
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
bool streamQueueIsFull(const SStreamQueue* pQueue);
typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
int32_t reqType;
} SStreamTaskRunReq;
struct SStreamDispatchReq {
int32_t type;
int64_t stage; // nodeId from upstream task
int64_t streamId;
int32_t taskId;
int32_t msgId; // msg id to identify if the incoming msg from the same sender
int32_t srcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
int32_t upstreamRelTaskId;
int32_t blockNum;
int64_t totalLen;
SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*>
};
typedef struct {
int64_t streamId;
int32_t upstreamNodeId;
@ -613,17 +557,6 @@ typedef struct {
int64_t stage;
} SStreamDispatchRsp;
typedef struct {
int64_t streamId;
int64_t reqId;
int32_t srcTaskId;
int32_t srcNodeId;
int32_t dstTaskId;
int32_t dstNodeId;
int32_t retrieveLen;
SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;
typedef struct {
int64_t streamId;
int32_t childId;
@ -631,29 +564,6 @@ typedef struct {
int32_t rspToTaskId;
} SStreamRetrieveRsp;
typedef struct {
int64_t reqId;
int64_t stage;
int64_t streamId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
} SStreamTaskCheckReq;
typedef struct {
int64_t reqId;
int64_t streamId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
int64_t oldStage;
int8_t status;
} SStreamTaskCheckRsp;
typedef struct {
SMsgHead msgHead;
int64_t streamId;
@ -661,48 +571,6 @@ typedef struct {
int8_t igUntreated;
} SStreamScanHistoryReq;
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
SEpSet mgmtEps;
int32_t mnodeId;
int32_t transId;
int8_t mndTrigger;
int64_t expireTime;
} SStreamCheckpointSourceReq;
typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
int32_t mnodeId;
int64_t expireTime;
int8_t success;
} SStreamCheckpointSourceRsp;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
typedef struct {
SMsgHead msgHead;
int64_t streamId;
int64_t checkpointId;
int32_t downstreamTaskId;
int32_t downstreamNodeId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
} SStreamCheckpointReadyMsg;
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
typedef struct STaskCkptInfo {
int64_t latestId; // saved checkpoint id
int64_t latestVer; // saved checkpoint ver
@ -733,77 +601,26 @@ typedef struct STaskStatusEntry {
STaskCkptInfo checkpointInfo;
} STaskStatusEntry;
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void streamMetaClearHbMsg(SStreamHbMsg* pMsg);
typedef struct SNodeUpdateInfo {
int32_t nodeId;
SEpSet prevEp;
SEpSet newEp;
} SNodeUpdateInfo;
typedef struct SStreamTaskNodeUpdateMsg {
int32_t transId; // to identify the msg
int64_t streamId;
int32_t taskId;
SArray* pNodeList; // SArray<SNodeUpdateInfo>
} SStreamTaskNodeUpdateMsg;
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
typedef struct SStreamTaskState {
ETaskStatus state;
char* name;
} SStreamTaskState;
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
typedef struct SStreamTaskCheckpointReq {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SStreamTaskCheckpointReq;
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
// dispatch related
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamExecTask(SStreamTask* pTask);
int32_t streamResumeTask(SStreamTask* pTask);
int32_t streamTrySchedExec(SStreamTask* pTask);
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
int32_t streamTaskResumeInFuture(SStreamTask* pTask);
void streamTaskClearSchedIdleInfo(SStreamTask* pTask);
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime);
bool streamTaskShouldStop(const SStreamTask* pStatus);
bool streamTaskShouldPause(const SStreamTask* pStatus);
@ -817,29 +634,38 @@ void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask);
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
// stream task sched
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamExecTask(SStreamTask* pTask);
int32_t streamResumeTask(SStreamTask* pTask);
int32_t streamTrySchedExec(SStreamTask* pTask);
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
int32_t streamTaskResumeInFuture(SStreamTask* pTask);
void streamTaskClearSchedIdleInfo(SStreamTask* pTask);
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime);
// check downstream status
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
void streamTaskSendCheckMsg(SStreamTask* pTask);
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo,
int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
// check downstream status
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
// fill-history task
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
@ -859,11 +685,6 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
@ -899,9 +720,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId,
int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs);
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
@ -914,6 +734,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
// timer
tmr_h streamTimerGetInstance();
// checkpoint
@ -924,17 +746,26 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t setCode);
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t setCode);
// stream task state machine, and event handling
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn,
void* param);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn,
void* param);
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req);
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
// stream task retrieve related API
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq);
int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq *req);
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
#ifdef __cplusplus
}
#endif
#endif /* ifndef _STREAM_H_ */
#endif /* ifndef TDENGINE_TSTREAM_H */

View File

@ -235,7 +235,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
streamMetaClearHbMsg(&req);
tCleanupStreamHbMsg(&req);
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
@ -349,7 +349,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req);
tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedTasks);
taosArrayDestroy(pOrphanTasks);

View File

@ -1084,7 +1084,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return code;
}
@ -1093,7 +1093,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1103,7 +1103,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1114,7 +1114,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1130,7 +1130,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1148,7 +1148,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
@ -1179,7 +1179,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
@ -1202,7 +1202,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return code;
}

View File

@ -303,7 +303,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
return -1;
}
tDeleteStreamDispatchReq(&req);
tCleanupStreamDispatchReq(&req);
streamMetaReleaseTask(pMeta, pTask);
return 0;
} else {
@ -335,7 +335,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
tmsgSendRsp(&rsp);
tDeleteStreamDispatchReq(&req);
tCleanupStreamDispatchReq(&req);
return 0;
}
@ -379,7 +379,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
if (pTask == NULL) {
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.dstTaskId);
tDeleteStreamRetrieveReq(&req);
tCleanupStreamRetrieveReq(&req);
return -1;
}
@ -389,14 +389,14 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
} else {
req.srcNodeId = pTask->info.nodeId;
req.srcTaskId = pTask->id.taskId;
code = broadcastRetrieveMsg(pTask, &req);
code = streamTaskBroadcastRetrieveReq(pTask, &req);
}
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
sendRetrieveRsp(&req, &rsp);
streamTaskSendRetrieveRsp(&req, &rsp);
streamMetaReleaseTask(pMeta, pTask);
tDeleteStreamRetrieveReq(&req);
tCleanupStreamRetrieveReq(&req);
return code;
}
@ -415,7 +415,7 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
streamTaskProcessCheckMsg(pMeta, &req, &rsp);
return streamSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
}
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
@ -451,7 +451,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
}
code = streamProcessCheckRsp(pTask, &rsp);
code = streamTaskProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pMeta, pTask);
return code;
}

View File

@ -101,6 +101,8 @@ extern int32_t taskDbWrapperId;
int32_t streamTimerInit();
void streamTimerCleanUp();
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
@ -122,6 +124,8 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);

View File

@ -137,7 +137,7 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
int64_t now = taosGetTimestampMs();
@ -202,7 +202,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;

View File

@ -129,7 +129,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
return TSDB_CODE_SUCCESS;
}
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroy(pReq->dataLen);
}
@ -162,9 +162,9 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
return 0;
}
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
@ -176,7 +176,7 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
tmsgSendRsp(pRsp);
}
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req) {
int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* req) {
int32_t code = 0;
void* buf = NULL;
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
@ -259,7 +259,7 @@ int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock)
return code;
}
code = broadcastRetrieveMsg(pTask, &req);
code = streamTaskBroadcastRetrieveReq(pTask, &req);
taosMemoryFree(req.pRetrieve);
return code;
@ -832,7 +832,7 @@ FAIL:
return code;
}
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t setCode) {
int32_t len = 0;
int32_t code = 0;
@ -874,7 +874,7 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
SStreamChkptReadyInfo info = {0};
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));

View File

@ -1045,7 +1045,7 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
return false;
}
void streamMetaClearHbMsg(SStreamHbMsg* pMsg) {
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
if (pMsg == NULL) {
return;
}
@ -1203,7 +1203,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
}
_end:
streamMetaClearHbMsg(&hbMsg);
tCleanupStreamHbMsg(&hbMsg);
return TSDB_CODE_SUCCESS;
}