Merge pull request #28184 from taosdata/fix/syntax

fix(stream): fix memory leak.
This commit is contained in:
Haojun Liao 2024-09-29 16:52:05 +08:00 committed by GitHub
commit 47b50fa5ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 34 additions and 40 deletions

View File

@ -165,68 +165,62 @@ void mndCleanupStream(SMnode *pMnode) {
} }
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY; SSdbRow * pRow = NULL;
SSdbRow *pRow = NULL;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *buf = NULL; void * buf = NULL;
int8_t sver = 0; int8_t sver = 0;
int32_t tlen;
int32_t dataPos = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) { code = sdbGetRawSoftVer(pRaw, &sver);
goto STREAM_DECODE_OVER; TSDB_CHECK_CODE(code, lino, _over);
}
if (sver < 1 || sver > MND_STREAM_VER_NUMBER) { if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
terrno = 0;
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
goto STREAM_DECODE_OVER; goto _over;
} }
pRow = sdbAllocRow(sizeof(SStreamObj)); pRow = sdbAllocRow(sizeof(SStreamObj));
if (pRow == NULL) { TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
goto STREAM_DECODE_OVER;
}
pStream = sdbGetRowObj(pRow); pStream = sdbGetRowObj(pRow);
if (pStream == NULL) { TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
goto STREAM_DECODE_OVER;
}
int32_t tlen; SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
buf = taosMemoryMalloc(tlen + 1); buf = taosMemoryMalloc(tlen + 1);
if (buf == NULL) { TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
goto STREAM_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, buf, tlen + 1); tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) { code = tDecodeSStreamObj(&decoder, pStream, sver);
tDecoderClear(&decoder);
goto STREAM_DECODE_OVER;
}
tDecoderClear(&decoder); tDecoderClear(&decoder);
terrno = TSDB_CODE_SUCCESS; if (code < 0) {
tFreeStreamObj(pStream);
STREAM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
char *p = (pStream == NULL) ? "null" : pStream->name;
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno));
taosMemoryFreeClear(pRow);
return NULL;
} }
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream, _over:
pStream->checkpointId); taosMemoryFreeClear(buf);
return pRow;
if (code != TSDB_CODE_SUCCESS) {
char *p = (pStream == NULL) ? "null" : pStream->name;
mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
taosMemoryFreeClear(pRow);
terrno = code;
return NULL;
} else {
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
pStream->checkpointId);
terrno = 0;
return pRow;
}
} }
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) { static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {