From a4827ad086be85aea1dfdc95d6fea5a2a01825f1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 21 Aug 2024 15:54:46 +0800 Subject: [PATCH 1/4] add network random error --- include/os/osFile.h | 26 ++++++++++++++++++++------ include/util/tdef.h | 9 ++------- source/libs/transport/src/thttp.c | 4 ++++ source/libs/transport/src/transCli.c | 9 ++++++++- source/libs/transport/src/transComm.c | 5 +++++ source/libs/transport/src/transSvr.c | 9 ++++++++- source/os/src/osFile.c | 4 ++-- source/os/src/osMemory.c | 4 ++-- 8 files changed, 51 insertions(+), 19 deletions(-) diff --git a/include/os/osFile.h b/include/os/osFile.h index 8bacb1bf7c..a56c54b086 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -120,15 +120,29 @@ int32_t taosSetFileHandlesLimit(); int32_t taosLinkFile(char *src, char *dst); -FILE* taosOpenCFile(const char* filename, const char* mode); -int taosSeekCFile(FILE* file, int64_t offset, int whence); -size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream ); -size_t taosWriteToCFile(const void* ptr, size_t size, size_t nitems, FILE* stream); -int taosCloseCFile(FILE *); -int taosSetAutoDelFile(char* path); +FILE *taosOpenCFile(const char *filename, const char *mode); +int taosSeekCFile(FILE *file, int64_t offset, int whence); +size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream); +size_t taosWriteToCFile(const void *ptr, size_t size, size_t nitems, FILE *stream); +int taosCloseCFile(FILE *); +int taosSetAutoDelFile(char *path); bool lastErrorIsFileNotExist(); +#ifdef BUILD_WITH_RAND_ERR +#define STUB_RAND_NETWORK_ERR(status) \ + do { \ + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_NETWORK)) { \ + uint32_t r = taosRand() % tsRandErrDivisor; \ + if ((r + 1) <= tsRandErrChance) { \ + status = TSDB_CODE_RPC_NETWORK_UNAVAIL; \ + } \ + } \ + while (0) +#else +#define STUB_RAND_NETWORK_ERR(status) +#endif + #ifdef __cplusplus } #endif diff --git a/include/util/tdef.h b/include/util/tdef.h index 35c4adab50..f087c28684 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -294,7 +294,7 @@ typedef enum ELogicConditionType { #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 -#define TSDB_MAX_EP_NUM 10 +#define TSDB_MAX_EP_NUM 10 #define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_TOKEN_SIZE 32 @@ -568,12 +568,7 @@ enum { SND_WORKER_TYPE__UNIQUE, }; -enum { - RAND_ERR_MEMORY = 1, - RAND_ERR_FILE = 2, - // RAND_ERR_SCOPE_XXX... = 4, - // ... -}; +enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; #define DEFAULT_HANDLE 0 #define MNODE_HANDLE 1 diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 908468f094..e517b8a0bc 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -345,6 +345,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested } static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { + STUB_RAND_NETWORK_ERR(nread); SHttpClient* cli = handle->data; if (nread < 0) { tError("http-report recv error:%s", uv_strerror(nread)); @@ -356,6 +357,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const } } static void clientSentCb(uv_write_t* req, int32_t status) { + STUB_RAND_NETWORK_ERR(status); SHttpClient* cli = req->data; if (status != 0) { tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, @@ -367,6 +369,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } else { tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); } + status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, @@ -377,6 +380,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } } static void clientConnCb(uv_connect_t* req, int32_t status) { + STUB_RAND_NETWORK_ERR(status); SHttpClient* cli = req->data; int64_t chanId = cli->chanId; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 862a74c72b..a5cc492b2b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -923,10 +923,12 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_ } } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { - // impl later + STUB_RAND_NETWORK_ERR(nread); + if (handle->data == NULL) { return; } + SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { @@ -1117,6 +1119,8 @@ static bool cliHandleNoResp(SCliConn* conn) { return res; } static void cliSendCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); + SCliConn* pConn = transReqQueueRemove(req); if (pConn == NULL) return; @@ -1434,6 +1438,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliSendBatch(conn); } static void cliSendBatchCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); SCliConn* conn = req->data; SCliThrd* thrd = conn->hostThrd; SCliBatch* p = conn->pBatch; @@ -1523,6 +1528,8 @@ void cliConnCb(uv_connect_t* req, int status) { pConn->timer = NULL; } + STUB_RAND_NETWORK_ERR(status); + if (status != 0) { cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); if (timeout == false) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5d82e157b3..aea8282d44 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -869,3 +869,8 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { *ppBuf = pBuf; return len; } + +// int32_t transGenRandomError(int32_t status) { +// STUB_RAND_NETWORK_ERR(status) +// return status; +// } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c1b934c812..84938126c3 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -493,6 +493,8 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SSvrConn* conn = cli->data; SWorkThrd* pThrd = conn->hostThrd; + STUB_RAND_NETWORK_ERR(nread); + if (true == pThrd->quit) { tInfo("work thread received quit msg, destroy conn"); destroyConn(conn, true); @@ -553,6 +555,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { } void uvOnSendCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); SSvrConn* conn = transReqQueueRemove(req); if (conn == NULL) return; @@ -602,6 +605,7 @@ void uvOnSendCb(uv_write_t* req, int status) { } } static void uvOnPipeWriteCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); if (status == 0) { tTrace("success to dispatch conn to work thread"); } else { @@ -949,6 +953,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + STUB_RAND_NETWORK_ERR(nread); if (nread < 0) { if (nread != UV_EOF) { tError("read error %s", uv_err_name(nread)); @@ -1041,9 +1046,11 @@ void* transAcceptThread(void* arg) { return NULL; } void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { + STUB_RAND_NETWORK_ERR(status); if (status != 0) { return; - } + }; + SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req); (void)uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index c274c7fbab..144baeb148 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -904,7 +904,7 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { int32_t code = _fstat64(pFile->fd, &fileStat); #else struct stat fileStat; - int32_t code = fstat(pFile->fd, &fileStat); + int32_t code = fstat(pFile->fd, &fileStat); #endif if (-1 == code) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -1611,7 +1611,7 @@ int taosSeekCFile(FILE *file, int64_t offset, int whence) { #ifdef WINDOWS return _fseeki64(file, offset, whence); #else - int code = fseeko(file, offset, whence); + int code = fseeko(file, offset, whence); if (-1 == code) { terrno = TAOS_SYSTEM_ERROR(errno); } diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 91eb7763bc..49a5c2a2a2 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -24,7 +24,7 @@ int32_t tsRandErrChance = 1; int64_t tsRandErrDivisor = 10001; -int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE); +int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE | RAND_ERR_NETWORK); threadlocal bool tsEnableRandErr = 0; #if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE) @@ -385,7 +385,7 @@ char *taosStrdup(const char *ptr) { } #endif - return tstrdup(ptr); + return tstrdup(ptr); #endif } From f464aec00af90173fa720aa7154367c62109318c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 22 Aug 2024 20:01:10 +0800 Subject: [PATCH 2/4] add network error --- include/util/taoserror.h | 2 ++ source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 15 +++++++++------ source/util/src/terror.c | 1 + 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a0898fa94b..a7bcfbba64 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -468,6 +468,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR TAOS_DEF_ERROR_CODE(0, 0x0427) #define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428) #define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429) +#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A) + // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index fc070d0d05..e8e7a75889 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -22,7 +22,7 @@ static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { int32_t code = 0; (void)taosThreadRwlockRdlock(&pMgmt->lock); if (pMgmt->stopped) { - code = -1; + code = TSDB_CODE_MNODE_STOPPED; } else { (void)atomic_add_fetch_32(&pMgmt->refCount, 1); } @@ -134,16 +134,19 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t code = 0; if (NULL == pMgmt->pMnode) { const STraceId *trace = &pMsg->info.traceId; - dGError("msg:%p, stop to pre-process in mnode since mnode is NULL, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - return -1; + code = TSDB_CODE_MNODE_NOT_FOUND; + dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType)); + return code; } pMsg->info.node = pMgmt->pMnode; - if (mndPreProcessQueryMsg(pMsg) != 0) { + if ((code = mndPreProcessQueryMsg(pMsg)) != 0) { const STraceId *trace = &pMsg->info.traceId; - dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); - return -1; + dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), + TMSG_INFO(pMsg->msgType)); + return code; } return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a58baf5883..60d7e335d7 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -389,6 +389,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_CHARSET, "charset not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR, "ttlChangeOnWrite not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_EN_WHITELIST, "enableWhiteList not match") +TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "enableWhiteList not match") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed") From 3a20e9c24123ee9d1a2bc4474bedbccf4247b53f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 22 Aug 2024 20:04:18 +0800 Subject: [PATCH 3/4] add network error --- source/util/src/terror.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 60d7e335d7..87876a1450 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -389,7 +389,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_CHARSET, "charset not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR, "ttlChangeOnWrite not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_EN_WHITELIST, "enableWhiteList not match") -TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "enableWhiteList not match") +TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "mnode stopped") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed") From 08a51e21dc1ceb28cfafa88ac4512b6ed27cfa8d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 22 Aug 2024 20:05:14 +0800 Subject: [PATCH 4/4] add network error --- source/util/src/terror.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 87876a1450..0423850320 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -389,7 +389,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_CHARSET, "charset not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR, "ttlChangeOnWrite not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_EN_WHITELIST, "enableWhiteList not match") -TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "mnode stopped") +TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "Mnode stopped") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")