enh: check return code

This commit is contained in:
Hongze Cheng 2024-07-18 11:32:13 +08:00
parent 597d239c4a
commit 69e79966b6
6 changed files with 144 additions and 199 deletions

View File

@ -24,7 +24,7 @@ extern "C" {
#endif
#define TSDB_ERROR_LOG(vid, lino, code) \
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code))
tsdbError("vgId:%d %s failed at %s:%d since %s", vid, __func__, __FILE__, lino, tstrerror(code))
typedef struct SFDataPtr {
int64_t offset;
@ -34,13 +34,13 @@ typedef struct SFDataPtr {
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn);
extern void tsdbCloseFile(STsdbFD **ppFD);
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size,
int32_t encryptAlgorithm, char* encryptKey);
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
char *encryptKey);
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
int32_t encryptAlgorithm, char* encryptKey);
int32_t encryptAlgorithm, char *encryptKey);
extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
int32_t encryptAlgorithm, char* encryptKey);
extern int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char* encryptKey);
int32_t encryptAlgorithm, char *encryptKey);
extern int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey);
typedef struct SColCompressInfo SColCompressInfo;
struct SColCompressInfo {

View File

@ -17,7 +17,9 @@
#include "vnd.h"
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY;
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
lvl[0]->level = level;
TARRAY2_INIT(lvl[0]->fobjArr);
return 0;
@ -81,14 +83,14 @@ static int32_t tsdbSttLvlFilteredInitEx(STsdb *pTsdb, const SSttLvl *lvl1, int64
return code;
}
TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
TAOS_CHECK_RETURN(TARRAY2_APPEND(lvl[0]->fobjArr, fobj));
} else {
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj1->f->fid,
.of = fobj1->f[0],
};
TARRAY2_APPEND(fopArr, op);
TAOS_CHECK_RETURN(TARRAY2_APPEND(fopArr, op));
}
}
return 0;

View File

@ -370,7 +370,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
if (pTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
goto _exit;
}
pTbData->suid = suid;
pTbData->uid = uid;
@ -415,11 +415,11 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
taosWUnLockLatch(&pMemTable->latch);
_exit:
*ppTbData = pTbData;
return code;
_err:
*ppTbData = NULL;
if (code) {
*ppTbData = NULL;
} else {
*ppTbData = pTbData;
}
return code;
}

View File

@ -38,8 +38,7 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
return TARRAY2_APPEND(&rtner->fopArr, op);
}
static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
int64_t total = 0;
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
int64_t interval = 1000; // 1s
int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
int64_t offset = 0;
@ -49,10 +48,9 @@ static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_
int64_t n;
int64_t last = taosGetTimestampMs();
if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
return -1;
TAOS_CHECK_RETURN(TAOS_SYSTEM_ERROR(errno));
}
total += n;
remain -= n;
if (remain > 0) {
@ -63,7 +61,7 @@ static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_
}
}
return total;
return 0;
}
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
@ -77,32 +75,32 @@ static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFi
tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
if (fdFrom == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdFrom == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (fdTo == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdTo == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
int64_t lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
int64_t n = taosFSendFile(fdTo, fdFrom, 0, lc_size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
if (fdFrom) taosCloseFile(&fdFrom);
if (fdTo) taosCloseFile(&fdTo);
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
return code;
}
@ -117,30 +115,29 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile
tsdbTFileName(rtner->tsdb, to, fname);
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
if (fdFrom == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdFrom == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (fdTo == NULL) code = terrno;
if (fdTo == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
TSDB_CHECK_CODE(code, lino, _exit);
int64_t n = tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
tsRetentionSpeedLimitMB);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
tsRetentionSpeedLimitMB),
&lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
return code;
}
@ -157,8 +154,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
.of = fobj->f[0],
};
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
// create new
op = (STFileOp){
@ -181,22 +177,20 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
},
};
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
// do copy the file
if (lcn < 1) {
code = tsdbDoCopyFile(rtner, fobj, &op.nf);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
} else {
code = tsdbDoCopyFileLC(rtner, fobj, &op.nf);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
return code;
}
@ -213,8 +207,7 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
int32_t lino = 0;
if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
code = tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit);
taosThreadMutexLock(&rtner->tsdb->mutex);
@ -231,7 +224,8 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
_exit:
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
} else {
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
}
@ -248,26 +242,19 @@ static int32_t tsdbDoRetention(SRTNer *rtner) {
if (expLevel < 0) { // remove the fileset
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoRemoveFileObject(rtner, fobj), &lino, _exit);
}
SSttLvl *lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
TARRAY2_FOREACH(lvl->fobjArr, fobj) { TAOS_CHECK_GOTO(tsdbDoRemoveFileObject(rtner, fobj), &lino, _exit); }
}
} else if (expLevel == 0) { // only migrate to upper level
return 0;
} else { // migrate
SDiskID did;
if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
TAOS_CHECK_GOTO(tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did), &lino, _exit);
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
// data
@ -284,25 +271,26 @@ static int32_t tsdbDoRetention(SRTNer *rtner) {
tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level,
did.level);
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoMigrateFileObj(rtner, fobj, &did), &lino, _exit);
}
// stt
SSttLvl *lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (fobj->f->did.level == did.level) continue;
if (fobj->f->did.level == did.level) {
continue;
}
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoMigrateFileObj(rtner, fobj, &did), &lino, _exit);
}
}
}
_exit:
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
return code;
}
@ -338,15 +326,12 @@ static int32_t tsdbRetention(void *arg) {
// do retention
if (rtner.fset) {
if (rtnArg->s3Migrate) {
code = tsdbDoS3Migrate(&rtner);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoS3Migrate(&rtner), &lino, _exit);
} else {
code = tsdbDoRetention(&rtner);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
}
code = tsdbDoRetentionEnd(&rtner);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner), &lino, _exit);
}
_exit:
@ -361,7 +346,7 @@ _exit:
TARRAY2_DESTROY(&rtner.fopArr, NULL);
taosMemoryFree(arg);
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(pTsdb->pVnode), __func__, code, lino);
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
}
return code;
}
@ -374,12 +359,11 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate)
if (!tsdb->bgTaskDisabled) {
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
code = tsdbTFileSetOpenChannel(fset);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbTFileSetOpenChannel(fset), &lino, _exit);
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
arg->tsdb = tsdb;
@ -396,7 +380,7 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate)
_exit:
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(tsdb->pVnode), __func__, code, lino);
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
}
return code;
}
@ -446,20 +430,19 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
tsdbTFileName(rtner->tsdb, to, fname);
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
if (fdFrom == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdFrom == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
char *object_name = taosDirEntryBaseName(fname);
code = s3PutObjectFromFile2(from->fname, object_name, 1);
TSDB_CHECK_CODE(code, lino, _exit);
taosCloseFile(&fdFrom);
TAOS_CHECK_GOTO(s3PutObjectFromFile2(from->fname, object_name, 1), &lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
taosCloseFile(&fdFrom);
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
taosCloseFile(&fdFrom);
return code;
}
@ -477,8 +460,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
.of = fobj->f[0],
};
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
// create new
op = (STFileOp){
@ -501,8 +483,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
},
};
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(rtner->tsdb, &op.nf, fname);
@ -514,13 +495,13 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
char *dot = strrchr(object_name_prefix, '.');
if (!dot) {
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
return -1;
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
}
char *dot2 = strchr(object_name, '.');
if (!dot) {
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
return -1;
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
}
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
@ -529,8 +510,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
int64_t c_offset = chunksize * (cn - fobj->f->lcn);
code = s3PutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(s3PutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
}
// copy last chunk
@ -540,30 +520,30 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
fdFrom = taosOpenFile(fname, TD_FILE_READ);
if (fdFrom == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdFrom == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
tsdbInfo("vgId:%d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, lc_size);
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", lcn);
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (fdTo == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdTo == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
if (fdFrom) taosCloseFile(&fdFrom);
if (fdTo) taosCloseFile(&fdTo);
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
return code;
}
@ -581,8 +561,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
.of = fobj->f[0],
};
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
// create new
op = (STFileOp){
@ -605,8 +584,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
},
};
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(rtner->tsdb, &op.nf, fname);
@ -618,7 +596,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
char *dot = strrchr(object_name_prefix, '.');
if (!dot) {
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
return -1;
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
}
// do copy the file
@ -626,8 +604,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
int64_t c_offset = chunksize * (cn - 1);
code = s3PutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(s3PutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
}
// copy last chunk
@ -637,34 +614,34 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
dot = strchr(object_name, '.');
if (!dot) {
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
return -1;
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
}
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name), "%d.data", lcn);
fdFrom = taosOpenFile(fobj->fname, TD_FILE_READ);
if (fdFrom == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdFrom == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
tsdbInfo("vgId: %d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, fobj->f->size);
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (fdTo == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (fdTo == NULL) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
return code;
}
@ -674,15 +651,21 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
STFileSet *fset = rtner->fset;
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
if (!fobj) return code;
if (!fobj) {
return 0;
}
int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
if (expLevel < 0) return code; // expired
if (expLevel < 0) { // expired
return 0;
}
SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
int32_t s3KeepLocal = pCfg->s3KeepLocal;
int32_t s3ExpLevel = tsdbS3FidLevel(fset->fid, &rtner->tsdb->keepCfg, s3KeepLocal, rtner->now);
if (s3ExpLevel < 1) return code; // keep on local storage
if (s3ExpLevel < 1) { // keep on local storage
return 0;
}
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
int32_t lcn = fobj->f->lcn;
@ -698,19 +681,18 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
STimeWindow win = {0};
tsdbFidKeyRange(fset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
tsdbInfo("vgId:%d, compact begin lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
tsdbAsyncCompact(rtner->tsdb, &win, pCfg->sttTrigger == 1);
tsdbInfo("vgId:%d, compact end lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
tsdbInfo("vgId:%d, async compact begin lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
code = tsdbAsyncCompact(rtner->tsdb, &win, pCfg->sttTrigger == 1);
tsdbInfo("vgId:%d, async compact end lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
goto _exit;
return code;
}
code = tsdbMigrateDataFileS3(rtner, fobj, size, chunksize);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbMigrateDataFileS3(rtner, fobj, size, chunksize), &lino, _exit);
}
} else {
if (lcn <= 1) {
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
return code;
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _exit);
}
char fname1[TSDB_FILENAME_LEN];
tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1);
@ -720,8 +702,7 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
int64_t size = 0;
taosStatFile(fname1, &size, &mtime, NULL);
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
code = tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize);
TSDB_CHECK_CODE(code, lino, _exit);
TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit);
}
} else {
tsdbError("vgId:%d, file: %s not found, %s at line %d", TD_VID(rtner->tsdb->pVnode), fname1, __func__, lino);
@ -731,7 +712,8 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code));
}
return code;
}
@ -750,11 +732,15 @@ int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
}
if (!tsS3Enabled) {
return code;
return 0;
}
taosThreadMutexLock(&tsdb->mutex);
code = tsdbAsyncRetentionImpl(tsdb, now, true);
taosThreadMutexUnlock(&tsdb->mutex);
if (code) {
tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
}
return code;
}

View File

@ -58,17 +58,9 @@ struct STsdbSnapReader {
};
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
tsdbDataFileReaderClose(&reader->dataReader);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
return 0;
}
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
@ -255,7 +247,6 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
TSDB_CHECK_CODE(code, lino, _exit);
// TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = 0;
for (int i = 0; i < 4; i++) {
@ -448,8 +439,8 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type,
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
__func__, lino, tstrerror(code), sver, ever, type);
tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
__func__, __FILE__, lino, tstrerror(code), sver, ever, type);
tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
taosMemoryFree(reader[0]);
reader[0] = NULL;
@ -461,10 +452,11 @@ _exit:
}
int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
if (reader[0] == NULL) return 0;
if (reader[0] == NULL) {
return 0;
}
int32_t code = 0;
int32_t lino = 0;
STsdb* tsdb = reader[0]->tsdb;
@ -488,12 +480,6 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
taosMemoryFree(reader[0]);
reader[0] = NULL;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
}
return code;
}

View File

@ -148,30 +148,14 @@ _exit:
}
static int32_t tsdbSnapRAWReadFileSetCloseReader(STsdbSnapRAWReader* reader) {
int32_t code = 0;
int32_t lino = 0;
TARRAY2_CLEAR(reader->dataReaderArr, tsdbDataFileRAWReaderClose);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
return 0;
}
static int32_t tsdbSnapRAWReadFileSetOpenIter(STsdbSnapRAWReader* reader) {
int32_t code = 0;
int32_t lino = 0;
reader->dataIter->count = TARRAY2_SIZE(reader->dataReaderArr);
reader->dataIter->idx = 0;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
return 0;
}
static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) {
@ -392,17 +376,6 @@ _exit:
return code;
}
static int32_t tsdbSnapRAWWriteFileSetOpenIter(STsdbSnapRAWWriter* writer) {
int32_t code = 0;
int32_t lino = 0;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbSnapRAWWriteFileSetCloseIter(STsdbSnapRAWWriter* writer) { return 0; }
static int32_t tsdbSnapRAWWriteFileSetOpenWriter(STsdbSnapRAWWriter* writer) {
@ -445,10 +418,8 @@ static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t
writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) {
code = TSDB_CODE_NO_AVAIL_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did);
TSDB_CHECK_CODE(code, lino, _exit);
tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did);
code = tsdbSnapRAWWriteFileSetOpenWriter(writer);