add http fail fast

This commit is contained in:
yihaoDeng 2023-11-22 20:23:27 +08:00
parent 8c7dd0a716
commit 40f6de4fc8
1 changed files with 54 additions and 2 deletions

View File

@ -32,6 +32,7 @@ typedef struct SHttpModule {
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
TdThread thread; TdThread thread;
SHashObj* connStatusTable;
} SHttpModule; } SHttpModule;
typedef struct SHttpMsg { typedef struct SHttpMsg {
@ -64,6 +65,8 @@ static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit(); static int32_t httpSendQuit();
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag); EHttpCompFlag flag);
@ -262,14 +265,20 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
} }
} }
static void clientConnCb(uv_connect_t* req, int32_t status) { static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
SHttpClient* cli = req->data; SHttpClient* cli = req->data;
if (status != 0) { if (status != 0) {
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
taosReleaseRef(httpRefMgt, httpRef);
return; return;
} }
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1);
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
if (0 != status) { if (0 != status) {
tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
@ -277,6 +286,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
} }
taosReleaseRef(httpRefMgt, httpRef);
} }
int32_t httpSendQuit() { int32_t httpSendQuit() {
@ -349,16 +359,51 @@ static void httpHandleQuit(SHttpMsg* msg) {
uv_walk(http->loop, httpWalkCb, NULL); uv_walk(http->loop, httpWalkCb, NULL);
taosReleaseRef(httpRefMgt, httpRef); taosReleaseRef(httpRefMgt, httpRef);
} }
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) {
char buf[256] = {0};
sprintf(buf, "%s:%d", server, port);
int64_t* failedTime = (int64_t*)taosHashGet(pTable, buf, strlen(buf));
if (failedTime == NULL) {
return false;
}
int64_t now = taosGetTimestampSec();
if (now - *failedTime < 10) {
tError("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf);
return true;
} else {
return false;
}
}
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) {
char buf[256] = {0};
sprintf(buf, "%s:%d", server, port);
if (succ) {
taosHashRemove(pTable, buf, strlen(buf));
} else {
int64_t st = taosGetTimestampSec();
taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st));
}
return;
}
static void httpHandleReq(SHttpMsg* msg) { static void httpHandleReq(SHttpMsg* msg) {
int32_t ignore = false;
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
if (http == NULL) { if (http == NULL) {
goto END; goto END;
} }
if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) {
ignore = true;
goto END;
}
struct sockaddr_in dest = {0}; struct sockaddr_in dest = {0};
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
goto END; goto END;
} }
if (msg->flag == HTTP_GZIP) { if (msg->flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
if (dstLen > 0) { if (dstLen > 0) {
@ -418,13 +463,16 @@ static void httpHandleReq(SHttpMsg* msg) {
if (ret != 0) { if (ret != 0) {
tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
cli->port); cli->port);
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
destroyHttpClient(cli); destroyHttpClient(cli);
} }
taosReleaseRef(httpRefMgt, httpRef); taosReleaseRef(httpRefMgt, httpRef);
return; return;
END: END:
if (ignore == false) {
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
}
httpDestroyMsg(msg); httpDestroyMsg(msg);
taosReleaseRef(httpRefMgt, httpRef); taosReleaseRef(httpRefMgt, httpRef);
} }
@ -441,6 +489,8 @@ static void transHttpEnvInit() {
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
uv_loop_init(http->loop); uv_loop_init(http->loop);
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
@ -474,6 +524,8 @@ void transHttpEnvDestroy() {
uv_loop_close(load->loop); uv_loop_close(load->loop);
taosMemoryFree(load->loop); taosMemoryFree(load->loop);
taosHashCleanup(load->connStatusTable);
taosReleaseRef(httpRefMgt, httpRef); taosReleaseRef(httpRefMgt, httpRef);
taosRemoveRef(httpRefMgt, httpRef); taosRemoveRef(httpRefMgt, httpRef);
} }