From 4e2c93f2620a0b67c35c19137156fee61858d4b8 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 21 Aug 2024 15:54:46 +0800 Subject: [PATCH] add network random error --- include/os/osFile.h | 26 ++++++++++++++++++++------ include/util/tdef.h | 9 ++------- source/libs/transport/src/thttp.c | 4 ++++ source/libs/transport/src/transCli.c | 9 ++++++++- source/libs/transport/src/transComm.c | 5 +++++ source/libs/transport/src/transSvr.c | 9 ++++++++- source/os/src/osFile.c | 4 ++-- source/os/src/osMemory.c | 4 ++-- 8 files changed, 51 insertions(+), 19 deletions(-) diff --git a/include/os/osFile.h b/include/os/osFile.h index 8bacb1bf7c..a56c54b086 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -120,15 +120,29 @@ int32_t taosSetFileHandlesLimit(); int32_t taosLinkFile(char *src, char *dst); -FILE* taosOpenCFile(const char* filename, const char* mode); -int taosSeekCFile(FILE* file, int64_t offset, int whence); -size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream ); -size_t taosWriteToCFile(const void* ptr, size_t size, size_t nitems, FILE* stream); -int taosCloseCFile(FILE *); -int taosSetAutoDelFile(char* path); +FILE *taosOpenCFile(const char *filename, const char *mode); +int taosSeekCFile(FILE *file, int64_t offset, int whence); +size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream); +size_t taosWriteToCFile(const void *ptr, size_t size, size_t nitems, FILE *stream); +int taosCloseCFile(FILE *); +int taosSetAutoDelFile(char *path); bool lastErrorIsFileNotExist(); +#ifdef BUILD_WITH_RAND_ERR +#define STUB_RAND_NETWORK_ERR(status) \ + do { \ + if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_NETWORK)) { \ + uint32_t r = taosRand() % tsRandErrDivisor; \ + if ((r + 1) <= tsRandErrChance) { \ + status = TSDB_CODE_RPC_NETWORK_UNAVAIL; \ + } \ + } \ + while (0) +#else +#define STUB_RAND_NETWORK_ERR(status) +#endif + #ifdef __cplusplus } #endif diff --git a/include/util/tdef.h b/include/util/tdef.h index 35c4adab50..f087c28684 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -294,7 +294,7 @@ typedef enum ELogicConditionType { #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 -#define TSDB_MAX_EP_NUM 10 +#define TSDB_MAX_EP_NUM 10 #define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_TOKEN_SIZE 32 @@ -568,12 +568,7 @@ enum { SND_WORKER_TYPE__UNIQUE, }; -enum { - RAND_ERR_MEMORY = 1, - RAND_ERR_FILE = 2, - // RAND_ERR_SCOPE_XXX... = 4, - // ... -}; +enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; #define DEFAULT_HANDLE 0 #define MNODE_HANDLE 1 diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 908468f094..e517b8a0bc 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -345,6 +345,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested } static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { + STUB_RAND_NETWORK_ERR(nread); SHttpClient* cli = handle->data; if (nread < 0) { tError("http-report recv error:%s", uv_strerror(nread)); @@ -356,6 +357,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const } } static void clientSentCb(uv_write_t* req, int32_t status) { + STUB_RAND_NETWORK_ERR(status); SHttpClient* cli = req->data; if (status != 0) { tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, @@ -367,6 +369,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } else { tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); } + status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, @@ -377,6 +380,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } } static void clientConnCb(uv_connect_t* req, int32_t status) { + STUB_RAND_NETWORK_ERR(status); SHttpClient* cli = req->data; int64_t chanId = cli->chanId; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 862a74c72b..a5cc492b2b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -923,10 +923,12 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_ } } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { - // impl later + STUB_RAND_NETWORK_ERR(nread); + if (handle->data == NULL) { return; } + SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { @@ -1117,6 +1119,8 @@ static bool cliHandleNoResp(SCliConn* conn) { return res; } static void cliSendCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); + SCliConn* pConn = transReqQueueRemove(req); if (pConn == NULL) return; @@ -1434,6 +1438,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliSendBatch(conn); } static void cliSendBatchCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); SCliConn* conn = req->data; SCliThrd* thrd = conn->hostThrd; SCliBatch* p = conn->pBatch; @@ -1523,6 +1528,8 @@ void cliConnCb(uv_connect_t* req, int status) { pConn->timer = NULL; } + STUB_RAND_NETWORK_ERR(status); + if (status != 0) { cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); if (timeout == false) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5d82e157b3..aea8282d44 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -869,3 +869,8 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { *ppBuf = pBuf; return len; } + +// int32_t transGenRandomError(int32_t status) { +// STUB_RAND_NETWORK_ERR(status) +// return status; +// } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c1b934c812..84938126c3 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -493,6 +493,8 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SSvrConn* conn = cli->data; SWorkThrd* pThrd = conn->hostThrd; + STUB_RAND_NETWORK_ERR(nread); + if (true == pThrd->quit) { tInfo("work thread received quit msg, destroy conn"); destroyConn(conn, true); @@ -553,6 +555,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { } void uvOnSendCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); SSvrConn* conn = transReqQueueRemove(req); if (conn == NULL) return; @@ -602,6 +605,7 @@ void uvOnSendCb(uv_write_t* req, int status) { } } static void uvOnPipeWriteCb(uv_write_t* req, int status) { + STUB_RAND_NETWORK_ERR(status); if (status == 0) { tTrace("success to dispatch conn to work thread"); } else { @@ -949,6 +953,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + STUB_RAND_NETWORK_ERR(nread); if (nread < 0) { if (nread != UV_EOF) { tError("read error %s", uv_err_name(nread)); @@ -1041,9 +1046,11 @@ void* transAcceptThread(void* arg) { return NULL; } void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { + STUB_RAND_NETWORK_ERR(status); if (status != 0) { return; - } + }; + SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req); (void)uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index a5df4f63f3..4f76dea5d3 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -906,7 +906,7 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { int32_t code = _fstat64(pFile->fd, &fileStat); #else struct stat fileStat; - int32_t code = fstat(pFile->fd, &fileStat); + int32_t code = fstat(pFile->fd, &fileStat); #endif if (-1 == code) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -1614,7 +1614,7 @@ int taosSeekCFile(FILE *file, int64_t offset, int whence) { #ifdef WINDOWS return _fseeki64(file, offset, whence); #else - int code = fseeko(file, offset, whence); + int code = fseeko(file, offset, whence); if (-1 == code) { terrno = TAOS_SYSTEM_ERROR(errno); } diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 7a5a547354..7e25d7c817 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -24,7 +24,7 @@ int32_t tsRandErrChance = 1; int64_t tsRandErrDivisor = 10001; -int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE); +int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE | RAND_ERR_NETWORK); threadlocal bool tsEnableRandErr = 0; #if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE) @@ -388,7 +388,7 @@ char *taosStrdup(const char *ptr) { } #endif - return tstrdup(ptr); + return tstrdup(ptr); #endif }