From a157caa38db8b8a4c39908d647c9b02a8e222fdc Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 29 Nov 2024 18:57:09 +0800 Subject: [PATCH] enh: process log rotate every 30 minutes --- include/common/cos_cp.h | 6 +- include/os/osFile.h | 4 +- source/common/src/cos.c | 8 +- source/common/src/cos_cp.c | 4 +- source/dnode/vnode/src/tsdb/tsdbRetention.c | 4 +- source/libs/wal/src/walMeta.c | 2 +- source/os/src/osDir.c | 6 +- source/os/src/osFile.c | 14 +-- source/util/src/tlog.c | 115 +++++++++++++++++++- 9 files changed, 133 insertions(+), 30 deletions(-) diff --git a/include/common/cos_cp.h b/include/common/cos_cp.h index 29532c0265..e54941f5d3 100644 --- a/include/common/cos_cp.h +++ b/include/common/cos_cp.h @@ -44,7 +44,7 @@ typedef struct { char file_path[TSDB_FILENAME_LEN]; // local file path int64_t file_size; // local file size, for upload - int32_t file_last_modified; // local file last modified time, for upload + int64_t file_last_modified; // local file last modified time, for upload char file_md5[64]; // md5 of the local file content, for upload, reserved char object_name[128]; // object name @@ -67,9 +67,9 @@ int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint); int32_t cos_cp_dump(SCheckpoint* checkpoint); void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes); void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64); -void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, +void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int64_t mtime, char const* upload_id, int64_t part_size); -bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime); +bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int64_t mtime); void cos_cp_build_download(SCheckpoint* checkpoint, char const* filepath, char const* object_name, int64_t object_size, char const* object_lmtime, char const* object_etag, int64_t part_size); diff --git a/include/os/osFile.h b/include/os/osFile.h index 536dee268a..1c397f3042 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -82,9 +82,9 @@ int32_t taosUnLockFile(TdFilePtr pFile); int32_t taosUmaskFile(int32_t maskVal); -int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *atime); +int32_t taosStatFile(const char *path, int64_t *size, int64_t *mtime, int64_t *atime); int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno); -int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime); +int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int64_t *mtime); bool taosCheckExistFile(const char *pathname); int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence); diff --git a/source/common/src/cos.c b/source/common/src/cos.c index a7e69ddc4c..de4f65f2ad 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -775,7 +775,7 @@ _exit: TAOS_RETURN(code); } -static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime, +static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int64_t lmtime, char const *object_name, int64_t contentLength, S3PutProperties *put_prop, put_object_callback_data *data) { int32_t code = 0, lino = 0; @@ -963,7 +963,7 @@ _exit: int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8_t withcp, int8_t epIndex) { int32_t code = 0; - int32_t lmtime = 0; + int64_t lmtime = 0; const char *filename = 0; uint64_t contentLength = 0; const char *cacheControl = 0, *contentType = 0, *md5 = 0; @@ -1040,7 +1040,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size, int8_t epIndex) { int32_t code = 0; - int32_t lmtime = 0; + int64_t lmtime = 0; const char *filename = 0; uint64_t contentLength = 0; const char *cacheControl = 0, *contentType = 0, *md5 = 0; @@ -1847,7 +1847,7 @@ _exit: typedef struct { int64_t size; - int32_t atime; + int64_t atime; char name[TSDB_FILENAME_LEN]; } SEvictFile; diff --git a/source/common/src/cos_cp.c b/source/common/src/cos_cp.c index 078b14c9e8..3469e8ecec 100644 --- a/source/common/src/cos_cp.c +++ b/source/common/src/cos_cp.c @@ -350,7 +350,7 @@ void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag checkpoint->parts[part_index].crc64 = crc64; } -void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, +void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int64_t mtime, char const* upload_id, int64_t part_size) { int i = 0; @@ -375,7 +375,7 @@ void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t static bool cos_cp_verify_md5(SCheckpoint* cp) { return true; } -bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime) { +bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int64_t mtime) { if (cos_cp_verify_md5(checkpoint) && checkpoint->file_size == size && checkpoint->file_last_modified == mtime) { return true; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 0072fd5e7f..c86a07584d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -660,7 +660,7 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) { int32_t lcn = fobj->f->lcn; if (/*lcn < 1 && */ taosCheckExistFile(fobj->fname)) { - int32_t mtime = 0; + int64_t mtime = 0; int64_t size = 0; int32_t r = taosStatFile(fobj->fname, &size, &mtime, NULL); if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) { @@ -687,7 +687,7 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) { tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1); if (taosCheckExistFile(fname1)) { - int32_t mtime = 0; + int64_t mtime = 0; int64_t size = 0; if (taosStatFile(fname1, &size, &mtime, NULL) != 0) { tsdbError("vgId:%d, %s failed at %s:%d ", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, __LINE__); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index b40a9eeefe..78f13a58ab 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -326,7 +326,7 @@ static int32_t walRepairLogFileTs(SWal* pWal, bool* updateMeta) { } walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); - int32_t mtime = 0; + int64_t mtime = 0; if (taosStatFile(fnameStr, NULL, &mtime, NULL) < 0) { wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index a6b0941577..194e69c09c 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -301,14 +301,14 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; char filename[1024]; - (void)snprintf(filename, sizeof(filename), "%s/%s", dirname, taosGetDirEntryName(de)); + (void)snprintf(filename, sizeof(filename), "%s%s%s", dirname, TD_DIRSEP, taosGetDirEntryName(de)); if (taosDirEntryIsDir(de)) { continue; } else { int32_t len = (int32_t)strlen(filename); if (len > 3 && strcmp(filename + len - 3, ".gz") == 0) { len -= 3; - }else{ + } else { continue; } @@ -324,7 +324,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { int32_t days = (int32_t)(TABS(sec - fileSec) / 86400 + 1); if (days > keepDays) { TAOS_UNUSED(taosRemoveFile(filename)); - uInfo("file:%s is removed, days:%d keepDays:%d, sed:%"PRId64, filename, days, keepDays, fileSec); + uInfo("file:%s is removed, days:%d keepDays:%d, sed:%" PRId64, filename, days, keepDays, fileSec); } else { // printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays); } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index c2484860ad..8ee0e74d4f 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -273,7 +273,7 @@ int32_t taosRenameFile(const char *oldName, const char *newName) { #endif } -int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *atime) { +int32_t taosStatFile(const char *path, int64_t *size, int64_t *mtime, int64_t *atime) { OS_PARAM_CHECK(path); #ifdef WINDOWS struct _stati64 fileStat; @@ -292,11 +292,11 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *a } if (mtime != NULL) { - *mtime = fileStat.st_mtime; + *mtime = fileStat.st_mtim.tv_sec; } if (atime != NULL) { - *atime = fileStat.st_atime; + *atime = fileStat.st_atim.tv_sec; } return 0; @@ -544,7 +544,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { return liOffset.QuadPart; } -int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { +int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int64_t *mtime) { if (pFile == NULL || pFile->hFile == NULL) { terrno = TSDB_CODE_INVALID_PARA; return terrno; @@ -571,7 +571,7 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { ULARGE_INTEGER ull; ull.LowPart = lastWriteTime.dwLowDateTime; ull.HighPart = lastWriteTime.dwHighDateTime; - *mtime = (int32_t)((ull.QuadPart - 116444736000000000ULL) / 10000000ULL); + *mtime = (int64_t)((ull.QuadPart - 116444736000000000ULL) / 10000000ULL); } return 0; } @@ -937,7 +937,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { return ret; } -int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { +int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int64_t *mtime) { if (pFile == NULL) { terrno = TSDB_CODE_INVALID_PARA; return terrno; @@ -960,7 +960,7 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { } if (mtime != NULL) { - *mtime = fileStat.st_mtime; + *mtime = fileStat.st_mtim.tv_sec; } return 0; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 4f2cce5869..e364719cb1 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -94,6 +94,7 @@ static int8_t tsLogInited = 0; static SLogObj tsLogObj = {.fileNum = 1, .slowHandle = NULL}; static int64_t tsAsyncLogLostLines = 0; static int32_t tsDaylightActive; /* Currently in daylight saving time. */ +static SRWLatch tsLogRotateLatch = 0; bool tsLogEmbedded = 0; bool tsAsyncLog = true; @@ -408,10 +409,6 @@ static void taosKeepOldLog(char *oldName) { } } } - - if (tsLogKeepDays > 0) { - taosRemoveOldFiles(tsLogDir, tsLogKeepDays); - } } typedef struct { TdFilePtr pOldFile; @@ -460,11 +457,16 @@ static OldFileKeeper *taosOpenNewFile() { static void *taosThreadToCloseOldFile(void *param) { if (!param) return NULL; + taosWLockLatch(&tsLogRotateLatch); OldFileKeeper *oldFileKeeper = (OldFileKeeper *)param; taosSsleep(20); taosCloseLogByFd(oldFileKeeper->pOldFile); taosKeepOldLog(oldFileKeeper->keepName); taosMemoryFree(oldFileKeeper); + if (tsLogKeepDays > 0) { + taosRemoveOldFiles(tsLogDir, tsLogKeepDays); + } + taosWUnLockLatch(&tsLogRotateLatch); return NULL; } @@ -600,8 +602,8 @@ static void decideLogFileName(const char *fn, int32_t maxFileNum) { static void decideLogFileNameFlag() { char name[PATH_MAX + 50] = "\0"; - int32_t logstat0_mtime = 0; - int32_t logstat1_mtime = 0; + int64_t logstat0_mtime = 0; + int64_t logstat1_mtime = 0; bool log0Exist = false; bool log1Exist = false; @@ -1076,6 +1078,87 @@ static void taosWriteLog(SLogBuff *pLogBuf) { pLogBuf->writeInterval = 0; } +#define LOG_ROTATE_INTERVAL 30 +#ifndef LOG_ROTATE_BOOT +#define LOG_ROTATE_BOOT 3 +#endif +static void *taosLogRotateFunc(void *param) { + setThreadName("logRotate"); + int32_t code = 0; + taosWLockLatch(&tsLogRotateLatch); + // compress or remove the old log files + TdDirPtr pDir = taosOpenDir(tsLogDir); + if (!pDir) goto _exit; + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir))) { + if (taosDirEntryIsDir(de)) { + continue; + } + char *fname = taosGetDirEntryName(de); + if (!fname) { + continue; + } + + char *pSec = strrchr(fname, '.'); + if (!pSec) { + continue; + } + char *pIter = pSec; + bool isSec = true; + while (*(++pIter)) { + if (!isdigit(*pIter)) { + isSec = false; + break; + } + } + if (!isSec) { + continue; + } + + int64_t fileSec = 0; + if ((code = taosStr2int64(pSec + 1, NULL)) != 0) { + uWarn("%s:%d failed to convert %s to int64 since %s", __func__, __LINE__, pSec + 1, tstrerror(code)); + continue; + } + if (fileSec <= 100) { + continue; + } + + int64_t mtime = 0; + if ((code = taosStatFile(fname, NULL, &mtime, NULL)) != 0) { + uWarn("%s:%d failed to stat file %s since %s", __func__, __LINE__, fname, tstrerror(code)); + continue; + } + + int64_t elapseSec = taosGetTimestampMs() / 1000 - mtime; + + if (elapseSec < 86400) { + continue; + } + + char fullName[PATH_MAX] = {0}; + snprintf(fullName, sizeof(fullName), "%s%s%s", pDir, TD_DIRSEP, fname); + + int32_t days = elapseSec / 86400 + 1; + if (tsLogKeepDays > 0 && days > tsLogKeepDays) { + TAOS_UNUSED(taosRemoveFile(fullName)); + uInfo("file:%s is removed, days:%d keepDays:%d, sed:%" PRId64, fullName, days, tsLogKeepDays, fileSec); + } else { + taosKeepOldLog(fullName); // compress + } + } + if ((code = taosCloseDir(&pDir)) != 0) { + uWarn("%s:%d failed to close dir:%s since %s\n", __func__, __LINE__, tsLogDir, tstrerror(code)); + } + + if (tsLogKeepDays > 0) { + taosRemoveOldFiles(tsLogDir, tsLogKeepDays); + } +_exit: + taosWUnLockLatch(&tsLogRotateLatch); + return NULL; +} + static void *taosAsyncOutputLog(void *param) { SLogBuff *pLogBuf = (SLogBuff *)tsLogObj.logHandle; SLogBuff *pSlowBuf = (SLogBuff *)tsLogObj.slowHandle; @@ -1084,6 +1167,7 @@ static void *taosAsyncOutputLog(void *param) { int32_t count = 0; int32_t updateCron = 0; int32_t writeInterval = 0; + int32_t lastCheckMin = taosGetTimestampMs() / 60000 - (LOG_ROTATE_INTERVAL - LOG_ROTATE_BOOT); while (1) { if (pSlowBuf) { @@ -1109,6 +1193,25 @@ static void *taosAsyncOutputLog(void *param) { if (pSlowBuf) taosWriteSlowLog(pSlowBuf); break; } + + // process the log rotation every LOG_ROTATE_INTERVAL minutes + int32_t curMin = taosGetTimestampMs() / 60000; + if (curMin >= lastCheckMin) { + if ((curMin - lastCheckMin) >= LOG_ROTATE_INTERVAL) { + TdThread thread; + TdThreadAttr attr; + (void)taosThreadAttrInit(&attr); + (void)taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_DETACHED); + if (taosThreadCreate(&thread, &attr, taosLogRotateFunc, tsLogObj.logHandle) == 0) { + lastCheckMin = curMin; + } else { + uWarn("failed to create thread to process log buffer"); + } + (void)taosThreadAttrDestroy(&attr); + } + } else if (curMin < lastCheckMin) { + lastCheckMin = curMin; + } } return NULL;