Merge pull request #26652 from taosdata/enc/TD-30987-6
enh: check return code
This commit is contained in:
commit
6f4fc0a41b
|
@ -24,7 +24,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define TSDB_ERROR_LOG(vid, lino, code) \
|
#define TSDB_ERROR_LOG(vid, lino, code) \
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code))
|
tsdbError("vgId:%d %s failed at %s:%d since %s", vid, __func__, __FILE__, lino, tstrerror(code))
|
||||||
|
|
||||||
typedef struct SFDataPtr {
|
typedef struct SFDataPtr {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
@ -34,13 +34,13 @@ typedef struct SFDataPtr {
|
||||||
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn);
|
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn);
|
||||||
extern void tsdbCloseFile(STsdbFD **ppFD);
|
extern void tsdbCloseFile(STsdbFD **ppFD);
|
||||||
|
|
||||||
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size,
|
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
|
||||||
int32_t encryptAlgorithm, char* encryptKey);
|
char *encryptKey);
|
||||||
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
|
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
|
||||||
int32_t encryptAlgorithm, char* encryptKey);
|
int32_t encryptAlgorithm, char *encryptKey);
|
||||||
extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
|
extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
|
||||||
int32_t encryptAlgorithm, char* encryptKey);
|
int32_t encryptAlgorithm, char *encryptKey);
|
||||||
extern int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char* encryptKey);
|
extern int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey);
|
||||||
|
|
||||||
typedef struct SColCompressInfo SColCompressInfo;
|
typedef struct SColCompressInfo SColCompressInfo;
|
||||||
struct SColCompressInfo {
|
struct SColCompressInfo {
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
||||||
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
lvl[0]->level = level;
|
lvl[0]->level = level;
|
||||||
TARRAY2_INIT(lvl[0]->fobjArr);
|
TARRAY2_INIT(lvl[0]->fobjArr);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -81,14 +83,14 @@ static int32_t tsdbSttLvlFilteredInitEx(STsdb *pTsdb, const SSttLvl *lvl1, int64
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
|
TAOS_CHECK_RETURN(TARRAY2_APPEND(lvl[0]->fobjArr, fobj));
|
||||||
} else {
|
} else {
|
||||||
STFileOp op = {
|
STFileOp op = {
|
||||||
.optype = TSDB_FOP_REMOVE,
|
.optype = TSDB_FOP_REMOVE,
|
||||||
.fid = fobj1->f->fid,
|
.fid = fobj1->f->fid,
|
||||||
.of = fobj1->f[0],
|
.of = fobj1->f[0],
|
||||||
};
|
};
|
||||||
TARRAY2_APPEND(fopArr, op);
|
TAOS_CHECK_RETURN(TARRAY2_APPEND(fopArr, op));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -370,7 +370,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
||||||
pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
|
pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
|
||||||
if (pTbData == NULL) {
|
if (pTbData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _exit;
|
||||||
}
|
}
|
||||||
pTbData->suid = suid;
|
pTbData->suid = suid;
|
||||||
pTbData->uid = uid;
|
pTbData->uid = uid;
|
||||||
|
@ -401,7 +401,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
||||||
code = tsdbMemTableRehash(pMemTable);
|
code = tsdbMemTableRehash(pMemTable);
|
||||||
if (code) {
|
if (code) {
|
||||||
taosWUnLockLatch(&pMemTable->latch);
|
taosWUnLockLatch(&pMemTable->latch);
|
||||||
goto _err;
|
goto _exit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,11 +415,11 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
||||||
taosWUnLockLatch(&pMemTable->latch);
|
taosWUnLockLatch(&pMemTable->latch);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
*ppTbData = pTbData;
|
if (code) {
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
*ppTbData = NULL;
|
*ppTbData = NULL;
|
||||||
|
} else {
|
||||||
|
*ppTbData = pTbData;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,8 +38,7 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
|
||||||
return TARRAY2_APPEND(&rtner->fopArr, op);
|
return TARRAY2_APPEND(&rtner->fopArr, op);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
|
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
|
||||||
int64_t total = 0;
|
|
||||||
int64_t interval = 1000; // 1s
|
int64_t interval = 1000; // 1s
|
||||||
int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
|
int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
|
||||||
int64_t offset = 0;
|
int64_t offset = 0;
|
||||||
|
@ -49,10 +48,9 @@ static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_
|
||||||
int64_t n;
|
int64_t n;
|
||||||
int64_t last = taosGetTimestampMs();
|
int64_t last = taosGetTimestampMs();
|
||||||
if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
|
if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
|
||||||
return -1;
|
TAOS_CHECK_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
total += n;
|
|
||||||
remain -= n;
|
remain -= n;
|
||||||
|
|
||||||
if (remain > 0) {
|
if (remain > 0) {
|
||||||
|
@ -63,7 +61,7 @@ static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return total;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
||||||
|
@ -77,32 +75,32 @@ static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFi
|
||||||
tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
|
tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
|
||||||
|
|
||||||
fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
|
fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
|
||||||
if (fdFrom == NULL) code = terrno;
|
if (fdFrom == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
|
tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
|
||||||
|
|
||||||
fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||||
if (fdTo == NULL) code = terrno;
|
if (fdTo == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
|
SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
|
||||||
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
||||||
int64_t lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
|
int64_t lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
|
||||||
int64_t n = taosFSendFile(fdTo, fdFrom, 0, lc_size);
|
|
||||||
if (n < 0) {
|
if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
taosCloseFile(&fdFrom);
|
|
||||||
taosCloseFile(&fdTo);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
if (fdFrom) taosCloseFile(&fdFrom);
|
tstrerror(code));
|
||||||
if (fdTo) taosCloseFile(&fdTo);
|
|
||||||
}
|
}
|
||||||
|
taosCloseFile(&fdFrom);
|
||||||
|
taosCloseFile(&fdTo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,30 +115,29 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile
|
||||||
tsdbTFileName(rtner->tsdb, to, fname);
|
tsdbTFileName(rtner->tsdb, to, fname);
|
||||||
|
|
||||||
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
|
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
|
||||||
if (fdFrom == NULL) code = terrno;
|
if (fdFrom == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
|
tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
|
||||||
|
|
||||||
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||||
if (fdTo == NULL) code = terrno;
|
if (fdTo == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
int64_t n = tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
|
TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
|
||||||
tsRetentionSpeedLimitMB);
|
tsRetentionSpeedLimitMB),
|
||||||
if (n < 0) {
|
&lino, _exit);
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
taosCloseFile(&fdFrom);
|
|
||||||
taosCloseFile(&fdTo);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
taosCloseFile(&fdFrom);
|
taosCloseFile(&fdFrom);
|
||||||
taosCloseFile(&fdTo);
|
taosCloseFile(&fdTo);
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,8 +154,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
|
||||||
.of = fobj->f[0],
|
.of = fobj->f[0],
|
||||||
};
|
};
|
||||||
|
|
||||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// create new
|
// create new
|
||||||
op = (STFileOp){
|
op = (STFileOp){
|
||||||
|
@ -181,22 +177,20 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// do copy the file
|
// do copy the file
|
||||||
|
|
||||||
if (lcn < 1) {
|
if (lcn < 1) {
|
||||||
code = tsdbDoCopyFile(rtner, fobj, &op.nf);
|
TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
} else {
|
} else {
|
||||||
code = tsdbDoCopyFileLC(rtner, fobj, &op.nf);
|
TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -213,8 +207,7 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
|
if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
|
||||||
code = tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION);
|
TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
taosThreadMutexLock(&rtner->tsdb->mutex);
|
taosThreadMutexLock(&rtner->tsdb->mutex);
|
||||||
|
|
||||||
|
@ -231,7 +224,8 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
|
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
|
||||||
}
|
}
|
||||||
|
@ -248,26 +242,19 @@ static int32_t tsdbDoRetention(SRTNer *rtner) {
|
||||||
if (expLevel < 0) { // remove the fileset
|
if (expLevel < 0) { // remove the fileset
|
||||||
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
|
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
|
||||||
if (fobj == NULL) continue;
|
if (fobj == NULL) continue;
|
||||||
code = tsdbDoRemoveFileObject(rtner, fobj);
|
TAOS_CHECK_GOTO(tsdbDoRemoveFileObject(rtner, fobj), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSttLvl *lvl;
|
SSttLvl *lvl;
|
||||||
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
||||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
TARRAY2_FOREACH(lvl->fobjArr, fobj) { TAOS_CHECK_GOTO(tsdbDoRemoveFileObject(rtner, fobj), &lino, _exit); }
|
||||||
code = tsdbDoRemoveFileObject(rtner, fobj);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if (expLevel == 0) { // only migrate to upper level
|
} else if (expLevel == 0) { // only migrate to upper level
|
||||||
return 0;
|
return 0;
|
||||||
} else { // migrate
|
} else { // migrate
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
|
|
||||||
if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
TAOS_CHECK_GOTO(tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did), &lino, _exit);
|
||||||
code = terrno;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
|
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
|
||||||
|
|
||||||
// data
|
// data
|
||||||
|
@ -284,25 +271,26 @@ static int32_t tsdbDoRetention(SRTNer *rtner) {
|
||||||
tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level,
|
tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level,
|
||||||
did.level);
|
did.level);
|
||||||
|
|
||||||
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
TAOS_CHECK_GOTO(tsdbDoMigrateFileObj(rtner, fobj, &did), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// stt
|
// stt
|
||||||
SSttLvl *lvl;
|
SSttLvl *lvl;
|
||||||
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
||||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||||
if (fobj->f->did.level == did.level) continue;
|
if (fobj->f->did.level == did.level) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
TAOS_CHECK_GOTO(tsdbDoMigrateFileObj(rtner, fobj, &did), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -338,15 +326,12 @@ static int32_t tsdbRetention(void *arg) {
|
||||||
// do retention
|
// do retention
|
||||||
if (rtner.fset) {
|
if (rtner.fset) {
|
||||||
if (rtnArg->s3Migrate) {
|
if (rtnArg->s3Migrate) {
|
||||||
code = tsdbDoS3Migrate(&rtner);
|
TAOS_CHECK_GOTO(tsdbDoS3Migrate(&rtner), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
} else {
|
} else {
|
||||||
code = tsdbDoRetention(&rtner);
|
TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbDoRetentionEnd(&rtner);
|
TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -361,7 +346,7 @@ _exit:
|
||||||
TARRAY2_DESTROY(&rtner.fopArr, NULL);
|
TARRAY2_DESTROY(&rtner.fopArr, NULL);
|
||||||
taosMemoryFree(arg);
|
taosMemoryFree(arg);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(pTsdb->pVnode), __func__, code, lino);
|
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -374,12 +359,11 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate)
|
||||||
|
|
||||||
if (!tsdb->bgTaskDisabled) {
|
if (!tsdb->bgTaskDisabled) {
|
||||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||||
code = tsdbTFileSetOpenChannel(fset);
|
TAOS_CHECK_GOTO(tsdbTFileSetOpenChannel(fset), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
|
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
|
||||||
if (arg == NULL) {
|
if (arg == NULL) {
|
||||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
arg->tsdb = tsdb;
|
arg->tsdb = tsdb;
|
||||||
|
@ -396,7 +380,7 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate)
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(tsdb->pVnode), __func__, code, lino);
|
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -446,20 +430,19 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
|
||||||
tsdbTFileName(rtner->tsdb, to, fname);
|
tsdbTFileName(rtner->tsdb, to, fname);
|
||||||
|
|
||||||
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
|
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
|
||||||
if (fdFrom == NULL) code = terrno;
|
if (fdFrom == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
char *object_name = taosDirEntryBaseName(fname);
|
char *object_name = taosDirEntryBaseName(fname);
|
||||||
code = s3PutObjectFromFile2(from->fname, object_name, 1);
|
TAOS_CHECK_GOTO(s3PutObjectFromFile2(from->fname, object_name, 1), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
taosCloseFile(&fdFrom);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
taosCloseFile(&fdFrom);
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
|
taosCloseFile(&fdFrom);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,8 +460,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
||||||
.of = fobj->f[0],
|
.of = fobj->f[0],
|
||||||
};
|
};
|
||||||
|
|
||||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// create new
|
// create new
|
||||||
op = (STFileOp){
|
op = (STFileOp){
|
||||||
|
@ -501,8 +483,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
tsdbTFileName(rtner->tsdb, &op.nf, fname);
|
tsdbTFileName(rtner->tsdb, &op.nf, fname);
|
||||||
|
@ -514,13 +495,13 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
||||||
char *dot = strrchr(object_name_prefix, '.');
|
char *dot = strrchr(object_name_prefix, '.');
|
||||||
if (!dot) {
|
if (!dot) {
|
||||||
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
||||||
return -1;
|
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
char *dot2 = strchr(object_name, '.');
|
char *dot2 = strchr(object_name, '.');
|
||||||
if (!dot) {
|
if (!dot) {
|
||||||
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
||||||
return -1;
|
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
|
||||||
}
|
}
|
||||||
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
|
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
|
||||||
|
|
||||||
|
@ -529,8 +510,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
||||||
int64_t c_offset = chunksize * (cn - fobj->f->lcn);
|
int64_t c_offset = chunksize * (cn - fobj->f->lcn);
|
||||||
|
|
||||||
code = s3PutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize);
|
TAOS_CHECK_GOTO(s3PutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy last chunk
|
// copy last chunk
|
||||||
|
@ -540,30 +520,30 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
||||||
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
|
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
|
||||||
|
|
||||||
fdFrom = taosOpenFile(fname, TD_FILE_READ);
|
fdFrom = taosOpenFile(fname, TD_FILE_READ);
|
||||||
if (fdFrom == NULL) code = terrno;
|
if (fdFrom == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId:%d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, lc_size);
|
tsdbInfo("vgId:%d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, lc_size);
|
||||||
|
|
||||||
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", lcn);
|
snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", lcn);
|
||||||
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||||
if (fdTo == NULL) code = terrno;
|
if (fdTo == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
|
int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
taosCloseFile(&fdFrom);
|
|
||||||
taosCloseFile(&fdTo);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
if (fdFrom) taosCloseFile(&fdFrom);
|
tstrerror(code));
|
||||||
if (fdTo) taosCloseFile(&fdTo);
|
|
||||||
}
|
}
|
||||||
|
taosCloseFile(&fdFrom);
|
||||||
|
taosCloseFile(&fdTo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -581,8 +561,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
||||||
.of = fobj->f[0],
|
.of = fobj->f[0],
|
||||||
};
|
};
|
||||||
|
|
||||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// create new
|
// create new
|
||||||
op = (STFileOp){
|
op = (STFileOp){
|
||||||
|
@ -605,8 +584,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
tsdbTFileName(rtner->tsdb, &op.nf, fname);
|
tsdbTFileName(rtner->tsdb, &op.nf, fname);
|
||||||
|
@ -618,7 +596,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
||||||
char *dot = strrchr(object_name_prefix, '.');
|
char *dot = strrchr(object_name_prefix, '.');
|
||||||
if (!dot) {
|
if (!dot) {
|
||||||
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
||||||
return -1;
|
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// do copy the file
|
// do copy the file
|
||||||
|
@ -626,8 +604,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
||||||
int64_t c_offset = chunksize * (cn - 1);
|
int64_t c_offset = chunksize * (cn - 1);
|
||||||
|
|
||||||
code = s3PutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize);
|
TAOS_CHECK_GOTO(s3PutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy last chunk
|
// copy last chunk
|
||||||
|
@ -637,34 +614,34 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
||||||
dot = strchr(object_name, '.');
|
dot = strchr(object_name, '.');
|
||||||
if (!dot) {
|
if (!dot) {
|
||||||
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
||||||
return -1;
|
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
|
||||||
}
|
}
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name), "%d.data", lcn);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name), "%d.data", lcn);
|
||||||
|
|
||||||
fdFrom = taosOpenFile(fobj->fname, TD_FILE_READ);
|
fdFrom = taosOpenFile(fobj->fname, TD_FILE_READ);
|
||||||
if (fdFrom == NULL) code = terrno;
|
if (fdFrom == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId: %d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, fobj->f->size);
|
tsdbInfo("vgId: %d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, fobj->f->size);
|
||||||
|
|
||||||
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||||
if (fdTo == NULL) code = terrno;
|
if (fdTo == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
|
int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
taosCloseFile(&fdFrom);
|
|
||||||
taosCloseFile(&fdTo);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
taosCloseFile(&fdFrom);
|
taosCloseFile(&fdFrom);
|
||||||
taosCloseFile(&fdTo);
|
taosCloseFile(&fdTo);
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -674,15 +651,21 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
|
||||||
|
|
||||||
STFileSet *fset = rtner->fset;
|
STFileSet *fset = rtner->fset;
|
||||||
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
|
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
|
||||||
if (!fobj) return code;
|
if (!fobj) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
|
int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
|
||||||
if (expLevel < 0) return code; // expired
|
if (expLevel < 0) { // expired
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
|
SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
|
||||||
int32_t s3KeepLocal = pCfg->s3KeepLocal;
|
int32_t s3KeepLocal = pCfg->s3KeepLocal;
|
||||||
int32_t s3ExpLevel = tsdbS3FidLevel(fset->fid, &rtner->tsdb->keepCfg, s3KeepLocal, rtner->now);
|
int32_t s3ExpLevel = tsdbS3FidLevel(fset->fid, &rtner->tsdb->keepCfg, s3KeepLocal, rtner->now);
|
||||||
if (s3ExpLevel < 1) return code; // keep on local storage
|
if (s3ExpLevel < 1) { // keep on local storage
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
||||||
int32_t lcn = fobj->f->lcn;
|
int32_t lcn = fobj->f->lcn;
|
||||||
|
@ -698,19 +681,18 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
|
||||||
STimeWindow win = {0};
|
STimeWindow win = {0};
|
||||||
tsdbFidKeyRange(fset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
|
tsdbFidKeyRange(fset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||||
|
|
||||||
tsdbInfo("vgId:%d, compact begin lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
|
tsdbInfo("vgId:%d, async compact begin lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
|
||||||
tsdbAsyncCompact(rtner->tsdb, &win, pCfg->sttTrigger == 1);
|
code = tsdbAsyncCompact(rtner->tsdb, &win, pCfg->sttTrigger == 1);
|
||||||
tsdbInfo("vgId:%d, compact end lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
|
tsdbInfo("vgId:%d, async compact end lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
|
||||||
|
goto _exit;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbMigrateDataFileS3(rtner, fobj, size, chunksize);
|
TAOS_CHECK_GOTO(tsdbMigrateDataFileS3(rtner, fobj, size, chunksize), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (lcn <= 1) {
|
if (lcn <= 1) {
|
||||||
tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
|
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _exit);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
char fname1[TSDB_FILENAME_LEN];
|
char fname1[TSDB_FILENAME_LEN];
|
||||||
tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1);
|
tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1);
|
||||||
|
@ -720,8 +702,7 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
|
||||||
int64_t size = 0;
|
int64_t size = 0;
|
||||||
taosStatFile(fname1, &size, &mtime, NULL);
|
taosStatFile(fname1, &size, &mtime, NULL);
|
||||||
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
|
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
|
||||||
code = tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize);
|
TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbError("vgId:%d, file: %s not found, %s at line %d", TD_VID(rtner->tsdb->pVnode), fname1, __func__, lino);
|
tsdbError("vgId:%d, file: %s not found, %s at line %d", TD_VID(rtner->tsdb->pVnode), fname1, __func__, lino);
|
||||||
|
@ -731,7 +712,8 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -750,11 +732,15 @@ int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tsS3Enabled) {
|
if (!tsS3Enabled) {
|
||||||
return code;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&tsdb->mutex);
|
taosThreadMutexLock(&tsdb->mutex);
|
||||||
code = tsdbAsyncRetentionImpl(tsdb, now, true);
|
code = tsdbAsyncRetentionImpl(tsdb, now, true);
|
||||||
taosThreadMutexUnlock(&tsdb->mutex);
|
taosThreadMutexUnlock(&tsdb->mutex);
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,17 +58,9 @@ struct STsdbSnapReader {
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
|
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
|
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
|
||||||
tsdbDataFileReaderClose(&reader->dataReader);
|
tsdbDataFileReaderClose(&reader->dataReader);
|
||||||
|
return 0;
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
|
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
|
||||||
|
@ -255,7 +247,6 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
|
||||||
SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
|
SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
|
||||||
code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
|
code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
|
@ -448,8 +439,8 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type,
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
|
tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
|
||||||
__func__, lino, tstrerror(code), sver, ever, type);
|
__func__, __FILE__, lino, tstrerror(code), sver, ever, type);
|
||||||
tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
|
tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
|
||||||
taosMemoryFree(reader[0]);
|
taosMemoryFree(reader[0]);
|
||||||
reader[0] = NULL;
|
reader[0] = NULL;
|
||||||
|
@ -461,10 +452,11 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
|
int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
|
||||||
if (reader[0] == NULL) return 0;
|
if (reader[0] == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdb* tsdb = reader[0]->tsdb;
|
STsdb* tsdb = reader[0]->tsdb;
|
||||||
|
|
||||||
|
@ -488,12 +480,6 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
|
||||||
taosMemoryFree(reader[0]);
|
taosMemoryFree(reader[0]);
|
||||||
reader[0] = NULL;
|
reader[0] = NULL;
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
|
||||||
} else {
|
|
||||||
tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
|
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -148,30 +148,14 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapRAWReadFileSetCloseReader(STsdbSnapRAWReader* reader) {
|
static int32_t tsdbSnapRAWReadFileSetCloseReader(STsdbSnapRAWReader* reader) {
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
TARRAY2_CLEAR(reader->dataReaderArr, tsdbDataFileRAWReaderClose);
|
TARRAY2_CLEAR(reader->dataReaderArr, tsdbDataFileRAWReaderClose);
|
||||||
|
return 0;
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapRAWReadFileSetOpenIter(STsdbSnapRAWReader* reader) {
|
static int32_t tsdbSnapRAWReadFileSetOpenIter(STsdbSnapRAWReader* reader) {
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
reader->dataIter->count = TARRAY2_SIZE(reader->dataReaderArr);
|
reader->dataIter->count = TARRAY2_SIZE(reader->dataReaderArr);
|
||||||
reader->dataIter->idx = 0;
|
reader->dataIter->idx = 0;
|
||||||
|
return 0;
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) {
|
static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) {
|
||||||
|
@ -392,17 +376,6 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapRAWWriteFileSetOpenIter(STsdbSnapRAWWriter* writer) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapRAWWriteFileSetCloseIter(STsdbSnapRAWWriter* writer) { return 0; }
|
static int32_t tsdbSnapRAWWriteFileSetCloseIter(STsdbSnapRAWWriter* writer) { return 0; }
|
||||||
|
|
||||||
static int32_t tsdbSnapRAWWriteFileSetOpenWriter(STsdbSnapRAWWriter* writer) {
|
static int32_t tsdbSnapRAWWriteFileSetOpenWriter(STsdbSnapRAWWriter* writer) {
|
||||||
|
@ -445,10 +418,8 @@ static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t
|
||||||
writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
|
writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
|
||||||
|
|
||||||
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
|
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
|
||||||
if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) {
|
code = tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did);
|
||||||
code = TSDB_CODE_NO_AVAIL_DISK;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
|
||||||
tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did);
|
tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did);
|
||||||
|
|
||||||
code = tsdbSnapRAWWriteFileSetOpenWriter(writer);
|
code = tsdbSnapRAWWriteFileSetOpenWriter(writer);
|
||||||
|
|
Loading…
Reference in New Issue