From 0d262b874745a409800856e976b719b40fe8fdce Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 18 Apr 2022 10:22:08 +0800 Subject: [PATCH] fix: definite lost in tlog.c found by valgrind --- include/os/osMemory.h | 12 +-- source/client/src/clientMain.c | 2 +- source/util/src/tlog.c | 163 +++++++++++++++++---------------- 3 files changed, 92 insertions(+), 85 deletions(-) diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 4efe69e204..92d9319d5c 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -36,12 +36,12 @@ void *taosMemoryStrDup(void *ptr); void taosMemoryFree(void *ptr); int32_t taosMemorySize(void *ptr); -#define taosMemoryFreeClear(ptr) \ - do { \ - if (ptr) { \ - taosMemoryFree((void*)ptr); \ - (ptr) = NULL; \ - } \ +#define taosMemoryFreeClear(ptr) \ + do { \ + if (ptr) { \ + taosMemoryFree((void *)ptr); \ + (ptr) = NULL; \ + } \ } while (0) #ifdef __cplusplus diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 73ac059276..2e6cd4ce17 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -69,9 +69,9 @@ void taos_cleanup(void) { rpcCleanup(); catalogDestroy(); schedulerDestroy(); - taosCloseLog(); tscInfo("all local resources released"); + taosCloseLog(); } setConfRet taos_set_config(const char *config) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index b37b1b2f69..3dce260b10 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -38,27 +38,26 @@ #define LOG_BUF_MUTEX(x) ((x)->buffMutex) typedef struct { - char *buffer; - int32_t buffStart; - int32_t buffEnd; - int32_t buffSize; - int32_t minBuffSize; - TdFilePtr pFile; - int32_t stop; - TdThread asyncThread; + char *buffer; + int32_t buffStart; + int32_t buffEnd; + int32_t buffSize; + int32_t minBuffSize; + TdFilePtr pFile; + int32_t stop; + TdThread asyncThread; TdThreadMutex buffMutex; - tsem_t buffNotEmpty; } SLogBuff; typedef struct { - int32_t fileNum; - int32_t maxLines; - int32_t lines; - int32_t flag; - int32_t openInProgress; - pid_t pid; - char logName[LOG_FILE_NAME_LEN]; - SLogBuff *logHandle; + int32_t fileNum; + int32_t maxLines; + int32_t lines; + int32_t flag; + int32_t openInProgress; + pid_t pid; + char logName[LOG_FILE_NAME_LEN]; + SLogBuff *logHandle; TdThreadMutex logMutex; } SLogObj; @@ -100,7 +99,7 @@ int64_t dbgBigWN = 0; int64_t dbgWSize = 0; static void *taosAsyncOutputLog(void *param); -static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t msgLen); +static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen); static SLogBuff *taosLogBuffNew(int32_t bufSize); static void taosCloseLogByFd(TdFilePtr pFile); static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum); @@ -136,16 +135,24 @@ static void taosStopLog() { } } +static void taosLogBuffDestroy() { + taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex); + taosCloseFile(&tsLogObj.logHandle->pFile); + taosMemoryFreeClear(tsLogObj.logHandle->buffer); + memset(&tsLogObj.logHandle->buffer, 0, sizeof(tsLogObj.logHandle->buffer)); + taosThreadMutexDestroy(&tsLogObj.logMutex); + taosMemoryFreeClear(tsLogObj.logHandle); + memset(&tsLogObj.logHandle, 0, sizeof(tsLogObj.logHandle)); + tsLogObj.logHandle = NULL; +} + void taosCloseLog() { taosStopLog(); if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL); } tsLogInited = 0; - // In case that other threads still use log resources causing invalid write in valgrind - // we comment two lines below. - // taosLogBuffDestroy(tsLogObj.logHandle); - // taosCloseLog(); + taosLogBuffDestroy(tsLogObj.logHandle); } static bool taosLockLogFile(TdFilePtr pFile) { @@ -231,7 +238,7 @@ static int32_t taosOpenNewLogFile() { tsLogObj.openInProgress = 1; uInfo("open new log file ......"); - TdThread thread; + TdThread thread; TdThreadAttr attr; taosThreadAttrInit(&attr); taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_DETACHED); @@ -506,45 +513,45 @@ static void taosCloseLogByFd(TdFilePtr pFile) { } static SLogBuff *taosLogBuffNew(int32_t bufSize) { - SLogBuff *tLogBuff = NULL; + SLogBuff *pLogBuf = NULL; - tLogBuff = taosMemoryCalloc(1, sizeof(SLogBuff)); - if (tLogBuff == NULL) return NULL; + pLogBuf = taosMemoryCalloc(1, sizeof(SLogBuff)); + if (pLogBuf == NULL) return NULL; - LOG_BUF_BUFFER(tLogBuff) = taosMemoryMalloc(bufSize); - if (LOG_BUF_BUFFER(tLogBuff) == NULL) goto _err; + LOG_BUF_BUFFER(pLogBuf) = taosMemoryMalloc(bufSize); + if (LOG_BUF_BUFFER(pLogBuf) == NULL) goto _err; - LOG_BUF_START(tLogBuff) = LOG_BUF_END(tLogBuff) = 0; - LOG_BUF_SIZE(tLogBuff) = bufSize; - tLogBuff->minBuffSize = bufSize / 10; - tLogBuff->stop = 0; + LOG_BUF_START(pLogBuf) = LOG_BUF_END(pLogBuf) = 0; + LOG_BUF_SIZE(pLogBuf) = bufSize; + pLogBuf->minBuffSize = bufSize / 10; + pLogBuf->stop = 0; - if (taosThreadMutexInit(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; - // tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); + if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err; + // tsem_init(&(pLogBuf->buffNotEmpty), 0, 0); - return tLogBuff; + return pLogBuf; _err: - taosMemoryFreeClear(LOG_BUF_BUFFER(tLogBuff)); - taosMemoryFreeClear(tLogBuff); + taosMemoryFreeClear(LOG_BUF_BUFFER(pLogBuf)); + taosMemoryFreeClear(pLogBuf); return NULL; } -static void taosCopyLogBuffer(SLogBuff *tLogBuff, int32_t start, int32_t end, const char *msg, int32_t msgLen) { +static void taosCopyLogBuffer(SLogBuff *pLogBuf, int32_t start, int32_t end, const char *msg, int32_t msgLen) { if (start > end) { - memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen); + memcpy(LOG_BUF_BUFFER(pLogBuf) + end, msg, msgLen); } else { - if (LOG_BUF_SIZE(tLogBuff) - end < msgLen) { - memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, LOG_BUF_SIZE(tLogBuff) - end); - memcpy(LOG_BUF_BUFFER(tLogBuff), msg + LOG_BUF_SIZE(tLogBuff) - end, msgLen - LOG_BUF_SIZE(tLogBuff) + end); + if (LOG_BUF_SIZE(pLogBuf) - end < msgLen) { + memcpy(LOG_BUF_BUFFER(pLogBuf) + end, msg, LOG_BUF_SIZE(pLogBuf) - end); + memcpy(LOG_BUF_BUFFER(pLogBuf), msg + LOG_BUF_SIZE(pLogBuf) - end, msgLen - LOG_BUF_SIZE(pLogBuf) + end); } else { - memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen); + memcpy(LOG_BUF_BUFFER(pLogBuf) + end, msg, msgLen); } } - LOG_BUF_END(tLogBuff) = (LOG_BUF_END(tLogBuff) + msgLen) % LOG_BUF_SIZE(tLogBuff); + LOG_BUF_END(pLogBuf) = (LOG_BUF_END(pLogBuf) + msgLen) % LOG_BUF_SIZE(pLogBuf); } -static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t msgLen) { +static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen) { int32_t start = 0; int32_t end = 0; int32_t remainSize = 0; @@ -552,13 +559,13 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t ms char tmpBuf[40] = {0}; int32_t tmpBufLen = 0; - if (tLogBuff == NULL || tLogBuff->stop) return -1; + if (pLogBuf == NULL || pLogBuf->stop) return -1; - taosThreadMutexLock(&LOG_BUF_MUTEX(tLogBuff)); - start = LOG_BUF_START(tLogBuff); - end = LOG_BUF_END(tLogBuff); + taosThreadMutexLock(&LOG_BUF_MUTEX(pLogBuf)); + start = LOG_BUF_START(pLogBuf); + end = LOG_BUF_END(pLogBuf); - remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1); + remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(pLogBuf) - end - 1); if (lostLine > 0) { sprintf(tmpBuf, "...Lost %" PRId64 " lines here...\n", lostLine); @@ -568,47 +575,47 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t ms if (remainSize <= msgLen || ((lostLine > 0) && (remainSize <= (msgLen + tmpBufLen)))) { lostLine++; tsAsyncLogLostLines++; - taosThreadMutexUnlock(&LOG_BUF_MUTEX(tLogBuff)); + taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf)); return -1; } if (lostLine > 0) { - taosCopyLogBuffer(tLogBuff, start, end, tmpBuf, tmpBufLen); + taosCopyLogBuffer(pLogBuf, start, end, tmpBuf, tmpBufLen); lostLine = 0; } - taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen); + taosCopyLogBuffer(pLogBuf, LOG_BUF_START(pLogBuf), LOG_BUF_END(pLogBuf), msg, msgLen); // int32_t w = atomic_sub_fetch_32(&waitLock, 1); /* - if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) { - tsem_post(&(tLogBuff->buffNotEmpty)); + if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(pLogBuf) * 4 /5))) { + tsem_post(&(pLogBuf->buffNotEmpty)); dbgPostN++; } else { dbgNoPostN++; } */ - taosThreadMutexUnlock(&LOG_BUF_MUTEX(tLogBuff)); + taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf)); return 0; } -static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t end) { +static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t end) { int32_t rSize = end - start; - return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize; + return rSize >= 0 ? rSize : LOG_BUF_SIZE(pLogBuf) + rSize; } -static void taosWriteLog(SLogBuff *tLogBuff) { +static void taosWriteLog(SLogBuff *pLogBuf) { static int32_t lastDuration = 0; int32_t remainChecked = 0; int32_t start, end, pollSize; do { if (remainChecked == 0) { - start = LOG_BUF_START(tLogBuff); - end = LOG_BUF_END(tLogBuff); + start = LOG_BUF_START(pLogBuf); + end = LOG_BUF_END(pLogBuf); if (start == end) { dbgEmptyW++; @@ -616,8 +623,8 @@ static void taosWriteLog(SLogBuff *tLogBuff) { return; } - pollSize = taosGetLogRemainSize(tLogBuff, start, end); - if (pollSize < tLogBuff->minBuffSize) { + pollSize = taosGetLogRemainSize(pLogBuf, start, end); + if (pollSize < pLogBuf->minBuffSize) { lastDuration += tsWriteInterval; if (lastDuration < LOG_MAX_WAIT_MSEC) { break; @@ -628,38 +635,38 @@ static void taosWriteLog(SLogBuff *tLogBuff) { } if (start < end) { - taosWriteFile(tLogBuff->pFile, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); + taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize); } else { - int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; - taosWriteFile(tLogBuff->pFile, LOG_BUF_BUFFER(tLogBuff) + start, tsize); + int32_t tsize = LOG_BUF_SIZE(pLogBuf) - start; + taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize); - taosWriteFile(tLogBuff->pFile, LOG_BUF_BUFFER(tLogBuff), end); + taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end); } dbgWN++; dbgWSize += pollSize; - if (pollSize < tLogBuff->minBuffSize) { + if (pollSize < pLogBuf->minBuffSize) { dbgSmallWN++; if (tsWriteInterval < LOG_MAX_INTERVAL) { tsWriteInterval += LOG_INTERVAL_STEP; } - } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 3) { + } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) { dbgBigWN++; tsWriteInterval = LOG_MIN_INTERVAL; - } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 4) { + } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) { if (tsWriteInterval > LOG_MIN_INTERVAL) { tsWriteInterval -= LOG_INTERVAL_STEP; } } - LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff); + LOG_BUF_START(pLogBuf) = (LOG_BUF_START(pLogBuf) + pollSize) % LOG_BUF_SIZE(pLogBuf); - start = LOG_BUF_START(tLogBuff); - end = LOG_BUF_END(tLogBuff); + start = LOG_BUF_START(pLogBuf); + end = LOG_BUF_END(pLogBuf); - pollSize = taosGetLogRemainSize(tLogBuff, start, end); - if (pollSize < tLogBuff->minBuffSize) { + pollSize = taosGetLogRemainSize(pLogBuf, start, end); + if (pollSize < pLogBuf->minBuffSize) { break; } @@ -670,16 +677,16 @@ static void taosWriteLog(SLogBuff *tLogBuff) { } static void *taosAsyncOutputLog(void *param) { - SLogBuff *tLogBuff = (SLogBuff *)param; + SLogBuff *pLogBuf = (SLogBuff *)param; setThreadName("log"); while (1) { taosMsleep(tsWriteInterval); // Polling the buffer - taosWriteLog(tLogBuff); + taosWriteLog(pLogBuf); - if (tLogBuff->stop) break; + if (pLogBuf->stop) break; } return NULL;