refactor checkpoint

This commit is contained in:
yihaoDeng 2023-06-16 17:28:50 +08:00
parent bd0168e562
commit b0a3a8d619
3 changed files with 188 additions and 171 deletions

View File

@ -137,48 +137,48 @@ typedef enum {
} EDndReason; } EDndReason;
typedef enum { typedef enum {
CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic
CONSUMER_UPDATE__ADD, CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE, CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST, CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER, CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic
} ECsmUpdateType; } ECsmUpdateType;
typedef struct { typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
ETrnConflct conflict; ETrnConflct conflict;
ETrnExec exec; ETrnExec exec;
EOperType oper; EOperType oper;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
void* rpcRsp; void* rpcRsp;
int32_t rpcRspLen; int32_t rpcRspLen;
int32_t redoActionPos; int32_t redoActionPos;
SArray* prepareActions; SArray* prepareActions;
SArray* redoActions; SArray* redoActions;
SArray* undoActions; SArray* undoActions;
SArray* commitActions; SArray* commitActions;
int64_t createdTime; int64_t createdTime;
int64_t lastExecTime; int64_t lastExecTime;
int32_t lastAction; int32_t lastAction;
int32_t lastErrorNo; int32_t lastErrorNo;
SEpSet lastEpset; SEpSet lastEpset;
tmsg_t lastMsgType; tmsg_t lastMsgType;
tmsg_t originRpcType; tmsg_t originRpcType;
char dbname[TSDB_TABLE_FNAME_LEN]; char dbname[TSDB_TABLE_FNAME_LEN];
char stbname[TSDB_TABLE_FNAME_LEN]; char stbname[TSDB_TABLE_FNAME_LEN];
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;
void* param; void* param;
char opername[TSDB_TRANS_OPER_LEN]; char opername[TSDB_TRANS_OPER_LEN];
SArray* pRpcArray; SArray* pRpcArray;
SRWLatch lockRpcArray; SRWLatch lockRpcArray;
int64_t mTraceId; int64_t mTraceId;
TdThreadMutex mutex; TdThreadMutex mutex;
} STrans; } STrans;
typedef struct { typedef struct {
@ -445,20 +445,20 @@ typedef struct {
} SStbObj; } SStbObj;
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int64_t createdTime; int64_t createdTime;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
int8_t align; int8_t align;
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
int64_t signature; int64_t signature;
int32_t commentSize; int32_t commentSize;
int32_t codeSize; int32_t codeSize;
char* pComment; char* pComment;
char* pCode; char* pCode;
int32_t funcVersion; int32_t funcVersion;
SRWLatch lock; SRWLatch lock;
} SFuncObj; } SFuncObj;
@ -552,11 +552,11 @@ typedef struct {
int64_t subscribeTime; int64_t subscribeTime;
int64_t rebalanceTime; int64_t rebalanceTime;
int8_t withTbName; int8_t withTbName;
int8_t useSnapshot; int8_t useSnapshot;
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
} SMqConsumerObj; } SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
@ -566,8 +566,8 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
// char* qmsg; // SubPlanToString // char* qmsg; // SubPlanToString
SEpSet epSet; SEpSet epSet;
} SMqVgEp; } SMqVgEp;
SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
@ -581,10 +581,10 @@ typedef struct {
SArray* offsetRows; // SArray<OffsetRows*> SArray* offsetRows; // SArray<OffsetRows*>
} SMqConsumerEp; } SMqConsumerEp;
//SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp); // SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
//void tDeleteSMqConsumerEp(void* pEp); // void tDeleteSMqConsumerEp(void* pEp);
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp); int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp, int8_t sver); void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp, int8_t sver);
typedef struct { typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
@ -598,7 +598,7 @@ typedef struct {
SArray* unassignedVgs; // SArray<SMqVgEp*> SArray* unassignedVgs; // SArray<SMqVgEp*>
SArray* offsetRows; SArray* offsetRows;
char dbName[TSDB_DB_FNAME_LEN]; char dbName[TSDB_DB_FNAME_LEN];
char* qmsg; // SubPlanToString char* qmsg; // SubPlanToString
} SMqSubscribeObj; } SMqSubscribeObj;
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
@ -675,7 +675,7 @@ typedef struct {
int64_t targetStbUid; int64_t targetStbUid;
// fixedSinkVg is not applicable for encode and decode // fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg; SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle int32_t fixedSinkVgId; // 0 for shuffle
// transformation // transformation
@ -691,18 +691,21 @@ typedef struct {
int64_t currentTick; // do not serialize int64_t currentTick; // do not serialize
int64_t deleteMark; int64_t deleteMark;
int8_t igCheckUpdate; int8_t igCheckUpdate;
// 3.0.5.
int64_t checkpointId;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
void tFreeStreamObj(SStreamObj* pObj); void tFreeStreamObj(SStreamObj* pObj);
//typedef struct { // typedef struct {
// char streamName[TSDB_STREAM_FNAME_LEN]; // char streamName[TSDB_STREAM_FNAME_LEN];
// int64_t uid; // int64_t uid;
// int64_t streamUid; // int64_t streamUid;
// SArray* childInfo; // SArray<SStreamChildEpInfo> // SArray* childInfo; // SArray<SStreamChildEpInfo>
//} SStreamCheckpointObj; // } SStreamCheckpointObj;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -80,6 +80,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1; if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1; if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1;
// 3.0.50
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
@ -150,6 +153,9 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
} }
} }
if (sver >= 3) {
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
}
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }
@ -187,14 +193,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL; if (pVgEpNew == NULL) return NULL;
pVgEpNew->vgId = pVgEp->vgId; pVgEpNew->vgId = pVgEp->vgId;
// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); // pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
pVgEpNew->epSet = pVgEp->epSet; pVgEpNew->epSet = pVgEp->epSet;
return pVgEpNew; return pVgEpNew;
} }
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
if (pVgEp) { if (pVgEp) {
// taosMemoryFreeClear(pVgEp->qmsg); // taosMemoryFreeClear(pVgEp->qmsg);
taosMemoryFree(pVgEp); taosMemoryFree(pVgEp);
} }
} }
@ -202,14 +208,14 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
// tlen += taosEncodeString(buf, pVgEp->qmsg); // tlen += taosEncodeString(buf, pVgEp->qmsg);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen; return tlen;
} }
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId); buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
if(sver == 1){ if (sver == 1) {
uint64_t size = 0; uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size); buf = taosDecodeVariantU64(buf, &size);
buf = POINTER_SHIFT(buf, size); buf = POINTER_SHIFT(buf, size);
@ -384,7 +390,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
taosArrayPush(pConsumer->assignedTopics, &topic); taosArrayPush(pConsumer->assignedTopics, &topic);
} }
if(sver > 1){ if (sver > 1) {
buf = taosDecodeFixedI8(buf, &pConsumer->withTbName); buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot); buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot);
buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit); buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
@ -394,18 +400,18 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
return (void *)buf; return (void *)buf;
} }
//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { // SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); // SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
// if (pConsumerEpNew == NULL) return NULL; // if (pConsumerEpNew == NULL) return NULL;
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; // pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); // pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
// return pConsumerEpNew; // return pConsumerEpNew;
//} // }
// //
//void tDeleteSMqConsumerEp(void *data) { // void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; // SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
// taosArrayDestroy(pConsumerEp->vgs); // taosArrayDestroy(pConsumerEp->vgs);
//} // }
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
int32_t tlen = 0; int32_t tlen = 0;
@ -413,7 +419,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows); int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs); tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows); tlen += taosEncodeFixedI64(buf, offRows->rows);
@ -427,28 +433,28 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
// do nothing // do nothing
} }
} }
//#if 0 // #if 0
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs); // int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
// tlen += taosEncodeFixedI32(buf, sz); // tlen += taosEncodeFixedI32(buf, sz);
// for (int32_t i = 0; i < sz; i++) { // for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); // SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
// tlen += tEncodeSMqVgEp(buf, pVgEp); // tlen += tEncodeSMqVgEp(buf, pVgEp);
// } // }
//#endif // #endif
return tlen; return tlen;
} }
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
if (sver > 1){ if (sver > 1) {
int32_t szVgs = 0; int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs); buf = taosDecodeFixedI32(buf, &szVgs);
if(szVgs > 0){ if (szVgs > 0) {
pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pConsumerEp->offsetRows) return NULL; if (NULL == pConsumerEp->offsetRows) return NULL;
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type); buf = taosDecodeFixedI8(buf, &offRows->offset.type);
@ -463,21 +469,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
} }
} }
} }
//#if 0 // #if 0
// int32_t sz; // int32_t sz;
// buf = taosDecodeFixedI32(buf, &sz); // buf = taosDecodeFixedI32(buf, &sz);
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); // pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
// for (int32_t i = 0; i < sz; i++) { // for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); // SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
// buf = tDecodeSMqVgEp(buf, pVgEp); // buf = tDecodeSMqVgEp(buf, pVgEp);
// taosArrayPush(pConsumerEp->vgs, &pVgEp); // taosArrayPush(pConsumerEp->vgs, &pVgEp);
// } // }
//#endif // #endif
return (void *)buf; return (void *)buf;
} }
SMqSubscribeObj *tNewSubscribeObj(const char* key) { SMqSubscribeObj *tNewSubscribeObj(const char *key) {
SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubObj == NULL) { if (pSubObj == NULL) {
return NULL; return NULL;
@ -570,7 +576,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t szVgs = taosArrayGetSize(pSub->offsetRows); int32_t szVgs = taosArrayGetSize(pSub->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs); tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j); OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows); tlen += taosEncodeFixedI64(buf, offRows->rows);
@ -610,14 +616,14 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
buf = taosDecodeStringTo(buf, pSub->dbName); buf = taosDecodeStringTo(buf, pSub->dbName);
if (sver > 1){ if (sver > 1) {
int32_t szVgs = 0; int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs); buf = taosDecodeFixedI32(buf, &szVgs);
if(szVgs > 0){ if (szVgs > 0) {
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pSub->offsetRows) return NULL; if (NULL == pSub->offsetRows) return NULL;
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1); OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type); buf = taosDecodeFixedI8(buf, &offRows->offset.type);
@ -636,65 +642,65 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
return (void *)buf; return (void *)buf;
} }
//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { // SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); // SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
// if (pEntryNew == NULL) return NULL; // if (pEntryNew == NULL) return NULL;
// pEntryNew->epoch = pEntry->epoch; // pEntryNew->epoch = pEntry->epoch;
// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp); // pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pEntryNew; // return pEntryNew;
//} // }
// //
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { // void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); // taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
//} // }
//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { // int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
// int32_t tlen = 0; // int32_t tlen = 0;
// tlen += taosEncodeFixedI32(buf, pEntry->epoch); // tlen += taosEncodeFixedI32(buf, pEntry->epoch);
// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry); // tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen; // return tlen;
//} // }
// //
//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) { // void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
// buf = taosDecodeFixedI32(buf, &pEntry->epoch); // buf = taosDecodeFixedI32(buf, &pEntry->epoch);
// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); // buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf; // return (void *)buf;
//} // }
//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { // SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); // SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
// if (pLogNew == NULL) return pLogNew; // if (pLogNew == NULL) return pLogNew;
// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); // memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp); // pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pLogNew; // return pLogNew;
//} // }
// //
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { // void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); // taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
//} // }
//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { // int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
// int32_t tlen = 0; // int32_t tlen = 0;
// tlen += taosEncodeString(buf, pLog->key); // tlen += taosEncodeString(buf, pLog->key);
// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry); // tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen; // return tlen;
//} // }
// //
//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { // void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
// buf = taosDecodeStringTo(buf, pLog->key); // buf = taosDecodeStringTo(buf, pLog->key);
// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); // buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf; // return (void *)buf;
//} // }
// //
//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) { // int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
// int32_t tlen = 0; // int32_t tlen = 0;
// tlen += taosEncodeString(buf, pOffset->key); // tlen += taosEncodeString(buf, pOffset->key);
// tlen += taosEncodeFixedI64(buf, pOffset->offset); // tlen += taosEncodeFixedI64(buf, pOffset->offset);
// return tlen; // return tlen;
//} // }
// //
//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { // void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
// buf = taosDecodeStringTo(buf, pOffset->key); // buf = taosDecodeStringTo(buf, pOffset->key);
// buf = taosDecodeFixedI64(buf, &pOffset->offset); // buf = taosDecodeFixedI64(buf, &pOffset->offset);
// return buf; // return buf;
//} // }

View File

@ -28,7 +28,7 @@
#include "parser.h" #include "parser.h"
#include "tname.h" #include "tname.h"
#define MND_STREAM_VER_NUMBER 2 #define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60 #define MND_STREAM_MAX_NUM 60
@ -147,7 +147,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
if (sver != 1 && sver != 2) { if (sver != 1 && sver != 2 && sver != 3) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
@ -946,6 +946,9 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
} }
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds, static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds,
int64_t checkpointId) { int64_t checkpointId) {
if (checkpointId == pStream->checkpointId) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
@ -955,7 +958,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
taosRLockLatch(&pStream->lock); atomic_store_64(&pStream->currentTick, 1);
taosWLockLatch(&pStream->lock);
// 1. redo action: broadcast checkpoint source msg for all source vg // 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks); int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) { for (int32_t i = 0; i < totLevel; i++) {
@ -1003,10 +1007,11 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
} }
// 2. reset tick // 2. reset tick
pStream->checkpointFreq = checkpointId; pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId;
atomic_store_64(&pStream->currentTick, 0); atomic_store_64(&pStream->currentTick, 0);
// 3. commit log: stream checkpoint info // 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1;
taosRUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
// // code condtion // // code condtion
@ -1051,6 +1056,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break; if (pIter == NULL) break;
code = mndProcessStreamCheckpointTrans(pMnode, pStream, vgIds, checkpointId); code = mndProcessStreamCheckpointTrans(pMnode, pStream, vgIds, checkpointId);
if (code == -1) {
mInfo("stream:%s failed to do checkpoint, reason: last checkpoint not finished", pStream->name);
}
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }
taosHashCleanup(vgIds); taosHashCleanup(vgIds);