diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index f5cdfeadad..b7e92d2e65 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1411,7 +1411,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { uError("smlBuildOutput failed"); - return code; + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1496,7 +1496,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { uError("smlBuildOutput failed"); - return code; + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); diff --git a/source/dnode/vnode/src/tq/tqHandleSnapshot.c b/source/dnode/vnode/src/tq/tqHandleSnapshot.c index 7d3e2f7837..38f9dcc57a 100644 --- a/source/dnode/vnode/src/tq/tqHandleSnapshot.c +++ b/source/dnode/vnode/src/tq/tqHandleSnapshot.c @@ -178,17 +178,14 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tDecodeSTqHandle(pDecoder, &handle); - if (code) goto _err; + if (code) goto end; taosWLockLatch(&pTq->lock); code = tqMetaSaveHandle(pTq, handle.subKey, &handle); taosWUnLockLatch(&pTq->lock); - if (code < 0) goto _err; - tDecoderClear(pDecoder); - return code; - -_err: +end: tDecoderClear(pDecoder); - tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqDestroyTqHandle(&handle); + tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code); return code; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index bea63fccb9..4c403dc18f 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -198,58 +198,51 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t code; int32_t vlen; + void* buf = NULL; + SEncoder encoder; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); if (code < 0) { - return -1; + goto end; } tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); - void* buf = taosMemoryCalloc(1, vlen); + buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; } - SEncoder encoder; + tEncoderInit(&encoder, buf, vlen); - if (tEncodeSTqHandle(&encoder, pHandle) < 0) { - tEncoderClear(&encoder); - taosMemoryFree(buf); - return -1; + code = tEncodeSTqHandle(&encoder, pHandle); + if (code < 0) { + goto end; } - TXN* txn; - - if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - tEncoderClear(&encoder); - taosMemoryFree(buf); - return -1; + TXN* txn = NULL; + code = tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (code < 0) { + goto end; } - if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) { - tEncoderClear(&encoder); - taosMemoryFree(buf); - return -1; + code = tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn); + if (code < 0) { + goto end; } - if (tdbCommit(pTq->pMetaDB, txn) < 0) { - tEncoderClear(&encoder); - taosMemoryFree(buf); - return -1; - } - - if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - tEncoderClear(&encoder); - taosMemoryFree(buf); - return -1; + code = tdbCommit(pTq->pMetaDB, txn); + if (code < 0) { + goto end; } + code = tdbPostCommit(pTq->pMetaDB, txn); +end: tEncoderClear(&encoder); taosMemoryFree(buf); - return 0; + return code; } int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { @@ -349,15 +342,18 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ int32_t vgId = TD_VID(pTq->pVnode); SDecoder decoder; + int32_t code = 0; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, handle); - tDecoderClear(&decoder); - - if(buildHandle(pTq, handle) < 0){ - return -1; - } + code = tDecodeSTqHandle(&decoder, handle); + if (code) goto end; + code = buildHandle(pTq, handle); + if (code) goto end; tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId); - return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); + code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); + +end: + tDecoderClear(&decoder); + return code; } int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 652ef7cde7..70371c4add 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -238,6 +238,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { return -1; } } + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) {