refactor: move to msg.
This commit is contained in:
parent
131be84dec
commit
1c66f22c2b
|
@ -1,251 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_STREAMMSG_H
|
||||
#define TDENGINE_STREAMMSG_H
|
||||
|
||||
#include "tmsg.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SStreamUpstreamEpInfo {
|
||||
int32_t nodeId;
|
||||
int32_t childId;
|
||||
int32_t taskId;
|
||||
SEpSet epSet;
|
||||
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
||||
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
||||
} SStreamUpstreamEpInfo;
|
||||
|
||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
|
||||
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo);
|
||||
|
||||
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int64_t checkpointId;
|
||||
int32_t taskId;
|
||||
int32_t nodeId;
|
||||
SEpSet mgmtEps;
|
||||
int32_t mnodeId;
|
||||
int32_t transId;
|
||||
int8_t mndTrigger;
|
||||
int64_t expireTime;
|
||||
} SStreamCheckpointSourceReq;
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
|
||||
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int64_t checkpointId;
|
||||
int32_t taskId;
|
||||
int32_t nodeId;
|
||||
int32_t mnodeId;
|
||||
int64_t expireTime;
|
||||
int8_t success;
|
||||
} SStreamCheckpointSourceRsp;
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
|
||||
|
||||
typedef struct SStreamTaskNodeUpdateMsg {
|
||||
int32_t transId; // to identify the msg
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
SArray* pNodeList; // SArray<SNodeUpdateInfo>
|
||||
} SStreamTaskNodeUpdateMsg;
|
||||
|
||||
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
|
||||
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
|
||||
|
||||
typedef struct {
|
||||
int64_t reqId;
|
||||
int64_t stage;
|
||||
int64_t streamId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t downstreamNodeId;
|
||||
int32_t downstreamTaskId;
|
||||
int32_t childId;
|
||||
} SStreamTaskCheckReq;
|
||||
|
||||
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
|
||||
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t reqId;
|
||||
int64_t streamId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t downstreamNodeId;
|
||||
int32_t downstreamTaskId;
|
||||
int32_t childId;
|
||||
int64_t oldStage;
|
||||
int8_t status;
|
||||
} SStreamTaskCheckRsp;
|
||||
|
||||
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
|
||||
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead msgHead;
|
||||
int64_t streamId;
|
||||
int64_t checkpointId;
|
||||
int32_t downstreamTaskId;
|
||||
int32_t downstreamNodeId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t childId;
|
||||
} SStreamCheckpointReadyMsg;
|
||||
|
||||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
||||
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
||||
|
||||
struct SStreamDispatchReq {
|
||||
int32_t type;
|
||||
int64_t stage; // nodeId from upstream task
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t msgId; // msg id to identify if the incoming msg from the same sender
|
||||
int32_t srcVgId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t upstreamChildId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t upstreamRelTaskId;
|
||||
int32_t blockNum;
|
||||
int64_t totalLen;
|
||||
SArray* dataLen; // SArray<int32_t>
|
||||
SArray* data; // SArray<SRetrieveTableRsp*>
|
||||
};
|
||||
|
||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq);
|
||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq);
|
||||
void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq);
|
||||
|
||||
struct SStreamRetrieveReq {
|
||||
int64_t streamId;
|
||||
int64_t reqId;
|
||||
int32_t srcTaskId;
|
||||
int32_t srcNodeId;
|
||||
int32_t dstTaskId;
|
||||
int32_t dstNodeId;
|
||||
int32_t retrieveLen;
|
||||
SRetrieveTableRsp* pRetrieve;
|
||||
};
|
||||
|
||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq);
|
||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq);
|
||||
void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq);
|
||||
|
||||
typedef struct SStreamTaskCheckpointReq {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t nodeId;
|
||||
} SStreamTaskCheckpointReq;
|
||||
|
||||
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
|
||||
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
|
||||
|
||||
typedef struct SStreamHbMsg {
|
||||
int32_t vgId;
|
||||
int32_t msgId;
|
||||
int64_t ts;
|
||||
int32_t numOfTasks;
|
||||
SArray* pTaskStatus; // SArray<STaskStatusEntry>
|
||||
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
|
||||
} SStreamHbMsg;
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq);
|
||||
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq);
|
||||
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int32_t msgId;
|
||||
} SMStreamHbRspMsg;
|
||||
|
||||
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp);
|
||||
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp);
|
||||
|
||||
typedef struct SRetrieveChkptTriggerReq {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int64_t checkpointId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t downstreamNodeId;
|
||||
int64_t downstreamTaskId;
|
||||
} SRetrieveChkptTriggerReq;
|
||||
|
||||
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq);
|
||||
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq);
|
||||
|
||||
typedef struct SCheckpointTriggerRsp {
|
||||
int64_t streamId;
|
||||
int64_t checkpointId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t taskId;
|
||||
int32_t transId;
|
||||
int32_t rspCode;
|
||||
} SCheckpointTriggerRsp;
|
||||
|
||||
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp);
|
||||
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp);
|
||||
|
||||
typedef struct SCheckpointReport {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t nodeId;
|
||||
int64_t checkpointId;
|
||||
int64_t checkpointVer;
|
||||
int64_t checkpointTs;
|
||||
int32_t transId;
|
||||
int8_t dropHTask;
|
||||
} SCheckpointReport;
|
||||
|
||||
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq);
|
||||
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq);
|
||||
|
||||
typedef struct SRestoreCheckpointInfo {
|
||||
SMsgHead head;
|
||||
int64_t startTs;
|
||||
int64_t streamId;
|
||||
int64_t checkpointId; // latest checkpoint id
|
||||
int32_t transId; // transaction id of the update the consensus-checkpointId transaction
|
||||
int32_t taskId;
|
||||
int32_t nodeId;
|
||||
} SRestoreCheckpointInfo;
|
||||
|
||||
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
|
||||
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t reqType;
|
||||
} SStreamTaskRunReq;
|
||||
|
||||
int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq);
|
||||
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_STREAMMSG_H
|
|
@ -319,11 +319,6 @@ typedef struct SSTaskBasicInfo {
|
|||
SInterval interval;
|
||||
} SSTaskBasicInfo;
|
||||
|
||||
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
|
||||
typedef struct SStreamDispatchReq SStreamDispatchReq;
|
||||
typedef struct STokenBucket STokenBucket;
|
||||
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||
|
||||
typedef struct SDispatchMsgInfo {
|
||||
SStreamDispatchReq* pData; // current dispatch data
|
||||
|
||||
|
@ -626,11 +621,11 @@ typedef struct STaskStatusEntry {
|
|||
STaskCkptInfo checkpointInfo;
|
||||
} STaskStatusEntry;
|
||||
|
||||
typedef struct SNodeUpdateInfo {
|
||||
int32_t nodeId;
|
||||
SEpSet prevEp;
|
||||
SEpSet newEp;
|
||||
} SNodeUpdateInfo;
|
||||
//typedef struct SNodeUpdateInfo {
|
||||
// int32_t nodeId;
|
||||
// SEpSet prevEp;
|
||||
// SEpSet newEp;
|
||||
//} SNodeUpdateInfo;
|
||||
|
||||
typedef struct SStreamTaskState {
|
||||
ETaskStatus state;
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
aux_source_directory(src COMMON_SRC)
|
||||
aux_source_directory(src/msg COMMON_MSG_SRC)
|
||||
|
||||
LIST(APPEND COMMON_SRC ${COMMON_MSG_SRC})
|
||||
|
||||
if(TD_ENTERPRISE)
|
||||
LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c)
|
||||
|
|
|
@ -15,8 +15,48 @@
|
|||
|
||||
#include "streamMsg.h"
|
||||
#include "os.h"
|
||||
#include "tstream.h"
|
||||
#include "streamInt.h"
|
||||
#include "tcommon.h"
|
||||
|
||||
typedef struct STaskId {
|
||||
int64_t streamId;
|
||||
int64_t taskId;
|
||||
} STaskId;
|
||||
|
||||
typedef struct STaskCkptInfo {
|
||||
int64_t latestId; // saved checkpoint id
|
||||
int64_t latestVer; // saved checkpoint ver
|
||||
int64_t latestTime; // latest checkpoint time
|
||||
int64_t latestSize; // latest checkpoint size
|
||||
int8_t remoteBackup; // latest checkpoint backup done
|
||||
int64_t activeId; // current active checkpoint id
|
||||
int32_t activeTransId; // checkpoint trans id
|
||||
int8_t failed; // denote if the checkpoint is failed or not
|
||||
int8_t consensusChkptId; // required the consensus-checkpointId
|
||||
int64_t consensusTs; //
|
||||
} STaskCkptInfo;
|
||||
|
||||
typedef struct STaskStatusEntry {
|
||||
STaskId id;
|
||||
int32_t status;
|
||||
int32_t statusLastDuration; // to record the last duration of current status
|
||||
int64_t stage;
|
||||
int32_t nodeId;
|
||||
SVersionRange verRange; // start/end version in WAL, only valid for source task
|
||||
int64_t processedVer; // only valid for source task
|
||||
double inputQUsed; // in MiB
|
||||
double inputRate;
|
||||
double procsThroughput; // duration between one element put into input queue and being processed.
|
||||
double procsTotal; // duration between one element put into input queue and being processed.
|
||||
double outputThroughput; // the size of dispatched result blocks in bytes
|
||||
double outputTotal; // the size of dispatched result blocks in bytes
|
||||
double sinkQuota; // existed quota size for sink task
|
||||
double sinkDataSize; // sink to dst data size
|
||||
int64_t startTime;
|
||||
int64_t startCheckpointId;
|
||||
int64_t startCheckpointVer;
|
||||
int64_t hTaskId;
|
||||
STaskCkptInfo checkpointInfo;
|
||||
} STaskStatusEntry;
|
||||
|
||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
|
||||
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
|
||||
|
@ -289,7 +329,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
|||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
|
||||
|
||||
if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
|
||||
stError("invalid dispatch req msg");
|
||||
uError("invalid dispatch req msg");
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
|
@ -697,179 +737,6 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
|
||||
int32_t taskId = pTask->hTaskInfo.id.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
|
||||
taskId = pTask->streamTaskId.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
|
||||
|
||||
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
|
||||
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||
int32_t taskId = 0;
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
|
||||
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->hTaskInfo.id.taskId = taskId;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->streamTaskId.taskId = taskId;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
|
||||
|
||||
int32_t epSz = -1;
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
|
||||
|
||||
if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
|
||||
if (pInfo == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
goto _exit;
|
||||
}
|
||||
if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
|
@ -46,7 +46,7 @@ if (${TD_LINUX})
|
|||
target_sources(tmsgTest
|
||||
PRIVATE
|
||||
"tmsgTest.cpp"
|
||||
"../src/tmsg.c"
|
||||
"../src/msg/tmsg.c"
|
||||
)
|
||||
target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/")
|
||||
target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main)
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "tstream.h"
|
||||
#include "ttimer.h"
|
||||
#include "wal.h"
|
||||
#include "streamMsg.h"
|
||||
|
||||
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
||||
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
|
||||
|
@ -1303,4 +1304,178 @@ void streamTaskFreeRefId(int64_t* pRefId) {
|
|||
}
|
||||
|
||||
metaRefMgtRemove(pRefId);
|
||||
}
|
||||
|
||||
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
|
||||
int32_t taskId = pTask->hTaskInfo.id.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
|
||||
taskId = pTask->streamTaskId.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
|
||||
|
||||
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
|
||||
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||
int32_t taskId = 0;
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
|
||||
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->hTaskInfo.id.taskId = taskId;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->streamTaskId.taskId = taskId;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
|
||||
|
||||
int32_t epSz = -1;
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
|
||||
|
||||
if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
|
||||
if (pInfo == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
goto _exit;
|
||||
}
|
||||
if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
Loading…
Reference in New Issue