refactor return code

This commit is contained in:
Hongze Cheng 2024-07-19 13:19:01 +08:00
parent 90e4d16fcc
commit ffe2973ade
5 changed files with 175 additions and 134 deletions

View File

@ -226,7 +226,7 @@ void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem); int32_t (*tItemCmprFn)(const void *, const void *), void *pItem);
int32_t tPutMapData(uint8_t *p, SMapData *pMapData); int32_t tPutMapData(uint8_t *p, SMapData *pMapData);
int32_t tGetMapData(uint8_t *p, SMapData *pMapData); int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int32_t *decodedSize);
int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *),
SArray **ppArray); SArray **ppArray);
// other // other
@ -706,7 +706,6 @@ typedef TARRAY2(STsdbFSetPartition *) STsdbFSetPartList;
STsdbFSetPartList *tsdbFSetPartListCreate(); STsdbFSetPartList *tsdbFSetPartListCreate();
void tsdbFSetPartListDestroy(STsdbFSetPartList **ppList); void tsdbFSetPartListDestroy(STsdbFSetPartList **ppList);
int32_t tSerializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList);
int32_t tDeserializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList); int32_t tDeserializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList);
int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList *pList, TFileSetRangeArray **ppRanges); int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList *pList, TFileSetRangeArray **ppRanges);

View File

@ -803,11 +803,9 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m
if (code) goto _err; if (code) goto _err;
// decode // decode
int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk); int32_t n;
if (n < 0) { code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
code = TSDB_CODE_OUT_OF_MEMORY; if (code) goto _err;
goto _err;
}
ASSERT(n == size); ASSERT(n == size);
return code; return code;

View File

@ -81,7 +81,7 @@ static int32_t tsdbFTypeToFRangeType(tsdb_ftype_t ftype) {
static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition** ppSP) { static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition** ppSP) {
STsdbFSetPartition* p = tsdbFSetPartitionCreate(); STsdbFSetPartition* p = tsdbFSetPartitionCreate();
if (p == NULL) { if (p == NULL) {
goto _err; return terrno;
} }
p->fid = fset->fid; p->fid = fset->fid;
@ -134,10 +134,6 @@ static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition**
} }
ppSP[0] = p; ppSP[0] = p;
return 0; return 0;
_err:
tsdbFSetPartitionClear(&p);
return -1;
} }
// fset partition list // fset partition list
@ -160,9 +156,11 @@ void tsdbFSetPartListDestroy(STsdbFSetPartList** ppList) {
} }
int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray** ppRanges) { int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray** ppRanges) {
int32_t code = 0;
TFileSetRangeArray* pDiff = taosMemoryCalloc(1, sizeof(TFileSetRangeArray)); TFileSetRangeArray* pDiff = taosMemoryCalloc(1, sizeof(TFileSetRangeArray));
if (pDiff == NULL) { if (pDiff == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
TARRAY2_INIT(pDiff); TARRAY2_INIT(pDiff);
@ -171,7 +169,7 @@ int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray
TARRAY2_FOREACH(pList, part) { TARRAY2_FOREACH(pList, part) {
STFileSetRange* r = taosMemoryCalloc(1, sizeof(STFileSetRange)); STFileSetRange* r = taosMemoryCalloc(1, sizeof(STFileSetRange));
if (r == NULL) { if (r == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
int64_t maxVerValid = -1; int64_t maxVerValid = -1;
@ -202,7 +200,7 @@ _err:
if (pDiff) { if (pDiff) {
tsdbTFileSetRangeArrayDestroy(&pDiff); tsdbTFileSetRangeArrayDestroy(&pDiff);
} }
return -1; return code;
} }
// serialization // serialization
@ -235,96 +233,103 @@ int32_t tTsdbFSetPartListDataLenCalc(STsdbFSetPartList* pList) {
return datLen; return datLen;
} }
int32_t tSerializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList) { static int32_t tSerializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList, int32_t* encodeSize) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
int8_t reserved8 = 0; int8_t reserved8 = 0;
int16_t reserved16 = 0; int16_t reserved16 = 0;
int64_t reserved64 = 0; int64_t reserved64 = 0;
int8_t msgVer = TSDB_SNAP_MSG_VER; int8_t msgVer = TSDB_SNAP_MSG_VER;
int32_t len = TARRAY2_SIZE(pList); int32_t len = TARRAY2_SIZE(pList);
int32_t code = 0;
if (tStartEncode(&encoder) < 0) goto _err; tEncoderInit(&encoder, buf, bufLen);
if (tEncodeI8(&encoder, msgVer) < 0) goto _err; if ((code = tStartEncode(&encoder))) goto _exit;
if (tEncodeI32(&encoder, len) < 0) goto _err; if ((code = tEncodeI8(&encoder, msgVer))) goto _exit;
if ((code = tEncodeI32(&encoder, len))) goto _exit;
for (int32_t u = 0; u < len; u++) { for (int32_t u = 0; u < len; u++) {
STsdbFSetPartition* p = TARRAY2_GET(pList, u); STsdbFSetPartition* p = TARRAY2_GET(pList, u);
if (tEncodeI64(&encoder, p->fid) < 0) goto _err; if ((code = tEncodeI64(&encoder, p->fid))) goto _exit;
if (tEncodeI8(&encoder, p->stat) < 0) goto _err; if ((code = tEncodeI8(&encoder, p->stat))) goto _exit;
if (tEncodeI8(&encoder, reserved8) < 0) goto _err; if ((code = tEncodeI8(&encoder, reserved8))) goto _exit;
if (tEncodeI16(&encoder, reserved16) < 0) goto _err; if ((code = tEncodeI16(&encoder, reserved16))) goto _exit;
int32_t typMax = TSDB_FSET_RANGE_TYP_MAX; int32_t typMax = TSDB_FSET_RANGE_TYP_MAX;
if (tEncodeI32(&encoder, typMax) < 0) goto _err; if ((code = tEncodeI32(&encoder, typMax))) goto _exit;
for (int32_t i = 0; i < typMax; i++) { for (int32_t i = 0; i < typMax; i++) {
SVerRangeList* iList = &p->verRanges[i]; SVerRangeList* iList = &p->verRanges[i];
int32_t iLen = TARRAY2_SIZE(iList); int32_t iLen = TARRAY2_SIZE(iList);
if (tEncodeI32(&encoder, iLen) < 0) goto _err; if ((code = tEncodeI32(&encoder, iLen))) goto _exit;
for (int32_t j = 0; j < iLen; j++) { for (int32_t j = 0; j < iLen; j++) {
SVersionRange r = TARRAY2_GET(iList, j); SVersionRange r = TARRAY2_GET(iList, j);
if (tEncodeI64(&encoder, r.minVer) < 0) goto _err; if ((code = tEncodeI64(&encoder, r.minVer))) goto _exit;
if (tEncodeI64(&encoder, r.maxVer) < 0) goto _err; if ((code = tEncodeI64(&encoder, r.maxVer))) goto _exit;
if (tEncodeI64(&encoder, reserved64) < 0) goto _err; if ((code = tEncodeI64(&encoder, reserved64))) goto _exit;
} }
} }
} }
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
_err: if (encodeSize) {
encodeSize[0] = encoder.pos;
}
_exit:
tEncoderClear(&encoder); tEncoderClear(&encoder);
return -1; return code;
} }
int32_t tDeserializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList) { int32_t tDeserializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
int8_t reserved8 = 0; int8_t reserved8 = 0;
int16_t reserved16 = 0; int16_t reserved16 = 0;
int64_t reserved64 = 0; int64_t reserved64 = 0;
int32_t code = 0;
STsdbFSetPartition* p = NULL; STsdbFSetPartition* p = NULL;
tDecoderInit(&decoder, buf, bufLen);
int8_t msgVer = 0; int8_t msgVer = 0;
int32_t len = 0; int32_t len = 0;
if (tStartDecode(&decoder) < 0) goto _err; if ((code = tStartDecode(&decoder))) goto _err;
if (tDecodeI8(&decoder, &msgVer) < 0) goto _err; if ((code = tDecodeI8(&decoder, &msgVer))) goto _err;
if (msgVer != TSDB_SNAP_MSG_VER) goto _err; if (msgVer != TSDB_SNAP_MSG_VER) {
if (tDecodeI32(&decoder, &len) < 0) goto _err; code = TSDB_CODE_INVALID_MSG;
goto _err;
}
if ((code = tDecodeI32(&decoder, &len))) goto _err;
for (int32_t u = 0; u < len; u++) { for (int32_t u = 0; u < len; u++) {
p = tsdbFSetPartitionCreate(); p = tsdbFSetPartitionCreate();
if (p == NULL) goto _err; if (p == NULL) {
if (tDecodeI64(&decoder, &p->fid) < 0) goto _err; code = terrno;
if (tDecodeI8(&decoder, &p->stat) < 0) goto _err; goto _err;
if (tDecodeI8(&decoder, &reserved8) < 0) goto _err; }
if (tDecodeI16(&decoder, &reserved16) < 0) goto _err;
if ((code = tDecodeI64(&decoder, &p->fid))) goto _err;
if ((code = tDecodeI8(&decoder, &p->stat))) goto _err;
if ((code = tDecodeI8(&decoder, &reserved8))) goto _err;
if ((code = tDecodeI16(&decoder, &reserved16))) goto _err;
int32_t typMax = 0; int32_t typMax = 0;
if (tDecodeI32(&decoder, &typMax) < 0) goto _err; if ((code = tDecodeI32(&decoder, &typMax))) goto _err;
for (int32_t i = 0; i < typMax; i++) { for (int32_t i = 0; i < typMax; i++) {
SVerRangeList* iList = &p->verRanges[i]; SVerRangeList* iList = &p->verRanges[i];
int32_t iLen = 0; int32_t iLen = 0;
if (tDecodeI32(&decoder, &iLen) < 0) goto _err; if ((code = tDecodeI32(&decoder, &iLen))) goto _err;
for (int32_t j = 0; j < iLen; j++) { for (int32_t j = 0; j < iLen; j++) {
SVersionRange r = {0}; SVersionRange r = {0};
if (tDecodeI64(&decoder, &r.minVer) < 0) goto _err; if ((code = tDecodeI64(&decoder, &r.minVer))) goto _err;
if (tDecodeI64(&decoder, &r.maxVer) < 0) goto _err; if ((code = tDecodeI64(&decoder, &r.maxVer))) goto _err;
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; if ((code = tDecodeI64(&decoder, &reserved64))) goto _err;
if (TARRAY2_APPEND(iList, r)) goto _err; if ((code = TARRAY2_APPEND(iList, r))) goto _err;
} }
} }
if (TARRAY2_APPEND(pList, p)) goto _err; if ((code = TARRAY2_APPEND(pList, p))) goto _err;
p = NULL; p = NULL;
} }
@ -337,7 +342,7 @@ _err:
tsdbFSetPartitionClear(&p); tsdbFSetPartitionClear(&p);
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
return -1; return code;
} }
// fs state // fs state
@ -352,8 +357,9 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) {
STFileSet* fset; STFileSet* fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
STsdbFSetPartition* pItem = NULL; STsdbFSetPartition* pItem = NULL;
if (tsdbTFileSetToFSetPartition(fset, &pItem) < 0) { code = tsdbTFileSetToFSetPartition(fset, &pItem);
code = -1; if (code) {
terrno = code;
break; break;
} }
ASSERT(pItem != NULL); ASSERT(pItem != NULL);
@ -404,7 +410,9 @@ static int32_t tsdbPartitionInfoInit(SVnode* pVnode, STsdbPartitionInfo* pInfo)
for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) { for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) {
STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, j); STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, j);
pInfo->pLists[j] = tsdbSnapGetFSetPartList(pTsdb->pFS); pInfo->pLists[j] = tsdbSnapGetFSetPartList(pTsdb->pFS);
if (pInfo->pLists[j] == NULL) return -1; if (pInfo->pLists[j] == NULL) {
return terrno;
}
} }
return 0; return 0;
} }
@ -432,9 +440,10 @@ static int32_t tsdbPartitionInfoSerialize(STsdbPartitionInfo* pInfo, uint8_t* bu
SSyncTLV* pSubHead = (void*)((char*)buf + offset); SSyncTLV* pSubHead = (void*)((char*)buf + offset);
int32_t valOffset = offset + sizeof(*pSubHead); int32_t valOffset = offset + sizeof(*pSubHead);
ASSERT(pSubHead->val == (char*)buf + valOffset); ASSERT(pSubHead->val == (char*)buf + valOffset);
if ((tlen = tSerializeTsdbFSetPartList(pSubHead->val, bufLen - valOffset, pInfo->pLists[j])) < 0) { int32_t code = tSerializeTsdbFSetPartList(pSubHead->val, bufLen - valOffset, pInfo->pLists[j], &tlen);
if (code) {
tsdbError("vgId:%d, failed to serialize fset partition list of tsdb %d since %s", pInfo->vgId, j, terrstr()); tsdbError("vgId:%d, failed to serialize fset partition list of tsdb %d since %s", pInfo->vgId, j, terrstr());
return -1; return code;
} }
pSubHead->typ = pInfo->subTyps[j]; pSubHead->typ = pInfo->subTyps[j];
pSubHead->len = tlen; pSubHead->len = tlen;
@ -460,17 +469,18 @@ static int32_t tTsdbRepOptsDataLenCalc(STsdbRepOpts* pInfo) {
} }
int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) { int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
int32_t code = 0;
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
int64_t reserved64 = 0; int64_t reserved64 = 0;
int8_t msgVer = TSDB_SNAP_MSG_VER; int8_t msgVer = TSDB_SNAP_MSG_VER;
if (tStartEncode(&encoder) < 0) goto _err; tEncoderInit(&encoder, buf, bufLen);
if (tEncodeI8(&encoder, msgVer) < 0) goto _err;
if ((code = tStartEncode(&encoder))) goto _err;
if ((code = tEncodeI8(&encoder, msgVer))) goto _err;
int16_t format = pOpts->format; int16_t format = pOpts->format;
if (tEncodeI16(&encoder, format) < 0) goto _err; if ((code = tEncodeI16(&encoder, format))) goto _err;
if (tEncodeI64(&encoder, reserved64) < 0) goto _err; if ((code = tEncodeI64(&encoder, reserved64))) goto _err;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -479,23 +489,24 @@ int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
_err: _err:
tEncoderClear(&encoder); tEncoderClear(&encoder);
return -1; return code;
} }
int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) { int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
int32_t code;
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
int64_t reserved64 = 0; int64_t reserved64 = 0;
int8_t msgVer = 0; int8_t msgVer = 0;
if (tStartDecode(&decoder) < 0) goto _err; tDecoderInit(&decoder, buf, bufLen);
if (tDecodeI8(&decoder, &msgVer) < 0) goto _err;
if ((code = tStartDecode(&decoder))) goto _err;
if ((code = tDecodeI8(&decoder, &msgVer))) goto _err;
if (msgVer != TSDB_SNAP_MSG_VER) goto _err; if (msgVer != TSDB_SNAP_MSG_VER) goto _err;
int16_t format = 0; int16_t format = 0;
if (tDecodeI16(&decoder, &format) < 0) goto _err; if ((code = tDecodeI16(&decoder, &format))) goto _err;
pOpts->format = format; pOpts->format = format;
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; if ((code = tDecodeI64(&decoder, &reserved64))) goto _err;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -503,7 +514,7 @@ int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts)
_err: _err:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return -1; return code;
} }
static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) { static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) {
@ -518,7 +529,7 @@ static int32_t tsdbRepOptsSerialize(STsdbRepOpts* pOpts, void* buf, int32_t bufL
int32_t offset = 0; int32_t offset = 0;
int32_t tlen = 0; int32_t tlen = 0;
if ((tlen = tSerializeTsdbRepOpts(pSubHead->val, bufLen, pOpts)) < 0) { if ((tlen = tSerializeTsdbRepOpts(pSubHead->val, bufLen, pOpts)) < 0) {
return -1; return tlen;
} }
pSubHead->typ = SNAP_DATA_RAW; pSubHead->typ = SNAP_DATA_RAW;
pSubHead->len = tlen; pSubHead->len = tlen;
@ -528,8 +539,10 @@ static int32_t tsdbRepOptsSerialize(STsdbRepOpts* pOpts, void* buf, int32_t bufL
// snap info // snap info
static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, STsdbRepOpts* pInfo) { static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, STsdbRepOpts* pInfo) {
if (!pSnap->data) return 0; if (!pSnap->data) {
int32_t code = -1; return 0;
}
int32_t code = 0;
SSyncTLV* pHead = (void*)pSnap->data; SSyncTLV* pHead = (void*)pSnap->data;
int32_t offset = 0; int32_t offset = 0;
@ -546,30 +559,30 @@ static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, ST
case SNAP_DATA_RSMA2: { case SNAP_DATA_RSMA2: {
} break; } break;
case SNAP_DATA_RAW: { case SNAP_DATA_RAW: {
if (tDeserializeTsdbRepOpts(buf, bufLen, pInfo) < 0) { code = tDeserializeTsdbRepOpts(buf, bufLen, pInfo);
terrno = TSDB_CODE_INVALID_DATA_FMT; if (code < 0) {
tsdbError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); tsdbError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
goto _out; return code;
} }
} break; } break;
default: default:
code = TSDB_CODE_INVALID_MSG;
tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ); tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ);
goto _out; return code;
} }
} }
code = 0;
_out:
return code; return code;
} }
int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
ASSERT(pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY); ASSERT(pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY);
STsdbPartitionInfo partitionInfo = {0}; STsdbPartitionInfo partitionInfo = {0};
int code = -1; int code = 0;
STsdbPartitionInfo* pInfo = &partitionInfo; STsdbPartitionInfo* pInfo = &partitionInfo;
if (tsdbPartitionInfoInit(pVnode, pInfo) != 0) { code = tsdbPartitionInfoInit(pVnode, pInfo);
if (code) {
goto _out; goto _out;
} }
@ -577,7 +590,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW}; STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW};
if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
STsdbRepOpts leaderOpts = {0}; STsdbRepOpts leaderOpts = {0};
if (tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts) < 0) { if ((code = tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts)) < 0) {
tsdbError("vgId:%d, failed to deal with snap info for reply since %s", TD_VID(pVnode), terrstr()); tsdbError("vgId:%d, failed to deal with snap info for reply since %s", TD_VID(pVnode), terrstr());
goto _out; goto _out;
} }
@ -589,7 +602,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
int32_t bufLen = headLen; int32_t bufLen = headLen;
bufLen += tsdbPartitionInfoEstSize(pInfo); bufLen += tsdbPartitionInfoEstSize(pInfo);
bufLen += tsdbRepOptsEstSize(&opts); bufLen += tsdbRepOptsEstSize(&opts);
if (syncSnapInfoDataRealloc(pSnap, bufLen) != 0) { if ((code = syncSnapInfoDataRealloc(pSnap, bufLen)) != 0) {
tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen); tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen);
goto _out; goto _out;
} }
@ -599,7 +612,8 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
int32_t offset = headLen; int32_t offset = headLen;
int32_t tlen = 0; int32_t tlen = 0;
if ((tlen = tsdbPartitionInfoSerialize(pInfo, buf + offset, bufLen - offset)) < 0) { if ((tlen = tsdbPartitionInfoSerialize(pInfo, (uint8_t*)(buf + offset), bufLen - offset)) < 0) {
code = tlen;
tsdbError("vgId:%d, failed to serialize tsdb partition info since %s", TD_VID(pVnode), terrstr()); tsdbError("vgId:%d, failed to serialize tsdb partition info since %s", TD_VID(pVnode), terrstr());
goto _out; goto _out;
} }
@ -607,6 +621,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
ASSERT(offset <= bufLen); ASSERT(offset <= bufLen);
if ((tlen = tsdbRepOptsSerialize(&opts, buf + offset, bufLen - offset)) < 0) { if ((tlen = tsdbRepOptsSerialize(&opts, buf + offset, bufLen - offset)) < 0) {
code = tlen;
tsdbError("vgId:%d, failed to serialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); tsdbError("vgId:%d, failed to serialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
goto _out; goto _out;
} }
@ -620,7 +635,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
tsdbInfo("vgId:%d, tsdb snap info prepared. type:%s, val length:%d", TD_VID(pVnode), TMSG_INFO(pHead->typ), tsdbInfo("vgId:%d, tsdb snap info prepared. type:%s, val length:%d", TD_VID(pVnode), TMSG_INFO(pHead->typ),
pHead->len); pHead->len);
code = 0;
_out: _out:
tsdbPartitionInfoClear(pInfo); tsdbPartitionInfoClear(pInfo);
return code; return code;

View File

@ -152,7 +152,7 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
} }
#endif #endif
int32_t tGetMapData(uint8_t *p, SMapData *pMapData) { int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int32_t *decodedSize) {
int32_t n = 0; int32_t n = 0;
int32_t offset; int32_t offset;
@ -160,7 +160,9 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
n += tGetI32v(p + n, &pMapData->nItem); n += tGetI32v(p + n, &pMapData->nItem);
if (pMapData->nItem) { if (pMapData->nItem) {
if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1; if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t lOffset = 0; int32_t lOffset = 0;
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
@ -170,12 +172,18 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
} }
n += tGetI32v(p + n, &pMapData->nData); n += tGetI32v(p + n, &pMapData->nData);
if (tRealloc(&pMapData->pData, pMapData->nData)) return -1; if (tRealloc(&pMapData->pData, pMapData->nData)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pMapData->pData, p + n, pMapData->nData); memcpy(pMapData->pData, p + n, pMapData->nData);
n += pMapData->nData; n += pMapData->nData;
} }
return n; if (decodedSize) {
*decodedSize = n;
}
return 0;
} }
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
@ -680,20 +688,17 @@ int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2) {
// STSDBRowIter ====================================================== // STSDBRowIter ======================================================
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
pIter->pRow = pRow; pIter->pRow = pRow;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter); int32_t code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
if (code) goto _exit; if (code) return code;
} else if (pRow->type == TSDBROW_COL_FMT) { } else if (pRow->type == TSDBROW_COL_FMT) {
pIter->iColData = 0; pIter->iColData = 0;
} else { } else {
ASSERT(0); ASSERT(0);
} }
_exit: return 0;
return code;
} }
void tsdbRowClose(STSDBRowIter *pIter) { void tsdbRowClose(STSDBRowIter *pIter) {
@ -760,7 +765,9 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
--iCol; --iCol;
continue; continue;
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) { } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
continue; continue;
} }
@ -780,14 +787,15 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
} }
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
} }
for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) { for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) {
pTColumn = &pMerger->pTSchema->columns[iCol]; pTColumn = &pMerger->pTSchema->columns[iCol];
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
pMerger->version = key.version; pMerger->version = key.version;
@ -962,8 +970,7 @@ _exit:
*/ */
// delete skyline ====================================================== // delete skyline ======================================================
static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) { static void tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
int32_t code = 0;
int32_t i1 = 0; int32_t i1 = 0;
int32_t n1 = taosArrayGetSize(pSkyline1); int32_t n1 = taosArrayGetSize(pSkyline1);
int32_t i2 = 0; int32_t i2 = 0;
@ -1017,7 +1024,6 @@ static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pS
} }
pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem); pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem);
return code;
} }
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) { int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
@ -1029,8 +1035,13 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx,
if (sidx == eidx) { if (sidx == eidx) {
TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2); TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2);
TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1); TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1);
taosArrayPush(pSkyline, &pItem1); if (taosArrayPush(pSkyline, &pItem1) == NULL) {
taosArrayPush(pSkyline, &pItem2); return TSDB_CODE_OUT_OF_MEMORY;
}
if (taosArrayPush(pSkyline, &pItem2) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else { } else {
SArray *pSkyline1 = NULL; SArray *pSkyline1 = NULL;
SArray *pSkyline2 = NULL; SArray *pSkyline2 = NULL;
@ -1049,7 +1060,7 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx,
code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2); code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2);
if (code) goto _clear; if (code) goto _clear;
code = tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline); tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
_clear: _clear:
taosArrayDestroy(pSkyline1); taosArrayDestroy(pSkyline1);
@ -1064,13 +1075,28 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
int32_t code = 0; int32_t code = 0;
int32_t dataNum = eidx - sidx + 1; int32_t dataNum = eidx - sidx + 1;
SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
if (aTmpSkyline == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
if (pSkyline == NULL) {
taosArrayDestroy(aTmpSkyline);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayClear(aSkyline); taosArrayClear(aSkyline);
for (int32_t i = sidx; i <= eidx; ++i) { for (int32_t i = sidx; i <= eidx; ++i) {
pDelData = (SDelData *)taosArrayGet(aDelData, i); pDelData = (SDelData *)taosArrayGet(aDelData, i);
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}); if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}) == NULL) {
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}); code = TSDB_CODE_OUT_OF_MEMORY;
goto _clear;
}
if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _clear;
}
} }
code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline); code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline);
@ -1079,7 +1105,10 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
int32_t skylineNum = taosArrayGetSize(pSkyline); int32_t skylineNum = taosArrayGetSize(pSkyline);
for (int32_t i = 0; i < skylineNum; ++i) { for (int32_t i = 0; i < skylineNum; ++i) {
TSDBKEY *p = taosArrayGetP(pSkyline, i); TSDBKEY *p = taosArrayGetP(pSkyline, i);
taosArrayPush(aSkyline, p); if (taosArrayPush(aSkyline, p) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _clear;
}
} }
_clear: _clear:
@ -1519,8 +1548,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *
SBlockCol blockCol; SBlockCol blockCol;
code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg); code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg);
if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist); code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -1812,7 +1841,9 @@ int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) {
if (set == NULL) return -1; if (set == NULL) return -1;
uint32_t *ret = taosHashGet(set, &colId, sizeof(colId)); uint32_t *ret = taosHashGet(set, &colId, sizeof(colId));
if (ret == NULL) return -1; if (ret == NULL) {
return TSDB_CODE_NOT_FOUND;
}
*alg = *ret; *alg = *ret;
return 0; return 0;

View File

@ -43,17 +43,16 @@ void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) { bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = 0xFFFFFFFF; uint32_t ipv4 = 0xFFFFFFFF;
sDebug("vgId:%d, start to resolve sync addr fqdn in %d seconds, " sDebug(
"vgId:%d, start to resolve sync addr fqdn in %d seconds, "
"dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ", "dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ",
vgId, tsResolveFQDNRetryTime, vgId, tsResolveFQDNRetryTime, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort); for (int i = 0; i < tsResolveFQDNRetryTime; i++) {
for(int i = 0; i < tsResolveFQDNRetryTime; i++){
ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn); ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) { if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
sError("failed to resolve ipv4 addr, fqdn:%s, wait one second", pInfo->nodeFqdn); sError("failed to resolve ipv4 addr, fqdn:%s, wait one second", pInfo->nodeFqdn);
taosSsleep(1); taosSsleep(1);
} } else {
else{
break; break;
} }
} }
@ -501,8 +500,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) { int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) {
void* data = taosMemoryRealloc(pSnap->data, size); void* data = taosMemoryRealloc(pSnap->data, size);
if (data == NULL) { if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
pSnap->data = data; pSnap->data = data;
return 0; return 0;