From bce6f40abbc7390d34048556411e114d66a65af2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 8 Aug 2024 17:29:18 +0800 Subject: [PATCH] feat:[TS-3718]add lock to avoid write slow at the same time --- source/util/src/tlog.c | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 26e5a0039d..411e251706 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -61,6 +61,7 @@ typedef struct { TdThreadMutex buffMutex; int32_t writeInterval; int32_t lastDuration; + int32_t lock; } SLogBuff; typedef struct { @@ -136,6 +137,8 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize); static void taosCloseLogByFd(TdFilePtr pFile); static int32_t taosInitNormalLog(const char *fn, int32_t maxFileNum); static void taosWriteLog(SLogBuff *pLogBuf); +static void taosWriteSlowLog(SLogBuff *pLogBuf); + static int32_t taosStartLog() { TdThreadAttr threadAttr; (void)taosThreadAttrInit(&threadAttr); @@ -387,14 +390,24 @@ static int32_t taosOpenNewLogFile() { return 0; } -static void taosOpenNewSlowLogFile(int64_t today) { +static void taosOpenNewSlowLogFile() { (void)taosThreadMutexLock(&tsLogObj.logMutex); - if (tsLogObj.timestampToday == today) { + int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday; + if (delta >= 0 && delta < 86400) { uInfo("timestampToday is already equal to today, no need to open new slow log file"); (void)taosThreadMutexUnlock(&tsLogObj.logMutex); return; } + + for (int32_t i = 1; atomic_val_compare_exchange_32(&tsLogObj.slowHandle->lock, 0, 1) == 1; ++i) { + if (i % 1000 == 0) { + (void)sched_yield(); + } + } + tsLogObj.slowHandle->lastDuration = LOG_MAX_WAIT_MSEC; // force write taosWriteLog(tsLogObj.slowHandle); + atomic_store_32(&tsLogObj.slowHandle->lock, 0); + char day[LOG_FILE_DAY_LEN] = {0}; getDay(day); TdFilePtr pFile = NULL; @@ -410,7 +423,7 @@ static void taosOpenNewSlowLogFile(int64_t today) { TdFilePtr pOldFile = tsLogObj.slowHandle->pFile; tsLogObj.slowHandle->pFile = pFile; (void)taosCloseFile(&pOldFile); - tsLogObj.timestampToday = today; + tsLogObj.timestampToday = getTimestampToday(); (void)taosThreadMutexUnlock(&tsLogObj.logMutex); } @@ -654,9 +667,9 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons void taosPrintSlowLog(const char *format, ...) { if (!osLogSpaceSufficient()) return; - int64_t today = getTimestampToday(); - if (today != tsLogObj.timestampToday){ - taosOpenNewSlowLogFile(today); + int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday; + if (delta >= 86400 || delta < 0) { + taosOpenNewSlowLogFile(); } char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); @@ -729,7 +742,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) { pLogBuf->minBuffSize = bufSize / 10; pLogBuf->stop = 0; pLogBuf->writeInterval = LOG_DEFAULT_INTERVAL; - + pLogBuf->lock = 0; if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err; // tsem_init(&(pLogBuf->buffNotEmpty), 0, 0); @@ -811,6 +824,12 @@ static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t en return rSize >= 0 ? rSize : LOG_BUF_SIZE(pLogBuf) + rSize; } +static void taosWriteSlowLog(SLogBuff *pLogBuf){ + int32_t lock = atomic_val_compare_exchange_32(&pLogBuf->lock, 0, 1); + if (lock == 1) return; + taosWriteLog(pLogBuf); + atomic_store_32(&pLogBuf->lock, 0); +} static void taosWriteLog(SLogBuff *pLogBuf) { int32_t start = LOG_BUF_START(pLogBuf); int32_t end = LOG_BUF_END(pLogBuf); @@ -895,12 +914,12 @@ static void *taosAsyncOutputLog(void *param) { // Polling the buffer taosWriteLog(pLogBuf); - if (pSlowBuf) taosWriteLog(pSlowBuf); + if (pSlowBuf) taosWriteSlowLog(pSlowBuf); if (pLogBuf->stop || (pSlowBuf && pSlowBuf->stop)) { pLogBuf->lastDuration = LOG_MAX_WAIT_MSEC; taosWriteLog(pLogBuf); - if (pSlowBuf) taosWriteLog(pSlowBuf); + if (pSlowBuf) taosWriteSlowLog(pSlowBuf); break; } }