diff --git a/docs/zh/10-third-party/01-collection/12-flink.md b/docs/zh/10-third-party/01-collection/12-flink.md index e589e36c9a..e085d2fd53 100644 --- a/docs/zh/10-third-party/01-collection/12-flink.md +++ b/docs/zh/10-third-party/01-collection/12-flink.md @@ -13,7 +13,7 @@ Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批 ## 前置条件 准备以下环境: -- TDengine 集群已部署并正常运行(企业及社区版均可) +- TDengine 服务已部署并正常运行(企业及社区版均可) - taosAdapter 能够正常运行。详细参考 [taosAdapter 使用手册](../../../reference/components/taosadapter) - Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 [官方文档](https://flink.apache.org/) diff --git a/include/util/tlog.h b/include/util/tlog.h index d0e42e3660..f573d61e73 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -119,6 +119,11 @@ void taosLogCrashInfo(char *nodeType, char *pMsg, int64_t msgLen, int signum, vo void taosReadCrashInfo(char *filepath, char **pMsg, int64_t *pMsgLen, TdFilePtr *pFd); void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile); +int32_t initCrashLogWriter(); +void checkAndPrepareCrashInfo(); +bool reportThreadSetQuit(); +void writeCrashLogToFile(int signum, void *sigInfo, char *nodeType, int64_t clusterId, int64_t startTime); + // clang-format off #define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", DEBUG_FATAL, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index c3f459ca9c..5bbfd2a0de 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -90,7 +90,7 @@ fi kill_service_of() { _service=$1 - pid=$(ps -ef | grep $_service | grep -v grep | grep -v $uninstallScript | awk '{print $2}') + pid=$(ps -C $_service | grep -v $uninstallScript | awk '{print $2}') if [ -n "$pid" ]; then ${csudo}kill -9 $pid || : fi diff --git a/packaging/tools/remove_client.sh b/packaging/tools/remove_client.sh index 31b1053a42..7798bbf16a 100755 --- a/packaging/tools/remove_client.sh +++ b/packaging/tools/remove_client.sh @@ -40,7 +40,7 @@ if command -v sudo > /dev/null; then fi function kill_client() { - pid=$(ps -ef | grep ${clientName2} | grep -v grep | grep -v $uninstallScript2 | awk '{print $2}') + pid=$(ps -C ${clientName2} | grep -v $uninstallScript2 | awk '{print $2}') if [ -n "$pid" ]; then ${csudo}kill -9 $pid || : fi diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index df93920303..b69585a356 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -43,7 +43,7 @@ #endif #ifndef CUS_PROMPT -#define CUS_PROMPT "tao" +#define CUS_PROMPT "taos" #endif #define TSC_VAR_NOT_RELEASE 1 @@ -831,9 +831,17 @@ static void *tscCrashReportThreadFp(void *param) { return NULL; } + code = initCrashLogWriter(); + if (code) { + tscError("failed to init crash log writer, code:%s", tstrerror(code)); + return NULL; + } + while (1) { - if (clientStop > 0) break; + checkAndPrepareCrashInfo(); + if (clientStop > 0 && reportThreadSetQuit()) break; if (loopTimes++ < reportPeriodNum) { + if (loopTimes < 0) loopTimes = reportPeriodNum; taosMsleep(sleepTime); continue; } @@ -921,21 +929,7 @@ void tscStopCrashReport() { } void tscWriteCrashInfo(int signum, void *sigInfo, void *context) { - char *pMsg = NULL; - const char *flags = "UTL FATAL "; - ELogLevel level = DEBUG_FATAL; - int32_t dflag = 255; - int64_t msgLen = -1; - - if (tsEnableCrashReport) { - if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) { - taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); - } else { - msgLen = strlen(pMsg); - } - } - - taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo); + writeCrashLogToFile(signum, sigInfo, CUS_PROMPT, lastClusterId, appInfo.startTime); } void taos_init_imp(void) { @@ -969,7 +963,7 @@ void taos_init_imp(void) { } taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); - const char *logName = CUS_PROMPT "slog"; + const char *logName = CUS_PROMPT "log"; ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output"); if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { (void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(errno), configDir); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d556158332..f4426fc94a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2606,6 +2606,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { } code = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); + tmqClearUnhandleMsg(tmq); if(code != 0){ goto END; } diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 144a1542cb..a966513629 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -231,6 +231,7 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "crashSig", signum), NULL, _exit); TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()), NULL, _exit); +#if 0 #ifdef _TD_DARWIN_64 taosLogTraceToBuf(tmp, sizeof(tmp), 4); #elif !defined(WINDOWS) @@ -240,7 +241,7 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t #endif TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "stackInfo", tmp), NULL, _exit); - +#endif char* pCont = tjsonToString(pJson); if (pCont == NULL) { code = terrno; diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index ddaf1d3c13..b5eeb78b5e 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -131,25 +131,7 @@ void dmLogCrash(int signum, void *sigInfo, void *context) { if (taosIgnSignal(SIGSEGV) != 0) { dWarn("failed to ignore signal SIGABRT"); } - - char *pMsg = NULL; - const char *flags = "UTL FATAL "; - ELogLevel level = DEBUG_FATAL; - int32_t dflag = 255; - int64_t msgLen = -1; - - if (tsEnableCrashReport) { - if (taosGenCrashJsonMsg(signum, &pMsg, dmGetClusterId(), global.startTime)) { - taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); - goto _return; - } else { - msgLen = strlen(pMsg); - } - } - -_return: - - taosLogCrashInfo(CUS_PROMPT "d", pMsg, msgLen, signum, sigInfo); + writeCrashLogToFile(signum, sigInfo, CUS_PROMPT "d", dmGetClusterId(), global.startTime); #ifdef _TD_DARWIN_64 exit(signum); @@ -177,6 +159,15 @@ static void dmSetSignalHandle() { if (taosSetSignal(SIGBREAK, dmStopDnode) != 0) { dWarn("failed to set signal SIGUSR1"); } + if (taosSetSignal(SIGABRT, dmLogCrash) != 0) { + dWarn("failed to set signal SIGUSR1"); + } + if (taosSetSignal(SIGFPE, dmLogCrash) != 0) { + dWarn("failed to set signal SIGUSR1"); + } + if (taosSetSignal(SIGSEGV, dmLogCrash) != 0) { + dWarn("failed to set signal SIGUSR1"); + } #ifndef WINDOWS if (taosSetSignal(SIGTSTP, dmStopDnode) != 0) { dWarn("failed to set signal SIGUSR1"); @@ -184,6 +175,9 @@ static void dmSetSignalHandle() { if (taosSetSignal(SIGQUIT, dmStopDnode) != 0) { dWarn("failed to set signal SIGUSR1"); } + if (taosSetSignal(SIGBUS, dmLogCrash) != 0) { + dWarn("failed to set signal SIGUSR1"); + } #endif } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index ef4e76031d..b2cb8e2f2e 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -274,14 +274,22 @@ static void *dmCrashReportThreadFp(void *param) { dError("failed to init telemetry since %s", tstrerror(code)); return NULL; } + code = initCrashLogWriter(); + if (code != 0) { + dError("failed to init crash log writer since %s", tstrerror(code)); + return NULL; + } while (1) { - if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + checkAndPrepareCrashInfo(); + if ((pMgmt->pData->dropped || pMgmt->pData->stopped) && reportThreadSetQuit()) { + break; + } if (loopTimes++ < reportPeriodNum) { taosMsleep(sleepTime); + if(loopTimes < 0) loopTimes = reportPeriodNum; continue; } - taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile); if (pMsg && msgLen > 0) { if (taosSendTelemReport(&mgt, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index b32274e0a7..9bed10ce99 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -336,12 +336,12 @@ typedef struct { }; } SConfigObj; -int32_t tEncodeSConfigObj(SEncoder* pEncoder, const SConfigObj* pObj); -int32_t tDecodeSConfigObj(SDecoder* pDecoder, SConfigObj* pObj); -SConfigObj* mndInitConfigObj(SConfigItem* pItem); -SConfigObj* mndInitConfigVersion(); -int32_t mndUpdateObj(SConfigObj* pObj, const char* name, char* value); -void tFreeSConfigObj(SConfigObj* obj); +int32_t tEncodeSConfigObj(SEncoder* pEncoder, const SConfigObj* pObj); +int32_t tDecodeSConfigObj(SDecoder* pDecoder, SConfigObj* pObj); +int32_t mndInitConfigObj(SConfigItem* pItem, SConfigObj* pObj); +SConfigObj mndInitConfigVersion(); +int32_t mndUpdateObj(SConfigObj* pObj, const char* name, char* value); +void tFreeSConfigObj(SConfigObj* obj); typedef struct { int32_t maxUsers; diff --git a/source/dnode/mnode/impl/src/mndConfig.c b/source/dnode/mnode/impl/src/mndConfig.c index 0729b2a26e..0d4265f8e7 100644 --- a/source/dnode/mnode/impl/src/mndConfig.c +++ b/source/dnode/mnode/impl/src/mndConfig.c @@ -308,32 +308,27 @@ int32_t mndInitWriteCfg(SMnode *pMnode) { } // encode mnd config version - SConfigObj *versionObj = mndInitConfigVersion(); - if ((code = mndSetCreateConfigCommitLogs(pTrans, versionObj)) != 0) { + SConfigObj versionObj = mndInitConfigVersion(); + if ((code = mndSetCreateConfigCommitLogs(pTrans, &versionObj)) != 0) { mError("failed to init mnd config version, since %s", tstrerror(code)); - tFreeSConfigObj(versionObj); - taosMemoryFree(versionObj); + tFreeSConfigObj(&versionObj); goto _OVER; } - tFreeSConfigObj(versionObj); - taosMemoryFree(versionObj); + tFreeSConfigObj(&versionObj); sz = taosArrayGetSize(taosGetGlobalCfg(tsCfg)); for (int i = 0; i < sz; ++i) { SConfigItem *item = taosArrayGet(taosGetGlobalCfg(tsCfg), i); - SConfigObj *obj = mndInitConfigObj(item); - if (obj == NULL) { - code = terrno; + SConfigObj obj; + if ((code = mndInitConfigObj(item, &obj)) != 0) { goto _OVER; } - if ((code = mndSetCreateConfigCommitLogs(pTrans, obj)) != 0) { + if ((code = mndSetCreateConfigCommitLogs(pTrans, &obj)) != 0) { mError("failed to init mnd config:%s, since %s", item->name, tstrerror(code)); - tFreeSConfigObj(obj); - taosMemoryFree(obj); + tFreeSConfigObj(&obj); goto _OVER; } - tFreeSConfigObj(obj); - taosMemoryFree(obj); + tFreeSConfigObj(&obj); } if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; @@ -372,11 +367,11 @@ static int32_t mndTryRebuildConfigSdb(SRpcMsg *pReq) { if (!mndIsLeader(pMnode)) { return TSDB_CODE_SUCCESS; } - int32_t code = 0; - int32_t sz = -1; - STrans *pTrans = NULL; - SAcctObj *vObj = NULL, *obj = NULL; - SArray *addArray = NULL; + int32_t code = 0; + int32_t sz = -1; + STrans *pTrans = NULL; + SConfigObj *vObj = NULL; + SArray *addArray = NULL; vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion"); if (vObj == NULL) { @@ -387,14 +382,12 @@ static int32_t mndTryRebuildConfigSdb(SRpcMsg *pReq) { addArray = taosArrayInit(4, sizeof(SConfigObj)); for (int i = 0; i < sz; ++i) { SConfigItem *item = taosArrayGet(taosGetGlobalCfg(tsCfg), i); - obj = sdbAcquire(pMnode->pSdb, SDB_CFG, item->name); + SConfigObj *obj = sdbAcquire(pMnode->pSdb, SDB_CFG, item->name); if (obj == NULL) { - SConfigObj *newObj = mndInitConfigObj(item); - if (newObj == NULL) { - code = terrno; - goto _exit; - } - if (NULL == taosArrayPush(addArray, newObj)) { + mInfo("config:%s, not exist in sdb, try to add it", item->name); + SConfigObj newObj; + if ((code = mndInitConfigObj(item, &newObj)) != 0) goto _exit; + if (NULL == taosArrayPush(addArray, &newObj)) { code = terrno; goto _exit; } @@ -422,7 +415,6 @@ _exit: mError("failed to try rebuild config in sdb, since %s", tstrerror(code)); } sdbRelease(pMnode->pSdb, vObj); - sdbRelease(pMnode->pSdb, obj); cfgObjArrayCleanUp(addArray); mndTransDrop(pTrans); TAOS_RETURN(code); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 92ad4eb5b8..a6602b392b 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -730,11 +730,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { return (void *)buf; } -SConfigObj *mndInitConfigObj(SConfigItem *pItem) { - SConfigObj *pObj = taosMemoryCalloc(1, sizeof(SConfigObj)); - if (pObj == NULL) { - return NULL; - } +int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) { tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN); pObj->dtype = pItem->dtype; switch (pItem->dtype) { @@ -761,11 +757,11 @@ SConfigObj *mndInitConfigObj(SConfigItem *pItem) { pObj->str = taosStrdup(pItem->str); if (pObj->str == NULL) { taosMemoryFree(pObj); - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } break; } - return pObj; + return TSDB_CODE_SUCCESS; } int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) { @@ -822,15 +818,14 @@ int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) { return code; } -SConfigObj *mndInitConfigVersion() { - SConfigObj *pObj = taosMemoryCalloc(1, sizeof(SConfigObj)); - if (pObj == NULL) { - return NULL; - } - tstrncpy(pObj->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN); - pObj->dtype = CFG_DTYPE_INT32; - pObj->i32 = 0; - return pObj; +SConfigObj mndInitConfigVersion() { + SConfigObj obj; + memset(&obj, 0, sizeof(SConfigObj)); + + tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN); + obj.dtype = CFG_DTYPE_INT32; + obj.i32 = 0; + return obj; } int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 90753ae7e8..4f5ca8d789 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -21,6 +21,7 @@ #include "tjson.h" #include "ttime.h" #include "tutil.h" +#include "tcommon.h" #define LOG_MAX_LINE_SIZE (10024) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) @@ -1264,6 +1265,8 @@ _return: taosPrintLog(flags, level, dflag, "crash signal is %d", signum); +// print the stack trace +#if 0 #ifdef _TD_DARWIN_64 taosPrintTrace(flags, level, dflag, 4); #elif !defined(WINDOWS) @@ -1273,10 +1276,109 @@ _return: #else taosPrintTrace(flags, level, dflag, 8); #endif - +#endif taosMemoryFree(pMsg); } +typedef enum { + CRASH_LOG_WRITER_UNKNOWN = 0, + CRASH_LOG_WRITER_INIT = 1, + CRASH_LOG_WRITER_WAIT, + CRASH_LOG_WRITER_RUNNING, + CRASH_LOG_WRITER_QUIT +} CrashStatus; +typedef struct crashBasicInfo { + int8_t status; + int64_t clusterId; + int64_t startTime; + char *nodeType; + int signum; + void *sigInfo; + tsem_t sem; + int64_t reportThread; +} crashBasicInfo; + +crashBasicInfo gCrashBasicInfo = {0}; + +void setCrashWriterStatus(int8_t status) { atomic_store_8(&gCrashBasicInfo.status, status); } +bool reportThreadSetQuit() { + CrashStatus status = + atomic_val_compare_exchange_8(&gCrashBasicInfo.status, CRASH_LOG_WRITER_INIT, CRASH_LOG_WRITER_QUIT); + if (status == CRASH_LOG_WRITER_INIT) { + return true; + } else { + return false; + } +} + +bool setReportThreadWait() { + CrashStatus status = + atomic_val_compare_exchange_8(&gCrashBasicInfo.status, CRASH_LOG_WRITER_INIT, CRASH_LOG_WRITER_WAIT); + if (status == CRASH_LOG_WRITER_INIT) { + return true; + } else { + return false; + } +} +bool setReportThreadRunning() { + CrashStatus status = + atomic_val_compare_exchange_8(&gCrashBasicInfo.status, CRASH_LOG_WRITER_WAIT, CRASH_LOG_WRITER_RUNNING); + if (status == CRASH_LOG_WRITER_WAIT) { + return true; + } else { + return false; + } +} +static void checkWriteCrashLogToFileInNewThead() { + if (setReportThreadRunning()) { + char *pMsg = NULL; + const char *flags = "UTL FATAL "; + ELogLevel level = DEBUG_FATAL; + int32_t dflag = 255; + int64_t msgLen = -1; + + if (tsEnableCrashReport) { + if (taosGenCrashJsonMsg(gCrashBasicInfo.signum, &pMsg, gCrashBasicInfo.clusterId, gCrashBasicInfo.startTime)) { + taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); + } else { + msgLen = strlen(pMsg); + } + } + taosLogCrashInfo(gCrashBasicInfo.nodeType, pMsg, msgLen, gCrashBasicInfo.signum, gCrashBasicInfo.sigInfo); + setCrashWriterStatus(CRASH_LOG_WRITER_INIT); + tsem_post(&gCrashBasicInfo.sem); + } +} + +void checkAndPrepareCrashInfo() { + return checkWriteCrashLogToFileInNewThead(); +} + +int32_t initCrashLogWriter() { + int32_t code = tsem_init(&gCrashBasicInfo.sem, 0, 0); + if (code != 0) { + uError("failed to init sem for crashLogWriter, code:%d", code); + return code; + } + gCrashBasicInfo.reportThread = taosGetSelfPthreadId(); + setCrashWriterStatus(CRASH_LOG_WRITER_INIT); + return code; +} + +void writeCrashLogToFile(int signum, void *sigInfo, char *nodeType, int64_t clusterId, int64_t startTime) { + if (gCrashBasicInfo.reportThread == taosGetSelfPthreadId()) { + return; + } + if (setReportThreadWait()) { + gCrashBasicInfo.clusterId = clusterId; + gCrashBasicInfo.startTime = startTime; + gCrashBasicInfo.nodeType = nodeType; + gCrashBasicInfo.signum = signum; + gCrashBasicInfo.sigInfo = sigInfo; + tsem_wait(&gCrashBasicInfo.sem); + } +} + void taosReadCrashInfo(char *filepath, char **pMsg, int64_t *pMsgLen, TdFilePtr *pFd) { const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 6a78a051ab..d10e8e8ced 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -450,6 +450,11 @@ class TDTestCase: tdsql.checkData(0,2,180) tdsql.checkData(0,3,0.53) + # check alter config + tdsql.execute('alter all dnodes "debugFlag 131"') + tdsql.execute('alter dnode 1 "debugFlag 143"') + tdsql.execute('alter local "debugFlag 131"') + # check tmq conn = taos.connect() diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index fc6ba0f7d8..1f6f8fe3df 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -49,14 +49,12 @@ int main(int argc, char *argv[]) { shell.args.local = false; #endif -#if 0 #if !defined(WINDOWS) taosSetSignal(SIGBUS, shellCrashHandler); #endif taosSetSignal(SIGABRT, shellCrashHandler); taosSetSignal(SIGFPE, shellCrashHandler); taosSetSignal(SIGSEGV, shellCrashHandler); -#endif if (shellCheckIntSize() != 0) { return -1;