fix: compatility after update stream
This commit is contained in:
parent
3424b184d3
commit
ffb1d52e55
|
@ -642,7 +642,7 @@ typedef struct {
|
||||||
} SStreamObj;
|
} SStreamObj;
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
||||||
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
|
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
|
||||||
void tFreeStreamObj(SStreamObj* pObj);
|
void tFreeStreamObj(SStreamObj* pObj);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -83,7 +83,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||||
|
|
||||||
|
@ -143,8 +143,9 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||||
|
|
||||||
// 3.0.20
|
// 3.0.20
|
||||||
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
|
if (sver >= 2) {
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
|
||||||
|
}
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define MND_STREAM_VER_NUMBER 1
|
#define MND_STREAM_VER_NUMBER 2
|
||||||
#define MND_STREAM_RESERVE_SIZE 64
|
#define MND_STREAM_RESERVE_SIZE 64
|
||||||
|
|
||||||
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
||||||
|
@ -131,7 +131,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
|
||||||
|
|
||||||
if (sver != MND_STREAM_VER_NUMBER) {
|
if (sver != 1 && sver != 2) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto STREAM_DECODE_OVER;
|
goto STREAM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
@ -152,7 +152,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, buf, tlen + 1);
|
tDecoderInit(&decoder, buf, tlen + 1);
|
||||||
if (tDecodeSStreamObj(&decoder, pStream) < 0) {
|
if (tDecodeSStreamObj(&decoder, pStream, 2) < 0) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
goto STREAM_DECODE_OVER;
|
goto STREAM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue