Merge pull request #26678 from taosdata/enh/TD-29367-8
refactor return code
This commit is contained in:
commit
04f71c884f
|
@ -226,7 +226,7 @@ void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32
|
|||
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
|
||||
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem);
|
||||
int32_t tPutMapData(uint8_t *p, SMapData *pMapData);
|
||||
int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
|
||||
int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int32_t *decodedSize);
|
||||
int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *),
|
||||
SArray **ppArray);
|
||||
// other
|
||||
|
@ -706,7 +706,6 @@ typedef TARRAY2(STsdbFSetPartition *) STsdbFSetPartList;
|
|||
|
||||
STsdbFSetPartList *tsdbFSetPartListCreate();
|
||||
void tsdbFSetPartListDestroy(STsdbFSetPartList **ppList);
|
||||
int32_t tSerializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList);
|
||||
int32_t tDeserializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList);
|
||||
int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList *pList, TFileSetRangeArray **ppRanges);
|
||||
|
||||
|
|
|
@ -757,10 +757,9 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// decode
|
||||
int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
|
||||
if (n < 0) {
|
||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||
}
|
||||
int32_t n;
|
||||
code = tGetMapData(pReader->aBuf[0], mDataBlk, &n);
|
||||
if (code) goto _exit;
|
||||
ASSERT(n == size);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -81,7 +81,7 @@ static int32_t tsdbFTypeToFRangeType(tsdb_ftype_t ftype) {
|
|||
static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition** ppSP) {
|
||||
STsdbFSetPartition* p = tsdbFSetPartitionCreate();
|
||||
if (p == NULL) {
|
||||
goto _err;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
p->fid = fset->fid;
|
||||
|
@ -134,10 +134,6 @@ static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition**
|
|||
}
|
||||
ppSP[0] = p;
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
tsdbFSetPartitionClear(&p);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// fset partition list
|
||||
|
@ -160,9 +156,11 @@ void tsdbFSetPartListDestroy(STsdbFSetPartList** ppList) {
|
|||
}
|
||||
|
||||
int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray** ppRanges) {
|
||||
int32_t code = 0;
|
||||
|
||||
TFileSetRangeArray* pDiff = taosMemoryCalloc(1, sizeof(TFileSetRangeArray));
|
||||
if (pDiff == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
TARRAY2_INIT(pDiff);
|
||||
|
@ -171,7 +169,7 @@ int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray
|
|||
TARRAY2_FOREACH(pList, part) {
|
||||
STFileSetRange* r = taosMemoryCalloc(1, sizeof(STFileSetRange));
|
||||
if (r == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
int64_t maxVerValid = -1;
|
||||
|
@ -202,7 +200,7 @@ _err:
|
|||
if (pDiff) {
|
||||
tsdbTFileSetRangeArrayDestroy(&pDiff);
|
||||
}
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// serialization
|
||||
|
@ -235,96 +233,103 @@ int32_t tTsdbFSetPartListDataLenCalc(STsdbFSetPartList* pList) {
|
|||
return datLen;
|
||||
}
|
||||
|
||||
int32_t tSerializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList) {
|
||||
static int32_t tSerializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList, int32_t* encodeSize) {
|
||||
SEncoder encoder = {0};
|
||||
int8_t reserved8 = 0;
|
||||
int16_t reserved16 = 0;
|
||||
int64_t reserved64 = 0;
|
||||
int8_t msgVer = TSDB_SNAP_MSG_VER;
|
||||
int32_t len = TARRAY2_SIZE(pList);
|
||||
int32_t code = 0;
|
||||
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
int8_t reserved8 = 0;
|
||||
int16_t reserved16 = 0;
|
||||
int64_t reserved64 = 0;
|
||||
|
||||
int8_t msgVer = TSDB_SNAP_MSG_VER;
|
||||
int32_t len = TARRAY2_SIZE(pList);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, msgVer) < 0) goto _err;
|
||||
if (tEncodeI32(&encoder, len) < 0) goto _err;
|
||||
if ((code = tStartEncode(&encoder))) goto _exit;
|
||||
if ((code = tEncodeI8(&encoder, msgVer))) goto _exit;
|
||||
if ((code = tEncodeI32(&encoder, len))) goto _exit;
|
||||
|
||||
for (int32_t u = 0; u < len; u++) {
|
||||
STsdbFSetPartition* p = TARRAY2_GET(pList, u);
|
||||
if (tEncodeI64(&encoder, p->fid) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, p->stat) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, reserved8) < 0) goto _err;
|
||||
if (tEncodeI16(&encoder, reserved16) < 0) goto _err;
|
||||
if ((code = tEncodeI64(&encoder, p->fid))) goto _exit;
|
||||
if ((code = tEncodeI8(&encoder, p->stat))) goto _exit;
|
||||
if ((code = tEncodeI8(&encoder, reserved8))) goto _exit;
|
||||
if ((code = tEncodeI16(&encoder, reserved16))) goto _exit;
|
||||
|
||||
int32_t typMax = TSDB_FSET_RANGE_TYP_MAX;
|
||||
if (tEncodeI32(&encoder, typMax) < 0) goto _err;
|
||||
if ((code = tEncodeI32(&encoder, typMax))) goto _exit;
|
||||
|
||||
for (int32_t i = 0; i < typMax; i++) {
|
||||
SVerRangeList* iList = &p->verRanges[i];
|
||||
int32_t iLen = TARRAY2_SIZE(iList);
|
||||
|
||||
if (tEncodeI32(&encoder, iLen) < 0) goto _err;
|
||||
if ((code = tEncodeI32(&encoder, iLen))) goto _exit;
|
||||
for (int32_t j = 0; j < iLen; j++) {
|
||||
SVersionRange r = TARRAY2_GET(iList, j);
|
||||
if (tEncodeI64(&encoder, r.minVer) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, r.maxVer) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, reserved64) < 0) goto _err;
|
||||
if ((code = tEncodeI64(&encoder, r.minVer))) goto _exit;
|
||||
if ((code = tEncodeI64(&encoder, r.maxVer))) goto _exit;
|
||||
if ((code = tEncodeI64(&encoder, reserved64))) goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
|
||||
_err:
|
||||
if (encodeSize) {
|
||||
encodeSize[0] = encoder.pos;
|
||||
}
|
||||
|
||||
_exit:
|
||||
tEncoderClear(&encoder);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDeserializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
int8_t reserved8 = 0;
|
||||
int16_t reserved16 = 0;
|
||||
int64_t reserved64 = 0;
|
||||
int8_t reserved8 = 0;
|
||||
int16_t reserved16 = 0;
|
||||
int64_t reserved64 = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
STsdbFSetPartition* p = NULL;
|
||||
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
int8_t msgVer = 0;
|
||||
int32_t len = 0;
|
||||
if (tStartDecode(&decoder) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &msgVer) < 0) goto _err;
|
||||
if (msgVer != TSDB_SNAP_MSG_VER) goto _err;
|
||||
if (tDecodeI32(&decoder, &len) < 0) goto _err;
|
||||
if ((code = tStartDecode(&decoder))) goto _err;
|
||||
if ((code = tDecodeI8(&decoder, &msgVer))) goto _err;
|
||||
if (msgVer != TSDB_SNAP_MSG_VER) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
if ((code = tDecodeI32(&decoder, &len))) goto _err;
|
||||
|
||||
for (int32_t u = 0; u < len; u++) {
|
||||
p = tsdbFSetPartitionCreate();
|
||||
if (p == NULL) goto _err;
|
||||
if (tDecodeI64(&decoder, &p->fid) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &p->stat) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &reserved8) < 0) goto _err;
|
||||
if (tDecodeI16(&decoder, &reserved16) < 0) goto _err;
|
||||
if (p == NULL) {
|
||||
code = terrno;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if ((code = tDecodeI64(&decoder, &p->fid))) goto _err;
|
||||
if ((code = tDecodeI8(&decoder, &p->stat))) goto _err;
|
||||
if ((code = tDecodeI8(&decoder, &reserved8))) goto _err;
|
||||
if ((code = tDecodeI16(&decoder, &reserved16))) goto _err;
|
||||
|
||||
int32_t typMax = 0;
|
||||
if (tDecodeI32(&decoder, &typMax) < 0) goto _err;
|
||||
if ((code = tDecodeI32(&decoder, &typMax))) goto _err;
|
||||
|
||||
for (int32_t i = 0; i < typMax; i++) {
|
||||
SVerRangeList* iList = &p->verRanges[i];
|
||||
int32_t iLen = 0;
|
||||
if (tDecodeI32(&decoder, &iLen) < 0) goto _err;
|
||||
if ((code = tDecodeI32(&decoder, &iLen))) goto _err;
|
||||
for (int32_t j = 0; j < iLen; j++) {
|
||||
SVersionRange r = {0};
|
||||
if (tDecodeI64(&decoder, &r.minVer) < 0) goto _err;
|
||||
if (tDecodeI64(&decoder, &r.maxVer) < 0) goto _err;
|
||||
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err;
|
||||
if (TARRAY2_APPEND(iList, r)) goto _err;
|
||||
if ((code = tDecodeI64(&decoder, &r.minVer))) goto _err;
|
||||
if ((code = tDecodeI64(&decoder, &r.maxVer))) goto _err;
|
||||
if ((code = tDecodeI64(&decoder, &reserved64))) goto _err;
|
||||
if ((code = TARRAY2_APPEND(iList, r))) goto _err;
|
||||
}
|
||||
}
|
||||
if (TARRAY2_APPEND(pList, p)) goto _err;
|
||||
if ((code = TARRAY2_APPEND(pList, p))) goto _err;
|
||||
p = NULL;
|
||||
}
|
||||
|
||||
|
@ -337,7 +342,7 @@ _err:
|
|||
tsdbFSetPartitionClear(&p);
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// fs state
|
||||
|
@ -352,8 +357,9 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) {
|
|||
STFileSet* fset;
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
STsdbFSetPartition* pItem = NULL;
|
||||
if (tsdbTFileSetToFSetPartition(fset, &pItem) < 0) {
|
||||
code = -1;
|
||||
code = tsdbTFileSetToFSetPartition(fset, &pItem);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
break;
|
||||
}
|
||||
ASSERT(pItem != NULL);
|
||||
|
@ -404,7 +410,9 @@ static int32_t tsdbPartitionInfoInit(SVnode* pVnode, STsdbPartitionInfo* pInfo)
|
|||
for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) {
|
||||
STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, j);
|
||||
pInfo->pLists[j] = tsdbSnapGetFSetPartList(pTsdb->pFS);
|
||||
if (pInfo->pLists[j] == NULL) return -1;
|
||||
if (pInfo->pLists[j] == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -432,9 +440,10 @@ static int32_t tsdbPartitionInfoSerialize(STsdbPartitionInfo* pInfo, uint8_t* bu
|
|||
SSyncTLV* pSubHead = (void*)((char*)buf + offset);
|
||||
int32_t valOffset = offset + sizeof(*pSubHead);
|
||||
ASSERT(pSubHead->val == (char*)buf + valOffset);
|
||||
if ((tlen = tSerializeTsdbFSetPartList(pSubHead->val, bufLen - valOffset, pInfo->pLists[j])) < 0) {
|
||||
int32_t code = tSerializeTsdbFSetPartList(pSubHead->val, bufLen - valOffset, pInfo->pLists[j], &tlen);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, failed to serialize fset partition list of tsdb %d since %s", pInfo->vgId, j, terrstr());
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
pSubHead->typ = pInfo->subTyps[j];
|
||||
pSubHead->len = tlen;
|
||||
|
@ -460,17 +469,18 @@ static int32_t tTsdbRepOptsDataLenCalc(STsdbRepOpts* pInfo) {
|
|||
}
|
||||
|
||||
int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
|
||||
int32_t code = 0;
|
||||
SEncoder encoder = {0};
|
||||
int64_t reserved64 = 0;
|
||||
int8_t msgVer = TSDB_SNAP_MSG_VER;
|
||||
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
int64_t reserved64 = 0;
|
||||
int8_t msgVer = TSDB_SNAP_MSG_VER;
|
||||
|
||||
if (tStartEncode(&encoder) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, msgVer) < 0) goto _err;
|
||||
if ((code = tStartEncode(&encoder))) goto _err;
|
||||
if ((code = tEncodeI8(&encoder, msgVer))) goto _err;
|
||||
int16_t format = pOpts->format;
|
||||
if (tEncodeI16(&encoder, format) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, reserved64) < 0) goto _err;
|
||||
if ((code = tEncodeI16(&encoder, format))) goto _err;
|
||||
if ((code = tEncodeI64(&encoder, reserved64))) goto _err;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -479,23 +489,24 @@ int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
|
|||
|
||||
_err:
|
||||
tEncoderClear(&encoder);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
|
||||
int32_t code;
|
||||
SDecoder decoder = {0};
|
||||
int64_t reserved64 = 0;
|
||||
int8_t msgVer = 0;
|
||||
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
int64_t reserved64 = 0;
|
||||
int8_t msgVer = 0;
|
||||
|
||||
if (tStartDecode(&decoder) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &msgVer) < 0) goto _err;
|
||||
if ((code = tStartDecode(&decoder))) goto _err;
|
||||
if ((code = tDecodeI8(&decoder, &msgVer))) goto _err;
|
||||
if (msgVer != TSDB_SNAP_MSG_VER) goto _err;
|
||||
int16_t format = 0;
|
||||
if (tDecodeI16(&decoder, &format) < 0) goto _err;
|
||||
if ((code = tDecodeI16(&decoder, &format))) goto _err;
|
||||
pOpts->format = format;
|
||||
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err;
|
||||
if ((code = tDecodeI64(&decoder, &reserved64))) goto _err;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -503,7 +514,7 @@ int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts)
|
|||
|
||||
_err:
|
||||
tDecoderClear(&decoder);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) {
|
||||
|
@ -518,7 +529,7 @@ static int32_t tsdbRepOptsSerialize(STsdbRepOpts* pOpts, void* buf, int32_t bufL
|
|||
int32_t offset = 0;
|
||||
int32_t tlen = 0;
|
||||
if ((tlen = tSerializeTsdbRepOpts(pSubHead->val, bufLen, pOpts)) < 0) {
|
||||
return -1;
|
||||
return tlen;
|
||||
}
|
||||
pSubHead->typ = SNAP_DATA_RAW;
|
||||
pSubHead->len = tlen;
|
||||
|
@ -528,8 +539,10 @@ static int32_t tsdbRepOptsSerialize(STsdbRepOpts* pOpts, void* buf, int32_t bufL
|
|||
|
||||
// snap info
|
||||
static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, STsdbRepOpts* pInfo) {
|
||||
if (!pSnap->data) return 0;
|
||||
int32_t code = -1;
|
||||
if (!pSnap->data) {
|
||||
return 0;
|
||||
}
|
||||
int32_t code = 0;
|
||||
|
||||
SSyncTLV* pHead = (void*)pSnap->data;
|
||||
int32_t offset = 0;
|
||||
|
@ -546,30 +559,30 @@ static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, ST
|
|||
case SNAP_DATA_RSMA2: {
|
||||
} break;
|
||||
case SNAP_DATA_RAW: {
|
||||
if (tDeserializeTsdbRepOpts(buf, bufLen, pInfo) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
code = tDeserializeTsdbRepOpts(buf, bufLen, pInfo);
|
||||
if (code < 0) {
|
||||
tsdbError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
|
||||
goto _out;
|
||||
return code;
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ);
|
||||
goto _out;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
code = 0;
|
||||
_out:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
|
||||
ASSERT(pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY);
|
||||
STsdbPartitionInfo partitionInfo = {0};
|
||||
int code = -1;
|
||||
int code = 0;
|
||||
STsdbPartitionInfo* pInfo = &partitionInfo;
|
||||
|
||||
if (tsdbPartitionInfoInit(pVnode, pInfo) != 0) {
|
||||
code = tsdbPartitionInfoInit(pVnode, pInfo);
|
||||
if (code) {
|
||||
goto _out;
|
||||
}
|
||||
|
||||
|
@ -577,7 +590,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
|
|||
STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW};
|
||||
if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||
STsdbRepOpts leaderOpts = {0};
|
||||
if (tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts) < 0) {
|
||||
if ((code = tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts)) < 0) {
|
||||
tsdbError("vgId:%d, failed to deal with snap info for reply since %s", TD_VID(pVnode), terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
@ -589,7 +602,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
|
|||
int32_t bufLen = headLen;
|
||||
bufLen += tsdbPartitionInfoEstSize(pInfo);
|
||||
bufLen += tsdbRepOptsEstSize(&opts);
|
||||
if (syncSnapInfoDataRealloc(pSnap, bufLen) != 0) {
|
||||
if ((code = syncSnapInfoDataRealloc(pSnap, bufLen)) != 0) {
|
||||
tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen);
|
||||
goto _out;
|
||||
}
|
||||
|
@ -599,7 +612,8 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
|
|||
int32_t offset = headLen;
|
||||
int32_t tlen = 0;
|
||||
|
||||
if ((tlen = tsdbPartitionInfoSerialize(pInfo, buf + offset, bufLen - offset)) < 0) {
|
||||
if ((tlen = tsdbPartitionInfoSerialize(pInfo, (uint8_t*)(buf + offset), bufLen - offset)) < 0) {
|
||||
code = tlen;
|
||||
tsdbError("vgId:%d, failed to serialize tsdb partition info since %s", TD_VID(pVnode), terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
@ -607,6 +621,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
|
|||
ASSERT(offset <= bufLen);
|
||||
|
||||
if ((tlen = tsdbRepOptsSerialize(&opts, buf + offset, bufLen - offset)) < 0) {
|
||||
code = tlen;
|
||||
tsdbError("vgId:%d, failed to serialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
@ -620,7 +635,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
|
|||
|
||||
tsdbInfo("vgId:%d, tsdb snap info prepared. type:%s, val length:%d", TD_VID(pVnode), TMSG_INFO(pHead->typ),
|
||||
pHead->len);
|
||||
code = 0;
|
||||
|
||||
_out:
|
||||
tsdbPartitionInfoClear(pInfo);
|
||||
return code;
|
||||
|
|
|
@ -152,7 +152,7 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
|
|||
}
|
||||
#endif
|
||||
|
||||
int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
|
||||
int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int32_t *decodedSize) {
|
||||
int32_t n = 0;
|
||||
int32_t offset;
|
||||
|
||||
|
@ -160,7 +160,9 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
|
|||
|
||||
n += tGetI32v(p + n, &pMapData->nItem);
|
||||
if (pMapData->nItem) {
|
||||
if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1;
|
||||
if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t lOffset = 0;
|
||||
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
|
||||
|
@ -170,12 +172,18 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
|
|||
}
|
||||
|
||||
n += tGetI32v(p + n, &pMapData->nData);
|
||||
if (tRealloc(&pMapData->pData, pMapData->nData)) return -1;
|
||||
if (tRealloc(&pMapData->pData, pMapData->nData)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pMapData->pData, p + n, pMapData->nData);
|
||||
n += pMapData->nData;
|
||||
}
|
||||
|
||||
return n;
|
||||
if (decodedSize) {
|
||||
*decodedSize = n;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
|
@ -680,20 +688,17 @@ int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2) {
|
|||
|
||||
// STSDBRowIter ======================================================
|
||||
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
|
||||
int32_t code = 0;
|
||||
|
||||
pIter->pRow = pRow;
|
||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||
code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
|
||||
if (code) goto _exit;
|
||||
int32_t code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
|
||||
if (code) return code;
|
||||
} else if (pRow->type == TSDBROW_COL_FMT) {
|
||||
pIter->iColData = 0;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbRowClose(STSDBRowIter *pIter) {
|
||||
|
@ -760,7 +765,9 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
|||
--iCol;
|
||||
continue;
|
||||
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
||||
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
||||
if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -780,14 +787,15 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
|||
}
|
||||
|
||||
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) {
|
||||
pTColumn = &pMerger->pTSchema->columns[iCol];
|
||||
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
||||
if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
pMerger->version = key.version;
|
||||
|
@ -962,8 +970,7 @@ _exit:
|
|||
*/
|
||||
|
||||
// delete skyline ======================================================
|
||||
static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
|
||||
int32_t code = 0;
|
||||
static void tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
|
||||
int32_t i1 = 0;
|
||||
int32_t n1 = taosArrayGetSize(pSkyline1);
|
||||
int32_t i2 = 0;
|
||||
|
@ -1017,7 +1024,6 @@ static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pS
|
|||
}
|
||||
|
||||
pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
|
||||
|
@ -1029,8 +1035,13 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx,
|
|||
if (sidx == eidx) {
|
||||
TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2);
|
||||
TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1);
|
||||
taosArrayPush(pSkyline, &pItem1);
|
||||
taosArrayPush(pSkyline, &pItem2);
|
||||
if (taosArrayPush(pSkyline, &pItem1) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (taosArrayPush(pSkyline, &pItem2) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
} else {
|
||||
SArray *pSkyline1 = NULL;
|
||||
SArray *pSkyline2 = NULL;
|
||||
|
@ -1049,7 +1060,7 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx,
|
|||
code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2);
|
||||
if (code) goto _clear;
|
||||
|
||||
code = tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
|
||||
tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
|
||||
|
||||
_clear:
|
||||
taosArrayDestroy(pSkyline1);
|
||||
|
@ -1064,13 +1075,28 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
|||
int32_t code = 0;
|
||||
int32_t dataNum = eidx - sidx + 1;
|
||||
SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
|
||||
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
|
||||
if (aTmpSkyline == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
|
||||
if (pSkyline == NULL) {
|
||||
taosArrayDestroy(aTmpSkyline);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayClear(aSkyline);
|
||||
for (int32_t i = sidx; i <= eidx; ++i) {
|
||||
pDelData = (SDelData *)taosArrayGet(aDelData, i);
|
||||
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version});
|
||||
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0});
|
||||
if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _clear;
|
||||
}
|
||||
|
||||
if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _clear;
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline);
|
||||
|
@ -1079,7 +1105,10 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
|||
int32_t skylineNum = taosArrayGetSize(pSkyline);
|
||||
for (int32_t i = 0; i < skylineNum; ++i) {
|
||||
TSDBKEY *p = taosArrayGetP(pSkyline, i);
|
||||
taosArrayPush(aSkyline, p);
|
||||
if (taosArrayPush(aSkyline, p) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _clear;
|
||||
}
|
||||
}
|
||||
|
||||
_clear:
|
||||
|
@ -1519,8 +1548,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *
|
|||
SBlockCol blockCol;
|
||||
|
||||
code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg);
|
||||
if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
|
||||
code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
@ -1812,7 +1841,9 @@ int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) {
|
|||
if (set == NULL) return -1;
|
||||
|
||||
uint32_t *ret = taosHashGet(set, &colId, sizeof(colId));
|
||||
if (ret == NULL) return -1;
|
||||
if (ret == NULL) {
|
||||
return TSDB_CODE_NOT_FOUND;
|
||||
}
|
||||
|
||||
*alg = *ret;
|
||||
return 0;
|
||||
|
|
|
@ -43,17 +43,16 @@ void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
|
|||
|
||||
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
|
||||
uint32_t ipv4 = 0xFFFFFFFF;
|
||||
sDebug("vgId:%d, start to resolve sync addr fqdn in %d seconds, "
|
||||
"dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ",
|
||||
vgId, tsResolveFQDNRetryTime,
|
||||
pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
|
||||
for(int i = 0; i < tsResolveFQDNRetryTime; i++){
|
||||
sDebug(
|
||||
"vgId:%d, start to resolve sync addr fqdn in %d seconds, "
|
||||
"dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ",
|
||||
vgId, tsResolveFQDNRetryTime, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
|
||||
for (int i = 0; i < tsResolveFQDNRetryTime; i++) {
|
||||
ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
|
||||
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
|
||||
sError("failed to resolve ipv4 addr, fqdn:%s, wait one second", pInfo->nodeFqdn);
|
||||
taosSsleep(1);
|
||||
}
|
||||
else{
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -501,8 +500,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
|
|||
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) {
|
||||
void* data = taosMemoryRealloc(pSnap->data, size);
|
||||
if (data == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSnap->data = data;
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue