feat:[TS-3718]add lock to avoid write slow at the same time

This commit is contained in:
wangmm0220 2024-08-08 17:29:18 +08:00
parent 3ef942bf4b
commit bce6f40abb
1 changed files with 28 additions and 9 deletions

View File

@ -61,6 +61,7 @@ typedef struct {
TdThreadMutex buffMutex; TdThreadMutex buffMutex;
int32_t writeInterval; int32_t writeInterval;
int32_t lastDuration; int32_t lastDuration;
int32_t lock;
} SLogBuff; } SLogBuff;
typedef struct { typedef struct {
@ -136,6 +137,8 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(TdFilePtr pFile); static void taosCloseLogByFd(TdFilePtr pFile);
static int32_t taosInitNormalLog(const char *fn, int32_t maxFileNum); static int32_t taosInitNormalLog(const char *fn, int32_t maxFileNum);
static void taosWriteLog(SLogBuff *pLogBuf); static void taosWriteLog(SLogBuff *pLogBuf);
static void taosWriteSlowLog(SLogBuff *pLogBuf);
static int32_t taosStartLog() { static int32_t taosStartLog() {
TdThreadAttr threadAttr; TdThreadAttr threadAttr;
(void)taosThreadAttrInit(&threadAttr); (void)taosThreadAttrInit(&threadAttr);
@ -387,14 +390,24 @@ static int32_t taosOpenNewLogFile() {
return 0; return 0;
} }
static void taosOpenNewSlowLogFile(int64_t today) { static void taosOpenNewSlowLogFile() {
(void)taosThreadMutexLock(&tsLogObj.logMutex); (void)taosThreadMutexLock(&tsLogObj.logMutex);
if (tsLogObj.timestampToday == today) { int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday;
if (delta >= 0 && delta < 86400) {
uInfo("timestampToday is already equal to today, no need to open new slow log file"); uInfo("timestampToday is already equal to today, no need to open new slow log file");
(void)taosThreadMutexUnlock(&tsLogObj.logMutex); (void)taosThreadMutexUnlock(&tsLogObj.logMutex);
return; return;
} }
for (int32_t i = 1; atomic_val_compare_exchange_32(&tsLogObj.slowHandle->lock, 0, 1) == 1; ++i) {
if (i % 1000 == 0) {
(void)sched_yield();
}
}
tsLogObj.slowHandle->lastDuration = LOG_MAX_WAIT_MSEC; // force write
taosWriteLog(tsLogObj.slowHandle); taosWriteLog(tsLogObj.slowHandle);
atomic_store_32(&tsLogObj.slowHandle->lock, 0);
char day[LOG_FILE_DAY_LEN] = {0}; char day[LOG_FILE_DAY_LEN] = {0};
getDay(day); getDay(day);
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
@ -410,7 +423,7 @@ static void taosOpenNewSlowLogFile(int64_t today) {
TdFilePtr pOldFile = tsLogObj.slowHandle->pFile; TdFilePtr pOldFile = tsLogObj.slowHandle->pFile;
tsLogObj.slowHandle->pFile = pFile; tsLogObj.slowHandle->pFile = pFile;
(void)taosCloseFile(&pOldFile); (void)taosCloseFile(&pOldFile);
tsLogObj.timestampToday = today; tsLogObj.timestampToday = getTimestampToday();
(void)taosThreadMutexUnlock(&tsLogObj.logMutex); (void)taosThreadMutexUnlock(&tsLogObj.logMutex);
} }
@ -654,9 +667,9 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
void taosPrintSlowLog(const char *format, ...) { void taosPrintSlowLog(const char *format, ...) {
if (!osLogSpaceSufficient()) return; if (!osLogSpaceSufficient()) return;
int64_t today = getTimestampToday(); int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday;
if (today != tsLogObj.timestampToday){ if (delta >= 86400 || delta < 0) {
taosOpenNewSlowLogFile(today); taosOpenNewSlowLogFile();
} }
char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE);
@ -729,7 +742,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
pLogBuf->minBuffSize = bufSize / 10; pLogBuf->minBuffSize = bufSize / 10;
pLogBuf->stop = 0; pLogBuf->stop = 0;
pLogBuf->writeInterval = LOG_DEFAULT_INTERVAL; pLogBuf->writeInterval = LOG_DEFAULT_INTERVAL;
pLogBuf->lock = 0;
if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err; if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err;
// tsem_init(&(pLogBuf->buffNotEmpty), 0, 0); // tsem_init(&(pLogBuf->buffNotEmpty), 0, 0);
@ -811,6 +824,12 @@ static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t en
return rSize >= 0 ? rSize : LOG_BUF_SIZE(pLogBuf) + rSize; return rSize >= 0 ? rSize : LOG_BUF_SIZE(pLogBuf) + rSize;
} }
static void taosWriteSlowLog(SLogBuff *pLogBuf){
int32_t lock = atomic_val_compare_exchange_32(&pLogBuf->lock, 0, 1);
if (lock == 1) return;
taosWriteLog(pLogBuf);
atomic_store_32(&pLogBuf->lock, 0);
}
static void taosWriteLog(SLogBuff *pLogBuf) { static void taosWriteLog(SLogBuff *pLogBuf) {
int32_t start = LOG_BUF_START(pLogBuf); int32_t start = LOG_BUF_START(pLogBuf);
int32_t end = LOG_BUF_END(pLogBuf); int32_t end = LOG_BUF_END(pLogBuf);
@ -895,12 +914,12 @@ static void *taosAsyncOutputLog(void *param) {
// Polling the buffer // Polling the buffer
taosWriteLog(pLogBuf); taosWriteLog(pLogBuf);
if (pSlowBuf) taosWriteLog(pSlowBuf); if (pSlowBuf) taosWriteSlowLog(pSlowBuf);
if (pLogBuf->stop || (pSlowBuf && pSlowBuf->stop)) { if (pLogBuf->stop || (pSlowBuf && pSlowBuf->stop)) {
pLogBuf->lastDuration = LOG_MAX_WAIT_MSEC; pLogBuf->lastDuration = LOG_MAX_WAIT_MSEC;
taosWriteLog(pLogBuf); taosWriteLog(pLogBuf);
if (pSlowBuf) taosWriteLog(pSlowBuf); if (pSlowBuf) taosWriteSlowLog(pSlowBuf);
break; break;
} }
} }