From 05ca71d5dead0dbd8bddb5a542dfa4c74fe48a8d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 09:28:43 +0800 Subject: [PATCH] stream change ver --- include/libs/stream/tstream.h | 10 ++-- source/dnode/mnode/impl/inc/mndDef.h | 62 +++++++++++++------------ source/dnode/mnode/impl/src/mndDef.c | 4 ++ source/dnode/mnode/impl/src/mndStream.c | 29 ++++++------ source/libs/stream/src/streamTask.c | 4 +- 5 files changed, 61 insertions(+), 48 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1b3960bdba..ab9f606fe3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,7 +31,7 @@ extern "C" { typedef struct SStreamTask SStreamTask; -#define SSTREAM_TASK_VER 1 +#define SSTREAM_TASK_VER 2 enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, @@ -321,7 +321,7 @@ typedef struct { struct SStreamTask { int64_t ver; - SStreamTaskId id; + SStreamTaskId id; SSTaskBasicInfo info; STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; @@ -329,8 +329,8 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SHistDataRange dataRange; - SStreamTaskId historyTaskId; - SStreamTaskId streamTaskId; + SStreamTaskId historyTaskId; + SStreamTaskId streamTaskId; int32_t nextCheckId; SArray* checkpointInfo; // SArray STaskTimestamp tsInfo; @@ -371,6 +371,8 @@ struct SStreamTask { int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; + + char reserve[256]; }; typedef struct SMetaHbInfo { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c4c0ea238d..1bf13c8fb5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -615,25 +615,25 @@ void tDeleteSubscribeObj(SMqSubscribeObj* pSub); int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub); void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver); -//typedef struct { -// int32_t epoch; -// SArray* consumers; // SArray -//} SMqSubActionLogEntry; +// typedef struct { +// int32_t epoch; +// SArray* consumers; // SArray +// } SMqSubActionLogEntry; -//SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); -//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); -//int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry); -//void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry); +// SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); +// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); +// int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry); +// void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry); // -//typedef struct { -// char key[TSDB_SUBSCRIBE_KEY_LEN]; -// SArray* logs; // SArray -//} SMqSubActionLogObj; +// typedef struct { +// char key[TSDB_SUBSCRIBE_KEY_LEN]; +// SArray* logs; // SArray +// } SMqSubActionLogObj; // -//SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog); -//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog); -//int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog); -//void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); +// SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog); +// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog); +// int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog); +// void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); typedef struct { int32_t oldConsumerNum; @@ -647,12 +647,12 @@ typedef struct { } SMqRebOutputVg; typedef struct { - SArray* rebVgs; // SArray - SArray* newConsumers; // SArray - SArray* removedConsumers; // SArray - SArray* modifyConsumers; // SArray - SMqSubscribeObj* pSub; -// SMqSubActionLogEntry* pLogEntry; + SArray* rebVgs; // SArray + SArray* newConsumers; // SArray + SArray* removedConsumers; // SArray + SArray* modifyConsumers; // SArray + SMqSubscribeObj* pSub; + // SMqSubActionLogEntry* pLogEntry; } SMqRebOutputObj; typedef struct SStreamConf { @@ -674,8 +674,8 @@ typedef struct { int32_t totalLevel; int64_t smaId; // 0 for unused // info - int64_t uid; - int8_t status; + int64_t uid; + int8_t status; SStreamConf conf; // source and target int64_t sourceDbUid; @@ -690,13 +690,13 @@ typedef struct { int32_t fixedSinkVgId; // 0 for shuffle // transformation - char* sql; - char* ast; - char* physicalPlan; - SArray* tasks; // SArray> + char* sql; + char* ast; + char* physicalPlan; + SArray* tasks; // SArray> - SArray* pHTasksList; // generate the results for already stored ts data - int64_t hTaskUid; // stream task for history ts data + SArray* pHTasksList; // generate the results for already stored ts data + int64_t hTaskUid; // stream task for history ts data SSchemaWrapper outputSchema; SSchemaWrapper tagSchema; @@ -709,6 +709,8 @@ typedef struct { // 3.0.5. int64_t checkpointId; + char reserve[256]; + } SStreamObj; int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6bf4015852..a5c768a018 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -84,6 +84,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { // 3.0.50 ver = 3 if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve)) < 0) return -1; + tEndEncode(pEncoder); return pEncoder->pos; } @@ -157,6 +159,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { if (sver >= 3) { if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1; } + if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1; + tEndDecode(pDecoder); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8f4d52556e..46eb0d9957 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -14,6 +14,7 @@ */ #include "mndStream.h" +#include "audit.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -28,9 +29,8 @@ #include "parser.h" #include "tmisce.h" #include "tname.h" -#include "audit.h" -#define MND_STREAM_VER_NUMBER 3 +#define MND_STREAM_VER_NUMBER 4 #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_MAX_NUM 60 #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" @@ -874,15 +874,18 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[2000] = {0}; - sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 ", " + sprintf(detail, + "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 + ", " "fillHistory:%d, igExists:%d, " - "igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", " - "maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, " + "igExpired:%d, igUpdate:%d, lastTs:%" PRId64 + ", " + "maxDelay:%" PRId64 + ", numOfTags:%d, sourceDB:%s, " "targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, - createStreamReq.fillHistory, createStreamReq.igExists, - createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs, - createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, + createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, + createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark); auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", detail); @@ -2301,12 +2304,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } - for(int32_t i = 0; i < req.numOfTasks; ++i) { - STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + for (int32_t i = 0; i < req.numOfTasks; ++i) { + STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); - STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 01dcb435c0..dc8c509f1e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -134,6 +134,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve)) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -245,6 +246,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; tEndDecode(pDecoder); return 0; @@ -483,7 +485,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { pTask->status.taskStatus = TASK_STATUS__STOP; qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); - while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */!streamTaskIsIdle(pTask)) { + while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) { qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel); taosMsleep(100); }