refactor code

This commit is contained in:
Yihao Deng 2024-07-09 09:48:26 +00:00
parent ddd3c23ff0
commit 1b052eefd0
1 changed files with 30 additions and 13 deletions

View File

@ -37,6 +37,7 @@ typedef struct SHttpModule {
SHashObj* connStatusTable;
SHashObj* connPool;
int8_t quit;
int16_t connNum;
} SHttpModule;
typedef struct SHttpMsg {
@ -272,6 +273,7 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
QUEUE_PUSH(&item->qmsg, &quitMsg->q);
}
}
static void httpAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data;
SHttpModule* http = item->pThrd;
@ -317,6 +319,14 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
SHttpClient* cli = handle->data;
int64_t chanId = cli->chanId;
SHttpModule* http = taosAcquireRef(httpRefMgt, cli->chanId);
if (http != NULL) {
http->connNum -= 1;
taosReleaseRef(httpRefMgt, chanId);
}
destroyHttpClient(cli);
}
@ -374,6 +384,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
taosReleaseRef(httpRefMgt, chanId);
return;
}
http->connNum += 1;
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1);
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
@ -590,6 +602,8 @@ END:
}
static void httpModuleDestroy(SHttpModule* http) {
if (http == NULL) return;
if (http->asyncPool != NULL) {
TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
transAsyncPoolDestroy(http->asyncPool);
@ -598,11 +612,13 @@ static void httpModuleDestroy(SHttpModule* http) {
uv_loop_close(http->loop);
taosMemoryFree(http->loop);
}
taosHashCleanup(http->connStatusTable);
// not free http, http freeed by ref
}
void httpModuleDestroy2(SHttpModule* http) {
if (http == NULL) return;
httpModuleDestroy(http);
taosMemoryFree(http);
}
@ -669,49 +685,50 @@ int64_t transInitHttpChanImpl() {
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
if (http == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
goto _ERROR;
}
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (http->connStatusTable == NULL) {
httpModuleDestroy2(http);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
goto _ERROR;
}
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (http->loop == NULL) {
httpModuleDestroy2(http);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
goto _ERROR;
}
int err = uv_loop_init(http->loop);
if (err != 0) {
tError("http-report failed init uv, reason:%s", uv_strerror(err));
httpModuleDestroy2(http);
terrno = TSDB_CODE_THIRDPARTY_ERROR;
return terrno;
goto _ERROR;
}
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
if (NULL == http->asyncPool) {
httpModuleDestroy2(http);
return terrno;
if (http->asyncPool == NULL) {
goto _ERROR;
}
http->quit = 0;
err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) {
httpModuleDestroy2(http);
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
goto _ERROR;
}
int64_t ref = taosAddRef(httpRefMgt, http);
if (ref < 0) {
return terrno;
goto _ERROR;
}
return ref;
_ERROR:
httpModuleDestroy2(http);
return terrno;
}
int64_t taosInitHttpChan() {
taosThreadOnce(&transHttpInit, transHttpEnvInit);