Merge pull request #27414 from taosdata/enh/TD-31494-3

Enh/TD-31494-3
This commit is contained in:
Hongze Cheng 2024-08-26 09:03:39 +08:00 committed by GitHub
commit e035eb1224
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 63 additions and 25 deletions

View File

@ -120,15 +120,29 @@ int32_t taosSetFileHandlesLimit();
int32_t taosLinkFile(char *src, char *dst); int32_t taosLinkFile(char *src, char *dst);
FILE* taosOpenCFile(const char* filename, const char* mode); FILE *taosOpenCFile(const char *filename, const char *mode);
int taosSeekCFile(FILE* file, int64_t offset, int whence); int taosSeekCFile(FILE *file, int64_t offset, int whence);
size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream ); size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream);
size_t taosWriteToCFile(const void* ptr, size_t size, size_t nitems, FILE* stream); size_t taosWriteToCFile(const void *ptr, size_t size, size_t nitems, FILE *stream);
int taosCloseCFile(FILE *); int taosCloseCFile(FILE *);
int taosSetAutoDelFile(char* path); int taosSetAutoDelFile(char *path);
bool lastErrorIsFileNotExist(); bool lastErrorIsFileNotExist();
#ifdef BUILD_WITH_RAND_ERR
#define STUB_RAND_NETWORK_ERR(status) \
do { \
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_NETWORK)) { \
uint32_t r = taosRand() % tsRandErrDivisor; \
if ((r + 1) <= tsRandErrChance) { \
status = TSDB_CODE_RPC_NETWORK_UNAVAIL; \
} \
} \
while (0)
#else
#define STUB_RAND_NETWORK_ERR(status)
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -468,6 +468,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR TAOS_DEF_ERROR_CODE(0, 0x0427) #define TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR TAOS_DEF_ERROR_CODE(0, 0x0427)
#define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428) #define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428)
#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429) #define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429)
#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A)
// mnode-sma // mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -294,7 +294,7 @@ typedef enum ELogicConditionType {
#define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_LOG_VAR_LEN 32 #define TSDB_LOG_VAR_LEN 32
#define TSDB_MAX_EP_NUM 10 #define TSDB_MAX_EP_NUM 10
#define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_GROUP_MEMBER_NUM 2
#define TSDB_ARB_TOKEN_SIZE 32 #define TSDB_ARB_TOKEN_SIZE 32
@ -568,12 +568,7 @@ enum {
SND_WORKER_TYPE__UNIQUE, SND_WORKER_TYPE__UNIQUE,
}; };
enum { enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
RAND_ERR_MEMORY = 1,
RAND_ERR_FILE = 2,
// RAND_ERR_SCOPE_XXX... = 4,
// ...
};
#define DEFAULT_HANDLE 0 #define DEFAULT_HANDLE 0
#define MNODE_HANDLE 1 #define MNODE_HANDLE 1

View File

@ -22,7 +22,7 @@ static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
(void)taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) { if (pMgmt->stopped) {
code = -1; code = TSDB_CODE_MNODE_STOPPED;
} else { } else {
(void)atomic_add_fetch_32(&pMgmt->refCount, 1); (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
} }
@ -134,16 +134,19 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
if (NULL == pMgmt->pMnode) { if (NULL == pMgmt->pMnode) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGError("msg:%p, stop to pre-process in mnode since mnode is NULL, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); code = TSDB_CODE_MNODE_NOT_FOUND;
return -1; dGError("msg:%p, stop to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
return code;
} }
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
if (mndPreProcessQueryMsg(pMsg) != 0) { if ((code = mndPreProcessQueryMsg(pMsg)) != 0) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, tstrerror(code),
return -1; TMSG_INFO(pMsg->msgType));
return code;
} }
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
} }

View File

@ -345,6 +345,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested
} }
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
STUB_RAND_NETWORK_ERR(nread);
SHttpClient* cli = handle->data; SHttpClient* cli = handle->data;
if (nread < 0) { if (nread < 0) {
tError("http-report recv error:%s", uv_strerror(nread)); tError("http-report recv error:%s", uv_strerror(nread));
@ -356,6 +357,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
} }
} }
static void clientSentCb(uv_write_t* req, int32_t status) { static void clientSentCb(uv_write_t* req, int32_t status) {
STUB_RAND_NETWORK_ERR(status);
SHttpClient* cli = req->data; SHttpClient* cli = req->data;
if (status != 0) { if (status != 0) {
tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
@ -367,6 +369,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
} else { } else {
tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId);
} }
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
if (status != 0) { if (status != 0) {
tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
@ -377,6 +380,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
} }
} }
static void clientConnCb(uv_connect_t* req, int32_t status) { static void clientConnCb(uv_connect_t* req, int32_t status) {
STUB_RAND_NETWORK_ERR(status);
SHttpClient* cli = req->data; SHttpClient* cli = req->data;
int64_t chanId = cli->chanId; int64_t chanId = cli->chanId;

View File

@ -923,10 +923,12 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_
} }
} }
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later STUB_RAND_NETWORK_ERR(nread);
if (handle->data == NULL) { if (handle->data == NULL) {
return; return;
} }
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
@ -1117,6 +1119,8 @@ static bool cliHandleNoResp(SCliConn* conn) {
return res; return res;
} }
static void cliSendCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
SCliConn* pConn = transReqQueueRemove(req); SCliConn* pConn = transReqQueueRemove(req);
if (pConn == NULL) return; if (pConn == NULL) return;
@ -1434,6 +1438,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
cliSendBatch(conn); cliSendBatch(conn);
} }
static void cliSendBatchCb(uv_write_t* req, int status) { static void cliSendBatchCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
SCliConn* conn = req->data; SCliConn* conn = req->data;
SCliThrd* thrd = conn->hostThrd; SCliThrd* thrd = conn->hostThrd;
SCliBatch* p = conn->pBatch; SCliBatch* p = conn->pBatch;
@ -1523,6 +1528,8 @@ void cliConnCb(uv_connect_t* req, int status) {
pConn->timer = NULL; pConn->timer = NULL;
} }
STUB_RAND_NETWORK_ERR(status);
if (status != 0) { if (status != 0) {
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
if (timeout == false) { if (timeout == false) {

View File

@ -869,3 +869,8 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) {
*ppBuf = pBuf; *ppBuf = pBuf;
return len; return len;
} }
// int32_t transGenRandomError(int32_t status) {
// STUB_RAND_NETWORK_ERR(status)
// return status;
// }

View File

@ -493,6 +493,8 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SSvrConn* conn = cli->data; SSvrConn* conn = cli->data;
SWorkThrd* pThrd = conn->hostThrd; SWorkThrd* pThrd = conn->hostThrd;
STUB_RAND_NETWORK_ERR(nread);
if (true == pThrd->quit) { if (true == pThrd->quit) {
tInfo("work thread received quit msg, destroy conn"); tInfo("work thread received quit msg, destroy conn");
destroyConn(conn, true); destroyConn(conn, true);
@ -553,6 +555,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
} }
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
SSvrConn* conn = transReqQueueRemove(req); SSvrConn* conn = transReqQueueRemove(req);
if (conn == NULL) return; if (conn == NULL) return;
@ -602,6 +605,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
} }
} }
static void uvOnPipeWriteCb(uv_write_t* req, int status) { static void uvOnPipeWriteCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
if (status == 0) { if (status == 0) {
tTrace("success to dispatch conn to work thread"); tTrace("success to dispatch conn to work thread");
} else { } else {
@ -949,6 +953,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
} }
} }
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
STUB_RAND_NETWORK_ERR(nread);
if (nread < 0) { if (nread < 0) {
if (nread != UV_EOF) { if (nread != UV_EOF) {
tError("read error %s", uv_err_name(nread)); tError("read error %s", uv_err_name(nread));
@ -1041,9 +1046,11 @@ void* transAcceptThread(void* arg) {
return NULL; return NULL;
} }
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
STUB_RAND_NETWORK_ERR(status);
if (status != 0) { if (status != 0) {
return; return;
} };
SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req); SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req);
(void)uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); (void)uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
} }

View File

@ -904,7 +904,7 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
int32_t code = _fstat64(pFile->fd, &fileStat); int32_t code = _fstat64(pFile->fd, &fileStat);
#else #else
struct stat fileStat; struct stat fileStat;
int32_t code = fstat(pFile->fd, &fileStat); int32_t code = fstat(pFile->fd, &fileStat);
#endif #endif
if (-1 == code) { if (-1 == code) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -1611,7 +1611,7 @@ int taosSeekCFile(FILE *file, int64_t offset, int whence) {
#ifdef WINDOWS #ifdef WINDOWS
return _fseeki64(file, offset, whence); return _fseeki64(file, offset, whence);
#else #else
int code = fseeko(file, offset, whence); int code = fseeko(file, offset, whence);
if (-1 == code) { if (-1 == code) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} }

View File

@ -24,7 +24,7 @@
int32_t tsRandErrChance = 1; int32_t tsRandErrChance = 1;
int64_t tsRandErrDivisor = 10001; int64_t tsRandErrDivisor = 10001;
int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE); int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE | RAND_ERR_NETWORK);
threadlocal bool tsEnableRandErr = 0; threadlocal bool tsEnableRandErr = 0;
#if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE) #if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE)
@ -385,7 +385,7 @@ char *taosStrdup(const char *ptr) {
} }
#endif #endif
return tstrdup(ptr); return tstrdup(ptr);
#endif #endif
} }

View File

@ -389,6 +389,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_CHARSET, "charset not match")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR, "ttlChangeOnWrite not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR, "ttlChangeOnWrite not match")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_EN_WHITELIST, "enableWhiteList not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_EN_WHITELIST, "enableWhiteList not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "Mnode stopped")
// vnode // vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")