stream change ver

This commit is contained in:
yihaoDeng 2023-08-31 11:42:12 +08:00
parent 05ca71d5de
commit 83a5e2be4c
3 changed files with 38 additions and 39 deletions

View File

@ -371,8 +371,7 @@ struct SStreamTask {
int32_t transferStateAlignCnt;
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
char reserve[256];
char reserve[256];
};
typedef struct SMetaHbInfo {

View File

@ -84,7 +84,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.50 ver = 3
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve)) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;

View File

@ -134,47 +134,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve)) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t ver;
int64_t skip64;
int8_t skip8;
int32_t skip32;
int16_t skip16;
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
@ -252,6 +217,41 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
return 0;
}
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t ver;
int64_t skip64;
int8_t skip8;
int32_t skip32;
int16_t skip16;
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
static void freeItem(void* p) {
SStreamContinueExecInfo* pInfo = p;
rpcFreeCont(pInfo->msg.pCont);