Merge pull request #5278 from taosdata/hotfix/TD-2545
[TD-2545]fix async log issue
This commit is contained in:
commit
404c8d86d9
|
@ -28,10 +28,13 @@
|
|||
#define MAX_LOGLINE_DUMP_CONTENT_SIZE (MAX_LOGLINE_DUMP_SIZE - 100)
|
||||
|
||||
#define LOG_FILE_NAME_LEN 300
|
||||
#define TSDB_DEFAULT_LOG_BUF_SIZE (512 * 1024) // 512K
|
||||
#define TSDB_MIN_LOG_BUF_SIZE 1024 // 1K
|
||||
#define TSDB_MAX_LOG_BUF_SIZE (1024 * 1024) // 1M
|
||||
#define TSDB_DEFAULT_LOG_BUF_UNIT 1024 // 1K
|
||||
#define TSDB_DEFAULT_LOG_BUF_SIZE (20 * 1024 * 1024) // 20MB
|
||||
|
||||
#define DEFAULT_LOG_INTERVAL 25
|
||||
#define LOG_INTERVAL_STEP 5
|
||||
#define MIN_LOG_INTERVAL 5
|
||||
#define MAX_LOG_INTERVAL 25
|
||||
#define LOG_MAX_WAIT_MSEC 1000
|
||||
|
||||
#define LOG_BUF_BUFFER(x) ((x)->buffer)
|
||||
#define LOG_BUF_START(x) ((x)->buffStart)
|
||||
|
@ -44,6 +47,7 @@ typedef struct {
|
|||
int32_t buffStart;
|
||||
int32_t buffEnd;
|
||||
int32_t buffSize;
|
||||
int32_t minBuffSize;
|
||||
int32_t fd;
|
||||
int32_t stop;
|
||||
pthread_t asyncThread;
|
||||
|
@ -68,6 +72,15 @@ int8_t tsAsyncLog = 1;
|
|||
float tsTotalLogDirGB = 0;
|
||||
float tsAvailLogDirGB = 0;
|
||||
float tsMinimalLogDirGB = 1.0f;
|
||||
int64_t asyncLogLostLines = 0;
|
||||
int32_t writeInterval = DEFAULT_LOG_INTERVAL;
|
||||
|
||||
int64_t dbgEmptyW = 0;
|
||||
int64_t dbgWN = 0;
|
||||
int64_t dbgSmallWN = 0;
|
||||
int64_t dbgBigWN = 0;
|
||||
int64_t dbgWSize = 0;
|
||||
|
||||
#ifdef _TD_POWER_
|
||||
char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/power";
|
||||
#else
|
||||
|
@ -108,7 +121,8 @@ static void taosStopLog() {
|
|||
|
||||
void taosCloseLog() {
|
||||
taosStopLog();
|
||||
tsem_post(&(tsLogObj.logHandle->buffNotEmpty));
|
||||
//tsem_post(&(tsLogObj.logHandle->buffNotEmpty));
|
||||
taosMsleep(MAX_LOG_INTERVAL/1000);
|
||||
if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
|
||||
pthread_join(tsLogObj.logHandle->asyncThread, NULL);
|
||||
}
|
||||
|
@ -497,8 +511,6 @@ static void taosCloseLogByFd(int32_t fd) {
|
|||
static SLogBuff *taosLogBuffNew(int32_t bufSize) {
|
||||
SLogBuff *tLogBuff = NULL;
|
||||
|
||||
if (bufSize < TSDB_MIN_LOG_BUF_SIZE || bufSize > TSDB_MAX_LOG_BUF_SIZE) return NULL;
|
||||
|
||||
tLogBuff = calloc(1, sizeof(SLogBuff));
|
||||
if (tLogBuff == NULL) return NULL;
|
||||
|
||||
|
@ -507,10 +519,11 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
|
|||
|
||||
LOG_BUF_START(tLogBuff) = LOG_BUF_END(tLogBuff) = 0;
|
||||
LOG_BUF_SIZE(tLogBuff) = bufSize;
|
||||
tLogBuff->minBuffSize = bufSize / 10;
|
||||
tLogBuff->stop = 0;
|
||||
|
||||
if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err;
|
||||
tsem_init(&(tLogBuff->buffNotEmpty), 0, 0);
|
||||
//tsem_init(&(tLogBuff->buffNotEmpty), 0, 0);
|
||||
|
||||
return tLogBuff;
|
||||
|
||||
|
@ -529,24 +542,7 @@ static void taosLogBuffDestroy(SLogBuff *tLogBuff) {
|
|||
}
|
||||
#endif
|
||||
|
||||
static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) {
|
||||
int32_t start = 0;
|
||||
int32_t end = 0;
|
||||
int32_t remainSize = 0;
|
||||
|
||||
if (tLogBuff == NULL || tLogBuff->stop) return -1;
|
||||
|
||||
pthread_mutex_lock(&LOG_BUF_MUTEX(tLogBuff));
|
||||
start = LOG_BUF_START(tLogBuff);
|
||||
end = LOG_BUF_END(tLogBuff);
|
||||
|
||||
remainSize = (start > end) ? (end - start - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1);
|
||||
|
||||
if (remainSize <= msgLen) {
|
||||
pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff));
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void taosCopyLogBuffer(SLogBuff *tLogBuff, int32_t start, int32_t end, char *msg, int32_t msgLen) {
|
||||
if (start > end) {
|
||||
memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen);
|
||||
} else {
|
||||
|
@ -558,61 +554,144 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
|
|||
}
|
||||
}
|
||||
LOG_BUF_END(tLogBuff) = (LOG_BUF_END(tLogBuff) + msgLen) % LOG_BUF_SIZE(tLogBuff);
|
||||
}
|
||||
|
||||
// TODO : put string in the buffer
|
||||
static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) {
|
||||
int32_t start = 0;
|
||||
int32_t end = 0;
|
||||
int32_t remainSize = 0;
|
||||
static int64_t lostLine = 0;
|
||||
char tmpBuf[40] = {0};
|
||||
int32_t tmpBufLen = 0;
|
||||
|
||||
tsem_post(&(tLogBuff->buffNotEmpty));
|
||||
if (tLogBuff == NULL || tLogBuff->stop) return -1;
|
||||
|
||||
pthread_mutex_lock(&LOG_BUF_MUTEX(tLogBuff));
|
||||
start = LOG_BUF_START(tLogBuff);
|
||||
end = LOG_BUF_END(tLogBuff);
|
||||
|
||||
remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1);
|
||||
|
||||
if (lostLine > 0) {
|
||||
sprintf(tmpBuf, "...Lost %"PRId64" lines here...\n", lostLine);
|
||||
tmpBufLen = (int32_t)strlen(tmpBuf);
|
||||
}
|
||||
|
||||
if (remainSize <= msgLen || ((lostLine > 0) && (remainSize <= (msgLen + tmpBufLen)))) {
|
||||
lostLine++;
|
||||
asyncLogLostLines++;
|
||||
pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (lostLine > 0) {
|
||||
taosCopyLogBuffer(tLogBuff, start, end, tmpBuf, tmpBufLen);
|
||||
lostLine = 0;
|
||||
}
|
||||
|
||||
taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen);
|
||||
|
||||
//int32_t w = atomic_sub_fetch_32(&waitLock, 1);
|
||||
/*
|
||||
if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) {
|
||||
tsem_post(&(tLogBuff->buffNotEmpty));
|
||||
dbgPostN++;
|
||||
} else {
|
||||
dbgNoPostN++;
|
||||
}
|
||||
*/
|
||||
|
||||
pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff));
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t taosPollLogBuffer(SLogBuff *tLogBuff, char *buf, int32_t bufSize) {
|
||||
int32_t start = LOG_BUF_START(tLogBuff);
|
||||
int32_t end = LOG_BUF_END(tLogBuff);
|
||||
int32_t pollSize = 0;
|
||||
static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t end) {
|
||||
int32_t rSize = end - start;
|
||||
|
||||
if (start == end) {
|
||||
return 0;
|
||||
} else if (start < end) {
|
||||
pollSize = MIN(end - start, bufSize);
|
||||
return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize;
|
||||
}
|
||||
|
||||
memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
|
||||
return pollSize;
|
||||
} else {
|
||||
pollSize = MIN(end + LOG_BUF_SIZE(tLogBuff) - start, bufSize);
|
||||
if (pollSize > LOG_BUF_SIZE(tLogBuff) - start) {
|
||||
int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
|
||||
memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
|
||||
memcpy(buf + tsize, LOG_BUF_BUFFER(tLogBuff), pollSize - tsize);
|
||||
static void taosWriteLog(SLogBuff *tLogBuff) {
|
||||
static int32_t lastDuration = 0;
|
||||
int32_t remainChecked = 0;
|
||||
int32_t start, end, pollSize;
|
||||
|
||||
do {
|
||||
if (remainChecked == 0) {
|
||||
start = LOG_BUF_START(tLogBuff);
|
||||
end = LOG_BUF_END(tLogBuff);
|
||||
|
||||
} else {
|
||||
memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
|
||||
if (start == end) {
|
||||
dbgEmptyW++;
|
||||
writeInterval = MAX_LOG_INTERVAL;
|
||||
return;
|
||||
}
|
||||
|
||||
pollSize = taosGetLogRemainSize(tLogBuff, start, end);
|
||||
if (pollSize < tLogBuff->minBuffSize) {
|
||||
lastDuration += writeInterval;
|
||||
if (lastDuration < LOG_MAX_WAIT_MSEC) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
lastDuration = 0;
|
||||
}
|
||||
return pollSize;
|
||||
}
|
||||
|
||||
if (start < end) {
|
||||
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
|
||||
} else {
|
||||
int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
|
||||
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
|
||||
|
||||
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end);
|
||||
}
|
||||
|
||||
dbgWN++;
|
||||
dbgWSize+=pollSize;
|
||||
|
||||
if (pollSize < tLogBuff->minBuffSize) {
|
||||
dbgSmallWN++;
|
||||
if (writeInterval < MAX_LOG_INTERVAL) {
|
||||
writeInterval += LOG_INTERVAL_STEP;
|
||||
}
|
||||
} else if (pollSize > LOG_BUF_SIZE(tLogBuff)/3) {
|
||||
dbgBigWN++;
|
||||
writeInterval = MIN_LOG_INTERVAL;
|
||||
} else if (pollSize > LOG_BUF_SIZE(tLogBuff)/4) {
|
||||
if (writeInterval > MIN_LOG_INTERVAL) {
|
||||
writeInterval -= LOG_INTERVAL_STEP;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff);
|
||||
|
||||
start = LOG_BUF_START(tLogBuff);
|
||||
end = LOG_BUF_END(tLogBuff);
|
||||
|
||||
pollSize = taosGetLogRemainSize(tLogBuff, start, end);
|
||||
if (pollSize < tLogBuff->minBuffSize) {
|
||||
break;
|
||||
}
|
||||
|
||||
writeInterval = MIN_LOG_INTERVAL;
|
||||
|
||||
remainChecked = 1;
|
||||
}while (1);
|
||||
}
|
||||
|
||||
static void *taosAsyncOutputLog(void *param) {
|
||||
SLogBuff *tLogBuff = (SLogBuff *)param;
|
||||
int32_t log_size = 0;
|
||||
|
||||
char tempBuffer[TSDB_DEFAULT_LOG_BUF_UNIT];
|
||||
|
||||
|
||||
while (1) {
|
||||
tsem_wait(&(tLogBuff->buffNotEmpty));
|
||||
//tsem_wait(&(tLogBuff->buffNotEmpty));
|
||||
|
||||
taosMsleep(writeInterval);
|
||||
|
||||
// Polling the buffer
|
||||
while (1) {
|
||||
log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_UNIT);
|
||||
if (log_size) {
|
||||
taosWrite(tLogBuff->fd, tempBuffer, log_size);
|
||||
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
taosWriteLog(tLogBuff);
|
||||
|
||||
if (tLogBuff->stop) break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue