This commit is contained in:
dapan1121 2021-02-01 10:41:27 +08:00
parent c816365e53
commit 24844d2e26
1 changed files with 68 additions and 45 deletions

View File

@ -30,6 +30,12 @@
#define LOG_FILE_NAME_LEN 300 #define LOG_FILE_NAME_LEN 300
#define TSDB_DEFAULT_LOG_BUF_SIZE (10 * 1024 * 1024) // 10MB #define TSDB_DEFAULT_LOG_BUF_SIZE (10 * 1024 * 1024) // 10MB
#define DEFAULT_LOG_INTERVAL 50000
#define LOG_INTERVAL_STEP 5000
#define MIN_LOG_INTERVAL 5000
#define MAX_LOG_INTERVAL 50000
#define LOG_BUF_BUFFER(x) ((x)->buffer) #define LOG_BUF_BUFFER(x) ((x)->buffer)
#define LOG_BUF_START(x) ((x)->buffStart) #define LOG_BUF_START(x) ((x)->buffStart)
#define LOG_BUF_END(x) ((x)->buffEnd) #define LOG_BUF_END(x) ((x)->buffEnd)
@ -66,13 +72,15 @@ float tsTotalLogDirGB = 0;
float tsAvailLogDirGB = 0; float tsAvailLogDirGB = 0;
float tsMinimalLogDirGB = 1.0f; float tsMinimalLogDirGB = 1.0f;
int64_t asyncLogLostLines = 0; int64_t asyncLogLostLines = 0;
int32_t writeInterval = DEFAULT_LOG_INTERVAL;
int64_t dbgPostN = 0; int64_t dbgPostN = 0;
int64_t dbgNoPostN = 0; int64_t dbgNoPostN = 0;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;
int64_t dbgW = 0; int64_t dbgWN = 0;
int64_t dbgSmallW = 0; int64_t dbgSmallWN = 0;
int64_t dbgBigWN = 0;
int64_t dbgWSize = 0; int64_t dbgWSize = 0;
#ifdef _TD_POWER_ #ifdef _TD_POWER_
@ -115,7 +123,8 @@ static void taosStopLog() {
void taosCloseLog() { void taosCloseLog() {
taosStopLog(); taosStopLog();
tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); //tsem_post(&(tsLogObj.logHandle->buffNotEmpty));
usleep(MAX_LOG_INTERVAL);
if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
pthread_join(tsLogObj.logHandle->asyncThread, NULL); pthread_join(tsLogObj.logHandle->asyncThread, NULL);
} }
@ -513,7 +522,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
tLogBuff->stop = 0; tLogBuff->stop = 0;
if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err;
tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); //tsem_init(&(tLogBuff->buffNotEmpty), 0, 0);
return tLogBuff; return tLogBuff;
@ -557,7 +566,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
if (tLogBuff == NULL || tLogBuff->stop) return -1; if (tLogBuff == NULL || tLogBuff->stop) return -1;
atomic_add_fetch_32(&waitLock, 1); //atomic_add_fetch_32(&waitLock, 1);
pthread_mutex_lock(&LOG_BUF_MUTEX(tLogBuff)); pthread_mutex_lock(&LOG_BUF_MUTEX(tLogBuff));
start = LOG_BUF_START(tLogBuff); start = LOG_BUF_START(tLogBuff);
@ -585,14 +594,15 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen); taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen);
int32_t w = atomic_sub_fetch_32(&waitLock, 1); //int32_t w = atomic_sub_fetch_32(&waitLock, 1);
/*
if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) { if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) {
tsem_post(&(tLogBuff->buffNotEmpty)); tsem_post(&(tLogBuff->buffNotEmpty));
dbgPostN++; dbgPostN++;
} else { } else {
dbgNoPostN++; dbgNoPostN++;
} }
*/
pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff)); pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff));
@ -600,59 +610,72 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
return 0; return 0;
} }
static int32_t taosPollLogBuffer(SLogBuff *tLogBuff, char *buf, int32_t bufSize) { static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff) {
int32_t start = LOG_BUF_START(tLogBuff); int32_t start = LOG_BUF_START(tLogBuff);
int32_t end = LOG_BUF_END(tLogBuff); int32_t end = LOG_BUF_END(tLogBuff);
int32_t pollSize = 0; int32_t rSize = end - start;
if (start == end) { return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize;
return 0; }
} else if (start < end) {
pollSize = MIN(end - start, bufSize);
memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); static void taosWriteLog(SLogBuff *tLogBuff) {
return pollSize; do {
} else { int32_t start = LOG_BUF_START(tLogBuff);
pollSize = MIN(end + LOG_BUF_SIZE(tLogBuff) - start, bufSize); int32_t end = LOG_BUF_END(tLogBuff);
if (pollSize > LOG_BUF_SIZE(tLogBuff) - start) { int32_t pollSize = 0;
int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
memcpy(buf + tsize, LOG_BUF_BUFFER(tLogBuff), pollSize - tsize);
if (start == end) {
dbgEmptyW++;
writeInterval = MAX_LOG_INTERVAL;
return;
} else if (start < end) {
pollSize = end - start;
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
} else { } else {
memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end);
pollSize = tsize + end;
} }
return pollSize;
} dbgWN++;
if (pollSize < 1048576) {
dbgSmallWN++;
if (writeInterval < MAX_LOG_INTERVAL) {
writeInterval += LOG_INTERVAL_STEP;
}
} else if (pollSize > 4 * 1048576) {
dbgBigWN++;
writeInterval = MIN_LOG_INTERVAL;
}
dbgWSize+=pollSize;
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff);
int32_t rsize = taosGetLogRemainSize(tLogBuff);
if (rsize < 1048576) {
break;
}
writeInterval = MIN_LOG_INTERVAL;
}while (1);
} }
static void *taosAsyncOutputLog(void *param) { static void *taosAsyncOutputLog(void *param) {
SLogBuff *tLogBuff = (SLogBuff *)param; SLogBuff *tLogBuff = (SLogBuff *)param;
int32_t log_size = 0;
char *tempBuffer = malloc(TSDB_DEFAULT_LOG_BUF_SIZE);
assert(tempBuffer);
while (1) { while (1) {
tsem_wait(&(tLogBuff->buffNotEmpty)); //tsem_wait(&(tLogBuff->buffNotEmpty));
usleep(writeInterval);
// Polling the buffer // Polling the buffer
while (1) { taosWriteLog(tLogBuff);
log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_SIZE);
if (log_size) {
dbgW++;
if (log_size < 1048576) {
dbgSmallW++;
}
dbgWSize+=log_size;
taosWrite(tLogBuff->fd, tempBuffer, log_size);
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff);
} else {
dbgEmptyW++;
break;
}
}
if (tLogBuff->stop) break; if (tLogBuff->stop) break;
} }