diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3ec99f6e44..69d3de25fc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -165,68 +165,62 @@ void mndCleanupStream(SMnode *pMnode) { } SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { - int32_t code = 0; - int32_t lino = 0; - terrno = TSDB_CODE_OUT_OF_MEMORY; - - SSdbRow *pRow = NULL; + int32_t code = 0; + int32_t lino = 0; + SSdbRow * pRow = NULL; SStreamObj *pStream = NULL; - void *buf = NULL; + void * buf = NULL; int8_t sver = 0; + int32_t tlen; + int32_t dataPos = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) { - goto STREAM_DECODE_OVER; - } + code = sdbGetRawSoftVer(pRaw, &sver); + TSDB_CHECK_CODE(code, lino, _over); if (sver < 1 || sver > MND_STREAM_VER_NUMBER) { - terrno = 0; mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); - goto STREAM_DECODE_OVER; + goto _over; } pRow = sdbAllocRow(sizeof(SStreamObj)); - if (pRow == NULL) { - goto STREAM_DECODE_OVER; - } + TSDB_CHECK_NULL(pRow, code, lino, _over, terrno); pStream = sdbGetRowObj(pRow); - if (pStream == NULL) { - goto STREAM_DECODE_OVER; - } + TSDB_CHECK_NULL(pStream, code, lino, _over, terrno); - int32_t tlen; - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &tlen, _over); buf = taosMemoryMalloc(tlen + 1); - if (buf == NULL) { - goto STREAM_DECODE_OVER; - } + TSDB_CHECK_NULL(buf, code, lino, _over, terrno); - SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over); SDecoder decoder; tDecoderInit(&decoder, buf, tlen + 1); - if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) { - tDecoderClear(&decoder); - goto STREAM_DECODE_OVER; - } + code = tDecodeSStreamObj(&decoder, pStream, sver); tDecoderClear(&decoder); - terrno = TSDB_CODE_SUCCESS; - -STREAM_DECODE_OVER: - taosMemoryFreeClear(buf); - if (terrno != TSDB_CODE_SUCCESS) { - char *p = (pStream == NULL) ? "null" : pStream->name; - mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno)); - taosMemoryFreeClear(pRow); - return NULL; + if (code < 0) { + tFreeStreamObj(pStream); } - mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream, - pStream->checkpointId); - return pRow; +_over: + taosMemoryFreeClear(buf); + + if (code != TSDB_CODE_SUCCESS) { + char *p = (pStream == NULL) ? "null" : pStream->name; + mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino); + taosMemoryFreeClear(pRow); + + terrno = code; + return NULL; + } else { + mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream, + pStream->checkpointId); + + terrno = 0; + return pRow; + } } static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {