From ab9490101f363bf00c9de2a741a7eefe4f47fa6a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 22 Aug 2023 18:04:04 +0800 Subject: [PATCH] Merge branch 'enh/3.0' into enh/triggerCheckPoint2 --- source/libs/stream/src/streamMeta.c | 6 +++--- source/libs/stream/src/streamTask.c | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ac2f7e02df..71ec7617e3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -94,7 +94,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); - pMeta->chkpCap = 2; + pMeta->chkpCap = 8; taosInitRWLatch(&pMeta->chkpDirLock); int64_t chkpId = streamGetLatestCheckpointId(pMeta); @@ -574,7 +574,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; - for(int32_t i = 0; i < pReq->numOfTasks; ++i) { + for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1; if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1; @@ -590,7 +590,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); - for(int32_t i = 0; i < pReq->numOfTasks; ++i) { + for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry hb = {0}; if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 029a71ed7b..50cf74bcf0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -165,6 +165,7 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) if (tDecodeI32(pDecoder, &skip32) < 0) return -1; if (tDecodeI32(pDecoder, &skip32) < 0) return -1; if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1; if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;