diff --git a/README-CN.md b/README-CN.md index 437d671bb9..a5d239a532 100644 --- a/README-CN.md +++ b/README-CN.md @@ -60,7 +60,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev 为了在 Ubuntu/Debian 系统上编译 [taos-tools](https://github.com/taosdata/taos-tools) 需要安装如下软件: ```bash -sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config +sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config ``` ### CentOS 7.9 @@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel ``` -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` #### CentOS 8/Rocky Linux @@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco sudo yum install -y epel-release sudo yum install -y dnf-plugins-core sudo yum config-manager --set-enabled powertools -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` 注意:由于 snappy 缺乏 pkg-config 支持(参考 [链接](https://github.com/google/snappy/pull/86)),会导致 cmake 提示无法发现 libsnappy,实际上工作正常。 diff --git a/README.md b/README.md index 6aec756ec7..05c1c075f0 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev To build the [taosTools](https://github.com/taosdata/taos-tools) on Ubuntu/Debian, the following packages need to be installed. ```bash -sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config +sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config ``` ### CentOS 7.9 @@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel #### CentOS 7.9 ``` -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` #### CentOS 8/Rocky Linux @@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco sudo yum install -y epel-release sudo yum install -y dnf-plugins-core sudo yum config-manager --set-enabled powertools -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` Note: Since snappy lacks pkg-config support (refer to [link](https://github.com/google/snappy/pull/86)), it leads a cmake prompt libsnappy not found. But snappy still works well. diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 2d7bcf6592..4abd9e86a6 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG e62c5ea + GIT_TAG ac69142 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 470b7bba7e..bb1addf1b6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,6 +70,11 @@ static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL); } + +static inline bool syncUtilUserCommit(tmsg_t msgType) { + return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; +} + /* ------------------------ OTHER DEFINITIONS ------------------------ */ // IE type #define TSDB_IE_TYPE_SEC 1 diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index f94e0c98dc..412054b13e 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -234,7 +234,6 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, - QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, @@ -265,7 +264,8 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, QUERY_NODE_PHYSICAL_PLAN_DELETE, QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN + QUERY_NODE_PHYSICAL_PLAN, + QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN } ENodeType; /** diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 900bc88c41..4cf4800472 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -47,6 +47,7 @@ extern "C" { #define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 +#define SYNC_SNAP_RESEND_MS 1000 * 60 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/include/os/os.h b/include/os/os.h index 0688eeb9ad..b27fa84406 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -27,6 +27,7 @@ extern "C" { #if !defined(WINDOWS) #include +#include #include #include #include diff --git a/include/os/osDef.h b/include/os/osDef.h index 0bf9c6184e..c18728c9a7 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -120,12 +120,6 @@ void syslog(int unused, const char *format, ...); #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) #define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2)) -#ifndef NDEBUG -#define ASSERT(x) assert(x) -#else -#define ASSERT(x) -#endif - #ifndef UNUSED #define UNUSED(x) ((void)(x)) #endif diff --git a/include/os/osString.h b/include/os/osString.h index 4c3fbe46c3..b609c9d351 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -62,7 +62,7 @@ typedef int32_t TdUcs4; int32_t taosUcs4len(TdUcs4 *ucs4); int64_t taosStr2int64(const char *str); -void taosConvInit(void); +int32_t taosConvInit(void); void taosConvDestroy(); int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs); bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len); diff --git a/include/os/osSystem.h b/include/os/osSystem.h index eca984c41a..31ec513fca 100644 --- a/include/os/osSystem.h +++ b/include/os/osSystem.h @@ -46,6 +46,26 @@ void taosSetTerminalMode(); int32_t taosGetOldTerminalMode(); void taosResetTerminalMode(); +#if !defined(WINDOWS) +#define taosPrintTrace(flags, level, dflag) \ + { \ + void* array[100]; \ + int32_t size = backtrace(array, 100); \ + char** strings = backtrace_symbols(array, size); \ + if (strings != NULL) { \ + taosPrintLog(flags, level, dflag, "obtained %d stack frames", size); \ + for (int32_t i = 0; i < size; i++) { \ + taosPrintLog(flags, level, dflag, "frame:%d, %s", i, strings[i]); \ + } \ + } \ + \ + taosMemoryFree(strings); \ + } +#else +#define taosPrintTrace(flags, level, dflag) \ + {} +#endif + #ifdef __cplusplus } #endif diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 38eb0d8fc6..b1bd09e123 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -17,6 +17,7 @@ #define _TD_UTIL_CODING_H_ #include "os.h" +#include "tlog.h" #ifdef __cplusplus extern "C" { diff --git a/include/util/tlog.h b/include/util/tlog.h index 68b004cda7..b6a389b6d9 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -38,6 +38,7 @@ typedef void (*LogFp)(int64_t ts, ELogLevel level, const char *content); extern bool tsLogEmbedded; extern bool tsAsyncLog; +extern bool tsAssert; extern int32_t tsNumOfLogLines; extern int32_t tsLogKeepDays; extern LogFp tsLogFp; @@ -82,6 +83,10 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons #endif ; +bool taosAssertLog(bool condition, const char *file, int32_t line, const char *format, ...); +#define ASSERTS(condition, ...) taosAssertLog(condition, __FILE__, __LINE__, __VA_ARGS__) +#define ASSERT(condition) ASSERTS(condition, "assert info not provided") + // clang-format off #define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", DEBUG_FATAL, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 7564d91330..c694a6c1a0 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -407,7 +407,9 @@ void taos_init_imp(void) { initQueryModuleMsgHandle(); - taosConvInit(); + if (taosConvInit() != 0) { + ASSERTS(0, "failed to init conv"); + } rpcInit(); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9f3c78aba2..f2dec7217f 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -796,9 +796,10 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); - qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId); + qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { + pWrapper->pCatalogReq->forceUpdate = false; code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d1a3f38143..7e9b28939b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -333,6 +333,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddTimezone(pCfg, "timezone", tsTimezoneStr) != 0) return -1; if (cfgAddLocale(pCfg, "locale", tsLocale) != 0) return -1; if (cfgAddCharset(pCfg, "charset", tsCharset) != 0) return -1; + if (cfgAddBool(pCfg, "assert", 1, 1) != 0) return -1; if (cfgAddBool(pCfg, "enableCoreFile", 1, 1) != 0) return -1; if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, 1) != 0) return -1; @@ -693,6 +694,8 @@ static void taosSetSystemCfg(SConfig *pCfg) { bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval; taosSetCoreDump(enableCore); + tsAssert = cfgGetItem(pCfg, "assert")->bval; + // todo tsVersion = 30000000; } @@ -788,6 +791,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { case 'a': { if (strcasecmp("asyncLog", name) == 0) { tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval; + } else if (strcasecmp("assert", name) == 0) { + tsAssert = cfgGetItem(pCfg, "assert")->bval; } break; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 00929a1836..95625e8d93 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -28,6 +28,8 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" +#include "tlog.h" + int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; diff --git a/source/common/src/trow.c b/source/common/src/trow.c index 52ebd7f879..ca2c056743 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "trow.h" +#include "tlog.h" const uint8_t tdVTypeByte[2][3] = {{ // 2 bits diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index a106a09a69..ec05ef4c44 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -23,6 +23,8 @@ #define _DEFAULT_SOURCE #include "ttime.h" +#include "tlog.h" + /* * mktime64 - Converts date to seconds. * Converts Gregorian date to seconds since 1970-01-01 00:00:00. diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 188677656a..6963803df0 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -201,7 +201,12 @@ int mainWindows(int argc, char **argv) { return -1; } - taosConvInit(); + if (taosConvInit() != 0) { + dError("failed to init conv"); + taosCloseLog(); + taosCleanupArgs(); + return -1; + } if (global.dumpConfig) { dmDumpCfg(); diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 2d2a121795..2e66aeae37 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -139,7 +139,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SSnode *pSnode = pMgmt->pSnode; if (pSnode == NULL) { - dError("snode: msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pMsg, terrstr(), + dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(), TMSG_INFO(pMsg->msgType), qtype); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); @@ -161,7 +161,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { smPutNodeMsgToWriteQueue(pMgmt, pMsg); break; default: - ASSERT(0); + ASSERTS(0, "msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(), + TMSG_INFO(pMsg->msgType), qtype); } return 0; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 2383a09343..99f8bd002a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -301,6 +301,7 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.parent = pDnode; + rpcInit.compressSize = tsCompressMsgSize; pTrans->serverRpc = rpcOpen(&rpcInit); if (pTrans->serverRpc == NULL) { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 895d7fedc4..7cba69ed50 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -119,7 +119,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta } int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - int32_t code = mndProcessWriteMsg(pFsm, pMsg, pMeta); + int32_t code = 0; + if (!syncUtilUserCommit(pMsg->msgType)) { + goto _out; + } + code = mndProcessWriteMsg(pFsm, pMsg, pMeta); + +_out: rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return code; diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 6a704d0425..0ed68dce95 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -454,7 +454,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { - memcpy(pBuf + sizeof(suid), pNode->data, keyLen); + memcpy(&pBuf[1], pNode->data, keyLen); // check whether it is existed in LRU cache, and remove it from linked list if not. LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 906e3b2638..8ec59ea959 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -53,6 +53,7 @@ typedef struct { // -------------- TSKEY nextKey; // reset by each table commit int32_t commitFid; + int32_t expLevel; TSKEY minKey; TSKEY maxKey; // commit file data @@ -503,6 +504,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { // memory pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); + pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, &pCommitter->maxKey); #if 0 @@ -556,7 +558,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { } } else { SDiskID did = {0}; - tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); + if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); wSet.diskId = did; wSet.nSttF = 1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d8c8a3e1b2..0fc42f3744 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -190,9 +190,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp version); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); + ASSERT(pVnode->state.applied + 1 == version); + pVnode->state.applied = version; pVnode->state.applyTerm = pMsg->info.conn.applyTerm; + if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; + // skip header pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index ba99906589..c435c46d46 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -547,6 +547,14 @@ typedef struct SCtgOperation { #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__) +#define ctgTaskFatal(param, ...) qFatal("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskError(param, ...) qError("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskWarn(param, ...) qWarn("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskInfo(param, ...) qInfo("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskDebug(param, ...) qDebug("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskTrace(param, ...) qTrace("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) + + #define CTG_LOCK_DEBUG(...) \ do { \ if (gCTGDebug.lockEnable) { \ diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 7138e0ce16..5b01ac1fb9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1094,6 +1094,9 @@ _return: ctgReleaseVgInfoToCache(pCtg, dbCache); } + if (code) { + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code)); + } if (pTask->res || code) { ctgHandleTaskEnd(pTask, code); } @@ -1124,7 +1127,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu SVgroupInfo vgInfo = {0}; CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo)); - ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); + ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); *vgId = vgInfo.vgId; CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); @@ -1144,7 +1147,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu SVgroupInfo vgInfo = {0}; CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); - ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); + ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); *vgId = vgInfo.vgId; CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); @@ -1162,7 +1165,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu return TSDB_CODE_SUCCESS; } - ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); + ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); ctgRemoveTbMetaFromCache(pCtg, pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); @@ -1180,7 +1183,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; if (CTG_IS_META_NULL(pOut->metaType)) { - ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); + ctgTaskError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); ctgRemoveTbMetaFromCache(pCtg, pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -1190,7 +1193,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu } if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) { - ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); + ctgTaskDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); taosMemoryFreeClear(pOut->tbMeta); @@ -1207,11 +1210,11 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu STableMeta* stbMeta = NULL; (void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta); if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) { - ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName)); + ctgTaskDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName)); exist = 1; taosMemoryFreeClear(stbMeta); } else { - ctgDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName)); + ctgTaskDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName)); taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(stbMeta); } @@ -1225,7 +1228,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu break; } default: - ctgError("invalid reqType %d", reqType); + ctgTaskError("invalid reqType %d", reqType); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); } @@ -1280,6 +1283,7 @@ _return: TSWAP(pTask->res, ctx->pResList); taskDone = true; } + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code)); } if (pTask->res && taskDone) { diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 2374d80bbf..072a96fa83 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -95,6 +95,8 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo // TODO: optimize to ignore null values for linear interpolation. if (!pLinearInfo->isStartSet) { if (!colDataIsNull_s(pColInfoData, rowIndex)) { + ASSERT(IS_MATHABLE_TYPE(pColInfoData->info.type)); + pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); } diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 4204692514..d97f81c994 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -17,6 +17,7 @@ #include "taoserror.h" #include "tdef.h" #include "tpagedbuf.h" +#include "tlog.h" #define LHASH_CAP_RATIO 0.85 diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index cf8e239190..9c3e1f5604 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -22,6 +22,7 @@ #include "tpagedbuf.h" #include "tpercentile.h" #include "ttypes.h" +#include "tlog.h" #define DEFAULT_NUM_OF_SLOT 1024 diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index df7a3953f2..59545d8fbc 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -44,6 +44,7 @@ typedef struct SInsertParseContext { SParsedDataColInfo tags; // for stmt bool missCache; bool usingDuplicateTable; + bool forceUpdate; } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); @@ -829,6 +830,11 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo } static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { + if (pCxt->forceUpdate) { + pCxt->missCache = true; + return TSDB_CODE_SUCCESS; + } + int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); @@ -844,6 +850,11 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpSt } static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { + if (pCxt->forceUpdate) { + pCxt->missCache = true; + return TSDB_CODE_SUCCESS; + } + int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache); @@ -1909,6 +1920,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, .missCache = false, .usingDuplicateTable = false, + .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false) }; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 1f9675a3cd..ee83636192 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -44,6 +44,7 @@ typedef struct SSyncSnapshotSender { SyncTerm term; int64_t startTime; int64_t endTime; + int64_t lastSendTime; bool finish; // init when create diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index f198f3809d..be14ef91a4 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -79,7 +79,6 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len); void syncUtilMsgHtoN(void* msg); void syncUtilMsgNtoH(void* msg); bool syncUtilUserPreCommit(tmsg_t msgType); -bool syncUtilUserCommit(tmsg_t msgType); bool syncUtilUserRollback(tmsg_t msgType); void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 07b1101256..5fdcbeb91c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -84,7 +84,7 @@ void syncOneReplicaAdvance(SSyncNode* pSyncNode) { } void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { - ASSERT(false && "deprecated"); + ASSERTS(false, "deprecated"); if (pSyncNode == NULL) { sError("pSyncNode is NULL"); return; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 59e5e968e7..6ed9fa66ea 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -791,9 +791,9 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { } int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { - ASSERT(pNode->pLogStore != NULL && "log store not created"); - ASSERT(pNode->pFsm != NULL && "pFsm not registered"); - ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + ASSERTS(pNode->pLogStore != NULL, "log store not created"); + ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); + ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); SSnapshot snapshot; if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); @@ -1144,8 +1144,8 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { } int32_t syncNodeRestore(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->pLogStore != NULL && "log store not created"); - ASSERT(pSyncNode->pLogBuf != NULL && "ring log buffer not created"); + ASSERTS(pSyncNode->pLogStore != NULL, "log store not created"); + ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created"); SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); @@ -2663,7 +2663,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn int32_t code = syncNodeAppend(ths, pEntry); if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) { - ASSERT(false && "failed to append blocking msg"); + ASSERTS(false, "failed to append blocking msg"); } return code; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5d6905f99e..7bd8d75dd1 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -50,7 +50,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt // initial log buffer with at least one item, e.g. commitIndex SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; - ASSERT(pMatch != NULL && "no matched log entry"); + ASSERTS(pMatch != NULL, "no matched log entry"); ASSERT(pMatch->index + 1 == index); SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; @@ -86,14 +86,14 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S if (prevIndex >= pBuf->startIndex) { pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL && "no log entry found"); + ASSERTS(pEntry != NULL, "no log entry found"); prevLogTerm = pEntry->term; return prevLogTerm; } if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; - ASSERT(timeMs != 0 && "no log entry found"); + ASSERTS(timeMs != 0, "no log entry found"); prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; ASSERT(prevIndex == 0 || prevLogTerm != 0); return prevLogTerm; @@ -141,9 +141,9 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex } int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - ASSERT(pNode->pLogStore != NULL && "log store not created"); - ASSERT(pNode->pFsm != NULL && "pFsm not registered"); - ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + ASSERTS(pNode->pLogStore != NULL, "log store not created"); + ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); + ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); SSnapshot snapshot; if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { @@ -437,7 +437,7 @@ _out: } int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { - ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM"); + ASSERTS(pFsm->FpCommitCb != NULL, "No commit cb registered for the FSM"); if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) { return 0; @@ -513,13 +513,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm if (!syncUtilUserCommit(pEntry->originalRpcType)) { sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); - pBuf->commitIndex = index; - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - continue; } + if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 ", role: %d, current term: %" PRId64, @@ -905,7 +900,7 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { ASSERT(pNode->logReplMgrs[i] == NULL); pNode->logReplMgrs[i] = syncLogReplMgrCreate(); pNode->logReplMgrs[i]->peerId = i; - ASSERT(pNode->logReplMgrs[i] != NULL && "Out of memory."); + ASSERTS(pNode->logReplMgrs[i] != NULL, "Out of memory."); } return 0; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index b8ecbe7515..42deb2c20a 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -103,6 +103,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->sendingMS = 0; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->startTime = taosGetTimestampMs(); + pSender->lastSendTime = pSender->startTime; pSender->finish = false; // build begin msg @@ -201,6 +202,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); + pSender->lastSendTime = taosGetTimestampMs(); + // event log if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { sSTrace(pSender, "snapshot sender finish"); @@ -213,33 +216,36 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { // send snapshot data from cache int32_t snapshotReSend(SSyncSnapshotSender *pSender) { // send current block data + + // build msg + SRpcMsg rpcMsg = {0}; + (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); + + SyncSnapshotSend *pMsg = rpcMsg.pCont; + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; + pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->beginIndex = pSender->snapshotParam.start; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pMsg->lastConfig = pSender->lastConfig; + pMsg->seq = pSender->seq; + if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) { - // build msg - SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); - - SyncSnapshotSend *pMsg = rpcMsg.pCont; - pMsg->srcId = pSender->pSyncNode->myRaftId; - pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; - pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; - pMsg->beginIndex = pSender->snapshotParam.start; - pMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pMsg->lastConfig = pSender->lastConfig; - pMsg->seq = pSender->seq; - // pMsg->privateTerm = pSender->privateTerm; memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); - - // send msg - syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); - - // event log - sSTrace(pSender, "snapshot sender resend"); } + // send msg + syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); + + pSender->lastSendTime = taosGetTimestampMs(); + + // event log + sSTrace(pSender, "snapshot sender resend"); + return 0; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 9c39fc1e8e..16e593d0e4 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -20,6 +20,7 @@ #include "syncRaftLog.h" #include "syncReplication.h" #include "syncRespMgr.h" +#include "syncSnapshot.h" #include "syncUtil.h" static void syncNodeCleanConfigIndex(SSyncNode* ths) { @@ -70,6 +71,20 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { } int64_t timeNow = taosGetTimestampMs(); + + for (int i = 0; i < ths->peersNum; ++i) { + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i])); + if (pSender != NULL) { + if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start && + timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) { + snapshotReSend(pSender); + } else { + sTrace("vgId:%d, do not resend: nstart%d, now:%" PRId64 ", lstsend:%" PRId64 ", diff:%" PRId64, ths->vgId, + ths->isStart, timeNow, pSender->lastSendTime, timeNow - pSender->lastSendTime); + } + } + } + if (atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) { // end timeout wal snapshot if (timeNow - ths->snapshottingTime > SYNC_DEL_WAL_MS && diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 7787438dfe..a034e6ad83 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -160,8 +160,6 @@ void syncUtilMsgNtoH(void* msg) { bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } -bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } - bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { @@ -568,7 +566,7 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 + "send sync-snapshot-send to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 ", stime:%" PRId64 ", seq:%d}, %s", host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); } @@ -595,7 +593,7 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + "send sync-snapshot-rsp to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 ", stime:%" PRId64 ", ack:%d}, %s", host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); } diff --git a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp index 1eb803e846..e1a8a5c832 100644 --- a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp @@ -19,7 +19,7 @@ SyncAppendEntriesReply *createMsg() { pMsg->success = true; pMsg->matchIndex = 77; pMsg->term = 33; - pMsg->privateTerm = 44; + // pMsg->privateTerm = 44; pMsg->startTime = taosGetTimestampMs(); return pMsg; } diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 9ae79e11fb..7b636085f2 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,5 +1,5 @@ #include "syncTest.h" -#include +// #include /* typedef enum { diff --git a/source/libs/sync/test/sync_test_lib/inc/syncIO.h b/source/libs/sync/test/sync_test_lib/inc/syncIO.h index 19f896182e..98b013b46d 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncIO.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncIO.h @@ -81,6 +81,8 @@ int32_t syncIOQTimerStop(); int32_t syncIOPingTimerStart(); int32_t syncIOPingTimerStop(); +void syncEntryDestory(SSyncRaftEntry* pEntry); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncIO.c b/source/libs/sync/test/sync_test_lib/src/syncIO.c index 2c24451713..415fecd9f2 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncIO.c +++ b/source/libs/sync/test/sync_test_lib/src/syncIO.c @@ -469,3 +469,5 @@ static void syncIOTickPing(void *param, void *tmrId) { taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer); } + +void syncEntryDestory(SSyncRaftEntry* pEntry) {} \ No newline at end of file diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 1ea7629601..3b6007df6b 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -1583,8 +1583,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + // snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); + // cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); diff --git a/source/libs/tdb/test/tdbExOVFLTest.cpp b/source/libs/tdb/test/tdbExOVFLTest.cpp index f4f09b20b8..b16bc643d3 100644 --- a/source/libs/tdb/test/tdbExOVFLTest.cpp +++ b/source/libs/tdb/test/tdbExOVFLTest.cpp @@ -8,6 +8,7 @@ #include #include #include +#include "tlog.h" typedef struct SPoolMem { int64_t size; diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 54e80a0009..cd02eb8d5e 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -8,6 +8,7 @@ #include #include #include +#include "tlog.h" typedef struct SPoolMem { int64_t size; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 73c6dc4011..2b1f68d5f6 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1195,6 +1195,8 @@ void transCloseServer(void* arg) { sendQuitToWorkThrd(srv->pThreadObj[i]); destroyWorkThrd(srv->pThreadObj[i]); } + } else { + uv_loop_close(srv->loop); } taosMemoryFree(srv->pThreadObj); diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 421901184b..d219873b5a 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -163,7 +163,7 @@ int32_t taosMulMkDir(const char *dirname) { code = mkdir(temp, 0755); #endif if (code < 0 && errno != EEXIST) { - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return code; } *pos = TD_DIRSEP[0]; @@ -179,7 +179,7 @@ int32_t taosMulMkDir(const char *dirname) { code = mkdir(temp, 0755); #endif if (code < 0 && errno != EEXIST) { - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return code; } } @@ -225,7 +225,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) { code = mkdir(temp, mode); #endif if (code < 0 && errno != EEXIST) { - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return code; } *pos = TD_DIRSEP[0]; @@ -241,7 +241,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) { code = mkdir(temp, mode); #endif if (code < 0 && errno != EEXIST) { - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return code; } } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 68365be481..8c2170239f 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -313,7 +313,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { assert(!(tdFileOptions & TD_FILE_EXCL)); fp = fopen(path, mode); if (fp == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } } else { @@ -336,14 +336,14 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO); #endif if (fd == -1) { - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } } TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile)); if (pFile == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + // terrno = TSDB_CODE_OUT_OF_MEMORY; if (fd >= 0) close(fd); if (fp != NULL) fclose(fp); return NULL; diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 230ca5b968..2e68bd5ccd 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -348,7 +348,7 @@ void taosMemoryTrim(int32_t size) { void* taosMemoryMallocAlign(uint32_t alignment, int64_t size) { #ifdef USE_TD_MEMORY - ASSERT(0); + assert(0); #else #if defined(LINUX) void* p = memalign(alignment, size); diff --git a/source/os/src/osString.c b/source/os/src/osString.c index e2d8ce95db..f03778de2f 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -143,15 +143,17 @@ SConv *gConv = NULL; int32_t convUsed = 0; int32_t gConvMaxNum = 0; -void taosConvInit(void) { +int32_t taosConvInit(void) { gConvMaxNum = 512; gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv)); for (int32_t i = 0; i < gConvMaxNum; ++i) { gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); if ((iconv_t)-1 == gConv[i].conv || (iconv_t)0 == gConv[i].conv) { - ASSERT(0); + return -1; } } + + return 0; } void taosConvDestroy() { diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 7ec1da0530..e28abf131d 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -617,14 +617,14 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { return 0; } else { // printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return -1; } #elif defined(_TD_DARWIN_64) struct statvfs info; if (statvfs(dataDir, &info)) { // printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return -1; } else { diskSize->total = info.f_blocks * info.f_frsize; @@ -635,7 +635,7 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { #else struct statvfs info; if (statvfs(dataDir, &info)) { - terrno = TAOS_SYSTEM_ERROR(errno); + // terrno = TAOS_SYSTEM_ERROR(errno); return -1; } else { diskSize->total = info.f_blocks * info.f_frsize; diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index 4d6875d263..057d3a620c 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "talgo.h" +#include "tlog.h" #define doswap(__left, __right, __size, __buf) \ do { \ diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index cc823decf4..48638af8d5 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -181,7 +181,7 @@ int32_t tjsonGetObjectValueString(const SJson* pJson, char** pValueString) { int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) { char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); if (NULL == p) { - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } strcpy(pVal, p); return TSDB_CODE_SUCCESS; @@ -190,7 +190,7 @@ int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) { int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal) { char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); if (NULL == p) { - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } *pVal = strdup(p); return TSDB_CODE_SUCCESS; @@ -199,7 +199,7 @@ int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal) int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal) { char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); if (NULL == p) { - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } #ifdef WINDOWS sscanf(p, "%" PRId64, pVal); @@ -233,7 +233,7 @@ int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal) { char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); if (NULL == p) { - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } #ifdef WINDOWS sscanf(p, "%" PRIu64, pVal); @@ -259,6 +259,9 @@ int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pV int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal) { const SJson* pObject = tjsonGetObjectItem(pJson, pName); + if (NULL == pObject) { + return TSDB_CODE_SUCCESS; + } if (!cJSON_IsBool(pObject)) { return TSDB_CODE_FAILED; } @@ -268,6 +271,9 @@ int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal) { int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal) { const SJson* pObject = tjsonGetObjectItem(pJson, pName); + if (NULL == pObject) { + return TSDB_CODE_SUCCESS; + } if (!cJSON_IsNumber(pObject)) { return TSDB_CODE_FAILED; } @@ -282,7 +288,7 @@ SJson* tjsonGetArrayItem(const SJson* pJson, int32_t index) { return cJSON_GetAr int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj) { SJson* pJsonObj = tjsonGetObjectItem(pJson, pName); if (NULL == pJsonObj) { - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } return func(pJsonObj, pObj); } @@ -294,7 +300,7 @@ int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, v SJson* pJsonObj = tjsonGetObjectItem(pJson, pName); if (NULL == pJsonObj) { - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } *pObj = taosMemoryCalloc(1, objSize); if (NULL == *pObj) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index be1db74f1a..cea08e46fe 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -72,6 +72,7 @@ static int32_t tsDaylightActive; /* Currently in daylight saving time. */ bool tsLogEmbedded = 0; bool tsAsyncLog = true; +bool tsAssert = true; int32_t tsNumOfLogLines = 10000000; int32_t tsLogKeepDays = 0; LogFp tsLogFp = NULL; @@ -778,3 +779,37 @@ cmp_end: return ret; } + +bool taosAssertLog(bool condition, const char *file, int32_t line, const char *format, ...) { + if (condition) return false; + + const char *flags = "UTL FATAL "; + ELogLevel level = DEBUG_FATAL; + int32_t dflag = 255; // tsLogEmbedded ? 255 : uDebugFlag + char buffer[LOG_MAX_LINE_BUFFER_SIZE]; + int32_t len = taosBuildLogHead(buffer, flags); + + va_list argpointer; + va_start(argpointer, format); + len = len + vsnprintf(buffer + len, LOG_MAX_LINE_BUFFER_SIZE - len, format, argpointer); + va_end(argpointer); + buffer[len++] = '\n'; + buffer[len] = 0; + taosPrintLogImp(1, 255, buffer, len); + + taosPrintLog(flags, level, dflag, "tAssert at file %s:%d exit:%d", file, line, tsAssert); + taosPrintTrace(flags, level, dflag); + + if (tsAssert) { + taosCloseLog(); + taosMsleep(300); + +#ifdef NDEBUG + abort(); +#else + assert(0); +#endif + } + + return true; +} \ No newline at end of file diff --git a/source/util/test/hashTest.cpp b/source/util/test/hashTest.cpp index 76fb9c7067..00379a755b 100644 --- a/source/util/test/hashTest.cpp +++ b/source/util/test/hashTest.cpp @@ -6,6 +6,7 @@ #include "taos.h" #include "taosdef.h" #include "thash.h" +#include "tlog.h" namespace { diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c90b2499a4..28ba7ea60f 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1022,9 +1022,9 @@ ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py -#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/demo.py +,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/demo.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insert_alltypes_json.py -#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/invalid_commandline.py +,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/invalid_commandline.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/json_tag.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/query_json.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sample_csv_json.py diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 47b7adbf18..df416b3822 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -20,6 +20,7 @@ #include #include "taos.h" #include "types.h" +#include "tlog.h" int smlProcess_influx_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);