fix compile error

This commit is contained in:
Liu Jicong 2021-12-20 18:55:56 +08:00
parent 66b6e1473c
commit 4a24d4bfda
2 changed files with 68 additions and 64 deletions

View File

@ -56,6 +56,9 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
} }
return pTq; return pTq;
} }
void tqClose(STQ* pTq) {
// TODO
}
static int tqProtoCheck(STqMsgHead* pMsg) { return pMsg->protoVer == 0; } static int tqProtoCheck(STqMsgHead* pMsg) { return pMsg->protoVer == 0; }

View File

@ -14,11 +14,11 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "ulog.h"
#include "tlog.h" #include "tlog.h"
#include "os.h"
#include "tnote.h" #include "tnote.h"
#include "tutil.h" #include "tutil.h"
#include "ulog.h"
#include "zlib.h" #include "zlib.h"
#define MAX_LOGLINE_SIZE (1000) #define MAX_LOGLINE_SIZE (1000)
@ -44,7 +44,7 @@
#define LOG_BUF_MUTEX(x) ((x)->buffMutex) #define LOG_BUF_MUTEX(x) ((x)->buffMutex)
typedef struct { typedef struct {
char * buffer; char *buffer;
int32_t buffStart; int32_t buffStart;
int32_t buffEnd; int32_t buffEnd;
int32_t buffSize; int32_t buffSize;
@ -64,7 +64,7 @@ typedef struct {
int32_t openInProgress; int32_t openInProgress;
pid_t pid; pid_t pid;
char logName[LOG_FILE_NAME_LEN]; char logName[LOG_FILE_NAME_LEN];
SLogBuff * logHandle; SLogBuff *logHandle;
pthread_mutex_t logMutex; pthread_mutex_t logMutex;
} SLogObj; } SLogObj;
@ -93,19 +93,19 @@ int32_t debugFlag = 0;
int32_t sDebugFlag = 135; int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135; int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131; int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 131;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int32_t ctgDebugFlag = 131; int32_t ctgDebugFlag = 131;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;
int64_t dbgWN = 0; int64_t dbgWN = 0;
int64_t dbgSmallWN = 0; int64_t dbgSmallWN = 0;
int64_t dbgBigWN = 0; int64_t dbgBigWN = 0;
int64_t dbgWSize = 0; int64_t dbgWSize = 0;
static SLogObj tsLogObj = { .fileNum = 1 }; static SLogObj tsLogObj = {.fileNum = 1};
static void * taosAsyncOutputLog(void *param); static void *taosAsyncOutputLog(void *param);
static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen); static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen);
static SLogBuff *taosLogBuffNew(int32_t bufSize); static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(int32_t oldFd); static void taosCloseLogByFd(int32_t oldFd);
@ -139,8 +139,8 @@ static void taosStopLog() {
void taosCloseLog() { void taosCloseLog() {
taosStopLog(); taosStopLog();
//tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); // tsem_post(&(tsLogObj.logHandle->buffNotEmpty));
taosMsleep(MAX_LOG_INTERVAL/1000); taosMsleep(MAX_LOG_INTERVAL / 1000);
if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
pthread_join(tsLogObj.logHandle->asyncThread, NULL); pthread_join(tsLogObj.logHandle->asyncThread, NULL);
} }
@ -381,13 +381,14 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) {
printf("server disk:%s space remain %.3f GB, total %.1f GB, stop print log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); printf("server disk:%s space remain %.3f GB, total %.1f GB, stop print log.\n", tsLogDir, tsAvailLogDirGB,
tsTotalLogDirGB);
fflush(stdout); fflush(stdout);
return; return;
} }
va_list argpointer; va_list argpointer;
char buffer[MAX_LOGLINE_BUFFER_SIZE] = { 0 }; char buffer[MAX_LOGLINE_BUFFER_SIZE] = {0};
int32_t len; int32_t len;
struct tm Tm, *ptm; struct tm Tm, *ptm;
struct timeval timeSecs; struct timeval timeSecs;
@ -434,14 +435,14 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
} }
} }
if (dflag & DEBUG_SCREEN) if (dflag & DEBUG_SCREEN) taosWriteFile(1, buffer, (uint32_t)len);
taosWriteFile(1, buffer, (uint32_t)len);
if (dflag == 255) nInfo(buffer, len); if (dflag == 255) nInfo(buffer, len);
} }
void taosDumpData(unsigned char *msg, int32_t len) { void taosDumpData(unsigned char *msg, int32_t len) {
if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) {
printf("server disk:%s space remain %.3f GB, total %.1f GB, stop dump log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); printf("server disk:%s space remain %.3f GB, total %.1f GB, stop dump log.\n", tsLogDir, tsAvailLogDirGB,
tsTotalLogDirGB);
fflush(stdout); fflush(stdout);
return; return;
} }
@ -468,7 +469,8 @@ void taosDumpData(unsigned char *msg, int32_t len) {
void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) { void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) {
if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) {
printf("server disk:%s space remain %.3f GB, total %.1f GB, stop write log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); printf("server disk:%s space remain %.3f GB, total %.1f GB, stop write log.\n", tsLogDir, tsAvailLogDirGB,
tsTotalLogDirGB);
fflush(stdout); fflush(stdout);
return; return;
} }
@ -542,7 +544,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
tLogBuff->stop = 0; tLogBuff->stop = 0;
if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err;
//tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); // tsem_init(&(tLogBuff->buffNotEmpty), 0, 0);
return tLogBuff; return tLogBuff;
@ -592,7 +594,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1); remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1);
if (lostLine > 0) { if (lostLine > 0) {
sprintf(tmpBuf, "...Lost %"PRId64" lines here...\n", lostLine); sprintf(tmpBuf, "...Lost %" PRId64 " lines here...\n", lostLine);
tmpBufLen = (int32_t)strlen(tmpBuf); tmpBufLen = (int32_t)strlen(tmpBuf);
} }
@ -610,7 +612,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen); taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen);
//int32_t w = atomic_sub_fetch_32(&waitLock, 1); // int32_t w = atomic_sub_fetch_32(&waitLock, 1);
/* /*
if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) { if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) {
tsem_post(&(tLogBuff->buffNotEmpty)); tsem_post(&(tLogBuff->buffNotEmpty));
@ -622,7 +624,6 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff)); pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff));
return 0; return 0;
} }
@ -669,17 +670,17 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
} }
dbgWN++; dbgWN++;
dbgWSize+=pollSize; dbgWSize += pollSize;
if (pollSize < tLogBuff->minBuffSize) { if (pollSize < tLogBuff->minBuffSize) {
dbgSmallWN++; dbgSmallWN++;
if (writeInterval < MAX_LOG_INTERVAL) { if (writeInterval < MAX_LOG_INTERVAL) {
writeInterval += LOG_INTERVAL_STEP; writeInterval += LOG_INTERVAL_STEP;
} }
} else if (pollSize > LOG_BUF_SIZE(tLogBuff)/3) { } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 3) {
dbgBigWN++; dbgBigWN++;
writeInterval = MIN_LOG_INTERVAL; writeInterval = MIN_LOG_INTERVAL;
} else if (pollSize > LOG_BUF_SIZE(tLogBuff)/4) { } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 4) {
if (writeInterval > MIN_LOG_INTERVAL) { if (writeInterval > MIN_LOG_INTERVAL) {
writeInterval -= LOG_INTERVAL_STEP; writeInterval -= LOG_INTERVAL_STEP;
} }
@ -698,7 +699,7 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
writeInterval = MIN_LOG_INTERVAL; writeInterval = MIN_LOG_INTERVAL;
remainChecked = 1; remainChecked = 1;
}while (1); } while (1);
} }
static void *taosAsyncOutputLog(void *param) { static void *taosAsyncOutputLog(void *param) {
@ -721,8 +722,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
int32_t compressSize = 163840; int32_t compressSize = 163840;
int32_t ret = 0; int32_t ret = 0;
int32_t len = 0; int32_t len = 0;
char * data = malloc(compressSize); char *data = malloc(compressSize);
FILE * srcFp = NULL; FILE *srcFp = NULL;
gzFile dstFp = NULL; gzFile dstFp = NULL;
srcFp = fopen(srcFileName, "r"); srcFp = fopen(srcFileName, "r");