enh: process log rotate every 30 minutes

This commit is contained in:
kailixu 2024-11-29 18:57:09 +08:00
parent d256fb1c27
commit a157caa38d
9 changed files with 133 additions and 30 deletions

View File

@ -44,7 +44,7 @@ typedef struct {
char file_path[TSDB_FILENAME_LEN]; // local file path
int64_t file_size; // local file size, for upload
int32_t file_last_modified; // local file last modified time, for upload
int64_t file_last_modified; // local file last modified time, for upload
char file_md5[64]; // md5 of the local file content, for upload, reserved
char object_name[128]; // object name
@ -67,9 +67,9 @@ int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint);
int32_t cos_cp_dump(SCheckpoint* checkpoint);
void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes);
void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64);
void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime,
void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int64_t mtime,
char const* upload_id, int64_t part_size);
bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime);
bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int64_t mtime);
void cos_cp_build_download(SCheckpoint* checkpoint, char const* filepath, char const* object_name, int64_t object_size,
char const* object_lmtime, char const* object_etag, int64_t part_size);

View File

@ -82,9 +82,9 @@ int32_t taosUnLockFile(TdFilePtr pFile);
int32_t taosUmaskFile(int32_t maskVal);
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *atime);
int32_t taosStatFile(const char *path, int64_t *size, int64_t *mtime, int64_t *atime);
int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno);
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime);
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int64_t *mtime);
bool taosCheckExistFile(const char *pathname);
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence);

View File

@ -775,7 +775,7 @@ _exit:
TAOS_RETURN(code);
}
static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime,
static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int64_t lmtime,
char const *object_name, int64_t contentLength, S3PutProperties *put_prop,
put_object_callback_data *data) {
int32_t code = 0, lino = 0;
@ -963,7 +963,7 @@ _exit:
int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8_t withcp, int8_t epIndex) {
int32_t code = 0;
int32_t lmtime = 0;
int64_t lmtime = 0;
const char *filename = 0;
uint64_t contentLength = 0;
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
@ -1040,7 +1040,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w
static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size,
int8_t epIndex) {
int32_t code = 0;
int32_t lmtime = 0;
int64_t lmtime = 0;
const char *filename = 0;
uint64_t contentLength = 0;
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
@ -1847,7 +1847,7 @@ _exit:
typedef struct {
int64_t size;
int32_t atime;
int64_t atime;
char name[TSDB_FILENAME_LEN];
} SEvictFile;

View File

@ -350,7 +350,7 @@ void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag
checkpoint->parts[part_index].crc64 = crc64;
}
void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime,
void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int64_t mtime,
char const* upload_id, int64_t part_size) {
int i = 0;
@ -375,7 +375,7 @@ void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t
static bool cos_cp_verify_md5(SCheckpoint* cp) { return true; }
bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime) {
bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int64_t mtime) {
if (cos_cp_verify_md5(checkpoint) && checkpoint->file_size == size && checkpoint->file_last_modified == mtime) {
return true;
}

View File

@ -660,7 +660,7 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
int32_t lcn = fobj->f->lcn;
if (/*lcn < 1 && */ taosCheckExistFile(fobj->fname)) {
int32_t mtime = 0;
int64_t mtime = 0;
int64_t size = 0;
int32_t r = taosStatFile(fobj->fname, &size, &mtime, NULL);
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
@ -687,7 +687,7 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1);
if (taosCheckExistFile(fname1)) {
int32_t mtime = 0;
int64_t mtime = 0;
int64_t size = 0;
if (taosStatFile(fname1, &size, &mtime, NULL) != 0) {
tsdbError("vgId:%d, %s failed at %s:%d ", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, __LINE__);

View File

@ -326,7 +326,7 @@ static int32_t walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
}
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
int32_t mtime = 0;
int64_t mtime = 0;
if (taosStatFile(fnameStr, NULL, &mtime, NULL) < 0) {
wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);

View File

@ -301,14 +301,14 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
char filename[1024];
(void)snprintf(filename, sizeof(filename), "%s/%s", dirname, taosGetDirEntryName(de));
(void)snprintf(filename, sizeof(filename), "%s%s%s", dirname, TD_DIRSEP, taosGetDirEntryName(de));
if (taosDirEntryIsDir(de)) {
continue;
} else {
int32_t len = (int32_t)strlen(filename);
if (len > 3 && strcmp(filename + len - 3, ".gz") == 0) {
len -= 3;
}else{
} else {
continue;
}
@ -324,7 +324,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
int32_t days = (int32_t)(TABS(sec - fileSec) / 86400 + 1);
if (days > keepDays) {
TAOS_UNUSED(taosRemoveFile(filename));
uInfo("file:%s is removed, days:%d keepDays:%d, sed:%"PRId64, filename, days, keepDays, fileSec);
uInfo("file:%s is removed, days:%d keepDays:%d, sed:%" PRId64, filename, days, keepDays, fileSec);
} else {
// printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays);
}

View File

@ -273,7 +273,7 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
#endif
}
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *atime) {
int32_t taosStatFile(const char *path, int64_t *size, int64_t *mtime, int64_t *atime) {
OS_PARAM_CHECK(path);
#ifdef WINDOWS
struct _stati64 fileStat;
@ -292,11 +292,11 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *a
}
if (mtime != NULL) {
*mtime = fileStat.st_mtime;
*mtime = fileStat.st_mtim.tv_sec;
}
if (atime != NULL) {
*atime = fileStat.st_atime;
*atime = fileStat.st_atim.tv_sec;
}
return 0;
@ -544,7 +544,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
return liOffset.QuadPart;
}
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int64_t *mtime) {
if (pFile == NULL || pFile->hFile == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
@ -571,7 +571,7 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
ULARGE_INTEGER ull;
ull.LowPart = lastWriteTime.dwLowDateTime;
ull.HighPart = lastWriteTime.dwHighDateTime;
*mtime = (int32_t)((ull.QuadPart - 116444736000000000ULL) / 10000000ULL);
*mtime = (int64_t)((ull.QuadPart - 116444736000000000ULL) / 10000000ULL);
}
return 0;
}
@ -937,7 +937,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
return ret;
}
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int64_t *mtime) {
if (pFile == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
@ -960,7 +960,7 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
}
if (mtime != NULL) {
*mtime = fileStat.st_mtime;
*mtime = fileStat.st_mtim.tv_sec;
}
return 0;

View File

@ -94,6 +94,7 @@ static int8_t tsLogInited = 0;
static SLogObj tsLogObj = {.fileNum = 1, .slowHandle = NULL};
static int64_t tsAsyncLogLostLines = 0;
static int32_t tsDaylightActive; /* Currently in daylight saving time. */
static SRWLatch tsLogRotateLatch = 0;
bool tsLogEmbedded = 0;
bool tsAsyncLog = true;
@ -408,10 +409,6 @@ static void taosKeepOldLog(char *oldName) {
}
}
}
if (tsLogKeepDays > 0) {
taosRemoveOldFiles(tsLogDir, tsLogKeepDays);
}
}
typedef struct {
TdFilePtr pOldFile;
@ -460,11 +457,16 @@ static OldFileKeeper *taosOpenNewFile() {
static void *taosThreadToCloseOldFile(void *param) {
if (!param) return NULL;
taosWLockLatch(&tsLogRotateLatch);
OldFileKeeper *oldFileKeeper = (OldFileKeeper *)param;
taosSsleep(20);
taosCloseLogByFd(oldFileKeeper->pOldFile);
taosKeepOldLog(oldFileKeeper->keepName);
taosMemoryFree(oldFileKeeper);
if (tsLogKeepDays > 0) {
taosRemoveOldFiles(tsLogDir, tsLogKeepDays);
}
taosWUnLockLatch(&tsLogRotateLatch);
return NULL;
}
@ -600,8 +602,8 @@ static void decideLogFileName(const char *fn, int32_t maxFileNum) {
static void decideLogFileNameFlag() {
char name[PATH_MAX + 50] = "\0";
int32_t logstat0_mtime = 0;
int32_t logstat1_mtime = 0;
int64_t logstat0_mtime = 0;
int64_t logstat1_mtime = 0;
bool log0Exist = false;
bool log1Exist = false;
@ -1076,6 +1078,87 @@ static void taosWriteLog(SLogBuff *pLogBuf) {
pLogBuf->writeInterval = 0;
}
#define LOG_ROTATE_INTERVAL 30
#ifndef LOG_ROTATE_BOOT
#define LOG_ROTATE_BOOT 3
#endif
static void *taosLogRotateFunc(void *param) {
setThreadName("logRotate");
int32_t code = 0;
taosWLockLatch(&tsLogRotateLatch);
// compress or remove the old log files
TdDirPtr pDir = taosOpenDir(tsLogDir);
if (!pDir) goto _exit;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir))) {
if (taosDirEntryIsDir(de)) {
continue;
}
char *fname = taosGetDirEntryName(de);
if (!fname) {
continue;
}
char *pSec = strrchr(fname, '.');
if (!pSec) {
continue;
}
char *pIter = pSec;
bool isSec = true;
while (*(++pIter)) {
if (!isdigit(*pIter)) {
isSec = false;
break;
}
}
if (!isSec) {
continue;
}
int64_t fileSec = 0;
if ((code = taosStr2int64(pSec + 1, NULL)) != 0) {
uWarn("%s:%d failed to convert %s to int64 since %s", __func__, __LINE__, pSec + 1, tstrerror(code));
continue;
}
if (fileSec <= 100) {
continue;
}
int64_t mtime = 0;
if ((code = taosStatFile(fname, NULL, &mtime, NULL)) != 0) {
uWarn("%s:%d failed to stat file %s since %s", __func__, __LINE__, fname, tstrerror(code));
continue;
}
int64_t elapseSec = taosGetTimestampMs() / 1000 - mtime;
if (elapseSec < 86400) {
continue;
}
char fullName[PATH_MAX] = {0};
snprintf(fullName, sizeof(fullName), "%s%s%s", pDir, TD_DIRSEP, fname);
int32_t days = elapseSec / 86400 + 1;
if (tsLogKeepDays > 0 && days > tsLogKeepDays) {
TAOS_UNUSED(taosRemoveFile(fullName));
uInfo("file:%s is removed, days:%d keepDays:%d, sed:%" PRId64, fullName, days, tsLogKeepDays, fileSec);
} else {
taosKeepOldLog(fullName); // compress
}
}
if ((code = taosCloseDir(&pDir)) != 0) {
uWarn("%s:%d failed to close dir:%s since %s\n", __func__, __LINE__, tsLogDir, tstrerror(code));
}
if (tsLogKeepDays > 0) {
taosRemoveOldFiles(tsLogDir, tsLogKeepDays);
}
_exit:
taosWUnLockLatch(&tsLogRotateLatch);
return NULL;
}
static void *taosAsyncOutputLog(void *param) {
SLogBuff *pLogBuf = (SLogBuff *)tsLogObj.logHandle;
SLogBuff *pSlowBuf = (SLogBuff *)tsLogObj.slowHandle;
@ -1084,6 +1167,7 @@ static void *taosAsyncOutputLog(void *param) {
int32_t count = 0;
int32_t updateCron = 0;
int32_t writeInterval = 0;
int32_t lastCheckMin = taosGetTimestampMs() / 60000 - (LOG_ROTATE_INTERVAL - LOG_ROTATE_BOOT);
while (1) {
if (pSlowBuf) {
@ -1109,6 +1193,25 @@ static void *taosAsyncOutputLog(void *param) {
if (pSlowBuf) taosWriteSlowLog(pSlowBuf);
break;
}
// process the log rotation every LOG_ROTATE_INTERVAL minutes
int32_t curMin = taosGetTimestampMs() / 60000;
if (curMin >= lastCheckMin) {
if ((curMin - lastCheckMin) >= LOG_ROTATE_INTERVAL) {
TdThread thread;
TdThreadAttr attr;
(void)taosThreadAttrInit(&attr);
(void)taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_DETACHED);
if (taosThreadCreate(&thread, &attr, taosLogRotateFunc, tsLogObj.logHandle) == 0) {
lastCheckMin = curMin;
} else {
uWarn("failed to create thread to process log buffer");
}
(void)taosThreadAttrDestroy(&attr);
}
} else if (curMin < lastCheckMin) {
lastCheckMin = curMin;
}
}
return NULL;