This commit is contained in:
dapan1121 2021-02-05 15:47:15 +08:00
parent 947913b168
commit 98036d544f
1 changed files with 36 additions and 18 deletions

View File

@ -34,7 +34,7 @@
#define LOG_INTERVAL_STEP 5000 #define LOG_INTERVAL_STEP 5000
#define MIN_LOG_INTERVAL 5000 #define MIN_LOG_INTERVAL 5000
#define MAX_LOG_INTERVAL 50000 #define MAX_LOG_INTERVAL 50000
#define LOG_MAX_WAIT_USEC 1000000
#define LOG_BUF_BUFFER(x) ((x)->buffer) #define LOG_BUF_BUFFER(x) ((x)->buffer)
#define LOG_BUF_START(x) ((x)->buffStart) #define LOG_BUF_START(x) ((x)->buffStart)
@ -47,6 +47,7 @@ typedef struct {
int32_t buffStart; int32_t buffStart;
int32_t buffEnd; int32_t buffEnd;
int32_t buffSize; int32_t buffSize;
int32_t minBuffSize;
int32_t fd; int32_t fd;
int32_t stop; int32_t stop;
pthread_t asyncThread; pthread_t asyncThread;
@ -516,6 +517,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
LOG_BUF_START(tLogBuff) = LOG_BUF_END(tLogBuff) = 0; LOG_BUF_START(tLogBuff) = LOG_BUF_END(tLogBuff) = 0;
LOG_BUF_SIZE(tLogBuff) = bufSize; LOG_BUF_SIZE(tLogBuff) = bufSize;
tLogBuff->minBuffSize = bufSize / 10;
tLogBuff->stop = 0; tLogBuff->stop = 0;
if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err;
@ -603,41 +605,52 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
return 0; return 0;
} }
static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff) { static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t end) {
int32_t start = LOG_BUF_START(tLogBuff);
int32_t end = LOG_BUF_END(tLogBuff);
int32_t rSize = end - start; int32_t rSize = end - start;
return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize; return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize;
} }
static void taosWriteLog(SLogBuff *tLogBuff) { static void taosWriteLog(SLogBuff *tLogBuff) {
static int32_t lastDuration = 0;
int32_t remainChecked = 0;
int32_t start, end, pollSize;
do { do {
int32_t start = LOG_BUF_START(tLogBuff); if (remainChecked == 0) {
int32_t end = LOG_BUF_END(tLogBuff); start = LOG_BUF_START(tLogBuff);
int32_t pollSize = 0; end = LOG_BUF_END(tLogBuff);
if (start == end) { if (start == end) {
dbgEmptyW++; dbgEmptyW++;
writeInterval = MAX_LOG_INTERVAL; writeInterval = MAX_LOG_INTERVAL;
return; return;
} else if (start < end) { }
pollSize = end - start;
pollSize = taosGetLogRemainSize(tLogBuff, start, end);
if (pollSize < tLogBuff->minBuffSize) {
lastDuration += writeInterval;
if (lastDuration < LOG_MAX_WAIT_USEC) {
break;
}
}
lastDuration = 0;
}
if (start < end) {
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
} else { } else {
int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end);
pollSize = tsize + end;
} }
dbgWN++; dbgWN++;
dbgWSize+=pollSize; dbgWSize+=pollSize;
if (pollSize < LOG_BUF_SIZE(tLogBuff)/10) { if (pollSize < tLogBuff->minBuffSize) {
dbgSmallWN++; dbgSmallWN++;
if (writeInterval < MAX_LOG_INTERVAL) { if (writeInterval < MAX_LOG_INTERVAL) {
writeInterval += LOG_INTERVAL_STEP; writeInterval += LOG_INTERVAL_STEP;
@ -651,12 +664,17 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff); LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff);
int32_t rsize = taosGetLogRemainSize(tLogBuff); start = LOG_BUF_START(tLogBuff);
if (rsize < 1048576) { end = LOG_BUF_END(tLogBuff);
int32_t pollSize = taosGetLogRemainSize(tLogBuff, start, end);
if (pollSize < tLogBuff->minBuffSize) {
break; break;
} }
writeInterval = MIN_LOG_INTERVAL; writeInterval = MIN_LOG_INTERVAL;
remainChecked = 1;
}while (1); }while (1);
} }