Merge pull request #28779 from taosdata/fix/3_liaohj
fix(stream): update the msg encoder.
This commit is contained in:
commit
686c20cd9f
|
@ -17,12 +17,23 @@
|
|||
#define TDENGINE_STREAMMSG_H
|
||||
|
||||
#include "tmsg.h"
|
||||
#include "trpc.h"
|
||||
//#include "trpc.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
|
||||
typedef struct SStreamDispatchReq SStreamDispatchReq;
|
||||
typedef struct STokenBucket STokenBucket;
|
||||
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||
|
||||
typedef struct SNodeUpdateInfo {
|
||||
int32_t nodeId;
|
||||
SEpSet prevEp;
|
||||
SEpSet newEp;
|
||||
} SNodeUpdateInfo;
|
||||
|
||||
typedef struct SStreamUpstreamEpInfo {
|
||||
int32_t nodeId;
|
||||
int32_t childId;
|
||||
|
@ -170,8 +181,8 @@ typedef struct SStreamHbMsg {
|
|||
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
|
||||
} SStreamHbMsg;
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
||||
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq);
|
||||
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq);
|
||||
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
|
||||
|
||||
typedef struct {
|
||||
|
@ -179,6 +190,9 @@ typedef struct {
|
|||
int32_t msgId;
|
||||
} SMStreamHbRspMsg;
|
||||
|
||||
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp);
|
||||
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp);
|
||||
|
||||
typedef struct SRetrieveChkptTriggerReq {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
|
@ -189,6 +203,9 @@ typedef struct SRetrieveChkptTriggerReq {
|
|||
int64_t downstreamTaskId;
|
||||
} SRetrieveChkptTriggerReq;
|
||||
|
||||
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq);
|
||||
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq);
|
||||
|
||||
typedef struct SCheckpointTriggerRsp {
|
||||
int64_t streamId;
|
||||
int64_t checkpointId;
|
||||
|
@ -198,6 +215,9 @@ typedef struct SCheckpointTriggerRsp {
|
|||
int32_t rspCode;
|
||||
} SCheckpointTriggerRsp;
|
||||
|
||||
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp);
|
||||
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp);
|
||||
|
||||
typedef struct SCheckpointReport {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
|
@ -222,7 +242,7 @@ typedef struct SRestoreCheckpointInfo {
|
|||
int32_t nodeId;
|
||||
} SRestoreCheckpointInfo;
|
||||
|
||||
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
|
||||
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
|
||||
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
|
||||
|
||||
typedef struct {
|
||||
|
@ -232,10 +252,8 @@ typedef struct {
|
|||
int32_t reqType;
|
||||
} SStreamTaskRunReq;
|
||||
|
||||
typedef struct SCheckpointConsensusEntry {
|
||||
SRestoreCheckpointInfo req;
|
||||
int64_t ts;
|
||||
} SCheckpointConsensusEntry;
|
||||
int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq);
|
||||
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
|
@ -3798,7 +3798,14 @@ typedef struct {
|
|||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
|
||||
} SVPauseStreamTaskReq;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int64_t chkptId;
|
||||
} SVResetStreamTaskReq;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
|
|
|
@ -319,11 +319,6 @@ typedef struct SSTaskBasicInfo {
|
|||
SInterval interval;
|
||||
} SSTaskBasicInfo;
|
||||
|
||||
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
|
||||
typedef struct SStreamDispatchReq SStreamDispatchReq;
|
||||
typedef struct STokenBucket STokenBucket;
|
||||
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||
|
||||
typedef struct SDispatchMsgInfo {
|
||||
SStreamDispatchReq* pData; // current dispatch data
|
||||
|
||||
|
@ -626,11 +621,11 @@ typedef struct STaskStatusEntry {
|
|||
STaskCkptInfo checkpointInfo;
|
||||
} STaskStatusEntry;
|
||||
|
||||
typedef struct SNodeUpdateInfo {
|
||||
int32_t nodeId;
|
||||
SEpSet prevEp;
|
||||
SEpSet newEp;
|
||||
} SNodeUpdateInfo;
|
||||
//typedef struct SNodeUpdateInfo {
|
||||
// int32_t nodeId;
|
||||
// SEpSet prevEp;
|
||||
// SEpSet newEp;
|
||||
//} SNodeUpdateInfo;
|
||||
|
||||
typedef struct SStreamTaskState {
|
||||
ETaskStatus state;
|
||||
|
@ -643,6 +638,11 @@ typedef struct SCheckpointConsensusInfo {
|
|||
int64_t streamId;
|
||||
} SCheckpointConsensusInfo;
|
||||
|
||||
typedef struct SCheckpointConsensusEntry {
|
||||
SRestoreCheckpointInfo req;
|
||||
int64_t ts;
|
||||
} SCheckpointConsensusEntry;
|
||||
|
||||
void streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||
|
||||
// dispatch related
|
||||
|
@ -718,6 +718,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
|||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pInfo, int32_t code);
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId);
|
||||
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
|
||||
|
|
|
@ -1011,6 +1011,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
|
||||
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
|
||||
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
|
||||
#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109)
|
||||
|
||||
// TDLite
|
||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
aux_source_directory(src COMMON_SRC)
|
||||
aux_source_directory(src/msg COMMON_MSG_SRC)
|
||||
|
||||
LIST(APPEND COMMON_SRC ${COMMON_MSG_SRC})
|
||||
|
||||
if(TD_ENTERPRISE)
|
||||
LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c)
|
||||
|
|
|
@ -15,8 +15,48 @@
|
|||
|
||||
#include "streamMsg.h"
|
||||
#include "os.h"
|
||||
#include "tstream.h"
|
||||
#include "streamInt.h"
|
||||
#include "tcommon.h"
|
||||
|
||||
typedef struct STaskId {
|
||||
int64_t streamId;
|
||||
int64_t taskId;
|
||||
} STaskId;
|
||||
|
||||
typedef struct STaskCkptInfo {
|
||||
int64_t latestId; // saved checkpoint id
|
||||
int64_t latestVer; // saved checkpoint ver
|
||||
int64_t latestTime; // latest checkpoint time
|
||||
int64_t latestSize; // latest checkpoint size
|
||||
int8_t remoteBackup; // latest checkpoint backup done
|
||||
int64_t activeId; // current active checkpoint id
|
||||
int32_t activeTransId; // checkpoint trans id
|
||||
int8_t failed; // denote if the checkpoint is failed or not
|
||||
int8_t consensusChkptId; // required the consensus-checkpointId
|
||||
int64_t consensusTs; //
|
||||
} STaskCkptInfo;
|
||||
|
||||
typedef struct STaskStatusEntry {
|
||||
STaskId id;
|
||||
int32_t status;
|
||||
int32_t statusLastDuration; // to record the last duration of current status
|
||||
int64_t stage;
|
||||
int32_t nodeId;
|
||||
SVersionRange verRange; // start/end version in WAL, only valid for source task
|
||||
int64_t processedVer; // only valid for source task
|
||||
double inputQUsed; // in MiB
|
||||
double inputRate;
|
||||
double procsThroughput; // duration between one element put into input queue and being processed.
|
||||
double procsTotal; // duration between one element put into input queue and being processed.
|
||||
double outputThroughput; // the size of dispatched result blocks in bytes
|
||||
double outputTotal; // the size of dispatched result blocks in bytes
|
||||
double sinkQuota; // existed quota size for sink task
|
||||
double sinkDataSize; // sink to dst data size
|
||||
int64_t startTime;
|
||||
int64_t startCheckpointId;
|
||||
int64_t startCheckpointVer;
|
||||
int64_t hTaskId;
|
||||
STaskCkptInfo checkpointInfo;
|
||||
} STaskStatusEntry;
|
||||
|
||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
|
||||
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
|
||||
|
@ -289,7 +329,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
|||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
|
||||
|
||||
if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
|
||||
stError("invalid dispatch req msg");
|
||||
uError("invalid dispatch req msg");
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
|
@ -605,173 +645,92 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
|
|||
pMsg->numOfTasks = -1;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
|
||||
int32_t taskId = pTask->hTaskInfo.id.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
|
||||
taskId = pTask->streamTaskId.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
|
||||
|
||||
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
|
||||
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId));
|
||||
tEndEncode(pEncoder);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||
int32_t taskId = 0;
|
||||
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
|
||||
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId));
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
|
||||
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
|
||||
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->downstreamTaskId));
|
||||
tEndEncode(pEncoder);
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->hTaskInfo.id.taskId = taskId;
|
||||
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->streamTaskId.taskId = taskId;
|
||||
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->downstreamTaskId));
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t epSz = -1;
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
|
||||
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
|
||||
if (pInfo == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
goto _exit;
|
||||
}
|
||||
if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
}
|
||||
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->checkpointId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->transId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->rspCode));
|
||||
tEndEncode(pEncoder);
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
|
||||
}
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
|
||||
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->transId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->rspCode));
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
_exit:
|
||||
|
@ -830,11 +789,7 @@ int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpoin
|
|||
tEndEncode(pEncoder);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
return code;
|
||||
} else {
|
||||
return pEncoder->pos;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
|
||||
|
@ -853,3 +808,31 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo*
|
|||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
|
||||
tEndEncode(pEncoder);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
|
@ -46,7 +46,7 @@ if (${TD_LINUX})
|
|||
target_sources(tmsgTest
|
||||
PRIVATE
|
||||
"tmsgTest.cpp"
|
||||
"../src/tmsg.c"
|
||||
"../src/msg/tmsg.c"
|
||||
)
|
||||
target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/")
|
||||
target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main)
|
||||
|
|
|
@ -56,6 +56,7 @@ typedef struct SStreamTransMgmt {
|
|||
typedef struct SStreamTaskResetMsg {
|
||||
int64_t streamId;
|
||||
int32_t transId;
|
||||
int64_t checkpointId;
|
||||
} SStreamTaskResetMsg;
|
||||
|
||||
typedef struct SChkptReportInfo {
|
||||
|
@ -142,9 +143,9 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt
|
|||
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId);
|
||||
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId);
|
||||
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts);
|
||||
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream);
|
||||
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
|
||||
|
|
|
@ -2434,7 +2434,12 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
|
|||
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
|
||||
pReport->taskId, p->checkpointId, pReport->checkpointId);
|
||||
|
||||
memcpy(p, pReport, sizeof(STaskChkptInfo));
|
||||
// update the checkpoint report info
|
||||
p->checkpointId = pReport->checkpointId;
|
||||
p->ts = pReport->checkpointTs;
|
||||
p->version = pReport->checkpointVer;
|
||||
p->transId = pReport->transId;
|
||||
p->dropHTask = pReport->dropHTask;
|
||||
} else {
|
||||
mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
#include "mndTrans.h"
|
||||
|
||||
uint32_t seed = 0;
|
||||
|
||||
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
|
||||
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
|
||||
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
|
||||
if (rpcMsg.pCont == NULL) {
|
||||
return rpcMsg;
|
||||
}
|
||||
|
||||
rpcMsg.info.traceId.rootId = traceId;
|
||||
rpcMsg.info.notFreeAhandle = 1;
|
||||
|
||||
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
||||
return rpcMsg;
|
||||
}
|
||||
|
||||
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
|
||||
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
|
||||
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
|
||||
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
|
||||
if (seed == 0) {
|
||||
seed = taosGetTimestampSec();
|
||||
}
|
||||
|
||||
uint32_t v = taosRandR(&seed);
|
||||
int32_t choseItem = v % 5;
|
||||
|
||||
if (choseItem == 0) {
|
||||
// 1. one of update-checkpoint not send, restart and send it again
|
||||
taosMsleep(5000);
|
||||
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
|
||||
"rollback***");
|
||||
exit(-1);
|
||||
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
|
||||
"not started***");
|
||||
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
|
||||
"started after restart***");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (choseItem == 1) {
|
||||
// 2. repeat send update chkpt msg
|
||||
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
|
||||
|
||||
mError("***repeat 1***");
|
||||
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
|
||||
|
||||
mError("***repeat 2***");
|
||||
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
|
||||
|
||||
mError("***repeat 3***");
|
||||
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
|
||||
} else if (choseItem == 2) {
|
||||
// 3. sleep 40s and then send msg
|
||||
mError("***idle for 30s, and then send msg***");
|
||||
taosMsleep(30000);
|
||||
} else {
|
||||
// do nothing
|
||||
// mInfo("no error triggered");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,7 +24,7 @@ typedef struct SFailedCheckpointInfo {
|
|||
|
||||
static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
|
||||
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
|
||||
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
|
||||
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId);
|
||||
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
|
||||
static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
|
||||
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
|
||||
|
@ -68,7 +68,7 @@ void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) {
|
||||
STrans *pTrans = NULL;
|
||||
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
|
||||
" reset from failed checkpoint", &pTrans);
|
||||
|
@ -84,7 +84,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
|
||||
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId);
|
||||
if (code) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -115,7 +115,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) {
|
||||
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) {
|
||||
int32_t size = sizeof(SStreamTaskResetMsg);
|
||||
|
||||
int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
|
||||
|
@ -135,8 +135,9 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
|
|||
taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail
|
||||
}
|
||||
|
||||
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId};
|
||||
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId};
|
||||
|
||||
// let's remember that this trans had been killed already
|
||||
void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p);
|
||||
if (px == NULL) {
|
||||
mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1);
|
||||
|
@ -150,6 +151,7 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
|
|||
|
||||
pReq->streamId = streamId;
|
||||
pReq->transId = transId;
|
||||
pReq->checkpointId = checkpointId;
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size};
|
||||
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
|
@ -234,7 +236,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
|
|||
} else {
|
||||
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
||||
pStream->uid, pMsg->transId);
|
||||
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
|
||||
code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -379,9 +381,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
|
||||
mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId);
|
||||
mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
|
||||
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
// return directly and after the vnode to continue to send the next HbMsg.
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
|
@ -495,10 +498,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||
pInfo->checkpointId, pInfo->transId);
|
||||
mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64
|
||||
" transId:%d failed issue task-reset trans to reset all tasks status",
|
||||
pInfo->streamUid, pInfo->checkpointId, pInfo->transId);
|
||||
|
||||
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId);
|
||||
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId);
|
||||
if (code) {
|
||||
mError("failed to create reset task trans, code:%s", tstrerror(code));
|
||||
}
|
||||
|
@ -549,12 +553,37 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr
|
|||
}
|
||||
|
||||
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) {
|
||||
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)};
|
||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||
int32_t ret = 0;
|
||||
int32_t tlen = 0;
|
||||
void *buf = NULL;
|
||||
|
||||
SMStreamHbRspMsg *pMsg = rsp.pCont;
|
||||
pMsg->head.vgId = htonl(vgId);
|
||||
pMsg->msgId = msgId;
|
||||
const SMStreamHbRspMsg msg = {.msgId = msgId};
|
||||
|
||||
tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
|
||||
if (ret < 0) {
|
||||
mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
|
||||
if (buf == NULL) {
|
||||
mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) {
|
||||
rpcFreeCont(buf);
|
||||
tEncoderClear(&encoder);
|
||||
mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
|
||||
return;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf};
|
||||
|
||||
tmsgSendRsp(&rsp);
|
||||
pRpcInfo->handle = NULL; // disable auto rsp
|
||||
|
|
|
@ -295,7 +295,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
|
||||
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
|
||||
|
@ -306,6 +306,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
|||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->chkptId = chkptId;
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
|
@ -544,7 +545,7 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
|
||||
SStreamTaskIter *pIter = NULL;
|
||||
|
||||
taosWLockLatch(&pStream->lock);
|
||||
|
@ -564,7 +565,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
|||
return code;
|
||||
}
|
||||
|
||||
code = doSetResetAction(pMnode, pTrans, pTask);
|
||||
code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyStreamTaskIter(pIter);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
@ -606,7 +607,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p
|
|||
tEncoderInit(&encoder, abuf, tlen);
|
||||
code = tEncodeRestoreCheckpointInfo(&encoder, &req);
|
||||
tEncoderClear(&encoder);
|
||||
if (code == -1) {
|
||||
if (code < 0) {
|
||||
taosMemoryFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1521,74 +1521,4 @@ int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
|
|||
mError("snode not existed when trying to create stream in db with multiple replica");
|
||||
return TSDB_CODE_SNODE_NOT_DEPLOYED;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t seed = 0;
|
||||
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
|
||||
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
|
||||
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
|
||||
if (rpcMsg.pCont == NULL) {
|
||||
return rpcMsg;
|
||||
}
|
||||
|
||||
rpcMsg.info.traceId.rootId = traceId;
|
||||
rpcMsg.info.notFreeAhandle = 1;
|
||||
|
||||
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
||||
return rpcMsg;
|
||||
}
|
||||
|
||||
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
|
||||
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
|
||||
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
|
||||
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
|
||||
if (seed == 0) {
|
||||
seed = taosGetTimestampSec();
|
||||
}
|
||||
|
||||
uint32_t v = taosRandR(&seed);
|
||||
int32_t choseItem = v % 5;
|
||||
|
||||
if (choseItem == 0) {
|
||||
// 1. one of update-checkpoint not send, restart and send it again
|
||||
taosMsleep(5000);
|
||||
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
|
||||
"rollback***");
|
||||
exit(-1);
|
||||
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
|
||||
"not started***");
|
||||
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
|
||||
"started after restart***");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (choseItem == 1) {
|
||||
// 2. repeat send update chkpt msg
|
||||
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
|
||||
|
||||
mError("***repeat 1***");
|
||||
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
|
||||
|
||||
mError("***repeat 2***");
|
||||
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
|
||||
|
||||
mError("***repeat 3***");
|
||||
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
|
||||
} else if (choseItem == 2) {
|
||||
// 3. sleep 40s and then send msg
|
||||
mError("***idle for 30s, and then send msg***");
|
||||
taosMsleep(30000);
|
||||
} else {
|
||||
// do nothing
|
||||
// mInfo("no error triggered");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -246,7 +246,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) {
|
|||
px = taosArrayPush(pStream->tasks, &pLevel);
|
||||
ASSERT(px != NULL);
|
||||
|
||||
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
|
||||
code = mndCreateStreamResetStatusTrans(pMnode, pStream, 1);
|
||||
ASSERT(code != 0);
|
||||
|
||||
tFreeStreamObj(pStream);
|
||||
|
|
|
@ -1009,21 +1009,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t code = 0;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SDecoder decoder;
|
||||
|
||||
SStreamTaskRunReq req = {0};
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
|
||||
tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
||||
tDecoderClear(&decoder);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
// extracted submit data from wal files for all tasks
|
||||
if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
|
||||
if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
|
||||
return tqScanWal(pTq);
|
||||
}
|
||||
|
||||
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
||||
code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
// let's continue scan data in the wal files
|
||||
if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) {
|
||||
if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
|
||||
code = tqScanWalAsync(pTq, false); // it's ok to failed
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
||||
|
@ -1297,7 +1310,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
|
||||
SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
|
||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
|
||||
(int32_t)pReq->downstreamTaskId);
|
||||
|
@ -1318,10 +1331,23 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
|
||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
|
||||
(int32_t)pReq->downstreamTaskId);
|
||||
SRetrieveChkptTriggerReq req = {0};
|
||||
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SDecoder decoder = {0};
|
||||
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
|
||||
req.downstreamTaskId);
|
||||
return TSDB_CODE_STREAM_NOT_LEADER;
|
||||
}
|
||||
|
||||
|
|
|
@ -828,14 +828,25 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
|||
}
|
||||
|
||||
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
|
||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SDecoder decoder;
|
||||
|
||||
int32_t type = pReq->reqType;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
SStreamTaskRunReq req = {0};
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
|
||||
tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
|
||||
tDecoderClear(&decoder);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t type = req.reqType;
|
||||
if (type == STREAM_EXEC_T_START_ONE_TASK) {
|
||||
code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
|
||||
code = streamMetaStartAllTasks(pMeta);
|
||||
|
@ -847,11 +858,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
code = streamMetaStopAllTasks(pMeta);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||
code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
|
||||
if (pTask != NULL && (code == 0)) {
|
||||
char* pStatus = NULL;
|
||||
|
@ -873,7 +884,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
}
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed
|
||||
char* p = NULL;
|
||||
if (streamTaskReadyToRun(pTask, &p)) {
|
||||
|
@ -890,7 +901,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
return 0;
|
||||
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
|
||||
// todo add one function to handle this
|
||||
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId);
|
||||
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -939,7 +950,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
|
||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
|
||||
SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
|
@ -954,17 +965,13 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
|
|||
streamMutexLock(&pTask->lock);
|
||||
streamTaskClearCheckInfo(pTask, true);
|
||||
|
||||
streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
|
||||
|
||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) {
|
||||
int32_t tranId = 0;
|
||||
int64_t activeChkId = 0;
|
||||
streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId);
|
||||
|
||||
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
|
||||
pTask->id.idStr, activeChkId, tranId);
|
||||
|
||||
streamTaskSetStatusReady(pTask);
|
||||
tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
|
||||
} else if (pState.state == TASK_STATUS__UNINIT) {
|
||||
// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
||||
// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
|
@ -980,25 +987,36 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
|
||||
SRetrieveChkptTriggerReq req = {0};
|
||||
SStreamTask* pTask = NULL;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SDecoder decoder = {0};
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask);
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
|
||||
if (pTask == NULL || (code != 0)) {
|
||||
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
|
||||
tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
|
||||
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId);
|
||||
pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
|
||||
pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId);
|
||||
req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
|
||||
|
||||
if (pTask->status.downstreamReady != 1) {
|
||||
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
|
||||
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
|
||||
pTask->id.idStr, (int32_t)req.downstreamTaskId);
|
||||
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
|
||||
TSDB_CODE_STREAM_TASK_IVLD_STATUS);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
|
@ -1010,19 +1028,19 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
int64_t checkpointId = 0;
|
||||
|
||||
streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
|
||||
if (checkpointId != pReq->checkpointId) {
|
||||
if (checkpointId != req.checkpointId) {
|
||||
tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
|
||||
" req:%" PRId64,
|
||||
pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId);
|
||||
pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
|
||||
if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
|
||||
// re-send the lost checkpoint-trigger msg to downstream task
|
||||
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
|
||||
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
|
||||
(int32_t)req.downstreamTaskId, checkpointId, transId);
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
|
||||
TSDB_CODE_SUCCESS);
|
||||
} else { // not send checkpoint-trigger yet, wait
|
||||
int32_t recv = 0, total = 0;
|
||||
|
@ -1036,7 +1054,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
"sending checkpoint-source/trigger",
|
||||
pTask->id.idStr, recv, total);
|
||||
}
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
|
||||
TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
}
|
||||
} else { // upstream not recv the checkpoint-source/trigger till now
|
||||
|
@ -1048,7 +1066,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
|
||||
"upstream sending checkpoint-source/trigger",
|
||||
pTask->id.idStr);
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
|
||||
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
|
||||
TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
}
|
||||
|
||||
|
@ -1057,23 +1075,34 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
}
|
||||
|
||||
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
SCheckpointTriggerRsp rsp = {0};
|
||||
SStreamTask* pTask = NULL;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SDecoder decoder = {0};
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask);
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
|
||||
if (pTask == NULL || (code != 0)) {
|
||||
tqError(
|
||||
"vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, pRsp->taskId);
|
||||
pMeta->vgId, rsp.taskId);
|
||||
return code;
|
||||
}
|
||||
|
||||
tqDebug(
|
||||
"s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, "
|
||||
"checkpointId:%" PRId64 ", transId:%d",
|
||||
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
|
||||
"s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
|
||||
", transId:%d",
|
||||
pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
|
||||
|
||||
code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
|
||||
code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
@ -1203,7 +1232,23 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
return streamProcessHeartbeatRsp(pMeta, pMsg->pCont);
|
||||
SMStreamHbRspMsg rsp = {0};
|
||||
int32_t code = 0;
|
||||
SDecoder decoder;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
code = tDecodeStreamHbRsp(&decoder, &rsp);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return streamProcessHeartbeatRsp(pMeta, &rsp);
|
||||
}
|
||||
|
||||
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||
|
@ -1237,7 +1282,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
SRestoreCheckpointInfo req = {0};
|
||||
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
|
||||
if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
|
||||
tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
|
||||
tDecoderClear(&decoder);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -3465,11 +3465,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
goto _end;
|
||||
}
|
||||
|
||||
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (!pUpInfo) {
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
SDecoder decoder = {0};
|
||||
pDeCoder = &decoder;
|
||||
tDecoderInit(pDeCoder, buf, tlen);
|
||||
|
@ -3478,14 +3473,21 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
goto _end;
|
||||
}
|
||||
|
||||
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||
pInfo->pUpdateInfo = pUpInfo;
|
||||
} else {
|
||||
taosMemoryFree(pUpInfo);
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
if (pInfo->pUpdateInfo != NULL) {
|
||||
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (!pUpInfo) {
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||
pInfo->pUpdateInfo = pUpInfo;
|
||||
} else {
|
||||
taosMemoryFree(pUpInfo);
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
if (tDecodeIsEnd(pDeCoder)) {
|
||||
|
|
|
@ -192,7 +192,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
|||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
||||
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask);
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
|
@ -245,6 +244,9 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger);
|
|||
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval,
|
||||
STimeWindow* pLatestWindow, const char* id);
|
||||
|
||||
// inject stream errors
|
||||
void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -161,33 +161,52 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
|
|||
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pRpcInfo, int32_t code) {
|
||||
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
|
||||
void* pBuf = rpcMallocCont(size);
|
||||
if (pBuf == NULL) {
|
||||
int32_t ret = 0;
|
||||
int32_t tlen = 0;
|
||||
void* buf = NULL;
|
||||
SEncoder encoder;
|
||||
|
||||
SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
|
||||
.upstreamTaskId = pTask->id.taskId,
|
||||
.taskId = dstTaskId,
|
||||
.rspCode = code};
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
||||
req.transId = pTask->chkInfo.pActiveInfo->transId;
|
||||
} else {
|
||||
req.checkpointId = -1;
|
||||
req.transId = -1;
|
||||
}
|
||||
|
||||
tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
|
||||
if (ret < 0) {
|
||||
stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return ret;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
|
||||
if (buf == NULL) {
|
||||
stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||
((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
|
||||
|
||||
pRsp->streamId = pTask->id.streamId;
|
||||
pRsp->upstreamTaskId = pTask->id.taskId;
|
||||
pRsp->taskId = dstTaskId;
|
||||
pRsp->rspCode = code;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
||||
pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
|
||||
} else {
|
||||
pRsp->checkpointId = -1;
|
||||
pRsp->transId = -1;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
|
||||
rpcFreeCont(buf);
|
||||
tEncoderClear(&encoder);
|
||||
stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
|
||||
return ret;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
|
||||
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
|
||||
tmsgSendRsp(&rspMsg);
|
||||
|
||||
return 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
||||
|
@ -222,14 +241,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
|||
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
|
||||
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
|
||||
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
|
||||
return code;
|
||||
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||
}
|
||||
|
||||
if (pActiveInfo->failedId >= checkpointId) {
|
||||
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
|
||||
" discard the checkpoint-trigger block",
|
||||
id, vgId, checkpointId, transId, pActiveInfo->failedId);
|
||||
return code;
|
||||
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||
}
|
||||
|
||||
if (pTask->chkInfo.checkpointId == checkpointId) {
|
||||
|
@ -255,8 +274,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
|||
"the interrupted checkpoint",
|
||||
id, vgId, pBlock->srcTaskId);
|
||||
|
||||
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
||||
return code;
|
||||
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||
}
|
||||
|
||||
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
||||
|
@ -264,14 +282,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
|||
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
|
||||
" discard",
|
||||
id, vgId, pActiveInfo->activeId, checkpointId);
|
||||
return code;
|
||||
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||
} else { // checkpointId == pActiveInfo->activeId
|
||||
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
|
||||
stDebug(
|
||||
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
|
||||
"checkpointId:%" PRId64 " transId:%d",
|
||||
id, vgId, checkpointId, transId);
|
||||
return code;
|
||||
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||
}
|
||||
|
||||
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||
|
@ -283,17 +301,17 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
|||
}
|
||||
|
||||
if (p->upstreamTaskId == pBlock->srcTaskId) {
|
||||
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
||||
stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
||||
", prev recvTs:%" PRId64 " discard",
|
||||
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
||||
return code;
|
||||
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
|
@ -317,6 +335,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
if (code) {
|
||||
if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
|
||||
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
||||
}
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
|
@ -330,6 +351,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
pActiveInfo->activeId = checkpointId;
|
||||
pActiveInfo->transId = transId;
|
||||
|
||||
if (pTask->chkInfo.startTs == 0) {
|
||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||
pTask->execInfo.checkpoint += 1;
|
||||
}
|
||||
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
|
||||
|
@ -373,6 +399,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
chkptFailedByRetrieveReqToSource(pTask, checkpointId);
|
||||
#endif
|
||||
|
||||
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
|
||||
|
@ -382,11 +412,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
}
|
||||
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||
if (pTask->chkInfo.startTs == 0) {
|
||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||
pTask->execInfo.checkpoint += 1;
|
||||
}
|
||||
|
||||
// todo: handle this
|
||||
// update the child Id for downstream tasks
|
||||
code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
|
||||
|
@ -562,7 +587,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
|||
}
|
||||
streamMutexUnlock(&pInfo->lock);
|
||||
|
||||
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64,
|
||||
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
|
||||
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
|
||||
}
|
||||
|
||||
|
@ -682,15 +707,22 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
|
||||
struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
if (pInfo->activeId <= 0) {
|
||||
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
|
||||
if (failedId <= 0) {
|
||||
stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
|
||||
" activeId:%" PRId64,
|
||||
pTask->id.idStr, pInfo->failedId, pInfo->activeId);
|
||||
} else {
|
||||
pInfo->failedId = pInfo->activeId;
|
||||
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId,
|
||||
pInfo->transId);
|
||||
if (failedId <= pInfo->failedId) {
|
||||
stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
|
||||
} else {
|
||||
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
|
||||
" prev failedId:%" PRId64,
|
||||
pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
|
||||
pInfo->failedId = failedId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -698,7 +730,7 @@ void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
|
|||
streamMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
@ -876,8 +908,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
|
||||
}
|
||||
} else { // clear the checkpoint info if failed
|
||||
// set failed checkpoint id before clear the checkpoint info
|
||||
streamMutexLock(&pTask->lock);
|
||||
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
|
||||
streamTaskSetFailedCheckpointId(pTask, ckId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||
|
@ -1101,23 +1134,43 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
|||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
|
||||
if (pReq == NULL) {
|
||||
code = terrno;
|
||||
stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId);
|
||||
int32_t ret = 0;
|
||||
int32_t tlen = 0;
|
||||
void* buf = NULL;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
SEncoder encoder;
|
||||
|
||||
SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
|
||||
.downstreamTaskId = pTask->id.taskId,
|
||||
.downstreamNodeId = vgId,
|
||||
.upstreamTaskId = pUpstreamTask->taskId,
|
||||
.upstreamNodeId = pUpstreamTask->nodeId,
|
||||
.checkpointId = checkpointId};
|
||||
|
||||
tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
|
||||
if (ret < 0) {
|
||||
stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
|
||||
if (buf == NULL) {
|
||||
stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
|
||||
continue;
|
||||
}
|
||||
|
||||
pReq->head.vgId = htonl(pUpstreamTask->nodeId);
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->downstreamTaskId = pTask->id.taskId;
|
||||
pReq->downstreamNodeId = vgId;
|
||||
pReq->upstreamTaskId = pUpstreamTask->taskId;
|
||||
pReq->upstreamNodeId = pUpstreamTask->nodeId;
|
||||
pReq->checkpointId = checkpointId;
|
||||
((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
|
||||
rpcFreeCont(buf);
|
||||
tEncoderClear(&encoder);
|
||||
stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
|
||||
|
||||
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
#include "streamInt.h"
|
||||
|
||||
/**
|
||||
* pre-request: checkpoint interval should be 60s
|
||||
* @param pTask
|
||||
* @param checkpointId
|
||||
*/
|
||||
void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) {
|
||||
streamMutexLock(&pTask->lock);
|
||||
|
||||
// set current checkpoint failed immediately, set failed checkpoint id before clear the checkpoint info
|
||||
streamTaskSetFailedCheckpointId(pTask, checkpointId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
// the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode
|
||||
taosMsleep(65*1000);
|
||||
}
|
|
@ -83,13 +83,37 @@ int32_t streamTrySchedExec(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
|
||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||
if (pRunReq == NULL) {
|
||||
int32_t code = 0;
|
||||
int32_t tlen = 0;
|
||||
|
||||
SStreamTaskRunReq req = {.streamId = streamId, .taskId = taskId, .reqType = execType};
|
||||
|
||||
tEncodeSize(tEncodeStreamTaskRunReq, &req, tlen, code);
|
||||
if (code < 0) {
|
||||
stError("s-task:0x%" PRIx64 " vgId:%d encode stream task run req failed, code:%s", streamId, vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
void* buf = rpcMallocCont(tlen + sizeof(SMsgHead));
|
||||
if (buf == NULL) {
|
||||
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
|
||||
tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = vgId;
|
||||
char* bufx = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, (uint8_t*)bufx, tlen);
|
||||
if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) {
|
||||
rpcFreeCont(buf);
|
||||
tEncoderClear(&encoder);
|
||||
stError("s-task:0x%x vgId:%d encode run task msg failed, code:%s", taskId, vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (streamId != 0) {
|
||||
stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType,
|
||||
streamTaskGetExecType(execType));
|
||||
|
@ -97,13 +121,8 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
|||
stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
|
||||
}
|
||||
|
||||
pRunReq->head.vgId = vgId;
|
||||
pRunReq->streamId = streamId;
|
||||
pRunReq->taskId = taskId;
|
||||
pRunReq->reqType = execType;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||
int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
|
||||
code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "tstream.h"
|
||||
#include "ttimer.h"
|
||||
#include "wal.h"
|
||||
#include "streamMsg.h"
|
||||
|
||||
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
||||
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
|
||||
|
@ -1246,13 +1247,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
|
|||
taosMemoryFree(pInfo);
|
||||
}
|
||||
|
||||
//NOTE: clear the checkpoint id, and keep the failed id
|
||||
// NOTE: clear the checkpoint id, and keep the failed id
|
||||
// failedId for a task will increase as the checkpoint I.D. increases.
|
||||
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
|
||||
pInfo->activeId = 0;
|
||||
pInfo->transId = 0;
|
||||
pInfo->allUpstreamTriggerRecv = 0;
|
||||
pInfo->dispatchTrigger = false;
|
||||
// pInfo->failedId = 0;
|
||||
|
||||
taosArrayClear(pInfo->pDispatchTriggerList);
|
||||
taosArrayClear(pInfo->pCheckpointReadyRecvList);
|
||||
|
@ -1303,4 +1304,178 @@ void streamTaskFreeRefId(int64_t* pRefId) {
|
|||
}
|
||||
|
||||
metaRefMgtRemove(pRefId);
|
||||
}
|
||||
|
||||
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
|
||||
int32_t taskId = pTask->hTaskInfo.id.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
|
||||
taskId = pTask->streamTaskId.taskId;
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
|
||||
|
||||
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
|
||||
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||
int32_t taskId = 0;
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
|
||||
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
|
||||
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->hTaskInfo.id.taskId = taskId;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
|
||||
pTask->streamTaskId.taskId = taskId;
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
|
||||
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
|
||||
|
||||
int32_t epSz = -1;
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
|
||||
|
||||
if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
|
||||
if (pInfo == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
goto _exit;
|
||||
}
|
||||
if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
|
||||
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
|
@ -853,7 +853,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVLD_CHKPT, "Invalid checkpoint trigger msg")
|
||||
|
||||
// TDLite
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||
|
|
|
@ -53,6 +53,8 @@ if $rows != 5 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
|
||||
print =============== show vgroups2
|
||||
sql show d2.vgroups
|
||||
if $rows != 2 then
|
||||
|
@ -126,13 +128,14 @@ if $data12 != d2 then
|
|||
endi
|
||||
|
||||
if $data13 != leader then
|
||||
print expect leader , actual $13
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data14
|
||||
print $data15
|
||||
print $data14 , $data15
|
||||
|
||||
if $data16 != 1 then
|
||||
print expect 1, acutal $data16
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue