stream change ver
This commit is contained in:
parent
68e4a72021
commit
05ca71d5de
|
@ -31,7 +31,7 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
|
|
||||||
#define SSTREAM_TASK_VER 1
|
#define SSTREAM_TASK_VER 2
|
||||||
enum {
|
enum {
|
||||||
STREAM_STATUS__NORMAL = 0,
|
STREAM_STATUS__NORMAL = 0,
|
||||||
STREAM_STATUS__STOP,
|
STREAM_STATUS__STOP,
|
||||||
|
@ -371,6 +371,8 @@ struct SStreamTask {
|
||||||
int32_t transferStateAlignCnt;
|
int32_t transferStateAlignCnt;
|
||||||
struct SStreamMeta* pMeta;
|
struct SStreamMeta* pMeta;
|
||||||
SSHashObj* pNameMap;
|
SSHashObj* pNameMap;
|
||||||
|
|
||||||
|
char reserve[256];
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SMetaHbInfo {
|
typedef struct SMetaHbInfo {
|
||||||
|
|
|
@ -615,25 +615,25 @@ void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
|
||||||
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
|
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
|
||||||
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);
|
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);
|
||||||
|
|
||||||
//typedef struct {
|
// typedef struct {
|
||||||
// int32_t epoch;
|
// int32_t epoch;
|
||||||
// SArray* consumers; // SArray<SMqConsumerEp*>
|
// SArray* consumers; // SArray<SMqConsumerEp*>
|
||||||
//} SMqSubActionLogEntry;
|
// } SMqSubActionLogEntry;
|
||||||
|
|
||||||
//SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
|
// SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
|
||||||
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
|
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
|
||||||
//int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
|
// int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
|
||||||
//void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
|
// void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
|
||||||
//
|
//
|
||||||
//typedef struct {
|
// typedef struct {
|
||||||
// char key[TSDB_SUBSCRIBE_KEY_LEN];
|
// char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
// SArray* logs; // SArray<SMqSubActionLogEntry*>
|
// SArray* logs; // SArray<SMqSubActionLogEntry*>
|
||||||
//} SMqSubActionLogObj;
|
// } SMqSubActionLogObj;
|
||||||
//
|
//
|
||||||
//SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
|
// SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
|
||||||
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
|
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
|
||||||
//int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
|
// int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
|
||||||
//void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
|
// void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t oldConsumerNum;
|
int32_t oldConsumerNum;
|
||||||
|
@ -652,7 +652,7 @@ typedef struct {
|
||||||
SArray* removedConsumers; // SArray<int64_t>
|
SArray* removedConsumers; // SArray<int64_t>
|
||||||
SArray* modifyConsumers; // SArray<int64_t>
|
SArray* modifyConsumers; // SArray<int64_t>
|
||||||
SMqSubscribeObj* pSub;
|
SMqSubscribeObj* pSub;
|
||||||
// SMqSubActionLogEntry* pLogEntry;
|
// SMqSubActionLogEntry* pLogEntry;
|
||||||
} SMqRebOutputObj;
|
} SMqRebOutputObj;
|
||||||
|
|
||||||
typedef struct SStreamConf {
|
typedef struct SStreamConf {
|
||||||
|
@ -709,6 +709,8 @@ typedef struct {
|
||||||
|
|
||||||
// 3.0.5.
|
// 3.0.5.
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
|
char reserve[256];
|
||||||
|
|
||||||
} SStreamObj;
|
} SStreamObj;
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
||||||
|
|
|
@ -84,6 +84,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
// 3.0.50 ver = 3
|
// 3.0.50 ver = 3
|
||||||
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve)) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
@ -157,6 +159,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
|
||||||
if (sver >= 3) {
|
if (sver >= 3) {
|
||||||
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "mndStream.h"
|
#include "mndStream.h"
|
||||||
|
#include "audit.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
@ -28,9 +29,8 @@
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
#include "audit.h"
|
|
||||||
|
|
||||||
#define MND_STREAM_VER_NUMBER 3
|
#define MND_STREAM_VER_NUMBER 4
|
||||||
#define MND_STREAM_RESERVE_SIZE 64
|
#define MND_STREAM_RESERVE_SIZE 64
|
||||||
#define MND_STREAM_MAX_NUM 60
|
#define MND_STREAM_MAX_NUM 60
|
||||||
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
||||||
|
@ -874,15 +874,18 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
char detail[2000] = {0};
|
char detail[2000] = {0};
|
||||||
sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 ", "
|
sprintf(detail,
|
||||||
|
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
|
||||||
|
", "
|
||||||
"fillHistory:%d, igExists:%d, "
|
"fillHistory:%d, igExists:%d, "
|
||||||
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", "
|
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64
|
||||||
"maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, "
|
", "
|
||||||
|
"maxDelay:%" PRId64
|
||||||
|
", numOfTags:%d, sourceDB:%s, "
|
||||||
"targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
|
"targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
|
||||||
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
|
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
|
||||||
createStreamReq.fillHistory, createStreamReq.igExists,
|
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
|
||||||
createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs,
|
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
|
||||||
createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
|
|
||||||
createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark);
|
createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", detail);
|
auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", detail);
|
||||||
|
@ -2301,12 +2304,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
doExtractTasksFromStream(pMnode);
|
doExtractTasksFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
int64_t k[2] = {p->streamId, p->taskId};
|
int64_t k[2] = {p->streamId, p->taskId};
|
||||||
int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
||||||
|
|
||||||
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
|
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
|
||||||
pStatusEntry->status = p->status;
|
pStatusEntry->status = p->status;
|
||||||
if (p->status != TASK_STATUS__NORMAL) {
|
if (p->status != TASK_STATUS__NORMAL) {
|
||||||
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));
|
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));
|
||||||
|
|
|
@ -134,6 +134,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
|
||||||
|
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve)) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
|
@ -245,6 +246,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -483,7 +485,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
pTask->status.taskStatus = TASK_STATUS__STOP;
|
pTask->status.taskStatus = TASK_STATUS__STOP;
|
||||||
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */!streamTaskIsIdle(pTask)) {
|
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
|
||||||
qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
|
qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue