From 24844d2e26f9e54d083042ac1f34951504458d44 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Mon, 1 Feb 2021 10:41:27 +0800 Subject: [PATCH] fix bug --- src/util/src/tlog.c | 113 ++++++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 45 deletions(-) diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index f2ea813a8e..494da588c3 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -30,6 +30,12 @@ #define LOG_FILE_NAME_LEN 300 #define TSDB_DEFAULT_LOG_BUF_SIZE (10 * 1024 * 1024) // 10MB +#define DEFAULT_LOG_INTERVAL 50000 +#define LOG_INTERVAL_STEP 5000 +#define MIN_LOG_INTERVAL 5000 +#define MAX_LOG_INTERVAL 50000 + + #define LOG_BUF_BUFFER(x) ((x)->buffer) #define LOG_BUF_START(x) ((x)->buffStart) #define LOG_BUF_END(x) ((x)->buffEnd) @@ -66,13 +72,15 @@ float tsTotalLogDirGB = 0; float tsAvailLogDirGB = 0; float tsMinimalLogDirGB = 1.0f; int64_t asyncLogLostLines = 0; +int32_t writeInterval = DEFAULT_LOG_INTERVAL; int64_t dbgPostN = 0; int64_t dbgNoPostN = 0; int64_t dbgEmptyW = 0; -int64_t dbgW = 0; -int64_t dbgSmallW = 0; +int64_t dbgWN = 0; +int64_t dbgSmallWN = 0; +int64_t dbgBigWN = 0; int64_t dbgWSize = 0; #ifdef _TD_POWER_ @@ -115,7 +123,8 @@ static void taosStopLog() { void taosCloseLog() { taosStopLog(); - tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); + //tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); + usleep(MAX_LOG_INTERVAL); if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { pthread_join(tsLogObj.logHandle->asyncThread, NULL); } @@ -513,7 +522,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) { tLogBuff->stop = 0; if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; - tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); + //tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); return tLogBuff; @@ -557,7 +566,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) if (tLogBuff == NULL || tLogBuff->stop) return -1; - atomic_add_fetch_32(&waitLock, 1); + //atomic_add_fetch_32(&waitLock, 1); pthread_mutex_lock(&LOG_BUF_MUTEX(tLogBuff)); start = LOG_BUF_START(tLogBuff); @@ -585,14 +594,15 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen); - int32_t w = atomic_sub_fetch_32(&waitLock, 1); - + //int32_t w = atomic_sub_fetch_32(&waitLock, 1); + /* if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) { tsem_post(&(tLogBuff->buffNotEmpty)); dbgPostN++; } else { dbgNoPostN++; } + */ pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff)); @@ -600,59 +610,72 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) return 0; } -static int32_t taosPollLogBuffer(SLogBuff *tLogBuff, char *buf, int32_t bufSize) { +static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff) { int32_t start = LOG_BUF_START(tLogBuff); int32_t end = LOG_BUF_END(tLogBuff); - int32_t pollSize = 0; + int32_t rSize = end - start; - if (start == end) { - return 0; - } else if (start < end) { - pollSize = MIN(end - start, bufSize); + return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize; +} - memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); - return pollSize; - } else { - pollSize = MIN(end + LOG_BUF_SIZE(tLogBuff) - start, bufSize); - if (pollSize > LOG_BUF_SIZE(tLogBuff) - start) { - int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; - memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, tsize); - memcpy(buf + tsize, LOG_BUF_BUFFER(tLogBuff), pollSize - tsize); +static void taosWriteLog(SLogBuff *tLogBuff) { + do { + int32_t start = LOG_BUF_START(tLogBuff); + int32_t end = LOG_BUF_END(tLogBuff); + int32_t pollSize = 0; + if (start == end) { + dbgEmptyW++; + writeInterval = MAX_LOG_INTERVAL; + return; + } else if (start < end) { + pollSize = end - start; + + taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); } else { - memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); + int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; + taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); + + taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); + + pollSize = tsize + end; } - return pollSize; - } + + dbgWN++; + + if (pollSize < 1048576) { + dbgSmallWN++; + if (writeInterval < MAX_LOG_INTERVAL) { + writeInterval += LOG_INTERVAL_STEP; + } + } else if (pollSize > 4 * 1048576) { + dbgBigWN++; + writeInterval = MIN_LOG_INTERVAL; + } + + dbgWSize+=pollSize; + + LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff); + + int32_t rsize = taosGetLogRemainSize(tLogBuff); + if (rsize < 1048576) { + break; + } + + writeInterval = MIN_LOG_INTERVAL; + }while (1); } static void *taosAsyncOutputLog(void *param) { SLogBuff *tLogBuff = (SLogBuff *)param; - int32_t log_size = 0; - - char *tempBuffer = malloc(TSDB_DEFAULT_LOG_BUF_SIZE); - - assert(tempBuffer); while (1) { - tsem_wait(&(tLogBuff->buffNotEmpty)); + //tsem_wait(&(tLogBuff->buffNotEmpty)); + + usleep(writeInterval); // Polling the buffer - while (1) { - log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_SIZE); - if (log_size) { - dbgW++; - if (log_size < 1048576) { - dbgSmallW++; - } - dbgWSize+=log_size; - taosWrite(tLogBuff->fd, tempBuffer, log_size); - LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff); - } else { - dbgEmptyW++; - break; - } - } + taosWriteLog(tLogBuff); if (tLogBuff->stop) break; }