diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c80f46f528..2996863e97 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1884,7 +1884,7 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_ // todo extract method: traverse stream tasks // build trans to update the epset -static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) { +static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, "stream-task-update"); if (pTrans == NULL) { mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); @@ -2154,6 +2154,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } typedef struct SMStreamNodeCheckMsg { + int8_t holder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cdcd482442..37b4f45df6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -871,14 +871,14 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t code = -1; SArray* refs = taosArrayInit(16, sizeof(int64_t)); - SArray* pCf = taosArrayInit(16, POINTER_BYTES); rocksdb_column_family_handle_t** ppCf = NULL; char* pChkpDir = NULL; char* pChkpIdDir = NULL; if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { - goto _ERROR; + taosArrayDestroy(refs); + return code; } SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); @@ -886,6 +886,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { goto _ERROR; } + // Get all cf and acquire cfWrappter int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); @@ -901,7 +902,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } else { qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } - + // release all ref to cfWrapper; for (int i = 0; i < taosArrayGetSize(refs); i++) { int64_t id = *(int64_t*)taosArrayGet(refs, i); taosReleaseRef(streamBackendCfWrapperId, id);