diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h deleted file mode 100644 index 19b033a02e..0000000000 --- a/include/libs/stream/streamMsg.h +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_STREAMMSG_H -#define TDENGINE_STREAMMSG_H - -#include "tmsg.h" -#include "trpc.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct SStreamUpstreamEpInfo { - int32_t nodeId; - int32_t childId; - int32_t taskId; - SEpSet epSet; - bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it - int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer -} SStreamUpstreamEpInfo; - -int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo); -int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo); - -// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished -typedef struct { - int64_t streamId; - int64_t checkpointId; - int32_t taskId; - int32_t nodeId; - SEpSet mgmtEps; - int32_t mnodeId; - int32_t transId; - int8_t mndTrigger; - int64_t expireTime; -} SStreamCheckpointSourceReq; - -int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); -int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); - -typedef struct { - int64_t streamId; - int64_t checkpointId; - int32_t taskId; - int32_t nodeId; - int32_t mnodeId; - int64_t expireTime; - int8_t success; -} SStreamCheckpointSourceRsp; - -int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); - -typedef struct SStreamTaskNodeUpdateMsg { - int32_t transId; // to identify the msg - int64_t streamId; - int32_t taskId; - SArray* pNodeList; // SArray -} SStreamTaskNodeUpdateMsg; - -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); -int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); - -typedef struct { - int64_t reqId; - int64_t stage; - int64_t streamId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int32_t downstreamTaskId; - int32_t childId; -} SStreamTaskCheckReq; - -int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); -int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); - -typedef struct { - int64_t reqId; - int64_t streamId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int32_t downstreamTaskId; - int32_t childId; - int64_t oldStage; - int8_t status; -} SStreamTaskCheckRsp; - -int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); -int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); - -typedef struct { - SMsgHead msgHead; - int64_t streamId; - int64_t checkpointId; - int32_t downstreamTaskId; - int32_t downstreamNodeId; - int32_t upstreamTaskId; - int32_t upstreamNodeId; - int32_t childId; -} SStreamCheckpointReadyMsg; - -int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); -int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); - -struct SStreamDispatchReq { - int32_t type; - int64_t stage; // nodeId from upstream task - int64_t streamId; - int32_t taskId; - int32_t msgId; // msg id to identify if the incoming msg from the same sender - int32_t srcVgId; - int32_t upstreamTaskId; - int32_t upstreamChildId; - int32_t upstreamNodeId; - int32_t upstreamRelTaskId; - int32_t blockNum; - int64_t totalLen; - SArray* dataLen; // SArray - SArray* data; // SArray -}; - -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq); -int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq); -void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq); - -struct SStreamRetrieveReq { - int64_t streamId; - int64_t reqId; - int32_t srcTaskId; - int32_t srcNodeId; - int32_t dstTaskId; - int32_t dstNodeId; - int32_t retrieveLen; - SRetrieveTableRsp* pRetrieve; -}; - -int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq); -int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq); -void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq); - -typedef struct SStreamTaskCheckpointReq { - int64_t streamId; - int32_t taskId; - int32_t nodeId; -} SStreamTaskCheckpointReq; - -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); - -typedef struct SStreamHbMsg { - int32_t vgId; - int32_t msgId; - int64_t ts; - int32_t numOfTasks; - SArray* pTaskStatus; // SArray - SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. -} SStreamHbMsg; - -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq); -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq); -void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); - -typedef struct { - SMsgHead head; - int32_t msgId; -} SMStreamHbRspMsg; - -int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); -int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp); - -typedef struct SRetrieveChkptTriggerReq { - SMsgHead head; - int64_t streamId; - int64_t checkpointId; - int32_t upstreamNodeId; - int32_t upstreamTaskId; - int32_t downstreamNodeId; - int64_t downstreamTaskId; -} SRetrieveChkptTriggerReq; - -int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq); -int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq); - -typedef struct SCheckpointTriggerRsp { - int64_t streamId; - int64_t checkpointId; - int32_t upstreamTaskId; - int32_t taskId; - int32_t transId; - int32_t rspCode; -} SCheckpointTriggerRsp; - -int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp); -int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp); - -typedef struct SCheckpointReport { - int64_t streamId; - int32_t taskId; - int32_t nodeId; - int64_t checkpointId; - int64_t checkpointVer; - int64_t checkpointTs; - int32_t transId; - int8_t dropHTask; -} SCheckpointReport; - -int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); -int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); - -typedef struct SRestoreCheckpointInfo { - SMsgHead head; - int64_t startTs; - int64_t streamId; - int64_t checkpointId; // latest checkpoint id - int32_t transId; // transaction id of the update the consensus-checkpointId transaction - int32_t taskId; - int32_t nodeId; -} SRestoreCheckpointInfo; - -int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); -int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); - -typedef struct { - SMsgHead head; - int64_t streamId; - int32_t taskId; - int32_t reqType; -} SStreamTaskRunReq; - -int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq); -int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_STREAMMSG_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 05b3e21eba..6b8e9f12a6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -319,11 +319,6 @@ typedef struct SSTaskBasicInfo { SInterval interval; } SSTaskBasicInfo; -typedef struct SStreamRetrieveReq SStreamRetrieveReq; -typedef struct SStreamDispatchReq SStreamDispatchReq; -typedef struct STokenBucket STokenBucket; -typedef struct SMetaHbInfo SMetaHbInfo; - typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data @@ -626,11 +621,11 @@ typedef struct STaskStatusEntry { STaskCkptInfo checkpointInfo; } STaskStatusEntry; -typedef struct SNodeUpdateInfo { - int32_t nodeId; - SEpSet prevEp; - SEpSet newEp; -} SNodeUpdateInfo; +//typedef struct SNodeUpdateInfo { +// int32_t nodeId; +// SEpSet prevEp; +// SEpSet newEp; +//} SNodeUpdateInfo; typedef struct SStreamTaskState { ETaskStatus state; diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index f10eb6a611..39380a0644 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,4 +1,7 @@ aux_source_directory(src COMMON_SRC) +aux_source_directory(src/msg COMMON_MSG_SRC) + +LIST(APPEND COMMON_SRC ${COMMON_MSG_SRC}) if(TD_ENTERPRISE) LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c) diff --git a/source/libs/stream/src/streamMsg.c b/source/common/src/msg/streamMsg.c similarity index 77% rename from source/libs/stream/src/streamMsg.c rename to source/common/src/msg/streamMsg.c index 8e823b7738..c92ab52ac1 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -15,8 +15,48 @@ #include "streamMsg.h" #include "os.h" -#include "tstream.h" -#include "streamInt.h" +#include "tcommon.h" + +typedef struct STaskId { + int64_t streamId; + int64_t taskId; +} STaskId; + +typedef struct STaskCkptInfo { + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t latestSize; // latest checkpoint size + int8_t remoteBackup; // latest checkpoint backup done + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not + int8_t consensusChkptId; // required the consensus-checkpointId + int64_t consensusTs; // +} STaskCkptInfo; + +typedef struct STaskStatusEntry { + STaskId id; + int32_t status; + int32_t statusLastDuration; // to record the last duration of current status + int64_t stage; + int32_t nodeId; + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + double inputQUsed; // in MiB + double inputRate; + double procsThroughput; // duration between one element put into input queue and being processed. + double procsTotal; // duration between one element put into input queue and being processed. + double outputThroughput; // the size of dispatched result blocks in bytes + double outputTotal; // the size of dispatched result blocks in bytes + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size + int64_t startTime; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t hTaskId; + STaskCkptInfo checkpointInfo; +} STaskStatusEntry; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId)); @@ -289,7 +329,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen)); if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) { - stError("invalid dispatch req msg"); + uError("invalid dispatch req msg"); TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); } @@ -697,179 +737,6 @@ _exit: return code; } -int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); - - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); - int32_t taskId = pTask->hTaskInfo.id.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); - taskId = pTask->streamTaskId.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); - - int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); - TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); - TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); - - tEndEncode(pEncoder); -_exit: - return code; -} - -int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - int32_t taskId = 0; - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { - TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); - } - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); - - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); - - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->hTaskInfo.id.taskId = taskId; - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->streamTaskId.taskId = taskId; - - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); - - int32_t epSz = -1; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); - - if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); - if (pInfo == NULL) { - TAOS_CHECK_EXIT(terrno); - } - if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { - taosMemoryFreeClear(pInfo); - goto _exit; - } - if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); - pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { - TAOS_CHECK_EXIT(terrno); - } - TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); - } - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); - - tEndDecode(pDecoder); - -_exit: - return code; -} - int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { int32_t code = 0; int32_t lino; diff --git a/source/common/src/tmsg.c b/source/common/src/msg/tmsg.c similarity index 100% rename from source/common/src/tmsg.c rename to source/common/src/msg/tmsg.c diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 2fe3ef652d..bb12612273 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -46,7 +46,7 @@ if (${TD_LINUX}) target_sources(tmsgTest PRIVATE "tmsgTest.cpp" - "../src/tmsg.c" + "../src/msg/tmsg.c" ) target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/") target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a8019937ff..f46228fd47 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -22,6 +22,7 @@ #include "tstream.h" #include "ttimer.h" #include "wal.h" +#include "streamMsg.h" static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); @@ -1303,4 +1304,178 @@ void streamTaskFreeRefId(int64_t* pRefId) { } metaRefMgtRemove(pRefId); +} + + +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); + int32_t taskId = pTask->hTaskInfo.id.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); + taskId = pTask->streamTaskId.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); + + int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); + TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); + TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); + + tEndEncode(pEncoder); +_exit: + return code; +} + +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + int32_t taskId = 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); + } + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->hTaskInfo.id.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->streamTaskId.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); + + int32_t epSz = -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); + + if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); + if (pInfo == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { + taosMemoryFreeClear(pInfo); + goto _exit; + } + if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); + pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); + } + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); + + tEndDecode(pDecoder); + +_exit: + return code; } \ No newline at end of file