diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 115e2fc674..89d1066a43 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 4776778 + GIT_TAG 4efbc10 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/common/tglobal.h b/include/common/tglobal.h index d445fc26e8..cd9667843b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -82,6 +82,10 @@ extern bool tsEnableTelem; extern int32_t tsTelemInterval; extern char tsTelemServer[]; extern uint16_t tsTelemPort; +extern bool tsEnableCrashReport; +extern char* tsTelemUri; +extern char* tsClientCrashReportUri; +extern char* tsSvrCrashReportUri; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index 7d8c588bfc..9a6aee4187 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -24,7 +24,7 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); #ifdef __cplusplus } diff --git a/include/os/osSystem.h b/include/os/osSystem.h index 58f34d26f0..5154c56e4b 100644 --- a/include/os/osSystem.h +++ b/include/os/osSystem.h @@ -46,27 +46,73 @@ void taosSetTerminalMode(); int32_t taosGetOldTerminalMode(); void taosResetTerminalMode(); +#define STACKSIZE 100 + #if !defined(WINDOWS) -#define taosPrintTrace(flags, level, dflag) \ - { \ - void* array[100]; \ - int32_t size = backtrace(array, 100); \ - char** strings = backtrace_symbols(array, size); \ - if (strings != NULL) { \ - taosPrintLog(flags, level, dflag, "obtained %d stack frames", size); \ - for (int32_t i = 0; i < size; i++) { \ - taosPrintLog(flags, level, dflag, "frame:%d, %s", i, strings[i]); \ - } \ - } \ - \ - taosMemoryFree(strings); \ +#define taosLogTraceToBuf(buf, bufSize, ignoreNum) { \ + void* array[STACKSIZE]; \ + int32_t size = backtrace(array, STACKSIZE); \ + char** strings = backtrace_symbols(array, size); \ + int32_t offset = 0; \ + if (strings != NULL) { \ + offset = snprintf(buf, bufSize - 1, "obtained %d stack frames\n", (ignoreNum > 0) ? size - ignoreNum : size); \ + for (int32_t i = (ignoreNum > 0) ? ignoreNum : 0; i < size; i++) { \ + offset += snprintf(buf + offset, bufSize - 1 - offset, "frame:%d, %s\n", (ignoreNum > 0) ? i - ignoreNum : i, strings[i]); \ + } \ + } \ + \ + taosMemoryFree(strings); \ +} + +#define taosPrintTrace(flags, level, dflag, ignoreNum) \ + { \ + void* array[STACKSIZE]; \ + int32_t size = backtrace(array, STACKSIZE); \ + char** strings = backtrace_symbols(array, size); \ + if (strings != NULL) { \ + taosPrintLog(flags, level, dflag, "obtained %d stack frames", (ignoreNum > 0) ? size - ignoreNum : size); \ + for (int32_t i = (ignoreNum > 0) ? ignoreNum : 0; i < size; i++) { \ + taosPrintLog(flags, level, dflag, "frame:%d, %s", (ignoreNum > 0) ? i - ignoreNum : i, strings[i]); \ + } \ + } \ + \ + taosMemoryFree(strings); \ } #else + #include #include -#define STACKSIZE 64 -#define taosPrintTrace(flags, level, dflag) \ +#define taosLogTraceToBuf(buf, bufSize, ignoreNum) { \ + unsigned int i; \ + void* stack[STACKSIZE]; \ + unsigned short frames; \ + SYMBOL_INFO* symbol; \ + HANDLE process; \ + int32_t offset = 0; \ + \ + process = GetCurrentProcess(); \ + \ + SymInitialize(process, NULL, TRUE); \ + \ + frames = CaptureStackBackTrace(0, STACKSIZE, stack, NULL); \ + symbol = (SYMBOL_INFO*)calloc(sizeof(SYMBOL_INFO) + 256 * sizeof(char), 1); \ + if (symbol != NULL) { \ + symbol->MaxNameLen = 255; \ + symbol->SizeOfStruct = sizeof(SYMBOL_INFO); \ + \ + if (frames > 0) { \ + offset = snprintf(buf, bufSize - 1, "obtained %d stack frames\n", (ignoreNum > 0) ? frames - ignoreNum : frames); \ + for (i = (ignoreNum > 0) ? ignoreNum : 0; i < frames; i++) { \ + SymFromAddr(process, (DWORD64)(stack[i]), 0, symbol); \ + offset += snprintf(buf + offset, bufSize - 1 - offset, "frame:%i, %s - 0x%0X\n", (ignoreNum > 0) ? i - ignoreNum : i, symbol->Name, symbol->Address); \ + } \ + } \ + free(symbol); \ + } \ + } + +#define taosPrintTrace(flags, level, dflag, ignoreNum) \ { \ unsigned int i; \ void* stack[STACKSIZE]; \ @@ -85,10 +131,10 @@ void taosResetTerminalMode(); symbol->SizeOfStruct = sizeof(SYMBOL_INFO); \ \ if (frames > 0) { \ - taosPrintLog(flags, level, dflag, "obtained %d stack frames", frames); \ - for (i = 0; i < frames; i++) { \ + taosPrintLog(flags, level, dflag, "obtained %d stack frames\n", (ignoreNum > 0) ? frames - ignoreNum : frames); \ + for (i = (ignoreNum > 0) ? ignoreNum : 0; i < frames; i++) { \ SymFromAddr(process, (DWORD64)(stack[i]), 0, symbol); \ - taosPrintLog(flags, level, dflag, "frame:%i: %s - 0x%0X", frames - i - 1, symbol->Name, symbol->Address); \ + taosPrintLog(flags, level, dflag, "frame:%i, %s - 0x%0X\n", (ignoreNum > 0) ? i - ignoreNum : i, symbol->Name, symbol->Address); \ } \ } \ free(symbol); \ diff --git a/include/util/tlog.h b/include/util/tlog.h index 6e9b304e1d..808377fa77 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -99,6 +99,11 @@ bool taosAssertRelease(bool condition); #endif #endif +void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, void *sigInfo); +void taosReadCrashInfo(char* filepath, char** pMsg, int64_t* pMsgLen, TdFilePtr* pFd); +void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile); +int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime); + // clang-format off #define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", DEBUG_FATAL, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index e22aa85c97..3d3dfc8e73 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -43,6 +43,9 @@ # Switch for allowing TDengine to collect and report service usage information # telemetryReporting 1 +# Switch for allowing TDengine to collect and report crash information +# crashReporting 1 + # The maximum number of vnodes supported by this dnode # supportVnodes 0 diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index ea76f726ea..903a6a22ca 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -313,6 +313,8 @@ extern SAppInfo appInfo; extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; extern int32_t timestampDeltaLimit; +extern int64_t lastClusterId; + __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); @@ -340,6 +342,7 @@ void resetConnectDB(STscObj* pTscObj); int taos_options_imp(TSDB_OPTION option, const char* str); void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); +void tscStopCrashReport(); typedef struct AsyncArg { SRpcMsg msg; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 64e1fd908a..2ecade58f9 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -28,13 +28,16 @@ #include "trpc.h" #include "tsched.h" #include "ttime.h" +#include "thttp.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 SAppInfo appInfo; +int64_t lastClusterId = 0; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; +int32_t clientStop = 0; int32_t timestampDeltaLimit = 900; // s @@ -385,6 +388,146 @@ void destroyRequest(SRequestObj *pRequest) { removeRequest(pRequest->self); } +void taosClientCrash(int signum, void *sigInfo, void *context) { + taosIgnSignal(SIGTERM); + taosIgnSignal(SIGHUP); + taosIgnSignal(SIGINT); + taosIgnSignal(SIGBREAK); + +#if !defined(WINDOWS) + taosIgnSignal(SIGBUS); +#endif + taosIgnSignal(SIGABRT); + taosIgnSignal(SIGFPE); + taosIgnSignal(SIGSEGV); + + char *pMsg = NULL; + const char *flags = "UTL FATAL "; + ELogLevel level = DEBUG_FATAL; + int32_t dflag = 255; + int64_t msgLen= -1; + + if (tsEnableCrashReport) { + if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) { + taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); + goto _return; + } else { + msgLen = strlen(pMsg); + } + } + +_return: + + taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo); + + exit(signum); +} + +void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } + +static void *tscCrashReportThreadFp(void *param) { + setThreadName("client-crashReport"); + char filepath[PATH_MAX] = {0}; + snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP); + char *pMsg = NULL; + int64_t msgLen = 0; + TdFilePtr pFile = NULL; + bool truncateFile = false; + int32_t sleepTime = 200; + int32_t reportPeriodNum = 3600 * 1000 / sleepTime; + int32_t loopTimes = reportPeriodNum; + +#ifdef WINDOWS + if (taosCheckCurrentInDll()) { + atexit(crashReportThreadFuncUnexpectedStopped); + } +#endif + + while (1) { + if (clientStop) break; + if (loopTimes++ < reportPeriodNum) { + taosMsleep(sleepTime); + continue; + } + + taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile); + if (pMsg && msgLen > 0) { + if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { + tscError("failed to send crash report"); + if (pFile) { + taosReleaseCrashLogFile(pFile, false); + continue; + } + } else { + tscInfo("succeed to send crash report"); + truncateFile = true; + } + } else { + tscDebug("no crash info"); + } + + taosMemoryFree(pMsg); + + if (pMsg && msgLen > 0) { + pMsg = NULL; + continue; + } + + if (pFile) { + taosReleaseCrashLogFile(pFile, truncateFile); + truncateFile = false; + } + + taosMsleep(sleepTime); + loopTimes = 0; + } + + clientStop = -1; + return NULL; +} + +int32_t tscCrashReportInit() { + if (!tsEnableCrashReport) { + return 0; + } + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + TdThread crashReportThread; + if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) { + tscError("failed to create crashReport thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + return 0; +} + +void tscStopCrashReport() { + if (!tsEnableCrashReport) { + return; + } + + if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) { + tscDebug("hb thread already stopped"); + return; + } + + while (atomic_load_32(&clientStop) > 0) { + taosMsleep(100); + } +} + +static void tscSetSignalHandle() { +#if !defined(WINDOWS) + taosSetSignal(SIGBUS, taosClientCrash); +#endif + taosSetSignal(SIGABRT, taosClientCrash); + taosSetSignal(SIGFPE, taosClientCrash); + taosSetSignal(SIGSEGV, taosClientCrash); +} + void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. @@ -392,6 +535,10 @@ void taos_init_imp(void) { errno = TSDB_CODE_SUCCESS; taosSeedRand(taosGetTimestampSec()); + appInfo.pid = taosGetPId(); + appInfo.startTime = taosGetTimestampMs(); + appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + deltaToUtcInitOnce(); if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { @@ -404,6 +551,8 @@ void taos_init_imp(void) { return; } + tscSetSignalHandle(); + initQueryModuleMsgHandle(); if (taosConvInit() != 0) { @@ -433,9 +582,8 @@ void taos_init_imp(void) { taosGetAppName(appInfo.appName, NULL); taosThreadMutexInit(&appInfo.mutex, NULL); - appInfo.pid = taosGetPId(); - appInfo.startTime = taosGetTimestampMs(); - appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + tscCrashReportInit(); + tscDebug("client is initialized successfully"); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f36036fd0a..53acafeeaa 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1253,7 +1253,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); - + tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { const char* errorMsg = diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 7f79323c4c..15c1d65162 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -55,6 +55,8 @@ void taos_cleanup(void) { return; } + tscStopCrashReport(); + int32_t id = clientReqRefPool; clientReqRefPool = -1; taosCloseRef(id); @@ -106,7 +108,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha if (pass == NULL) { pass = TSDB_DEFAULT_PASS; } - + STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); if (pObj) { int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 85027ff371..f414c7e92f 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -119,6 +119,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { // update the appInstInfo pTscObj->pAppInfo->clusterId = connectRsp.clusterId; + lastClusterId = connectRsp.clusterId; pTscObj->connType = connectRsp.connType; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index deefa65595..f670ce5aaf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -73,6 +73,11 @@ bool tsEnableTelem = true; int32_t tsTelemInterval = 43200; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; +char* tsTelemUri = "/report"; + +bool tsEnableCrashReport = true; +char* tsClientCrashReportUri = "/ccrashreport"; +char* tsSvrCrashReportUri = "/dcrashreport"; // schemaless char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; @@ -202,7 +207,9 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg); #endif -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -314,6 +321,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; + if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); @@ -377,7 +385,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; - tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4); + tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS); if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; tsNumOfCommitThreads = tsNumOfCores / 2; @@ -434,6 +442,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, 0) != 0) return -1; + if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, 0) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, 0) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, 0) != 0) return -1; if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1; @@ -665,6 +674,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval; + tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; return 0; @@ -715,7 +725,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; - tsSIMDBuiltins = (bool) cfgGetItem(pCfg, "SIMD-builtins")->bval; + tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "SIMD-builtins")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; @@ -726,6 +736,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; + tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; @@ -795,6 +806,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32; } else if (strcasecmp("cDebugFlag", name) == 0) { cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32; + } else if (strcasecmp("crashReporting", name) == 0) { + tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; } break; } diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index d308d3e618..711280ea58 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -44,6 +44,7 @@ static struct { char apolloUrl[PATH_MAX]; const char **envCmd; SArray *pArgs; // SConfigPair + int64_t startTime; } global = {0}; static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetAllDebugFlag(143, true); } @@ -67,23 +68,67 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) { dmStop(); } +void dmLogCrash(int signum, void *sigInfo, void *context) { + taosIgnSignal(SIGTERM); + taosIgnSignal(SIGHUP); + taosIgnSignal(SIGINT); + taosIgnSignal(SIGBREAK); + +#ifndef WINDOWS + taosIgnSignal(SIGBUS); +#endif + taosIgnSignal(SIGABRT); + taosIgnSignal(SIGFPE); + taosIgnSignal(SIGSEGV); + + char *pMsg = NULL; + const char *flags = "UTL FATAL "; + ELogLevel level = DEBUG_FATAL; + int32_t dflag = 255; + int64_t msgLen= -1; + + if (tsEnableCrashReport) { + if (taosGenCrashJsonMsg(signum, &pMsg, dmGetClusterId(), global.startTime)) { + taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); + goto _return; + } else { + msgLen = strlen(pMsg); + } + } + +_return: + + taosLogCrashInfo("taosd", pMsg, msgLen, signum, sigInfo); + + exit(signum); +} + static void dmSetSignalHandle() { taosSetSignal(SIGUSR1, dmSetDebugFlag); taosSetSignal(SIGUSR2, dmSetAssert); taosSetSignal(SIGTERM, dmStopDnode); taosSetSignal(SIGHUP, dmStopDnode); taosSetSignal(SIGINT, dmStopDnode); - taosSetSignal(SIGABRT, dmStopDnode); taosSetSignal(SIGBREAK, dmStopDnode); #ifndef WINDOWS taosSetSignal(SIGTSTP, dmStopDnode); taosSetSignal(SIGQUIT, dmStopDnode); #endif + +#ifndef WINDOWS + taosSetSignal(SIGBUS, dmLogCrash); +#endif + taosSetSignal(SIGABRT, dmLogCrash); + taosSetSignal(SIGFPE, dmLogCrash); + taosSetSignal(SIGSEGV, dmLogCrash); } static int32_t dmParseArgs(int32_t argc, char const *argv[]) { + global.startTime = taosGetTimestampMs(); + int32_t cmdEnvIndex = 0; if (argc < 2) return 0; + global.envCmd = taosMemoryMalloc((argc - 1) * sizeof(char *)); memset(global.envCmd, 0, (argc - 1) * sizeof(char *)); for (int32_t i = 1; i < argc; ++i) { diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index c776beb3f0..ff32cbcb08 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -29,6 +29,7 @@ typedef struct SDnodeMgmt { const char *name; TdThread statusThread; TdThread monitorThread; + TdThread crashReportThread; SSingleWorker mgmtWorker; ProcessCreateNodeFp processCreateNodeFp; ProcessDropNodeFp processDropNodeFp; @@ -55,6 +56,8 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt); void dmStopStatusThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt); +int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt); +void dmStopCrashReportThread(SDnodeMgmt *pMgmt); int32_t dmStartWorker(SDnodeMgmt *pMgmt); void dmStopWorker(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index d2db1a4a62..51df293ba7 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -23,6 +23,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if (dmStartMonitorThread(pMgmt) != 0) { return -1; } + if (dmStartCrashReportThread(pMgmt) != 0) { + return -1; + } return 0; } @@ -30,6 +33,7 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) { pMgmt->pData->stopped = true; dmStopMonitorThread(pMgmt); dmStopStatusThread(pMgmt); + dmStopCrashReportThread(pMgmt); } static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 80c040a5e8..76c8e09b70 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmInt.h" +#include "thttp.h" static void *dmStatusThreadFp(void *param) { SDnodeMgmt *pMgmt = param; @@ -63,6 +64,63 @@ static void *dmMonitorThreadFp(void *param) { return NULL; } +static void *dmCrashReportThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); + setThreadName("dnode-crashReport"); + char filepath[PATH_MAX] = {0}; + snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP); + char *pMsg = NULL; + int64_t msgLen = 0; + TdFilePtr pFile = NULL; + bool truncateFile = false; + int32_t sleepTime = 200; + int32_t reportPeriodNum = 3600 * 1000 / sleepTime;; + int32_t loopTimes = reportPeriodNum; + + while (1) { + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + if (loopTimes++ < reportPeriodNum) { + taosMsleep(sleepTime); + continue; + } + + taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile); + if (pMsg && msgLen > 0) { + if (taosSendHttpReport(tsTelemServer, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { + dError("failed to send crash report"); + if (pFile) { + taosReleaseCrashLogFile(pFile, false); + continue; + } + } else { + dInfo("succeed to send crash report"); + truncateFile = true; + } + } else { + dDebug("no crash info"); + } + + taosMemoryFree(pMsg); + + if (pMsg && msgLen > 0) { + pMsg = NULL; + continue; + } + + if (pFile) { + taosReleaseCrashLogFile(pFile, truncateFile); + truncateFile = false; + } + + taosMsleep(sleepTime); + loopTimes = 0; + } + + return NULL; +} + + int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -105,6 +163,36 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { } } +int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) { + if (!tsEnableCrashReport) { + return 0; + } + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) { + dError("failed to create crashReport thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("dnode-crashReport", "initialized"); + return 0; +} + +void dmStopCrashReportThread(SDnodeMgmt *pMgmt) { + if (!tsEnableCrashReport) { + return; + } + + if (taosCheckPthreadValid(pMgmt->crashReportThread)) { + taosThreadJoin(pMgmt->crashReportThread, NULL); + taosThreadClear(&pMgmt->crashReportThread); + } +} + + static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index f736ffd0c8..dd05fe673a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" +#include "tjson.h" int32_t mmReadFile(const char *path, SMnodeOpt *pOption) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; @@ -130,56 +131,70 @@ _OVER: return code; } +static int32_t mmEncodeOption(SJson *pJson, const SMnodeOpt *pOption) { + if (pOption->deploy && pOption->numOfReplicas > 0) { + if (tjsonAddDoubleToObject(pJson, "selfIndex", pOption->selfIndex) < 0) return -1; + + SJson *replicas = tjsonCreateArray(); + if (replicas == NULL) return -1; + if (tjsonAddItemToObject(pJson, "replicas", replicas) < 0) return -1; + + for (int32_t i = 0; i < pOption->numOfReplicas; ++i) { + SJson *replica = tjsonCreateObject(); + if (replica == NULL) return -1; + + const SReplica *pReplica = pOption->replicas + i; + if (tjsonAddDoubleToObject(replica, "id", pReplica->id) < 0) return -1; + if (tjsonAddStringToObject(replica, "fqdn", pReplica->fqdn) < 0) return -1; + if (tjsonAddDoubleToObject(replica, "port", pReplica->port) < 0) return -1; + if (tjsonAddItemToArray(replicas, replica) < 0) return -1; + } + } + + if (tjsonAddDoubleToObject(pJson, "deployed", pOption->deploy) < 0) return -1; + + return 0; +} + int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) { - char file[PATH_MAX] = {0}; - char realfile[PATH_MAX] = {0}; + int32_t code = -1; + char *buffer = NULL; + SJson *pJson = NULL; + TdFilePtr pFile = NULL; + char file[PATH_MAX] = {0}; + char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to write %s since %s", file, terrstr()); - return -1; - } + terrno = TSDB_CODE_OUT_OF_MEMORY; + pJson = tjsonCreateObject(); + if (pJson == NULL) goto _OVER; + if (mmEncodeOption(pJson, pOption) != 0) goto _OVER; + buffer = tjsonToString(pJson); + if (buffer == NULL) goto _OVER; + terrno = 0; - int32_t len = 0; - int32_t maxLen = 4096; - char *content = taosMemoryCalloc(1, maxLen + 1); + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) goto _OVER; - len += snprintf(content + len, maxLen - len, "{\n"); - if (pOption->deploy && pOption->numOfReplicas > 0) { - len += snprintf(content + len, maxLen - len, " \"selfIndex\": %d,\n", pOption->selfIndex); - len += snprintf(content + len, maxLen - len, " \"replicas\": [{\n"); + int32_t len = strlen(buffer); + if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; + if (taosFsyncFile(pFile) < 0) goto _OVER; - for (int32_t i = 0; i < pOption->numOfReplicas; ++i) { - const SReplica *pReplica = pOption->replicas + i; - if (pReplica != NULL && pReplica->id > 0) { - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); - } - if (i < pOption->numOfReplicas - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }],\n"); - } - } - } - len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", pOption->deploy); - len += snprintf(content + len, maxLen - len, "}\n"); - - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); taosCloseFile(&pFile); - taosMemoryFree(content); + if (taosRenameFile(file, realfile) != 0) goto _OVER; - if (taosRenameFile(file, realfile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to rename %s since %s", file, terrstr()); - return -1; + code = 0; + dInfo("succeed to write mnode file:%s, deloyed:%d", realfile, pOption->deploy); + +_OVER: + if (pJson != NULL) tjsonDelete(pJson); + if (buffer != NULL) taosMemoryFree(buffer); + if (pFile != NULL) taosCloseFile(&pFile); + + if (code != 0) { + if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to write mnode file:%s since %s, deloyed:%d", realfile, terrstr(), pOption->deploy); } - - dDebug("succeed to write %s, deployed:%d", realfile, pOption->deploy); - return 0; + return code; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index dc32054fd7..8337fb5d10 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +#include "tjson.h" #define MAX_CONTENT_LEN 2 * 1024 * 1024 @@ -144,65 +145,66 @@ _OVER: return code; } -int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { - int32_t code = 0; - char file[PATH_MAX] = {0}; - char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); - snprintf(realfile, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); +static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t numOfVnodes) { + SJson *vnodes = tjsonCreateArray(); + if (vnodes == NULL) return -1; + if (tjsonAddItemToObject(pJson, "vnodes", vnodes) < 0) return -1; - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to write %s since %s", file, terrstr()); - return -1; - } - - int32_t numOfVnodes = 0; - SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); - if (ppVnodes == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; - dError("failed to write %s while get vnodelist", file); - goto _OVER; - } - - int32_t len = 0; - int32_t maxLen = MAX_CONTENT_LEN; - char *content = taosMemoryCalloc(1, maxLen + 1); - if (content == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; - dError("failed to write %s while malloc content", file); - goto _OVER; - } - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); for (int32_t i = 0; i < numOfVnodes; ++i) { SVnodeObj *pVnode = ppVnodes[i]; if (pVnode == NULL) continue; - len += snprintf(content + len, maxLen - len, " {\n"); - len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped); - len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d\n", pVnode->vgVersion); - if (i < numOfVnodes - 1) { - len += snprintf(content + len, maxLen - len, " },\n"); - } else { - len += snprintf(content + len, maxLen - len, " }\n"); - } + SJson *vnode = tjsonCreateObject(); + if (vnode == NULL) return -1; + if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1; + if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1; + if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1; + if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1; } - len += snprintf(content + len, maxLen - len, " ]\n"); - len += snprintf(content + len, maxLen - len, "}\n"); + + return 0; +} + +int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { + int32_t code = -1; + char *buffer = NULL; + SJson *pJson = NULL; + TdFilePtr pFile = NULL; + SVnodeObj **ppVnodes = NULL; + char file[PATH_MAX] = {0}; + char realfile[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); + + int32_t numOfVnodes = 0; + ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); + if (ppVnodes == NULL) goto _OVER; + + terrno = TSDB_CODE_OUT_OF_MEMORY; + pJson = tjsonCreateObject(); + if (pJson == NULL) goto _OVER; + if (vmEncodeVnodeList(pJson, ppVnodes, numOfVnodes) != 0) goto _OVER; + buffer = tjsonToString(pJson); + if (buffer == NULL) goto _OVER; terrno = 0; -_OVER: - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); - taosCloseFile(&pFile); - taosMemoryFree(content); + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) goto _OVER; + int32_t len = strlen(buffer); + if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; + if (taosFsyncFile(pFile) < 0) goto _OVER; + + taosCloseFile(&pFile); + if (taosRenameFile(file, realfile) != 0) goto _OVER; + + code = 0; + dInfo("succeed to write vnodes file:%s, vnodes:%d", realfile, numOfVnodes); + +_OVER: + if (pJson != NULL) tjsonDelete(pJson); + if (buffer != NULL) taosMemoryFree(buffer); + if (pFile != NULL) taosCloseFile(&pFile); if (ppVnodes != NULL) { for (int32_t i = 0; i < numOfVnodes; ++i) { SVnodeObj *pVnode = ppVnodes[i]; @@ -213,14 +215,9 @@ _OVER: taosMemoryFree(ppVnodes); } - if (code != 0) return -1; - - dInfo("succeed to write %s, numOfVnodes:%d", realfile, numOfVnodes); - code = taosRenameFile(file, realfile); - if (code != 0) { - dError("failed to rename %s to %s", file, realfile); + if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes); } - return code; } \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 7e85e6b722..02cd678433 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -85,6 +85,7 @@ typedef struct SDnode { // dmEnv.c SDnode *dmInstance(); void dmReportStartup(const char *pName, const char *pDesc); +int64_t dmGetClusterId(); // dmMgmt.c int32_t dmInitDnode(SDnode *pDnode); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index e3bda5a3f0..1d0236c0c5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -268,3 +268,8 @@ void dmReportStartup(const char *pName, const char *pDesc) { tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); dDebug("step:%s, %s", pStartup->name, pStartup->desc); } + +int64_t dmGetClusterId() { + return global.data.clusterId; +} + diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index 981797834a..08330e025f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -111,6 +111,7 @@ static int32_t dmStartNodes(SDnode *pDnode) { dInfo("TDengine initialized successfully"); dmReportStartup("TDengine", "initialized successfully"); + return 0; } diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index a7a63fbaca..3e2d8b53aa 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmUtil.h" +#include "tjson.h" #include "tmisce.h" static void dmPrintEps(SDnodeData *pData); @@ -181,81 +182,73 @@ _OVER: return code; } -int32_t dmWriteEps(SDnodeData *pData) { - int32_t code = -1; - char *content = NULL; - TdFilePtr pFile = NULL; +static int32_t dmEncodeEps(SJson *pJson, SDnodeData *pData) { + if (tjsonAddDoubleToObject(pJson, "dnodeId", pData->dnodeId) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "dnodeVer", pData->dnodeVer) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "clusterId", pData->clusterId) < 0) return -1; + if (tjsonAddDoubleToObject(pJson, "dropped", pData->dropped) < 0) return -1; - char file[PATH_MAX] = {0}; - char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP); - - pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - dError("failed to open %s since %s", file, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _OVER; - } - - int32_t len = 0; - int32_t maxLen = 256 * 1024; - content = taosMemoryCalloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId); - len += snprintf(content + len, maxLen - len, " \"dnodeVer\": \"%" PRId64 "\",\n", pData->dnodeVer); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped); - len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); + SJson *dnodes = tjsonCreateArray(); + if (dnodes == NULL) return -1; + if (tjsonAddItemToObject(pJson, "dnodes", dnodes) < 0) return -1; int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps); for (int32_t i = 0; i < numOfEps; ++i) { SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i); - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); - len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode); - if (i < numOfEps - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); + SJson *dnode = tjsonCreateObject(); + if (dnode == NULL) return -1; - if (taosWriteFile(pFile, content, len) != len) { - dError("failed to write %s since %s", file, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _OVER; + if (tjsonAddDoubleToObject(dnode, "id", pDnodeEp->id) < 0) return -1; + if (tjsonAddStringToObject(dnode, "fqdn", pDnodeEp->ep.fqdn) < 0) return -1; + if (tjsonAddDoubleToObject(dnode, "port", pDnodeEp->ep.port) < 0) return -1; + if (tjsonAddDoubleToObject(dnode, "isMnode", pDnodeEp->isMnode) < 0) return -1; + if (tjsonAddItemToArray(dnodes, dnode) < 0) return -1; } - if (taosFsyncFile(pFile) < 0) { - dError("failed to fsync %s since %s", file, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _OVER; - } + return 0; +} + +int32_t dmWriteEps(SDnodeData *pData) { + int32_t code = -1; + char *buffer = NULL; + SJson *pJson = NULL; + TdFilePtr pFile = NULL; + char file[PATH_MAX] = {0}; + char realfile[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP); + + terrno = TSDB_CODE_OUT_OF_MEMORY; + pJson = tjsonCreateObject(); + if (pJson == NULL) goto _OVER; + if (dmEncodeEps(pJson, pData) != 0) goto _OVER; + buffer = tjsonToString(pJson); + if (buffer == NULL) goto _OVER; + terrno = 0; + + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) goto _OVER; + + int32_t len = strlen(buffer); + if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; + if (taosFsyncFile(pFile) < 0) goto _OVER; taosCloseFile(&pFile); - taosMemoryFreeClear(content); - - if (taosRenameFile(file, realfile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to rename %s since %s", file, terrstr()); - goto _OVER; - } + if (taosRenameFile(file, realfile) != 0) goto _OVER; code = 0; pData->updateTime = taosGetTimestampMs(); - dInfo("succeed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer); + dInfo("succeed to write dnode file:%s, dnodeVer:%" PRId64, realfile, pData->dnodeVer); _OVER: - if (content != NULL) taosMemoryFreeClear(content); + if (pJson != NULL) tjsonDelete(pJson); + if (buffer != NULL) taosMemoryFree(buffer); if (pFile != NULL) taosCloseFile(&pFile); - if (code != 0) { - dError("failed to write file %s since %s", realfile, terrstr()); - } + if (code != 0) { + if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); + dInfo("succeed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, terrstr(), pData->dnodeVer); + } return code; } diff --git a/source/dnode/mgmt/node_util/src/dmFile.c b/source/dnode/mgmt/node_util/src/dmFile.c index 2eb1462efc..4dcc962a20 100644 --- a/source/dnode/mgmt/node_util/src/dmFile.c +++ b/source/dnode/mgmt/node_util/src/dmFile.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmUtil.h" +#include "tjson.h" #define MAXLEN 1024 @@ -63,56 +64,51 @@ _OVER: return code; } +static int32_t dmEncodeFile(SJson *pJson, bool deployed) { + if (tjsonAddDoubleToObject(pJson, "deployed", deployed) < 0) return -1; + return 0; +} + int32_t dmWriteFile(const char *path, const char *name, bool deployed) { int32_t code = -1; - int32_t len = 0; - char content[MAXLEN + 1] = {0}; + char *buffer = NULL; + SJson *pJson = NULL; + TdFilePtr pFile = NULL; char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; - TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name); snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name); + terrno = TSDB_CODE_OUT_OF_MEMORY; + pJson = tjsonCreateObject(); + if (pJson == NULL) goto _OVER; + if (dmEncodeFile(pJson, deployed) != 0) goto _OVER; + buffer = tjsonToString(pJson); + if (buffer == NULL) goto _OVER; + terrno = 0; + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to write %s since %s", file, terrstr()); - goto _OVER; - } + if (pFile == NULL) goto _OVER; - len += snprintf(content + len, MAXLEN - len, "{\n"); - len += snprintf(content + len, MAXLEN - len, " \"deployed\": %d\n", deployed); - len += snprintf(content + len, MAXLEN - len, "}\n"); - - if (taosWriteFile(pFile, content, len) != len) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to write file:%s since %s", file, terrstr()); - goto _OVER; - } - - if (taosFsyncFile(pFile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to fsync file:%s since %s", file, terrstr()); - goto _OVER; - } + int32_t len = strlen(buffer); + if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; + if (taosFsyncFile(pFile) < 0) goto _OVER; taosCloseFile(&pFile); + if (taosRenameFile(file, realfile) != 0) goto _OVER; - if (taosRenameFile(file, realfile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to rename %s since %s", file, terrstr()); - return -1; - } - - dInfo("succeed to write %s, deployed:%d", realfile, deployed); code = 0; + dInfo("succeed to write file:%s, deloyed:%d", realfile, deployed); _OVER: - if (pFile != NULL) { - taosCloseFile(&pFile); - } + if (pJson != NULL) tjsonDelete(pJson); + if (buffer != NULL) taosMemoryFree(buffer); + if (pFile != NULL) taosCloseFile(&pFile); + if (code != 0) { + if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to write file:%s since %s, deloyed:%d", realfile, terrstr(), deployed); + } return code; } diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index cf7deba397..8943ba703e 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -34,6 +34,8 @@ SHashObj *mndDupDbHash(SHashObj *pOld); SHashObj *mndDupTopicHash(SHashObj *pOld); int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen); +int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db); +int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 87b5a5c42d..7e5c29d56f 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1051,17 +1051,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; - - SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser); - if (pUser != NULL) { - pUser->authVersion++; - SSdbRaw *pCommitRaw = mndUserActionEncode(pUser); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - goto _OVER; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - } + if (mndUserRemoveDb(pMnode, pTrans, pDb->name) != 0) goto _OVER; int32_t rspLen = 0; void *pRsp = NULL; diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index b0b49b42dc..679fafa28d 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -131,7 +131,7 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { taosThreadMutexUnlock(&pMgmt->lock); if (pCont != NULL) { - if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { + if (taosSendHttpReport(tsTelemServer, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { mError("failed to send telemetry report"); } else { mInfo("succeed to send telemetry report"); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bf3827c090..48c35f3f07 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -604,22 +604,19 @@ _OVER: } static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) { + int32_t code = -1; + if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) goto _OVER; + SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + code = 0; +_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { @@ -890,6 +887,7 @@ int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) { return 0; } +#if 0 int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; @@ -917,3 +915,4 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return code; } +#endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 85a92c7aef..b965e13316 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -285,14 +285,35 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) { return 0; } -static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { - mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser); +static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) { + memcpy(pNew, pUser, sizeof(SUserObj)); + pNew->authVersion++; + pNew->updateTime = taosGetTimestampMs(); + + taosRLockLatch(&pUser->lock); + pNew->readDbs = mndDupDbHash(pUser->readDbs); + pNew->writeDbs = mndDupDbHash(pUser->writeDbs); + pNew->topics = mndDupTopicHash(pUser->topics); + taosRUnLockLatch(&pUser->lock); + + if (pNew->readDbs == NULL || pNew->writeDbs == NULL || pNew->topics == NULL) { + return -1; + } + return 0; +} + +static void mndUserFreeObj(SUserObj *pUser) { taosHashCleanup(pUser->readDbs); taosHashCleanup(pUser->writeDbs); taosHashCleanup(pUser->topics); pUser->readDbs = NULL; pUser->writeDbs = NULL; pUser->topics = NULL; +} + +static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { + mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser); + mndUserFreeObj(pUser); return 0; } @@ -516,19 +537,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { goto _OVER; } - memcpy(&newUser, pUser, sizeof(SUserObj)); - newUser.authVersion++; - newUser.updateTime = taosGetTimestampMs(); - - taosRLockLatch(&pUser->lock); - newUser.readDbs = mndDupDbHash(pUser->readDbs); - newUser.writeDbs = mndDupDbHash(pUser->writeDbs); - newUser.topics = mndDupTopicHash(pUser->topics); - taosRUnLockLatch(&pUser->lock); - - if (newUser.readDbs == NULL || newUser.writeDbs == NULL || newUser.topics == NULL) { - goto _OVER; - } + if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER; if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char pass[TSDB_PASSWORD_LEN + 1] = {0}; @@ -654,9 +663,7 @@ _OVER: mndReleaseUser(pMnode, pOperUser); mndReleaseUser(pMnode, pUser); - taosHashCleanup(newUser.writeDbs); - taosHashCleanup(newUser.readDbs); - taosHashCleanup(newUser.topics); + mndUserFreeObj(&newUser); return code; } @@ -1007,3 +1014,74 @@ _OVER: tFreeSUserAuthBatchRsp(&batchRsp); return code; } + +int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + int32_t len = strlen(db) + 1; + void *pIter = NULL; + SUserObj *pUser = NULL; + SUserObj newUser = {0}; + + while (1) { + pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser); + if (pIter == NULL) break; + + code = -1; + if (mndUserDupObj(pUser, &newUser) != 0) break; + + bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL); + bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL); + if (inRead || inWrite) { + (void)taosHashRemove(newUser.readDbs, db, len); + (void)taosHashRemove(newUser.writeDbs, db, len); + + SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break; + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + } + + mndUserFreeObj(&newUser); + sdbRelease(pSdb, pUser); + code = 0; + } + + if (pUser != NULL) sdbRelease(pSdb, pUser); + if (pIter != NULL) sdbCancelFetch(pSdb, pIter); + mndUserFreeObj(&newUser); + return code; +} + +int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + int32_t len = strlen(topic) + 1; + void *pIter = NULL; + SUserObj *pUser = NULL; + SUserObj newUser = {0}; + + while (1) { + pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser); + if (pIter == NULL) break; + + code = -1; + if (mndUserDupObj(pUser, &newUser) != 0) break; + + bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL); + if (inTopic) { + (void)taosHashRemove(newUser.topics, topic, len); + SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break; + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + } + + mndUserFreeObj(&newUser); + sdbRelease(pSdb, pUser); + code = 0; + } + + if (pUser != NULL) sdbRelease(pSdb, pUser); + if (pIter != NULL) sdbCancelFetch(pSdb, pIter); + mndUserFreeObj(&newUser); + return code; +} diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index f43b6bdb25..2a63e3faf3 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -636,15 +636,20 @@ int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) { } int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, int64_t term, int64_t config) { - int32_t code = 0; + int32_t code = -1; if (!isApply) { mInfo("sdbiter:%p, not apply to sdb", pIter); - sdbCloseIter(pIter); - return 0; + code = 0; + goto _OVER; + } + + if (taosFsyncFile(pIter->file) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, terrstr()); + goto _OVER; } - taosFsyncFile(pIter->file); taosCloseFile(&pIter->file); pIter->file = NULL; @@ -653,14 +658,12 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, i if (taosRenameFile(pIter->name, datafile) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr()); - sdbCloseIter(pIter); - return -1; + goto _OVER; } if (sdbReadFile(pSdb) != 0) { mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr()); - sdbCloseIter(pIter); - return -1; + goto _OVER; } if (config > 0) { @@ -674,8 +677,11 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, i } mInfo("sdbiter:%p, success applyed to sdb", pIter); + code = 0; + +_OVER: sdbCloseIter(pIter); - return 0; + return code; } int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) { diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 9748963722..20db35e5b5 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -395,6 +395,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { int32_t code = 0; SVnode *pVnode = pSma->pVnode; + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + if (!pSmaEnv) { + goto _exit; + } #if 0 SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 7b5020d395..08d52554c6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -192,6 +192,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) { int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; if (rowVer >= pReader->sver && rowVer <= pReader->ever) { + pIter->rInfo.suid = pIter->bData.suid; pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); goto _out; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index cc22668b29..e75dc24329 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -406,8 +406,10 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_ snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path); } - SVnode *pVnode = pWriter->pVnode; + SVnodeStats vndStats = pWriter->info.config.vndStats; + SVnode *pVnode = pWriter->pVnode; pWriter->info.config = pVnode->config; + pWriter->info.config.vndStats = vndStats; vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId); if (vnodeSaveInfo(dir, &pWriter->info) < 0) { code = terrno; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index b8590c9255..438128203e 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -483,7 +483,7 @@ int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* tas _return: CTG_UNLOCK(CTG_WRITE, &pJob->taskLock); - + return code; } @@ -905,6 +905,14 @@ int32_t ctgCallUserCb(void* param) { return TSDB_CODE_SUCCESS; } +void ctgUpdateJobErrCode(SCtgJob* pJob, int32_t errCode) { + if (!NEED_CLIENT_REFRESH_VG_ERROR(errCode) || errCode == TSDB_CODE_SUCCESS) return; + + atomic_store_32(&pJob->jobResCode, errCode); + qDebug("QID:0x%" PRIx64 " ctg job errCode updated to %s", pJob->queryId, tstrerror(errCode)); + return; +} + int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { SCtgJob* pJob = pTask->pJob; int32_t code = 0; @@ -924,6 +932,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { if (taskDone < taosArrayGetSize(pJob->pTasks)) { qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, (int32_t)taosArrayGetSize(pJob->pTasks)); + + ctgUpdateJobErrCode(pJob, rspCode); return TSDB_CODE_SUCCESS; } @@ -931,7 +941,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { _return: - pJob->jobResCode = code; + ctgUpdateJobErrCode(pJob, rspCode); + // pJob->jobResCode = code; // taosSsleep(2); // qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); @@ -1098,7 +1109,8 @@ _return: } if (code) { - ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code)); + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, + tstrerror(code)); } if (pTask->res || code) { ctgHandleTaskEnd(pTask, code); @@ -1286,7 +1298,8 @@ _return: TSWAP(pTask->res, ctx->pResList); taskDone = true; } - ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code)); + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, + tstrerror(code)); } if (pTask->res && taskDone) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 06ef36664a..03be1ee6f2 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -108,7 +108,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { return TSDB_CODE_SUCCESS; } -void tsortClearOrderdSource(SArray *pOrderedSource) { +void tsortClearOrderdSource(SArray* pOrderedSource) { for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { SSortSource** pSource = taosArrayGet(pOrderedSource, i); if (NULL == *pSource) { @@ -121,6 +121,12 @@ void tsortClearOrderdSource(SArray *pOrderedSource) { if ((*pSource)->param && !(*pSource)->onlyRef) { taosMemoryFree((*pSource)->param); } + + if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) { + blockDataDestroy((*pSource)->src.pBlock); + (*pSource)->src.pBlock = NULL; + } + taosMemoryFreeClear(*pSource); } @@ -629,9 +635,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (pHandle->type == SORT_SINGLESOURCE_SORT) { SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); - SSortSource* source = *pSource; + SSortSource* source = *pSource; *pSource = NULL; - + tsortClearOrderdSource(pHandle->pOrderedSource); while (1) { @@ -659,6 +665,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (source->param && !source->onlyRef) { taosMemoryFree(source->param); } + if (!source->onlyRef && source->src.pBlock) { + blockDataDestroy(source->src.pBlock); + source->src.pBlock = NULL; + } taosMemoryFree(source); return code; } @@ -672,6 +682,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (source->param && !source->onlyRef) { taosMemoryFree(source->param); } + if (!source->onlyRef && source->src.pBlock) { + blockDataDestroy(source->src.pBlock); + source->src.pBlock = NULL; + } taosMemoryFree(source); return code; } @@ -849,8 +863,8 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo info = {0}; if (pHandle == NULL) { - info.sortMethod = SORT_QSORT_T; // by default - info.sortBuffer = 2 * 1048576; // 2mb by default + info.sortMethod = SORT_QSORT_T; // by default + info.sortBuffer = 2 * 1048576; // 2mb by default } else { info.sortBuffer = pHandle->pageSize * pHandle->numOfPages; info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T; diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 873dc46a08..80159460f5 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -73,10 +73,10 @@ void tMemBucketDestroy(tMemBucket *pBucket); int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); -double getPercentile(tMemBucket *pMemBucket, double percent); +int32_t getPercentile(tMemBucket *pMemBucket, double percent, double *result); #endif // TDENGINE_TPERCENTILE_H #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 8fde27e046..cd224b716e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1670,15 +1670,14 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { tMemBucket* pMemBucket = ppInfo->pMemBucket; if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null - SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v)); + int32_t code = getPercentile(pMemBucket, v, &ppInfo->result); + if (code != TSDB_CODE_SUCCESS) { + tMemBucketDestroy(pMemBucket); + return code; + } } tMemBucketDestroy(pMemBucket); - - if (ppInfo->result < 0) { - return TSDB_CODE_NO_AVAIL_DISK; - } - return functionFinalize(pCtx, pBlock); } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 04472c42ec..acadb9de1b 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -90,7 +90,7 @@ static void resetPosInfo(SSlotInfo *pInfo) { pInfo->data = NULL; } -double findOnlyResult(tMemBucket *pMemBucket) { +int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) { ASSERT(pMemBucket->total == 1); for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { @@ -108,17 +108,17 @@ double findOnlyResult(tMemBucket *pMemBucket) { int32_t *pageId = taosArrayGet(list, 0); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); if (pPage == NULL) { - return -1; + return TSDB_CODE_NO_AVAIL_DISK; } ASSERT(pPage->num == 1); - double v = 0; - GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data); - return v; + GET_TYPED_DATA(*result, double, pMemBucket->type, pPage->data); + return TSDB_CODE_SUCCESS; } } - return 0; + *result = 0.0; + return TSDB_CODE_SUCCESS; } int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { @@ -440,7 +440,7 @@ static double getIdenticalDataVal(tMemBucket *pMemBucket, int32_t slotIndex) { return finalResult; } -double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) { +int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction, double *result) { int32_t num = 0; for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { @@ -473,15 +473,15 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) ASSERT(minOfNextSlot > maxOfThisSlot); - double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; - return val; + *result = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; + return TSDB_CODE_SUCCESS; } if (pSlot->info.size <= pMemBucket->maxCapacity) { // data in buffer and file are merged together to be processed. SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); if (buffer == NULL) { - return -1; + return TSDB_CODE_NO_AVAIL_DISK; } int32_t currentIdx = count - num; @@ -492,13 +492,14 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) GET_TYPED_DATA(td, double, pMemBucket->type, thisVal); GET_TYPED_DATA(nd, double, pMemBucket->type, nextVal); - double val = (1 - fraction) * td + fraction * nd; + *result = (1 - fraction) * td + fraction * nd; taosMemoryFreeClear(buffer); - return val; + return TSDB_CODE_SUCCESS; } else { // incur a second round bucket split if (isIdenticalData(pMemBucket, i)) { - return getIdenticalDataVal(pMemBucket, i); + *result = getIdenticalDataVal(pMemBucket, i); + return TSDB_CODE_SUCCESS; } // try next round @@ -518,37 +519,37 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) int32_t *pageId = taosArrayGet(list, f); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); if (pg == NULL) { - return -1; + return TSDB_CODE_NO_AVAIL_DISK; } int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); if (code != TSDB_CODE_SUCCESS) { - return -1; + return code; } setBufPageDirty(pg, true); releaseBufPage(pMemBucket->pBuffer, pg); } - return getPercentileImpl(pMemBucket, count - num, fraction); + return getPercentileImpl(pMemBucket, count - num, fraction, result); } } else { num += pSlot->info.size; } } - return 0; + *result = 0; + return TSDB_CODE_SUCCESS; } -double getPercentile(tMemBucket *pMemBucket, double percent) { +int32_t getPercentile(tMemBucket *pMemBucket, double percent, double *result) { if (pMemBucket->total == 0) { - return 0.0; + *result = 0.0; + return TSDB_CODE_SUCCESS; } // if only one elements exists, return it if (pMemBucket->total == 1) { - if (findOnlyResult(pMemBucket) < 0) { - return -1; - } + return findOnlyResult(pMemBucket, result); } percent = fabs(percent); @@ -558,21 +559,21 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { MinMaxEntry *pRange = &pMemBucket->range; if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) { - double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->i64MaxVal : pRange->i64MinVal); - return v; + *result = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->i64MaxVal : pRange->i64MinVal); } else if (IS_UNSIGNED_NUMERIC_TYPE(pMemBucket->type)) { - double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->u64MaxVal : pRange->u64MinVal); - return v; + *result = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->u64MaxVal : pRange->u64MinVal); } else { - return fabs(percent - 100) < DBL_EPSILON ? pRange->dMaxVal : pRange->dMinVal; + *result = fabs(percent - 100) < DBL_EPSILON ? pRange->dMaxVal : pRange->dMinVal; } + + return TSDB_CODE_SUCCESS; } double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0); // do put data by using buckets int32_t orderIdx = (int32_t)percentVal; - return getPercentileImpl(pMemBucket, orderIdx, percentVal - orderIdx); + return getPercentileImpl(pMemBucket, orderIdx, percentVal - orderIdx, result); } /* diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index b3ca0fa452..b23a36d4df 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -20,6 +20,7 @@ #include "ttime.h" static SMonitor tsMonitor = {0}; +static char* tsMonUri = "/report"; void monRecordLog(int64_t ts, ELogLevel level, const char *content) { taosThreadMutexLock(&tsMonitor.lock); @@ -550,7 +551,7 @@ void monSendReport() { // uDebugL("report cont:%s\n", pCont); if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + if (taosSendHttpReport(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { uError("failed to send monitor msg"); } taosMemoryFree(pCont); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 55e8dc7b49..084d99cae5 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1334,6 +1334,7 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO } if (TSDB_CODE_SUCCESS == code) { + pSetOp->precision = pSetOperator->precision; *pLogicNode = (SLogicNode*)pSetOp; } else { nodesDestroyNode((SNode*)pSetOp); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 974a8f968e..5277e7818f 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -57,7 +57,6 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); -int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { @@ -82,7 +81,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); // on message int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5b9fb7c59c..91a05e78b0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1247,7 +1247,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { #if 0 if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId, @@ -1270,7 +1270,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { void syncNodePostClose(SSyncNode* pSyncNode) { if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId, @@ -1325,7 +1325,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver); @@ -1855,7 +1855,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // close receiver if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL && snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } // stop elect timer diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 86ea1f48cc..806949c81e 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -71,31 +71,23 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) { char file[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s.bak", realfile); - pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, realfile, terrstr()); - goto _OVER; - } - terrno = TSDB_CODE_OUT_OF_MEMORY; pJson = tjsonCreateObject(); if (pJson == NULL) goto _OVER; if (tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg) < 0) goto _OVER; - buffer = tjsonToString(pJson); if (buffer == NULL) goto _OVER; + terrno = 0; + + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) goto _OVER; int32_t len = strlen(buffer); if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; if (taosFsyncFile(pFile) < 0) goto _OVER; - taosCloseFile(&pFile); - if (taosRenameFile(file, realfile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - sError("vgId:%d, failed to rename sync cfg file:%s to %s since %s", pNode->vgId, file, realfile, terrstr()); - goto _OVER; - } + taosCloseFile(&pFile); + if (taosRenameFile(file, realfile) != 0) goto _OVER; code = 0; sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d", pNode->vgId, realfile, len); @@ -106,6 +98,7 @@ _OVER: if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { + if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, terrstr()); } return code; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index b68b735f46..defb7402f4 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -150,7 +150,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // when sender receive ack, call this function to send msg from seq // seq = ack + 1, already updated -int32_t snapshotSend(SSyncSnapshotSender *pSender) { +static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // free memory last time (current seq - 1) if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); @@ -342,23 +342,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } -// force stop -void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { - sRInfo(pReceiver, "snapshot receiver force stop, writer:%p"); - - // force close, abandon incomplete data - if (pReceiver->pWriter != NULL) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, - &pReceiver->snapshot); - if (ret != 0) { - sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr()); - } - pReceiver->pWriter = NULL; - } - - pReceiver->start = false; -} - static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { if (pReceiver->pWriter != NULL) { sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null"); @@ -590,7 +573,7 @@ _START_RECEIVER: if (snapshotReceiverIsStart(pReceiver)) { sRInfo(pReceiver, "snapshot receiver already start and force stop pre one"); - snapshotReceiverForceStop(pReceiver); + snapshotReceiverStop(pReceiver); } snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender @@ -842,7 +825,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { // force close, no response syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); - snapshotReceiverForceStop(pReceiver); + snapshotReceiverStop(pReceiver); } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); code = syncNodeOnSnapshotReceive(pSyncNode, pMsg); @@ -989,6 +972,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg); } + if (pSender->pReader == NULL || pSender->finish) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid"); + sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish); + terrno = pMsg->code; + goto _ERROR; + } + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index bf9a6c0051..5f964f6b1a 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -100,14 +100,7 @@ typedef void* queue[2]; #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 -#define TRANS_MAGIC_NUM 0x5f375a86 - -#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) - -#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 - -#define TRANS_MAGIC_NUM 0x5f375a86 - +#define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) typedef SRpcMsg STransMsg; diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 00854b5ee5..cd508f6fe9 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -35,6 +35,7 @@ typedef struct SHttpModule { typedef struct SHttpMsg { queue q; char* server; + char* uri; int32_t port; char* cont; int32_t len; @@ -63,26 +64,26 @@ static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(); -static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, +static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); -static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, +static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { if (flag == HTTP_FLAT) { return snprintf(pHead, headLen, - "POST /report HTTP/1.1\n" + "POST %s HTTP/1.1\n" "Host: %s\n" "Content-Type: application/json\n" "Content-Length: %d\n\n", - server, contLen); + uri, server, contLen); } else if (flag == HTTP_GZIP) { return snprintf(pHead, headLen, - "POST /report HTTP/1.1\n" + "POST %s HTTP/1.1\n" "Host: %s\n" "Content-Type: application/json\n" "Content-Encoding: gzip\n" "Content-Length: %d\n\n", - server, contLen); + uri, server, contLen); } else { terrno = TSDB_CODE_INVALID_CFG; return -1; @@ -181,6 +182,7 @@ static void httpDestroyMsg(SHttpMsg* msg) { if (msg == NULL) return; taosMemoryFree(msg->server); + taosMemoryFree(msg->uri); taosMemoryFree(msg->cont); taosMemoryFree(msg); } @@ -293,10 +295,11 @@ int32_t httpSendQuit() { return 0; } -static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, +static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); msg->server = strdup(server); + msg->uri = strdup(uri); msg->port = port; msg->cont = taosMemoryMalloc(contLen); memcpy(msg->cont, pCont, contLen); @@ -309,12 +312,10 @@ static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* p httpDestroyMsg(msg); tError("http-report already released"); return -1; - } else { - msg->http = load; - transAsyncSend(load->asyncPool, &(msg->q)); } - - return 0; + + msg->http = load; + return transAsyncSend(load->asyncPool, &(msg->q)); } static void httpDestroyClientCb(uv_handle_t* handle) { @@ -360,7 +361,7 @@ static void httpHandleReq(SHttpMsg* msg) { int32_t len = 2048; char* header = taosMemoryCalloc(1, len); - int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag); + int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag); if (headLen < 0) { taosMemoryFree(header); goto END; @@ -380,6 +381,7 @@ static void httpHandleReq(SHttpMsg* msg) { cli->port = msg->port; cli->dest = dest; + taosMemoryFree(msg->uri); taosMemoryFree(msg); uv_tcp_init(http->loop, &cli->tcp); @@ -406,9 +408,9 @@ END: httpDestroyMsg(msg); } -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { +int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { taosThreadOnce(&transHttpInit, transHttpEnvInit); - return taosSendHttpReportImpl(server, port, pCont, contLen, flag); + return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag); } static void transHttpEnvInit() { diff --git a/source/os/test/osTests.cpp b/source/os/test/osTests.cpp index f831f457f9..2e24bb0526 100644 --- a/source/os/test/osTests.cpp +++ b/source/os/test/osTests.cpp @@ -33,7 +33,7 @@ TEST(osTest, osSystem) { const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; int32_t dflag = 255; // tsLogEmbedded ? 255 : uDebugFlag - taosPrintTrace(flags, level, dflag); + taosPrintTrace(flags, level, dflag, 0); } void fileOperateOnFree(void *param) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 53d0cad5ea..d9cbde5714 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -18,6 +18,8 @@ #include "os.h" #include "tconfig.h" #include "tutil.h" +#include "tjson.h" +#include "tglobal.h" #define LOG_MAX_LINE_SIZE (1024) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) @@ -808,7 +810,7 @@ bool taosAssertDebug(bool condition, const char *file, int32_t line, const char taosPrintLogImp(1, 255, buffer, len); taosPrintLog(flags, level, dflag, "tAssert at file %s:%d exit:%d", file, line, tsAssert); - taosPrintTrace(flags, level, dflag); + taosPrintTrace(flags, level, dflag, -1); if (tsAssert) { // taosCloseLog(); @@ -824,6 +826,216 @@ bool taosAssertDebug(bool condition, const char *file, int32_t line, const char return true; } +int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { + SJson* pJson = tjsonCreateObject(); + if (pJson == NULL) return -1; + char tmp[4096] = {0}; + + tjsonAddDoubleToObject(pJson, "reportVersion", 1); + + tjsonAddIntegerToObject(pJson, "clusterId", clusterId); + tjsonAddIntegerToObject(pJson, "startTime", startTime); + + taosGetFqdn(tmp); + tjsonAddStringToObject(pJson, "fqdn", tmp); + + tjsonAddIntegerToObject(pJson, "pid", taosGetPId()); + + taosGetAppName(tmp, NULL); + tjsonAddStringToObject(pJson, "appName", tmp); + + if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) { + tjsonAddStringToObject(pJson, "os", tmp); + } + + float numOfCores = 0; + if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) { + tjsonAddStringToObject(pJson, "cpuModel", tmp); + tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores); + } else { + tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores); + } + + snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB); + tjsonAddStringToObject(pJson, "memory", tmp); + + tjsonAddStringToObject(pJson, "version", version); + tjsonAddStringToObject(pJson, "buildInfo", buildinfo); + tjsonAddStringToObject(pJson, "gitInfo", gitinfo); + + tjsonAddIntegerToObject(pJson, "crashSig", signum); + tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()); + +#ifdef _TD_DARWIN_64 + taosLogTraceToBuf(tmp, sizeof(tmp), 4); +#elif !defined(WINDOWS) + taosLogTraceToBuf(tmp, sizeof(tmp), 3); +#else + taosLogTraceToBuf(tmp, sizeof(tmp), 8); +#endif + + tjsonAddStringToObject(pJson, "stackInfo", tmp); + + char* pCont = tjsonToString(pJson); + tjsonDelete(pJson); + + *pMsg = pCont; + + return TSDB_CODE_SUCCESS; +} + + +void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, void *sigInfo) { + const char *flags = "UTL FATAL "; + ELogLevel level = DEBUG_FATAL; + int32_t dflag = 255; + char filepath[PATH_MAX] = {0}; + TdFilePtr pFile = NULL; + + if (pMsg && msgLen > 0) { + snprintf(filepath, sizeof(filepath), "%s%s.%sCrashLog", tsLogDir, TD_DIRSEP, nodeType); + + pFile = taosOpenFile(filepath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pFile == NULL) { + taosPrintLog(flags, level, dflag, "failed to open file:%s since %s", filepath, terrstr()); + goto _return; + } + + taosLockFile(pFile); + + int64_t writeSize = taosWriteFile(pFile, &msgLen, sizeof(msgLen)); + if (sizeof(msgLen) != writeSize) { + taosUnLockFile(pFile); + taosPrintLog(flags, level, dflag, "failed to write len to file:%s,%p wlen:%" PRId64 " tlen:%lu since %s", + filepath, pFile, writeSize, sizeof(msgLen), terrstr()); + goto _return; + } + + writeSize = taosWriteFile(pFile, pMsg, msgLen); + if (msgLen != writeSize) { + taosUnLockFile(pFile); + taosPrintLog(flags, level, dflag, "failed to write file:%s,%p wlen:%" PRId64 " tlen:%" PRId64 " since %s", + filepath, pFile, writeSize, msgLen, terrstr()); + goto _return; + } + + taosUnLockFile(pFile); + } + +_return: + + if (pFile) taosCloseFile(&pFile); + + terrno = TAOS_SYSTEM_ERROR(errno); + taosPrintLog(flags, level, dflag, "crash signal is %d", signum); + +#ifdef _TD_DARWIN_64 + taosPrintTrace(flags, level, dflag, 4); +#elif !defined(WINDOWS) + taosPrintLog(flags, level, dflag, "sender PID:%d cmdline:%s", ((siginfo_t *)sigInfo)->si_pid, + taosGetCmdlineByPID(((siginfo_t *)sigInfo)->si_pid)); + taosPrintTrace(flags, level, dflag, 3); +#else + taosPrintTrace(flags, level, dflag, 8); +#endif + + taosMemoryFree(pMsg); +} + +void taosReadCrashInfo(char* filepath, char** pMsg, int64_t* pMsgLen, TdFilePtr* pFd) { + const char *flags = "UTL FATAL "; + ELogLevel level = DEBUG_FATAL; + int32_t dflag = 255; + TdFilePtr pFile = NULL; + bool truncateFile = false; + char* buf = NULL; + + if (NULL == *pFd) { + int64_t filesize = 0; + if (taosStatFile(filepath, &filesize, NULL) < 0) { + if (ENOENT == errno) { + return; + } + + terrno = TAOS_SYSTEM_ERROR(errno); + taosPrintLog(flags, level, dflag, "failed to stat file:%s since %s", filepath, terrstr()); + return; + } + + if (filesize <= 0) { + return; + } + + pFile = taosOpenFile(filepath, TD_FILE_READ|TD_FILE_WRITE); + if (pFile == NULL) { + if (ENOENT == errno) { + return; + } + + terrno = TAOS_SYSTEM_ERROR(errno); + taosPrintLog(flags, level, dflag, "failed to open file:%s since %s", filepath, terrstr()); + return; + } + + taosLockFile(pFile); + } else { + pFile = *pFd; + } + + int64_t msgLen = 0; + int64_t readSize = taosReadFile(pFile, &msgLen, sizeof(msgLen)); + if (sizeof(msgLen) != readSize) { + truncateFile = true; + if (readSize < 0) { + taosPrintLog(flags, level, dflag, "failed to read len from file:%s,%p wlen:%" PRId64 " tlen:%lu since %s", + filepath, pFile, readSize, sizeof(msgLen), terrstr()); + } + goto _return; + } + + buf = taosMemoryMalloc(msgLen); + if (NULL == buf) { + taosPrintLog(flags, level, dflag, "failed to malloc buf, size:%" PRId64, msgLen); + goto _return; + } + + readSize = taosReadFile(pFile, buf, msgLen); + if (msgLen != readSize) { + truncateFile = true; + taosPrintLog(flags, level, dflag, "failed to read file:%s,%p wlen:%" PRId64 " tlen:%" PRId64 " since %s", + filepath, pFile, readSize, msgLen, terrstr()); + goto _return; + } + + *pMsg = buf; + *pMsgLen = msgLen; + *pFd = pFile; + + return; + +_return: + + if (truncateFile) { + taosFtruncateFile(pFile, 0); + } + taosUnLockFile(pFile); + taosCloseFile(&pFile); + taosMemoryFree(buf); + + *pMsg = NULL; + *pMsgLen = 0; + *pFd = NULL; +} + +void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile) { + if (truncateFile) { + taosFtruncateFile(pFile, 0); + } + + taosUnLockFile(pFile); + taosCloseFile(&pFile); +} + #ifdef NDEBUG bool taosAssertRelease(bool condition) { if (condition) return false; @@ -842,4 +1054,4 @@ bool taosAssertRelease(bool condition) { return true; } -#endif \ No newline at end of file +#endif