diff --git a/include/libs/stream/streamMsg.h b/include/common/streamMsg.h similarity index 85% rename from include/libs/stream/streamMsg.h rename to include/common/streamMsg.h index 0ceaa93a72..3db92ba58d 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/common/streamMsg.h @@ -17,12 +17,23 @@ #define TDENGINE_STREAMMSG_H #include "tmsg.h" -#include "trpc.h" +//#include "trpc.h" #ifdef __cplusplus extern "C" { #endif +typedef struct SStreamRetrieveReq SStreamRetrieveReq; +typedef struct SStreamDispatchReq SStreamDispatchReq; +typedef struct STokenBucket STokenBucket; +typedef struct SMetaHbInfo SMetaHbInfo; + +typedef struct SNodeUpdateInfo { + int32_t nodeId; + SEpSet prevEp; + SEpSet newEp; +} SNodeUpdateInfo; + typedef struct SStreamUpstreamEpInfo { int32_t nodeId; int32_t childId; @@ -170,8 +181,8 @@ typedef struct SStreamHbMsg { SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. } SStreamHbMsg; -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq); +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq); void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); typedef struct { @@ -179,6 +190,9 @@ typedef struct { int32_t msgId; } SMStreamHbRspMsg; +int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); +int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp); + typedef struct SRetrieveChkptTriggerReq { SMsgHead head; int64_t streamId; @@ -189,6 +203,9 @@ typedef struct SRetrieveChkptTriggerReq { int64_t downstreamTaskId; } SRetrieveChkptTriggerReq; +int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq); +int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq); + typedef struct SCheckpointTriggerRsp { int64_t streamId; int64_t checkpointId; @@ -198,6 +215,9 @@ typedef struct SCheckpointTriggerRsp { int32_t rspCode; } SCheckpointTriggerRsp; +int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp); +int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp); + typedef struct SCheckpointReport { int64_t streamId; int32_t taskId; @@ -222,7 +242,7 @@ typedef struct SRestoreCheckpointInfo { int32_t nodeId; } SRestoreCheckpointInfo; -int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); +int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); typedef struct { @@ -232,10 +252,8 @@ typedef struct { int32_t reqType; } SStreamTaskRunReq; -typedef struct SCheckpointConsensusEntry { - SRestoreCheckpointInfo req; - int64_t ts; -} SCheckpointConsensusEntry; +int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq); +int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq); #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index acf1d5e182..2294cf6f73 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3798,7 +3798,14 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; -} SVPauseStreamTaskReq, SVResetStreamTaskReq; +} SVPauseStreamTaskReq; + +typedef struct { + SMsgHead head; + int64_t streamId; + int32_t taskId; + int64_t chkptId; +} SVResetStreamTaskReq; typedef struct { char name[TSDB_STREAM_FNAME_LEN]; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2cf791c8da..6b8e9f12a6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -319,11 +319,6 @@ typedef struct SSTaskBasicInfo { SInterval interval; } SSTaskBasicInfo; -typedef struct SStreamRetrieveReq SStreamRetrieveReq; -typedef struct SStreamDispatchReq SStreamDispatchReq; -typedef struct STokenBucket STokenBucket; -typedef struct SMetaHbInfo SMetaHbInfo; - typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data @@ -626,11 +621,11 @@ typedef struct STaskStatusEntry { STaskCkptInfo checkpointInfo; } STaskStatusEntry; -typedef struct SNodeUpdateInfo { - int32_t nodeId; - SEpSet prevEp; - SEpSet newEp; -} SNodeUpdateInfo; +//typedef struct SNodeUpdateInfo { +// int32_t nodeId; +// SEpSet prevEp; +// SEpSet newEp; +//} SNodeUpdateInfo; typedef struct SStreamTaskState { ETaskStatus state; @@ -643,6 +638,11 @@ typedef struct SCheckpointConsensusInfo { int64_t streamId; } SCheckpointConsensusInfo; +typedef struct SCheckpointConsensusEntry { + SRestoreCheckpointInfo req; + int64_t ts; +} SCheckpointConsensusEntry; + void streamSetupScheduleTrigger(SStreamTask* pTask); // dispatch related @@ -718,6 +718,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pInfo, int32_t code); +void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2c811495fd..e33af33d0e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1011,6 +1011,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106) #define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107) #define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108) +#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index f10eb6a611..39380a0644 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,4 +1,7 @@ aux_source_directory(src COMMON_SRC) +aux_source_directory(src/msg COMMON_MSG_SRC) + +LIST(APPEND COMMON_SRC ${COMMON_MSG_SRC}) if(TD_ENTERPRISE) LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c) diff --git a/source/libs/stream/src/streamMsg.c b/source/common/src/msg/streamMsg.c similarity index 75% rename from source/libs/stream/src/streamMsg.c rename to source/common/src/msg/streamMsg.c index 193daa0cc4..c92ab52ac1 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -15,8 +15,48 @@ #include "streamMsg.h" #include "os.h" -#include "tstream.h" -#include "streamInt.h" +#include "tcommon.h" + +typedef struct STaskId { + int64_t streamId; + int64_t taskId; +} STaskId; + +typedef struct STaskCkptInfo { + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t latestSize; // latest checkpoint size + int8_t remoteBackup; // latest checkpoint backup done + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not + int8_t consensusChkptId; // required the consensus-checkpointId + int64_t consensusTs; // +} STaskCkptInfo; + +typedef struct STaskStatusEntry { + STaskId id; + int32_t status; + int32_t statusLastDuration; // to record the last duration of current status + int64_t stage; + int32_t nodeId; + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + double inputQUsed; // in MiB + double inputRate; + double procsThroughput; // duration between one element put into input queue and being processed. + double procsTotal; // duration between one element put into input queue and being processed. + double outputThroughput; // the size of dispatched result blocks in bytes + double outputTotal; // the size of dispatched result blocks in bytes + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size + int64_t startTime; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t hTaskId; + STaskCkptInfo checkpointInfo; +} STaskStatusEntry; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId)); @@ -289,7 +329,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen)); if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) { - stError("invalid dispatch req msg"); + uError("invalid dispatch req msg"); TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); } @@ -605,173 +645,92 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { pMsg->numOfTasks = -1; } -int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { +int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) { int32_t code = 0; int32_t lino; TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); - - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); - int32_t taskId = pTask->hTaskInfo.id.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); - taskId = pTask->streamTaskId.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); - - int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); - TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); - TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); - + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId)); tEndEncode(pEncoder); + _exit: return code; } -int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - int32_t taskId = 0; +int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) { int32_t code = 0; int32_t lino; TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { - TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); - } + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId)); + tEndDecode(pDecoder); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); +_exit: + return code; +} - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); +int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq) { + int32_t code = 0; + int32_t lino; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->downstreamTaskId)); + tEndEncode(pEncoder); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); +_exit: + return code; +} - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->hTaskInfo.id.taskId = taskId; +int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq) { + int32_t code = 0; + int32_t lino; - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->streamTaskId.taskId = taskId; + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->downstreamTaskId)); + tEndDecode(pDecoder); - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); +_exit: + return code; +} - int32_t epSz = -1; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); +int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp) { + int32_t code = 0; + int32_t lino; - if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); - if (pInfo == NULL) { - TAOS_CHECK_EXIT(terrno); - } - if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { - taosMemoryFreeClear(pInfo); - goto _exit; - } - if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->transId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->rspCode)); + tEndEncode(pEncoder); - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); - } +_exit: + return code; +} - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); - pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { - TAOS_CHECK_EXIT(terrno); - } - TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); - } - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); +int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp) { + int32_t code = 0; + int32_t lino; + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->transId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->rspCode)); tEndDecode(pDecoder); _exit: @@ -830,11 +789,7 @@ int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpoin tEndEncode(pEncoder); _exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } + return code; } int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { @@ -853,3 +808,31 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* _exit: return code; } + +int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType)); + tEndDecode(pDecoder); + +_exit: + return code; +} \ No newline at end of file diff --git a/source/common/src/tmsg.c b/source/common/src/msg/tmsg.c similarity index 100% rename from source/common/src/tmsg.c rename to source/common/src/msg/tmsg.c diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 2fe3ef652d..bb12612273 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -46,7 +46,7 @@ if (${TD_LINUX}) target_sources(tmsgTest PRIVATE "tmsgTest.cpp" - "../src/tmsg.c" + "../src/msg/tmsg.c" ) target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/") target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index c9155f536c..b519d8509a 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -56,6 +56,7 @@ typedef struct SStreamTransMgmt { typedef struct SStreamTaskResetMsg { int64_t streamId; int32_t transId; + int64_t checkpointId; } SStreamTaskResetMsg; typedef struct SChkptReportInfo { @@ -142,9 +143,9 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); -int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId); int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts); int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream); int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 81db427afd..6336cd6e49 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2434,7 +2434,12 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, pReport->taskId, p->checkpointId, pReport->checkpointId); - memcpy(p, pReport, sizeof(STaskChkptInfo)); + // update the checkpoint report info + p->checkpointId = pReport->checkpointId; + p->ts = pReport->checkpointTs; + p->version = pReport->checkpointVer; + p->transId = pReport->transId; + p->dropHTask = pReport->dropHTask; } else { mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId); } diff --git a/source/dnode/mnode/impl/src/mndStreamErrorInjection.c b/source/dnode/mnode/impl/src/mndStreamErrorInjection.c new file mode 100644 index 0000000000..c68416369d --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStreamErrorInjection.c @@ -0,0 +1,72 @@ +#include "mndTrans.h" + +uint32_t seed = 0; + +static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; + rpcMsg.pCont = rpcMallocCont(pAction->contLen); + if (rpcMsg.pCont == NULL) { + return rpcMsg; + } + + rpcMsg.info.traceId.rootId = traceId; + rpcMsg.info.notFreeAhandle = 1; + + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + return rpcMsg; +} + +void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) { + if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) || + (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) || + (pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) { + if (seed == 0) { + seed = taosGetTimestampSec(); + } + + uint32_t v = taosRandR(&seed); + int32_t choseItem = v % 5; + + if (choseItem == 0) { + // 1. one of update-checkpoint not send, restart and send it again + taosMsleep(5000); + if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) { + mError( + "***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will " + "rollback***"); + exit(-1); + } else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT + mError( + "***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will " + "not started***"); + } else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE + mError( + "***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will " + "started after restart***"); + exit(-1); + } + } else if (choseItem == 1) { + // 2. repeat send update chkpt msg + mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***"); + + mError("***repeat 1***"); + SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature); + int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1); + + mError("***repeat 2***"); + SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature); + code = tmsgSendReq(&pAction->epSet, &rpcMsg2); + + mError("***repeat 3***"); + SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature); + code = tmsgSendReq(&pAction->epSet, &rpcMsg3); + } else if (choseItem == 2) { + // 3. sleep 40s and then send msg + mError("***idle for 30s, and then send msg***"); + taosMsleep(30000); + } else { + // do nothing + // mInfo("no error triggered"); + } + } +} diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 941956ae2b..4b3db28aa1 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -24,7 +24,7 @@ typedef struct SFailedCheckpointInfo { static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode); static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); -static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); +static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); @@ -68,7 +68,7 @@ void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) { } } -int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) { STrans *pTrans = NULL; int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint", &pTrans); @@ -84,7 +84,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return code; } - code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream); + code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId); if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -115,7 +115,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return code; } -int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) { +int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) { int32_t size = sizeof(SStreamTaskResetMsg); int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans); @@ -135,8 +135,9 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail } - SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId}; + SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId}; + // let's remember that this trans had been killed already void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p); if (px == NULL) { mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1); @@ -150,6 +151,7 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t pReq->streamId = streamId; pReq->transId = transId; + pReq->checkpointId = checkpointId; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size}; int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -234,7 +236,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { } else { mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, pStream->uid, pMsg->transId); - code = mndCreateStreamResetStatusTrans(pMnode, pStream); + code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId); } } @@ -379,9 +381,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { - mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId); + mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId); - terrno = TSDB_CODE_INVALID_MSG; + // return directly and after the vnode to continue to send the next HbMsg. + terrno = TSDB_CODE_SUCCESS; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); @@ -495,10 +498,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } - mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", - pInfo->checkpointId, pInfo->transId); + mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64 + " transId:%d failed issue task-reset trans to reset all tasks status", + pInfo->streamUid, pInfo->checkpointId, pInfo->transId); - code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId); + code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId); if (code) { mError("failed to create reset task trans, code:%s", tstrerror(code)); } @@ -549,12 +553,37 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr } void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { - SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)}; - rsp.pCont = rpcMallocCont(rsp.contLen); + int32_t ret = 0; + int32_t tlen = 0; + void *buf = NULL; - SMStreamHbRspMsg *pMsg = rsp.pCont; - pMsg->head.vgId = htonl(vgId); - pMsg->msgId = msgId; + const SMStreamHbRspMsg msg = {.msgId = msgId}; + + tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret); + if (ret < 0) { + mError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { + mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno)); + return; + } + + ((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + mError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + return; + } + tEncoderClear(&encoder); + + SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf}; tmsgSendRsp(&rsp); pRpcInfo->handle = NULL; // disable auto rsp diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 139ea4f147..5ccb626609 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -295,7 +295,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas return code; } -static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { +static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) { SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), @@ -306,6 +306,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + pReq->chkptId = chkptId; SEpSet epset = {0}; bool hasEpset = false; @@ -544,7 +545,7 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p return 0; } -int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { +int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) { SStreamTaskIter *pIter = NULL; taosWLockLatch(&pStream->lock); @@ -564,7 +565,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * return code; } - code = doSetResetAction(pMnode, pTrans, pTask); + code = doSetResetAction(pMnode, pTrans, pTask, chkptId); if (code != TSDB_CODE_SUCCESS) { destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); @@ -606,7 +607,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p tEncoderInit(&encoder, abuf, tlen); code = tEncodeRestoreCheckpointInfo(&encoder, &req); tEncoderClear(&encoder); - if (code == -1) { + if (code < 0) { taosMemoryFree(pBuf); return code; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f9b7644af4..615c383f07 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1521,74 +1521,4 @@ int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { mError("snode not existed when trying to create stream in db with multiple replica"); return TSDB_CODE_SNODE_NOT_DEPLOYED; } -} - -uint32_t seed = 0; -static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { - SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; - rpcMsg.pCont = rpcMallocCont(pAction->contLen); - if (rpcMsg.pCont == NULL) { - return rpcMsg; - } - - rpcMsg.info.traceId.rootId = traceId; - rpcMsg.info.notFreeAhandle = 1; - - memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - return rpcMsg; -} - -void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) { - if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) || - (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) || - (pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) { - if (seed == 0) { - seed = taosGetTimestampSec(); - } - - uint32_t v = taosRandR(&seed); - int32_t choseItem = v % 5; - - if (choseItem == 0) { - // 1. one of update-checkpoint not send, restart and send it again - taosMsleep(5000); - if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) { - mError( - "***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will " - "rollback***"); - exit(-1); - } else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT - mError( - "***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will " - "not started***"); - } else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE - mError( - "***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will " - "started after restart***"); - exit(-1); - } - } else if (choseItem == 1) { - // 2. repeat send update chkpt msg - mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***"); - - mError("***repeat 1***"); - SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature); - int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1); - - mError("***repeat 2***"); - SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature); - code = tmsgSendReq(&pAction->epSet, &rpcMsg2); - - mError("***repeat 3***"); - SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature); - code = tmsgSendReq(&pAction->epSet, &rpcMsg3); - } else if (choseItem == 2) { - // 3. sleep 40s and then send msg - mError("***idle for 30s, and then send msg***"); - taosMsleep(30000); - } else { - // do nothing - // mInfo("no error triggered"); - } - } } \ No newline at end of file diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index d508cf7390..45bc4c2ce2 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -246,7 +246,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { px = taosArrayPush(pStream->tasks, &pLevel); ASSERT(px != NULL); - code = mndCreateStreamResetStatusTrans(pMnode, pStream); + code = mndCreateStreamResetStatusTrans(pMnode, pStream, 1); ASSERT(code != 0); tFreeStreamObj(pStream); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6195899566..a234777441 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1009,21 +1009,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { - SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t code = 0; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; + + SStreamTaskRunReq req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) { + tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + + tDecoderClear(&decoder); // extracted submit data from wal files for all tasks - if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { + if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { return tqScanWal(pTq); } - int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); if (code) { tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } // let's continue scan data in the wal files - if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) { + if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { code = tqScanWalAsync(pTq, false); // it's ok to failed if (code) { tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); @@ -1297,7 +1310,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; + SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, (int32_t)pReq->downstreamTaskId); @@ -1318,10 +1331,23 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { - tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, - (int32_t)pReq->downstreamTaskId); + SRetrieveChkptTriggerReq req = {0}; + + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; + + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId, + req.downstreamTaskId); return TSDB_CODE_STREAM_NOT_LEADER; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 326e8d4ada..3f67503454 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -828,14 +828,25 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { - SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; - int32_t type = pReq->reqType; - int32_t vgId = pMeta->vgId; - int32_t code = 0; + SStreamTaskRunReq req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) { + tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + tDecoderClear(&decoder); + + int32_t type = req.reqType; if (type == STREAM_EXEC_T_START_ONE_TASK) { - code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { code = streamMetaStartAllTasks(pMeta); @@ -847,11 +858,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead code = streamMetaStopAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { - code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask != NULL && (code == 0)) { char* pStatus = NULL; @@ -873,7 +884,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { @@ -890,7 +901,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return 0; } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this - tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId); + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId); return code; } } @@ -939,7 +950,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { } int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { - SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg; + SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg; SStreamTask* pTask = NULL; int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); @@ -954,17 +965,13 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { streamMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, true); + streamTaskSetFailedCheckpointId(pTask, pReq->chkptId); + // clear flag set during do checkpoint, and open inputQ for all upstream tasks SStreamTaskState pState = streamTaskGetStatus(pTask); if (pState.state == TASK_STATUS__CK) { - int32_t tranId = 0; - int64_t activeChkId = 0; - streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId); - - tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", - pTask->id.idStr, activeChkId, tranId); - streamTaskSetStatusReady(pTask); + tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr); } else if (pState.state == TASK_STATUS__UNINIT) { // tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); // tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); @@ -980,25 +987,36 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { } int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; + SRetrieveChkptTriggerReq req = {0}; + SStreamTask* pTask = NULL; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask); + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask); if (pTask == NULL || (code != 0)) { - tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 + tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64 " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId); + pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64, - pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId); + req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId); if (pTask->status.downstreamReady != 1) { tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready", - pTask->id.idStr, (int32_t)pReq->downstreamTaskId); + pTask->id.idStr, (int32_t)req.downstreamTaskId); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); streamMetaReleaseTask(pMeta, pTask); return code; @@ -1010,19 +1028,19 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int64_t checkpointId = 0; streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId); - if (checkpointId != pReq->checkpointId) { + if (checkpointId != req.checkpointId) { tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64 " req:%" PRId64, - pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId); + pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_INVALID_MSG; } - if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { + if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) { // re-send the lost checkpoint-trigger msg to downstream task tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, - (int32_t)pReq->downstreamTaskId, checkpointId, transId); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + (int32_t)req.downstreamTaskId, checkpointId, transId); + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; @@ -1036,7 +1054,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "sending checkpoint-source/trigger", pTask->id.idStr, recv, total); } - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } } else { // upstream not recv the checkpoint-source/trigger till now @@ -1048,7 +1066,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", pTask->id.idStr); - code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, + code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } @@ -1057,23 +1075,34 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SCheckpointTriggerRsp rsp = {0}; + SStreamTask* pTask = NULL; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder = {0}; - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask); + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) { + tDecoderClear(&decoder); + tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId); + return TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&decoder); + + int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask); if (pTask == NULL || (code != 0)) { tqError( "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, pRsp->taskId); + pMeta->vgId, rsp.taskId); return code; } tqDebug( - "s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, " - "checkpointId:%" PRId64 ", transId:%d", - pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId); + "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64 + ", transId:%d", + pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId); - code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp); + code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp); streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1203,7 +1232,23 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { } int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - return streamProcessHeartbeatRsp(pMeta, pMsg->pCont); + SMStreamHbRspMsg rsp = {0}; + int32_t code = 0; + SDecoder decoder; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + + tDecoderInit(&decoder, (uint8_t*)msg, len); + code = tDecodeStreamHbRsp(&decoder, &rsp); + if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&decoder); + tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno)); + return terrno; + } + + tDecoderClear(&decoder); + return streamProcessHeartbeatRsp(pMeta, &rsp); } int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } @@ -1237,7 +1282,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SRestoreCheckpointInfo req = {0}; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { + if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) { tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 84dde6a579..d4702e2bdc 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3465,11 +3465,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) goto _end; } - void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); - if (!pUpInfo) { - lino = __LINE__; - goto _end; - } SDecoder decoder = {0}; pDeCoder = &decoder; tDecoderInit(pDeCoder, buf, tlen); @@ -3478,14 +3473,21 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) goto _end; } - code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); - if (code == TSDB_CODE_SUCCESS) { - pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); - pInfo->pUpdateInfo = pUpInfo; - } else { - taosMemoryFree(pUpInfo); - lino = __LINE__; - goto _end; + if (pInfo->pUpdateInfo != NULL) { + void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (!pUpInfo) { + lino = __LINE__; + goto _end; + } + code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); + if (code == TSDB_CODE_SUCCESS) { + pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); + pInfo->pUpdateInfo = pUpInfo; + } else { + taosMemoryFree(pUpInfo); + lino = __LINE__; + goto _end; + } } if (tDecodeIsEnd(pDeCoder)) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 863bc76c79..427733e9ec 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -192,7 +192,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); @@ -245,6 +244,9 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger); int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id); +// inject stream errors +void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7724d1c5ff..20f49216e7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -161,33 +161,52 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { - int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); - void* pBuf = rpcMallocCont(size); - if (pBuf == NULL) { + int32_t ret = 0; + int32_t tlen = 0; + void* buf = NULL; + SEncoder encoder; + + SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .taskId = dstTaskId, + .rspCode = code}; + + if (code == TSDB_CODE_SUCCESS) { + req.checkpointId = pTask->chkInfo.pActiveInfo->activeId; + req.transId = pTask->chkInfo.pActiveInfo->transId; + } else { + req.checkpointId = -1; + req.transId = -1; + } + + tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret); + if (ret < 0) { + stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code)); + return ret; + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { + stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId); return terrno; } - SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); - - pRsp->streamId = pTask->id.streamId; - pRsp->upstreamTaskId = pTask->id.taskId; - pRsp->taskId = dstTaskId; - pRsp->rspCode = code; - - if (code == TSDB_CODE_SUCCESS) { - pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; - pRsp->transId = pTask->chkInfo.pActiveInfo->transId; - } else { - pRsp->checkpointId = -1; - pRsp->transId = -1; + tEncoderInit(&encoder, abuf, tlen); + if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code)); + return ret; } + tEncoderClear(&encoder); - SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); - return 0; + return ret; } int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { @@ -222,14 +241,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check stError("s-task:%s vgId:%d current checkpointId:%" PRId64 " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (pActiveInfo->failedId >= checkpointId) { stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64 " discard the checkpoint-trigger block", id, vgId, checkpointId, transId, pActiveInfo->failedId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (pTask->chkInfo.checkpointId == checkpointId) { @@ -255,8 +274,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check "the interrupted checkpoint", id, vgId, pBlock->srcTaskId); - streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { @@ -264,14 +282,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 " discard", id, vgId, pActiveInfo->activeId, checkpointId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } else { // checkpointId == pActiveInfo->activeId if (pActiveInfo->allUpstreamTriggerRecv == 1) { stDebug( "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " "checkpointId:%" PRId64 " transId:%d", id, vgId, checkpointId, transId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -283,17 +301,17 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check } if (p->upstreamTaskId == pBlock->srcTaskId) { - stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 + stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } } } } } - return 0; + return TSDB_CODE_SUCCESS; } int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { @@ -317,6 +335,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId); streamMutexUnlock(&pTask->lock); if (code) { + if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks + streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); + } streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -330,6 +351,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock pActiveInfo->activeId = checkpointId; pActiveInfo->transId = transId; + if (pTask->chkInfo.startTs == 0) { + pTask->chkInfo.startTs = taosGetTimestampMs(); + pTask->execInfo.checkpoint += 1; + } + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); @@ -373,6 +399,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } +#if 0 + chkptFailedByRetrieveReqToSource(pTask, checkpointId); +#endif + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure @@ -382,11 +412,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - if (pTask->chkInfo.startTs == 0) { - pTask->chkInfo.startTs = taosGetTimestampMs(); - pTask->execInfo.checkpoint += 1; - } - // todo: handle this // update the child Id for downstream tasks code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); @@ -562,7 +587,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { } streamMutexUnlock(&pInfo->lock); - stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64, + stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64, pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); } @@ -682,15 +707,22 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV return TSDB_CODE_SUCCESS; } -void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { +void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) { struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - if (pInfo->activeId <= 0) { - stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr); + if (failedId <= 0) { + stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64 + " activeId:%" PRId64, + pTask->id.idStr, pInfo->failedId, pInfo->activeId); } else { - pInfo->failedId = pInfo->activeId; - stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId, - pInfo->transId); + if (failedId <= pInfo->failedId) { + stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId); + } else { + stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64 + " prev failedId:%" PRId64, + pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId); + pInfo->failedId = failedId; + } } } @@ -698,7 +730,7 @@ void streamTaskSetCheckpointFailed(SStreamTask* pTask) { streamMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask).state; if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); + streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId); } streamMutexUnlock(&pTask->lock); } @@ -876,8 +908,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); } } else { // clear the checkpoint info if failed + // set failed checkpoint id before clear the checkpoint info streamMutexLock(&pTask->lock); - streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info + streamTaskSetFailedCheckpointId(pTask, ckId); streamMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); @@ -1101,23 +1134,43 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { return TSDB_CODE_INVALID_PARA; } - SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq)); - if (pReq == NULL) { - code = terrno; - stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId); + int32_t ret = 0; + int32_t tlen = 0; + void* buf = NULL; + SRpcMsg rpcMsg = {0}; + SEncoder encoder; + + SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId, + .downstreamTaskId = pTask->id.taskId, + .downstreamNodeId = vgId, + .upstreamTaskId = pUpstreamTask->taskId, + .upstreamNodeId = pUpstreamTask->nodeId, + .checkpointId = checkpointId}; + + tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret); + if (ret < 0) { + stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code)); + } + + buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { + stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId); continue; } - pReq->head.vgId = htonl(pUpstreamTask->nodeId); - pReq->streamId = pTask->id.streamId; - pReq->downstreamTaskId = pTask->id.taskId; - pReq->downstreamNodeId = vgId; - pReq->upstreamTaskId = pUpstreamTask->taskId; - pReq->upstreamNodeId = pUpstreamTask->nodeId; - pReq->checkpointId = checkpointId; + ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SRpcMsg rpcMsg = {0}; - initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq)); + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code)); + continue; + } + tEncoderClear(&encoder); + + initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead)); code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); if (code == TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamErrorInjection.c b/source/libs/stream/src/streamErrorInjection.c new file mode 100644 index 0000000000..515845ba2b --- /dev/null +++ b/source/libs/stream/src/streamErrorInjection.c @@ -0,0 +1,17 @@ +#include "streamInt.h" + +/** + * pre-request: checkpoint interval should be 60s + * @param pTask + * @param checkpointId + */ +void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) { + streamMutexLock(&pTask->lock); + + // set current checkpoint failed immediately, set failed checkpoint id before clear the checkpoint info + streamTaskSetFailedCheckpointId(pTask, checkpointId); + streamMutexUnlock(&pTask->lock); + + // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode + taosMsleep(65*1000); +} \ No newline at end of file diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 8c79abfd02..9e131fd526 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -83,13 +83,37 @@ int32_t streamTrySchedExec(SStreamTask* pTask) { } int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { + int32_t code = 0; + int32_t tlen = 0; + + SStreamTaskRunReq req = {.streamId = streamId, .taskId = taskId, .reqType = execType}; + + tEncodeSize(tEncodeStreamTaskRunReq, &req, tlen, code); + if (code < 0) { + stError("s-task:0x%" PRIx64 " vgId:%d encode stream task run req failed, code:%s", streamId, vgId, tstrerror(code)); + return code; + } + + void* buf = rpcMallocCont(tlen + sizeof(SMsgHead)); + if (buf == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, tstrerror(terrno)); return terrno; } + ((SMsgHead*)buf)->vgId = vgId; + char* bufx = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, (uint8_t*)bufx, tlen); + if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) { + rpcFreeCont(buf); + tEncoderClear(&encoder); + stError("s-task:0x%x vgId:%d encode run task msg failed, code:%s", taskId, vgId, tstrerror(code)); + return code; + } + tEncoderClear(&encoder); + if (streamId != 0) { stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType, streamTaskGetExecType(execType)); @@ -97,13 +121,8 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = execType; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; + code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); if (code) { stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a044859b80..f46228fd47 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -22,6 +22,7 @@ #include "tstream.h" #include "ttimer.h" #include "wal.h" +#include "streamMsg.h" static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); @@ -1246,13 +1247,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { taosMemoryFree(pInfo); } -//NOTE: clear the checkpoint id, and keep the failed id +// NOTE: clear the checkpoint id, and keep the failed id +// failedId for a task will increase as the checkpoint I.D. increases. void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->activeId = 0; pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; -// pInfo->failedId = 0; taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); @@ -1303,4 +1304,178 @@ void streamTaskFreeRefId(int64_t* pRefId) { } metaRefMgtRemove(pRefId); +} + + +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); + int32_t taskId = pTask->hTaskInfo.id.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); + taskId = pTask->streamTaskId.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); + + int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); + TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); + TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); + + tEndEncode(pEncoder); +_exit: + return code; +} + +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + int32_t taskId = 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); + } + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->hTaskInfo.id.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->streamTaskId.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); + + int32_t epSz = -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); + + if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); + if (pInfo == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { + taosMemoryFreeClear(pInfo); + goto _exit; + } + if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); + pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); + } + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); + + tEndDecode(pDecoder); + +_exit: + return code; } \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0d8a85155a..00f72123dc 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -853,7 +853,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVLD_CHKPT, "Invalid checkpoint trigger msg") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") diff --git a/tests/script/tsim/db/basic1.sim b/tests/script/tsim/db/basic1.sim index 8eb6dce759..f3239957d3 100644 --- a/tests/script/tsim/db/basic1.sim +++ b/tests/script/tsim/db/basic1.sim @@ -53,6 +53,8 @@ if $rows != 5 then return -1 endi +sleep 500 + print =============== show vgroups2 sql show d2.vgroups if $rows != 2 then @@ -126,13 +128,14 @@ if $data12 != d2 then endi if $data13 != leader then + print expect leader , actual $13 return -1 endi -print $data14 -print $data15 +print $data14 , $data15 if $data16 != 1 then + print expect 1, acutal $data16 return -1 endi