refactor: do some internal refactor.
This commit is contained in:
parent
75ff0ff649
commit
e79d8657bf
|
@ -165,69 +165,62 @@ void mndCleanupStream(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
SSdbRow *pRow = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSdbRow * pRow = NULL;
|
||||
SStreamObj *pStream = NULL;
|
||||
void *buf = NULL;
|
||||
void * buf = NULL;
|
||||
int8_t sver = 0;
|
||||
int32_t tlen;
|
||||
int32_t dataPos = 0;
|
||||
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
code = sdbGetRawSoftVer(pRaw, &sver);
|
||||
TSDB_CHECK_CODE(code, lino, _over);
|
||||
|
||||
if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
|
||||
terrno = 0;
|
||||
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
|
||||
goto STREAM_DECODE_OVER;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
pRow = sdbAllocRow(sizeof(SStreamObj));
|
||||
if (pRow == NULL) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
|
||||
|
||||
pStream = sdbGetRowObj(pRow);
|
||||
if (pStream == NULL) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
|
||||
|
||||
int32_t tlen;
|
||||
int32_t dataPos = 0;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
|
||||
|
||||
buf = taosMemoryMalloc(tlen + 1);
|
||||
if (buf == NULL) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
|
||||
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, buf, tlen + 1);
|
||||
if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
tFreeStreamObj(pStream);
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
|
||||
code = tDecodeSStreamObj(&decoder, pStream, sver);
|
||||
tDecoderClear(&decoder);
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
STREAM_DECODE_OVER:
|
||||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
char *p = (pStream == NULL) ? "null" : pStream->name;
|
||||
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno));
|
||||
taosMemoryFreeClear(pRow);
|
||||
return NULL;
|
||||
if (code < 0) {
|
||||
tFreeStreamObj(pStream);
|
||||
}
|
||||
|
||||
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
|
||||
pStream->checkpointId);
|
||||
return pRow;
|
||||
_over:
|
||||
taosMemoryFreeClear(buf);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
char *p = (pStream == NULL) ? "null" : pStream->name;
|
||||
mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
|
||||
taosMemoryFreeClear(pRow);
|
||||
|
||||
terrno = code;
|
||||
return NULL;
|
||||
} else {
|
||||
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
|
||||
pStream->checkpointId);
|
||||
|
||||
terrno = 0;
|
||||
return pRow;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
|
||||
|
|
Loading…
Reference in New Issue