From 9ef69dd455e2df379bd7043cd37e97f5cf6241a0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Aug 2023 10:27:47 +0800 Subject: [PATCH 1/2] refactor checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cdcd482442..37b4f45df6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -871,14 +871,14 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t code = -1; SArray* refs = taosArrayInit(16, sizeof(int64_t)); - SArray* pCf = taosArrayInit(16, POINTER_BYTES); rocksdb_column_family_handle_t** ppCf = NULL; char* pChkpDir = NULL; char* pChkpIdDir = NULL; if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { - goto _ERROR; + taosArrayDestroy(refs); + return code; } SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); @@ -886,6 +886,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { goto _ERROR; } + // Get all cf and acquire cfWrappter int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); @@ -901,7 +902,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } else { qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } - + // release all ref to cfWrapper; for (int i = 0; i < taosArrayGetSize(refs); i++) { int64_t id = *(int64_t*)taosArrayGet(refs, i); taosReleaseRef(streamBackendCfWrapperId, id); From d934d91fb178cffcba248a905416eb4cf9d93c16 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Aug 2023 10:34:08 +0800 Subject: [PATCH 2/2] refactor checkpoint --- source/dnode/mnode/impl/src/mndStream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7eb0c7b9aa..188acabcda 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1830,7 +1830,7 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_ // todo extract method: traverse stream tasks // build trans to update the epset -static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) { +static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, "stream-task-update"); if (pTrans == NULL) { mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); @@ -2100,6 +2100,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } typedef struct SMStreamNodeCheckMsg { + int8_t holder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {