diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 4720fb0ddc..b57521e759 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -108,8 +108,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { } void mnodeReleaseConn(SConnObj *pConn) { - if(pConn == NULL) return; - taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); + if (pConn == NULL) return; + taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); } SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 628ff805f4..5d94e8456e 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -188,6 +188,7 @@ typedef struct HttpContext { char user[TSDB_USER_LEN]; // parsed from auth token or login message char pass[TSDB_PASSWORD_LEN]; void * taos; + void * ppContext; HttpSession *session; z_stream gzipStream; HttpParser parser; diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index c0bef8ae9d..9262e3f609 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -105,15 +105,15 @@ HttpContext *httpCreateContext(int32_t fd) { char fdStr[12] = {0}; snprintf(fdStr, sizeof(fdStr), "%d", fd); - //atomic_add_fetch_32(&pContext->refCount, 1); - + pContext->fd = fd; pContext->httpVersion = HTTP_VERSION_10; pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; - taosCachePut(tsHttpServer.contextCache, fdStr, &pContext, sizeof(HttpContext *), 5); - httpTrace("context:%p, fd:%d is created", pContext, fd); + HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, fdStr, &pContext, sizeof(HttpContext *), 5); + pContext->ppContext = ppContext; + httpTrace("context:%p, fd:%d, is created", pContext, fd); return pContext; } @@ -128,7 +128,7 @@ HttpContext *httpGetContext(int32_t fd) { HttpContext *pContext = *ppContext; if (pContext) { int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); - httpTrace("context:%p, fd:%d is accquired, refCount:%d", pContext, pContext->fd, refCount); + httpTrace("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount); return pContext; } } @@ -138,9 +138,10 @@ HttpContext *httpGetContext(int32_t fd) { void httpReleaseContext(HttpContext *pContext) { int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); assert(refCount >= 0); - httpTrace("context:%p, fd:%d is releasd, refCount:%d", pContext, pContext->fd, refCount); + httpTrace("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount); - taosCacheRelease(tsHttpServer.contextCache, (void **)(&pContext), false); + HttpContext **ppContext = pContext->ppContext; + taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); } bool httpInitContext(HttpContext *pContext) { @@ -205,9 +206,6 @@ void httpCloseContextByApp(HttpContext *pContext) { } void httpCloseContextByServer(HttpContext *pContext) { - httpRemoveContextFromEpoll(pContext); - pContext->parsed = false; - if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { @@ -220,5 +218,7 @@ void httpCloseContextByServer(HttpContext *pContext) { httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state); } + pContext->parsed = false; + httpRemoveContextFromEpoll(pContext); httpReleaseContext(pContext); } diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index d06c1fdbc6..06523540b7 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -272,7 +272,7 @@ static void *httpAcceptHttpConnection(void *arg) { taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno)); return NULL; } else { - httpPrint("http service init success at %u", pServer->serverPort); + httpPrint("http server init success at %u", pServer->serverPort); pServer->status = HTTP_SERVER_RUNNING; } @@ -316,12 +316,9 @@ static void *httpAcceptHttpConnection(void *arg) { pContext->pThread = pThread; sprintf(pContext->ipstr, "%s:%u", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); - httpTrace("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, totalFds:%d, accept a new connection", pContext, - connFd, pContext->ipstr, pThread->label, pThread->numOfFds, totalFds); - + struct epoll_event event; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; - event.data.fd = connFd; if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, @@ -333,6 +330,8 @@ static void *httpAcceptHttpConnection(void *arg) { // notify the data process, add into the FdObj list atomic_add_fetch_32(&pThread->numOfFds, 1); + httpTrace("context:%p, fd:%d, ip:%s, thread:%s numOfFds:%d totalFds:%d, accept a new connection", pContext, connFd, + pContext->ipstr, pThread->label, pThread->numOfFds, totalFds); // pick up next thread for next connection threadId++; diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index 62ec101353..af8a3e6002 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -58,7 +58,7 @@ static void httpFetchSessionImp(HttpContext *pContext) { pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId); if (pContext->session != NULL) { - atomic_add_fetch_32(&pContext->refCount, 1); + atomic_add_fetch_32(&pContext->session->refCount, 1); httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, refCount:%d", pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); } else { @@ -83,7 +83,7 @@ void httpGetSession(HttpContext *pContext) { void httpReleaseSession(HttpContext *pContext) { if (pContext == NULL || pContext->session == NULL) return; - int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); + int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1); assert(refCount >= 0); httpTrace("context:%p, session:%p is releasd refCount:%d", pContext, pContext->session, pContext->session->refCount); diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index e6c8d95c88..6e21baca04 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -68,7 +68,7 @@ int httpStartSystem() { } if (!httpInitContexts()) { - httpError("http init session failed"); + httpError("http init contexts failed"); return -1; }