diff --git a/src/plugins/http/inc/httpContext.h b/src/plugins/http/inc/httpContext.h index 594900d0cf..a2d50d6b7f 100644 --- a/src/plugins/http/inc/httpContext.h +++ b/src/plugins/http/inc/httpContext.h @@ -31,7 +31,4 @@ void httpCloseContextByApp(HttpContext *pContext); void httpNotifyContextClose(HttpContext *pContext); bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState); -void ehttpIncContextRef(HttpContext *pContext); -void ehttpDecContextRef(HttpContext **ppContext); - #endif diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 40f980f101..044b5cc4cc 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -212,8 +212,6 @@ typedef struct HttpContext { void * timer; HttpEncodeMethod * encodeMethod; struct HttpThread *pThread; - - int closed:2; } HttpContext; typedef struct HttpThread { @@ -244,8 +242,6 @@ typedef struct HttpServer { pthread_mutex_t serverMutex; HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE]; bool (*processData)(HttpContext *pContext); - - int fallback:2; } HttpServer; extern const char *httpKeepAliveStr[]; diff --git a/src/plugins/http/src/gcHandle.c b/src/plugins/http/src/gcHandle.c index 72b73b4bad..4aed6eb5cc 100644 --- a/src/plugins/http/src/gcHandle.c +++ b/src/plugins/http/src/gcHandle.c @@ -67,8 +67,7 @@ bool gcGetPassFromUrl(HttpContext* pContext) { } bool gcProcessLoginRequest(HttpContext* pContext) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process grafana login msg", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, process grafana login msg", pContext, pContext->fd, pContext->user); pContext->reqType = HTTP_REQTYPE_LOGIN; return true; } @@ -143,7 +142,7 @@ bool gcProcessLoginRequest(HttpContext* pContext) { //}] bool gcProcessQueryRequest(HttpContext* pContext) { - httpDebug("context:%p, fd:%d, ip:%s, process grafana query msg", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, process grafana query msg", pContext, pContext->fd); HttpParser* pParser = &pContext->parser; char* filter = pParser->data.pos; @@ -183,15 +182,13 @@ bool gcProcessQueryRequest(HttpContext* pContext) { cJSON* refId = cJSON_GetObjectItem(query, "refId"); if (refId == NULL || refId->valuestring == NULL || strlen(refId->valuestring) == 0) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, refId is null", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, refId is null", pContext, pContext->fd, pContext->user); continue; } int refIdBuffer = httpAddToSqlCmdBuffer(pContext, refId->valuestring); if (refIdBuffer == -1) { - httpWarn("context:%p, fd:%d, ip:%s, user:%s, refId buffer is full", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user); break; } @@ -200,8 +197,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) { if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) { aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring); if (aliasBuffer == -1) { - httpWarn("context:%p, fd:%d, ip:%s, user:%s, alias buffer is full", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user); break; } } @@ -211,15 +207,13 @@ bool gcProcessQueryRequest(HttpContext* pContext) { cJSON* sql = cJSON_GetObjectItem(query, "sql"); if (sql == NULL || sql->valuestring == NULL || strlen(sql->valuestring) == 0) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, sql is null", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, sql is null", pContext, pContext->fd, pContext->user); continue; } int sqlBuffer = httpAddToSqlCmdBuffer(pContext, sql->valuestring); if (sqlBuffer == -1) { - httpWarn("context:%p, fd:%d, ip:%s, user:%s, sql buffer is full", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user); break; } @@ -237,8 +231,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) { cmd->timestamp = httpAddToSqlCmdBufferWithSize(pContext, HTTP_GC_TARGET_SIZE + 1); // hack way if (cmd->timestamp == -1) { - httpWarn("context:%p, fd:%d, ip:%s, user:%s, cant't malloc target size, sql buffer is full", - pContext, pContext->fd, pContext->ipstr, pContext->user); + httpWarn("context:%p, fd:%d, user:%s, cant't malloc target size, sql buffer is full", pContext, pContext->fd, + pContext->user); break; } } @@ -251,7 +245,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) { } bool gcProcessHeartbeatRequest(HttpContext* pContext) { - httpDebug("context:%p, fd:%d, ip:%s, process grafana heartbeat msg", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, process grafana heartbeat msg", pContext, pContext->fd); pContext->reqType = HTTP_REQTYPE_HEARTBEAT; pContext->encodeMethod = &gcHeartBeatMethod; return true; diff --git a/src/plugins/http/src/httpAuth.c b/src/plugins/http/src/httpAuth.c index ea7024fad6..dd4d14c709 100644 --- a/src/plugins/http/src/httpAuth.c +++ b/src/plugins/http/src/httpAuth.c @@ -28,23 +28,21 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { int outlen = 0; char *base64 = (char *)base64_decode(token, len, &outlen); if (base64 == NULL || outlen == 0) { - httpError("context:%p, fd:%d, ip:%s, basic token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token); + httpError("context:%p, fd:%d, basic token:%s parsed error", pContext, pContext->fd, token); free(base64); return false; } char *user = strstr(base64, ":"); if (user == NULL) { - httpError("context:%p, fd:%d, ip:%s, basic token:%s invalid format", pContext, pContext->fd, pContext->ipstr, - token); + httpError("context:%p, fd:%d, basic token:%s invalid format", pContext, pContext->fd, token); free(base64); return false; } int user_len = (int)(user - base64); if (user_len < 1 || user_len >= TSDB_USER_LEN) { - httpError("context:%p, fd:%d, ip:%s, basic token:%s parse user error", pContext, pContext->fd, pContext->ipstr, - token); + httpError("context:%p, fd:%d, basic token:%s parse user error", pContext, pContext->fd, token); free(base64); return false; } @@ -54,8 +52,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { char *password = user + 1; int pass_len = (int)((base64 + outlen) - password); if (pass_len < 1 || pass_len >= TSDB_PASSWORD_LEN) { - httpError("context:%p, fd:%d, ip:%s, basic token:%s parse password error", pContext, pContext->fd, pContext->ipstr, - token); + httpError("context:%p, fd:%d, basic token:%s parse password error", pContext, pContext->fd, token); free(base64); return false; } @@ -63,8 +60,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { pContext->pass[pass_len] = 0; free(base64); - httpDebug("context:%p, fd:%d, ip:%s, basic token parsed success, user:%s", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, basic token parsed success, user:%s", pContext, pContext->fd, pContext->user); return true; } @@ -73,28 +69,27 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) { int outlen = 0; unsigned char *base64 = base64_decode(token, len, &outlen); if (base64 == NULL || outlen == 0) { - httpError("context:%p, fd:%d, ip:%s, taosd token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token); + httpError("context:%p, fd:%d, taosd token:%s parsed error", pContext, pContext->fd, token); if (base64) free(base64); return false; } if (outlen != (TSDB_USER_LEN + TSDB_PASSWORD_LEN)) { - httpError("context:%p, fd:%d, ip:%s, taosd token:%s length error", pContext, pContext->fd, pContext->ipstr, token); + httpError("context:%p, fd:%d, taosd token:%s length error", pContext, pContext->fd, token); free(base64); return false; } char *descrypt = taosDesDecode(KEY_DES_4, (char *)base64, outlen); if (descrypt == NULL) { - httpError("context:%p, fd:%d, ip:%s, taosd token:%s descrypt error", pContext, pContext->fd, pContext->ipstr, - token); + httpError("context:%p, fd:%d, taosd token:%s descrypt error", pContext, pContext->fd, token); free(base64); return false; } else { tstrncpy(pContext->user, descrypt, sizeof(pContext->user)); tstrncpy(pContext->pass, descrypt + TSDB_USER_LEN, sizeof(pContext->pass)); - httpDebug("context:%p, fd:%d, ip:%s, taosd token:%s parsed success, user:%s", pContext, pContext->fd, - pContext->ipstr, token, pContext->user); + httpDebug("context:%p, fd:%d, taosd token:%s parsed success, user:%s", pContext, pContext->fd, token, + pContext->user); free(base64); free(descrypt); return true; @@ -116,7 +111,7 @@ bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen) { free(encrypt); free(base64); - httpDebug("context:%p, fd:%d, ip:%s, gen taosd token:%s", pContext, pContext->fd, pContext->ipstr, token); + httpDebug("context:%p, fd:%d, gen taosd token:%s", pContext, pContext->fd, token); return true; } diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 16d8e91899..59c81f5960 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -37,16 +37,14 @@ extern bool httpParseHttpVersion(HttpContext* pContext); extern bool httpGetDecodeMethod(HttpContext* pContext); extern bool httpParseHead(HttpContext* pContext); -static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw); -static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase); -static void on_header_field(void *arg, const char *key, const char *val); -static void on_body(void *arg, const char *chunk, size_t len); -static void on_end(void *arg); -static void on_error(void *arg, int status_code); +static void httpParseOnRequestLine(void *arg, const char *method, const char *target, const char *version, const char *target_raw); +static void httpParseOnStatusLine(void *arg, const char *version, int status_code, const char *reason_phrase); +static void httpParseOnHeaderField(void *arg, const char *key, const char *val); +static void httpParseOnBody(void *arg, const char *chunk, size_t len); +static void httpParseOnEnd(void *arg); +static void httpParseOnError(void *arg, int status_code); static void httpDestroyContext(void *data); -static void httpMightDestroyContext(void *data); -static void ehttpReleaseContext(HttpContext *pContext); static void httpRemoveContextFromEpoll(HttpContext *pContext) { HttpThread *pThread = pContext->pThread; @@ -54,15 +52,11 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); int32_t fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1); taosCloseSocket(fd); - if (!tsHttpServer.fallback) { - ehttpDecContextRef(&pContext); - } } } static void httpDestroyContext(void *data) { HttpContext *pContext = *(HttpContext **)data; - D("==context[%p] destroyed==", pContext); if (pContext->fd > 0) taosClose(pContext->fd); HttpThread *pThread = pContext->pThread; @@ -79,18 +73,16 @@ static void httpDestroyContext(void *data) { httpFreeJsonBuf(pContext); httpFreeMultiCmds(pContext); - if (!tsHttpServer.fallback) { - if (pContext->parser.parser) { - ehttp_parser_destroy(pContext->parser.parser); - pContext->parser.parser = NULL; - } + if (pContext->parser.parser) { + ehttp_parser_destroy(pContext->parser.parser); + pContext->parser.parser = NULL; } taosTFree(pContext); } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpMightDestroyContext, "restc"); + tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc"); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; @@ -135,20 +127,16 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *pContext = calloc(1, sizeof(HttpContext)); if (pContext == NULL) return NULL; - D("==context[%p] created==", pContext); - pContext->fd = fd; pContext->httpVersion = HTTP_VERSION_10; pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; - ehttpIncContextRef(pContext); uint64_t handleVal = (uint64_t)pContext; HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(int64_t), &pContext, sizeof(int64_t), 3000); pContext->ppContext = ppContext; httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); - ehttpIncContextRef(pContext); // set the ref to 0 taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); @@ -164,7 +152,6 @@ HttpContext *httpGetContext(void *ptr) { if (ppContext) { HttpContext *pContext = *ppContext; if (pContext) { - if (!tsHttpServer.fallback) return pContext; int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount); return pContext; @@ -174,10 +161,6 @@ HttpContext *httpGetContext(void *ptr) { } void httpReleaseContext(HttpContext *pContext) { - if (!tsHttpServer.fallback) { - ehttpReleaseContext(pContext); - return; - } int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); if (refCount < 0) { httpError("context:%p, is already released, refCount:%d", pContext, refCount); @@ -212,31 +195,25 @@ bool httpInitContext(HttpContext *pContext) { memset(pParser, 0, sizeof(HttpParser)); pParser->pCur = pParser->pLast = pParser->buffer; - if (!tsHttpServer.fallback) { - ehttp_parser_callbacks_t callbacks = { - on_request_line, - on_status_line, - on_header_field, - on_body, - on_end, - on_error - }; - ehttp_parser_conf_t conf = { - .flush_block_size = 0 - }; - pParser->parser = ehttp_parser_create(callbacks, conf, pContext); - pParser->inited = 1; - } + ehttp_parser_callbacks_t callbacks = { + httpParseOnRequestLine, + httpParseOnStatusLine, + httpParseOnHeaderField, + httpParseOnBody, + httpParseOnEnd, + httpParseOnError + }; + ehttp_parser_conf_t conf = { + .flush_block_size = 0 + }; + pParser->parser = ehttp_parser_create(callbacks, conf, pContext); + pParser->inited = 1; - httpDebug("context:%p, fd:%d, ip:%s, accessTimes:%d, parsed:%d", pContext, pContext->fd, pContext->ipstr, - pContext->accessTimes, pContext->parsed); + httpDebug("context:%p, fd:%d, parsed:%d", pContext, pContext->fd, pContext->parsed); return true; } void httpCloseContextByApp(HttpContext *pContext) { - if (!tsHttpServer.fallback) { - if (pContext->parsed == false) return; - } pContext->parsed = false; bool keepAlive = true; @@ -249,150 +226,132 @@ void httpCloseContextByApp(HttpContext *pContext) { if (keepAlive) { if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) { - httpDebug("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd, - pContext->ipstr); + httpDebug("context:%p, fd:%d, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) { httpRemoveContextFromEpoll(pContext); - httpDebug("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect", pContext, pContext->fd, - pContext->ipstr); + httpDebug("context:%p, fd:%d, ast state:dropping, keepAlive:true, close connect", pContext, pContext->fd); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { - httpDebug("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse context", pContext, pContext->fd, - pContext->ipstr); + httpDebug("context:%p, fd:%d, last state:ready, keepAlive:true, reuse context", pContext, pContext->fd); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { httpRemoveContextFromEpoll(pContext); - httpDebug("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect", pContext, pContext->fd, - pContext->ipstr); + httpDebug("context:%p, fd:%d, last state:ready, keepAlive:true, close connect", pContext, pContext->fd); } else { httpRemoveContextFromEpoll(pContext); - httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect", pContext, pContext->fd, - pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); + httpError("context:%p, fd:%d, last state:%s:%d, keepAlive:true, close connect", pContext, pContext->fd, + httpContextStateStr(pContext->state), pContext->state); } } else { httpRemoveContextFromEpoll(pContext); - httpDebug("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close context", pContext, pContext->fd, - pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); + httpDebug("context:%p, fd:%d, ilast state:%s:%d, keepAlive:false, close context", pContext, pContext->fd, + httpContextStateStr(pContext->state), pContext->state); } - if (tsHttpServer.fallback) httpReleaseContext(pContext); + httpReleaseContext(pContext); } void httpCloseContextByServer(HttpContext *pContext) { - if (!tsHttpServer.fallback) { - if (pContext->closed) return; - pContext->closed = 1; - } if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { - httpDebug("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, epoll finished, still used by app", pContext, pContext->fd); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { - httpDebug("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, epoll already finished, wait app finished", pContext, pContext->fd); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) { - httpDebug("context:%p, fd:%d, ip:%s, epoll finished, close connect", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, epoll finished, close connect", pContext, pContext->fd); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { - httpDebug("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, epoll finished, will be closed soon", pContext, pContext->fd); } else { - httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state); + httpError("context:%p, fd:%d, unknown state:%d", pContext, pContext->fd, pContext->state); } pContext->parsed = false; httpRemoveContextFromEpoll(pContext); - if (tsHttpServer.fallback) httpReleaseContext(pContext); } - - - - -static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw) { +static void httpParseOnRequestLine(void *arg, const char *method, const char *target, const char *version, const char *target_raw) { HttpContext *pContext = (HttpContext*)arg; HttpParser *pParser = &pContext->parser; int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); - int n = snprintf(pParser->pLast, avail, - "%s %s %s\r\n", method, target_raw, version); - + int n = snprintf(pParser->pLast, avail, "%s %s %s\r\n", method, target_raw, version); char *last = pParser->pLast; do { - if (n>=avail) { - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), exceeding buffer size", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + if (n >= avail) { + httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), exceeding buffer size", pContext, pContext->fd, method, + target, version, target_raw); break; } pParser->bufsize += n; if (!httpGetHttpMethod(pContext)) { - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http method failed", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), parse http method failed", pContext, pContext->fd, + method, target, version, target_raw); break; } if (!httpParseURL(pContext)) { - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http url failed", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), parse http url failed", pContext, pContext->fd, method, + target, version, target_raw); break; } if (!httpParseHttpVersion(pContext)) { - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http version failed", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), parse http version failed", pContext, pContext->fd, + method, target, version, target_raw); break; } if (!httpGetDecodeMethod(pContext)) { - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), get decode method failed", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), get decode method failed", pContext, pContext->fd, + method, target, version, target_raw); break; } - last += n; - pParser->pLast = last; + last += n; + pParser->pLast = last; return; } while (0); pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; } -static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase) { +static void httpParseOnStatusLine(void *arg, const char *version, int status_code, const char *reason_phrase) { HttpContext *pContext = (HttpContext*)arg; HttpParser *pParser = &pContext->parser; + httpDebug("context:%p, fd:%d, failed to parse status line ", pContext, pContext->fd); pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; } -static void on_header_field(void *arg, const char *key, const char *val) { +static void httpParseOnHeaderField(void *arg, const char *key, const char *val) { HttpContext *pContext = (HttpContext*)arg; HttpParser *pParser = &pContext->parser; if (pParser->failed) return; - D("==key:[%s], val:[%s]==", key, val); - int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); - int n = snprintf(pParser->pLast, avail, - "%s: %s\r\n", key, val); - + httpDebug("context:%p, fd:%d, key:%s val:%s", pContext, pContext->fd, key, val); + int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); + int n = snprintf(pParser->pLast, avail, "%s: %s\r\n", key, val); char *last = pParser->pLast; do { - if (n>=avail) { - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), exceeding buffer size", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val); + if (n >= avail) { + httpDebug("context:%p, fd:%d, header field(%s,%s), exceeding buffer size", pContext, pContext->fd, key, val); break; } pParser->bufsize += n; - pParser->pCur = pParser->pLast + n; + pParser->pCur = pParser->pLast + n; if (!httpParseHead(pContext)) { - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), parse head failed", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val); + httpDebug("context:%p, fd:%d, header field(%s,%s), parse failed", pContext, pContext->fd, key, val); break; } - last += n; - pParser->pLast = last; + last += n; + pParser->pLast = last; return; } while (0); pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; } -static void on_body(void *arg, const char *chunk, size_t len) { +static void httpParseOnBody(void *arg, const char *chunk, size_t len) { HttpContext *pContext = (HttpContext*)arg; HttpParser *pParser = &pContext->parser; @@ -404,16 +363,18 @@ static void on_body(void *arg, const char *chunk, size_t len) { } int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); - if (len+1>=avail) { + if (len + 1 >= avail) { + httpError("context:%p, fd:%d, failed parse body, exceeding buffer size", pContext, pContext->fd); pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; return; } + memcpy(pParser->pLast, chunk, len); pParser->pLast += len; pParser->data.len += len; } -static void on_end(void *arg) { +static void httpParseOnEnd(void *arg) { HttpContext *pContext = (HttpContext*)arg; HttpParser *pParser = &pContext->parser; @@ -424,47 +385,14 @@ static void on_end(void *arg) { if (!pContext->parsed) { pContext->parsed = true; } + + httpDebug("context:%p, fd:%d, parse success", pContext, pContext->fd); } -static void on_error(void *arg, int status_code) { - HttpContext *pContext = (HttpContext*)arg; - HttpParser *pParser = &pContext->parser; +static void httpParseOnError(void *arg, int status_code) { + HttpContext *pContext = (HttpContext *)arg; + HttpParser * pParser = &pContext->parser; + httpError("context:%p, fd:%d, failed to parse, status_code:%d", pContext, pContext->fd, status_code); pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED; } - -static void httpMightDestroyContext(void *data) { - HttpContext *pContext = *(HttpContext **)data; - if (!tsHttpServer.fallback) { - httpRemoveContextFromEpoll(pContext); - ehttpDecContextRef(&pContext); - return; - } - httpDestroyContext(data); -} - -static void ehttpReleaseContext(HttpContext *pContext) { - HttpContext **ppContext = pContext->ppContext; - - if (tsHttpServer.contextCache != NULL) { - taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); - } else { - httpDebug("context:%p, won't be destroyed for cache is already released", pContext); - // httpDestroyContext((void **)(&ppContext)); - } -} - -void ehttpIncContextRef(HttpContext *pContext) { - if (tsHttpServer.fallback) return; - atomic_add_fetch_32(&pContext->refCount, 1); -} - -void ehttpDecContextRef(HttpContext **ppContext) { - if (tsHttpServer.fallback) return; - HttpContext *pContext = *ppContext; - int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); - if (refCount>0) return; - EQ_ASSERT(refCount==0); - httpDestroyContext(ppContext); -} - diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c index 407d19b307..59b3268392 100644 --- a/src/plugins/http/src/httpHandle.c +++ b/src/plugins/http/src/httpHandle.c @@ -103,15 +103,13 @@ bool httpParseHttpVersion(HttpContext* pContext) { HttpParser* pParser = &pContext->parser; char* pEnd = strchr(pParser->pLast, '1'); if (pEnd == NULL) { - httpError("context:%p, fd:%d, ip:%s, can't find http version at position:%s", pContext, pContext->fd, - pContext->ipstr, pParser->pLast); + httpError("context:%p, fd:%d, can't find http version at position:%s", pContext, pContext->fd, pParser->pLast); httpSendErrorResp(pContext, HTTP_PARSE_HTTP_VERSION_ERROR); return false; } if (*(pEnd + 1) != '.') { - httpError("context:%p, fd:%d, ip:%s, can't find http version at position:%s", pContext, pContext->fd, - pContext->ipstr, pParser->pLast); + httpError("context:%p, fd:%d, can't find http version at position:%s", pContext, pContext->fd, pParser->pLast); httpSendErrorResp(pContext, HTTP_PARSE_HTTP_VERSION_ERROR); return false; } @@ -125,8 +123,7 @@ bool httpParseHttpVersion(HttpContext* pContext) { else pContext->httpVersion = HTTP_VERSION_10; - httpDebug("context:%p, fd:%d, ip:%s, httpVersion:1.%d", pContext, pContext->fd, pContext->ipstr, - pContext->httpVersion); + httpDebug("context:%p, fd:%d, httpVersion:1.%d", pContext, pContext->fd, pContext->httpVersion); return true; } @@ -147,18 +144,20 @@ bool httpGetNextLine(HttpContext* pContext) { bool httpGetHttpMethod(HttpContext* pContext) { HttpParser* pParser = &pContext->parser; - char* pSeek = strchr(pParser->pLast, ' '); + if (pSeek == NULL) { + httpError("context:%p, fd:%d, failed to parse httpMethod", pContext, pContext->fd); httpSendErrorResp(pContext, HTTP_PARSE_HTTP_METHOD_ERROR); return false; } + pParser->method.pos = pParser->pLast; pParser->method.len = (int16_t)(pSeek - pParser->pLast); pParser->method.pos[pParser->method.len] = 0; pParser->pLast = pSeek + 1; - httpTrace("context:%p, fd:%d, ip:%s, httpMethod:%s", pContext, pContext->fd, pContext->ipstr, pParser->method.pos); + httpTrace("context:%p, fd:%d, httpMethod:%s", pContext, pContext->fd, pParser->method.pos); return true; } @@ -176,8 +175,8 @@ bool httpGetDecodeMethod(HttpContext* pContext) { return true; } - httpError("context:%p, fd:%d, ip:%s, error:the url is not support, method:%s, path:%s", - pContext, pContext->fd, pContext->ipstr, pParser->method.pos, pParser->path[0].pos); + httpError("context:%p, fd:%d, error:the url is not support, method:%s, path:%s", + pContext, pContext->fd, pParser->method.pos, pParser->path[0].pos); httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL); return false; @@ -187,23 +186,23 @@ bool httpParseHead(HttpContext* pContext) { HttpParser* pParser = &pContext->parser; if (strncasecmp(pParser->pLast, "Content-Length: ", 16) == 0) { pParser->data.len = (int32_t)atoi(pParser->pLast + 16); - httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr, + httpTrace("context:%p, fd:%d, Content-Length:%d", pContext, pContext->fd, pParser->data.len); } else if (strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) { if (tsHttpEnableCompress && strstr(pParser->pLast + 17, "gzip") != NULL) { pContext->acceptEncoding = HTTP_COMPRESS_GZIP; - httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, Accept-Encoding:gzip", pContext, pContext->fd); } else { pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY; - httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, Accept-Encoding:identity", pContext, pContext->fd); } } else if (strncasecmp(pParser->pLast, "Content-Encoding: ", 18) == 0) { if (strstr(pParser->pLast + 18, "gzip") != NULL) { pContext->contentEncoding = HTTP_COMPRESS_GZIP; - httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, Content-Encoding:gzip", pContext, pContext->fd); } else { pContext->contentEncoding = HTTP_COMPRESS_IDENTITY; - httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, Content-Encoding:identity", pContext, pContext->fd); } } else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) { if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) { @@ -211,8 +210,7 @@ bool httpParseHead(HttpContext* pContext) { } else { pContext->httpKeepAlive = HTTP_KEEPALIVE_DISABLE; } - httpTrace("context:%p, fd:%d, ip:%s, keepAlive:%d", pContext, pContext->fd, pContext->ipstr, - pContext->httpKeepAlive); + httpTrace("context:%p, fd:%d, keepAlive:%d", pContext, pContext->fd, pContext->httpKeepAlive); } else if (strncasecmp(pParser->pLast, "Transfer-Encoding: ", 19) == 0) { if (strncasecmp(pParser->pLast + 19, "chunked", 7) == 0) { pContext->httpChunked = HTTP_CHUNKED; @@ -244,129 +242,6 @@ bool httpParseHead(HttpContext* pContext) { return true; } -bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test) { - char* pEnd = pParser->buffer + pParser->bufsize; - char* pRet = pParser->data.pos; - char* pSize = pParser->data.pos; - size_t size = strtoul(pSize, NULL, 16); - if (size <= 0) return false; - - while (size > 0) { - char* pData = strstr(pSize, "\r\n"); - if (pData == NULL || pData >= pEnd) return false; - pData += 2; - - pSize = strstr(pData, "\r\n"); - if (pSize == NULL || pSize >= pEnd) return false; - if ((size_t)(pSize - pData) != size) return false; - pSize += 2; - - if (!test) { - memmove(pRet, pData, size); - pRet += size; - } - - size = strtoul(pSize, NULL, 16); - } - - if (!test) { - *pRet = '\0'; - } - - return true; -} - -int httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { - bool parsedOk = httpParseChunkedBody(pContext, pParser, true); - if (parsedOk) { - httpParseChunkedBody(pContext, pParser, false); - return HTTP_CHECK_BODY_SUCCESS; - } else { - httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr); - if (httpReadDataImp(pContext) != HTTP_READ_DATA_SUCCESS) { - httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); - return HTTP_CHECK_BODY_ERROR; - } else { - return HTTP_CHECK_BODY_CONTINUE; - } - } -} - -int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { - int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer); - if (dataReadLen > pParser->data.len) { - httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d", - pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); - return HTTP_CHECK_BODY_ERROR; - } else if (dataReadLen < pParser->data.len) { - httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read", - pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); - return HTTP_CHECK_BODY_CONTINUE; - } else { - return HTTP_CHECK_BODY_SUCCESS; - } -} - -bool httpParseRequest(HttpContext* pContext) { - HttpParser *pParser = &pContext->parser; - if (pContext->parsed) { - return true; - } - - httpTraceL("context:%p, fd:%d, ip:%s, thread:%s, numOfContexts:%d, read size:%d, raw data:\n%s", pContext, - pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfContexts, - pContext->parser.bufsize, pContext->parser.buffer); - - if (!httpGetHttpMethod(pContext)) { - return false; - } - - if (!httpParseURL(pContext)) { - return false; - } - - if (!httpParseHttpVersion(pContext)) { - return false; - } - - if (!httpGetDecodeMethod(pContext)) { - return false; - } - - do { - if (!httpGetNextLine(pContext)) { - return false; - } - - // Empty line, end of the HTTP HEAD - if (pParser->pCur - pParser->pLast == 1) { - pParser->data.pos = ++pParser->pCur; - break; - } - - if (!httpParseHead(pContext)) { - return false; - } - - pParser->pLast = ++pParser->pCur; - } while (1); - - httpDebug("context:%p, fd:%d, ip:%s, parse http head ok", pContext, pContext->fd, pContext->ipstr); - - pContext->parsed = true; - return true; -} - -int httpCheckReadCompleted(HttpContext* pContext) { - HttpParser* pParser = &pContext->parser; - - if (pContext->httpChunked == HTTP_UNCUNKED) { - return httpReadUnChunkedBody(pContext, pParser); - } else { - return httpReadChunkedBody(pContext, pParser); - } -} - bool httpDecodeRequest(HttpContext* pContext) { HttpParser* pParser = &pContext->parser; if (pParser->pMethod->decodeFp == NULL) { @@ -380,17 +255,16 @@ bool httpDecodeRequest(HttpContext* pContext) { * Process the request from http pServer */ bool httpProcessData(HttpContext* pContext) { - if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) { - httpDebug("context:%p, fd:%d, ip:%s, state:%s not in ready state, stop process request", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); + httpDebug("context:%p, fd:%d, state:%s not in ready state, stop process request", pContext, pContext->fd, + httpContextStateStr(pContext->state)); httpCloseContextByApp(pContext); return false; } // handle Cross-domain request if (strcmp(pContext->parser.method.pos, "OPTIONS") == 0) { - httpDebug("context:%p, fd:%d, ip:%s, process options request", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, process options request", pContext, pContext->fd); httpSendOptionResp(pContext, "process options request success"); } else { if (!httpDecodeRequest(pContext)) { diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index 4748f03b66..e200efbcef 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -52,14 +52,12 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) { } if (len < 0) { - httpDebug("context:%p, fd:%d, ip:%s, socket write errno:%d, times:%d", - pContext, pContext->fd, pContext->ipstr, errno, countWait); + httpDebug("context:%p, fd:%d, socket write errno:%d, times:%d", pContext, pContext->fd, errno, countWait); if (++countWait > HTTP_WRITE_RETRY_TIMES) break; taosMsleep(HTTP_WRITE_WAIT_TIME_MS); continue; } else if (len == 0) { - httpDebug("context:%p, fd:%d, ip:%s, socket write errno:%d, connect already closed", - pContext, pContext->fd, pContext->ipstr, errno); + httpDebug("context:%p, fd:%d, socket write errno:%d, connect already closed", pContext, pContext->fd, errno); break; } else { countWait = 0; @@ -70,14 +68,13 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) { return writeLen; } -int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) { +int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz) { int writeSz = httpWriteBufByFd(pContext, buf, sz); if (writeSz != sz) { - httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response:\n%s", - pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); + httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response:\n%s", pContext, pContext->fd, sz, + writeSz, buf); } else { - httpTrace("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd, - pContext->ipstr, sz, writeSz, buf); + httpTrace("context:%p, fd:%d, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd, sz, writeSz, buf); } return writeSz; @@ -86,8 +83,8 @@ int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) { int httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int sz) { int writeSz = httpWriteBufByFd(pContext, buf, sz); if (writeSz != sz) { - httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response", - pContext, pContext->fd, pContext->ipstr, sz, writeSz); + httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response", pContext, pContext->fd, sz, + writeSz); } return writeSz; @@ -99,7 +96,7 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { uint64_t srcLen = (uint64_t) (buf->lst - buf->buf); if (buf->pContext->fd <= 0) { - httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd); buf->pContext->fd = -1; } @@ -113,12 +110,12 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) { if (buf->lst == buf->buf) { - httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd); return 0; // there is no data to dump. } else { int len = sprintf(sLen, "%lx\r\n", srcLen); - httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", response:\n%s", - buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, buf->buf); + httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", response:\n%s", buf->pContext, buf->pContext->fd, + srcLen, buf->buf); httpWriteBufNoTrace(buf->pContext, sLen, len); remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int) srcLen); } @@ -129,18 +126,18 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { if (ret == 0) { if (compressBufLen > 0) { int len = sprintf(sLen, "%x\r\n", compressBufLen); - httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s", - buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, compressBufLen, isTheLast, buf->buf); + httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s", + buf->pContext, buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf); httpWriteBufNoTrace(buf->pContext, sLen, len); - remain = httpWriteBufNoTrace(buf->pContext, (const char *) compressBuf, (int) compressBufLen); + remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, (int)compressBufLen); } else { - httpTrace("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s", - buf->pContext, buf->pContext->fd, buf->pContext->ipstr, isTheLast, buf->buf); + httpTrace("context:%p, fd:%d, last:%d, compress already dumped, response:\n%s", buf->pContext, + buf->pContext->fd, isTheLast, buf->buf); return 0; // there is no data to dump. } } else { - httpError("context:%p, fd:%d, ip:%s, failed to compress data, chunkSize:%" PRIu64 ", last:%d, error:%d, response:\n%s", - buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, isTheLast, ret, buf->buf); + httpError("context:%p, fd:%d, failed to compress data, chunkSize:%" PRIu64 ", last:%d, error:%d, response:\n%s", + buf->pContext, buf->pContext->fd, srcLen, isTheLast, ret, buf->buf); return 0; } } @@ -173,7 +170,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) { void httpWriteJsonBufEnd(JsonBuf* buf) { if (buf->pContext->fd <= 0) { - httpTrace("context:%p, fd:%d, ip:%s, json buf fd is 0", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + httpTrace("context:%p, fd:%d, json buf fd is 0", buf->pContext, buf->pContext->fd); buf->pContext->fd = -1; } @@ -192,7 +189,7 @@ void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) { httpGzipCompressInit(buf->pContext); } - httpDebug("context:%p, fd:%d, ip:%s, json buffer initialized", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + httpDebug("context:%p, fd:%d, json buffer initialized", buf->pContext, buf->pContext->fd); } void httpJsonItemToken(JsonBuf* buf) { diff --git a/src/plugins/http/src/httpResp.c b/src/plugins/http/src/httpResp.c index f53aff7831..a7c17dfdbb 100644 --- a/src/plugins/http/src/httpResp.c +++ b/src/plugins/http/src/httpResp.c @@ -46,7 +46,7 @@ const char *httpRespTemplate[] = { }; static void httpSendErrorRespImp(HttpContext *pContext, int httpCode, char *httpCodeStr, int errNo, char *desc) { - httpError("context:%p, fd:%d, ip:%s, code:%d, error:%s", pContext, pContext->fd, pContext->ipstr, httpCode, desc); + httpError("context:%p, fd:%d, code:%d, error:%s", pContext, pContext->fd, httpCode, desc); char head[512] = {0}; char body[512] = {0}; @@ -169,7 +169,7 @@ void httpSendErrorRespWithDesc(HttpContext *pContext, int errNo, char *desc) { httpCodeStr = "Bad Request"; break; default: - httpError("context:%p, fd:%d, ip:%s, error:%d not recognized", pContext, pContext->fd, pContext->ipstr, errNo); + httpError("context:%p, fd:%d, error:%d not recognized", pContext, pContext->fd, errNo); break; } diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 6802d3624a..614cc92700 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -31,7 +31,7 @@ #define EPOLLWAKEUP (1u << 29) #endif -static bool ehttpReadData(HttpContext *pContext); +static bool httpReadData(HttpContext *pContext); static void httpStopThread(HttpThread* pThread) { pThread->stop = true; @@ -73,43 +73,9 @@ void httpCleanUpConnect() { httpDebug("http server:%s is cleaned up", pServer->label); } -int httpReadDataImp(HttpContext *pContext) { - HttpParser *pParser = &pContext->parser; - - while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { - int nread = (int)taosReadSocket(pContext->fd, pParser->buffer + pParser->bufsize, HTTP_STEP_SIZE); - if (nread >= 0 && nread < HTTP_STEP_SIZE) { - pParser->bufsize += nread; - break; - } else if (nread < 0) { - if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { - httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event", - pContext, pContext->fd, pContext->ipstr, errno); - break; - } else { - httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", - pContext, pContext->fd, pContext->ipstr, errno); - return HTTP_READ_DATA_FAILED; - } - } else { - pParser->bufsize += nread; - } - - if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { - httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE); - return HTTP_REQUSET_TOO_BIG; - } - } - - pParser->buffer[pParser->bufsize] = 0; - - return HTTP_READ_DATA_SUCCESS; -} - static bool httpDecompressData(HttpContext *pContext) { if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) { - httpTraceL("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); + httpTraceL("context:%p, fd:%d, content:%s", pContext, pContext->fd, pContext->parser.data.pos); return true; } @@ -125,64 +91,18 @@ static bool httpDecompressData(HttpContext *pContext) { if (ret == 0) { memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen); pContext->parser.data.pos[decompressBufLen] = 0; - httpTraceL("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd, - pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf); + httpTraceL("context:%p, fd:%d, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd, + pContext->parser.data.len, decompressBufLen, decompressBuf); pContext->parser.data.len = decompressBufLen; } else { - httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d", - pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, ret); + httpError("context:%p, fd:%d, failed to decompress data, rawSize:%d, error:%d", pContext, pContext->fd, + pContext->parser.data.len, ret); } free(decompressBuf); return ret == 0; } -static bool httpReadData(HttpContext *pContext) { - if (!tsHttpServer.fallback) return ehttpReadData(pContext); - - if (!pContext->parsed) { - httpInitContext(pContext); - } - - int32_t code = httpReadDataImp(pContext); - if (code != HTTP_READ_DATA_SUCCESS) { - if (code == HTTP_READ_DATA_FAILED) { - httpReleaseContext(pContext); - } else { - httpSendErrorResp(pContext, code); - httpNotifyContextClose(pContext); - } - return false; - } - - if (!httpParseRequest(pContext)) { - httpNotifyContextClose(pContext); - return false; - } - - int ret = httpCheckReadCompleted(pContext); - if (ret == HTTP_CHECK_BODY_CONTINUE) { - //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); - httpReleaseContext(pContext); - return false; - } else if (ret == HTTP_CHECK_BODY_SUCCESS){ - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len); - if (httpDecompressData(pContext)) { - return true; - } else { - httpNotifyContextClose(pContext); - httpReleaseContext(pContext); - return false; - } - } else { - httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); - httpNotifyContextClose(pContext); - httpReleaseContext(pContext); - return false; - } -} - static void httpProcessHttpData(void *param) { HttpServer *pServer = &tsHttpServer; HttpThread *pThread = (HttpThread *)param; @@ -194,8 +114,6 @@ static void httpProcessHttpData(void *param) { sigaddset(&set, SIGPIPE); pthread_sigmask(SIG_SETMASK, &set, NULL); - elog_set_thread_name("httpProcessHttpData"); - while (1) { struct epoll_event events[HTTP_MAX_EVENTS]; //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 @@ -215,66 +133,51 @@ static void httpProcessHttpData(void *param) { continue; } - ehttpIncContextRef(pContext); - if (events[i].events & EPOLLPRI) { - httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); + httpDebug("context:%p, fd:%d, state:%s, EPOLLPRI events occured, accessed:%d, close connect", pContext, + pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); - if (!tsHttpServer.fallback) httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); continue; } if (events[i].events & EPOLLRDHUP) { - httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); + httpDebug("context:%p, fd:%d, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", pContext, + pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); - httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); continue; } if (events[i].events & EPOLLERR) { - httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); + httpDebug("context:%p, fd:%d, state:%s, EPOLLERR events occured, accessed:%d, close connect", pContext, + pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); - if (!tsHttpServer.fallback) httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); continue; } if (events[i].events & EPOLLHUP) { - httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); + httpDebug("context:%p, fd:%d, state:%s, EPOLLHUP events occured, accessed:%d, close connect", pContext, + pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); - if (!tsHttpServer.fallback) httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); continue; } if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { - httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); + httpDebug("context:%p, fd:%d, state:%s, not in ready state, ignore read events", pContext, pContext->fd, + httpContextStateStr(pContext->state)); httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); continue; } if (pServer->status != HTTP_SERVER_RUNNING) { - httpDebug("context:%p, fd:%d, ip:%s, state:%s, server is not running, accessed:%d, close connect", pContext, - pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); + httpDebug("context:%p, fd:%d, state:%s, server is not running, accessed:%d, close connect", pContext, + pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes); httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE); httpNotifyContextClose(pContext); - if (!tsHttpServer.fallback) httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); } else { if (httpReadData(pContext)) { (*(pThread->processData))(pContext); atomic_fetch_add_32(&pServer->requestNum, 1); } - if (!tsHttpServer.fallback) httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); } } } @@ -355,8 +258,7 @@ static void *httpAcceptHttpConnection(void *arg) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, pContext->ipstr, pThread->label, strerror(errno)); taosClose(pContext->fd); - if (tsHttpServer.fallback) httpReleaseContext(pContext); - ehttpDecContextRef(&pContext); + httpReleaseContext(pContext); continue; } @@ -430,12 +332,10 @@ bool httpInitConnect() { return true; } - - - -static bool ehttpReadData(HttpContext *pContext) { +static bool httpReadData(HttpContext *pContext) { HttpParser *pParser = &pContext->parser; EQ_ASSERT(!pContext->parsed); + if (!pParser->parser) { if (!pParser->inited) { httpInitContext(pContext); @@ -448,61 +348,44 @@ static bool ehttpReadData(HttpContext *pContext) { pContext->accessTimes++; pContext->lastAccessTime = taosGetTimestampSec(); - char buf[HTTP_STEP_SIZE+1] = {0}; - int nread = (int)taosReadSocket(pContext->fd, buf, sizeof(buf)); + char buf[HTTP_STEP_SIZE + 1] = {0}; + int nread = (int)taosReadSocket(pContext->fd, buf, sizeof(buf)); if (nread > 0) { buf[nread] = '\0'; - if (strstr(buf, "GET ")==buf && !strchr(buf, '\r') && !strchr(buf, '\n')) { - D("==half of request line received:\n%s\n==", buf); - } - if (ehttp_parser_parse(pParser->parser, buf, nread)) { - D("==parsing failed=="); - httpCloseContextByServer(pContext); + httpError("context:%p, fd:%d, init parse failed, close connect", pContext, pContext->fd); + httpNotifyContextClose(pContext); return false; } if (pContext->parser.failed) { - D("==parsing failed: [0x%x]==", pContext->parser.failed); + httpError("context:%p, fd:%d, parse failed, close connect", pContext, pContext->fd); httpNotifyContextClose(pContext); return false; } + if (pContext->parsed) { - // int ret = httpCheckReadCompleted(pContext); - // already done in ehttp_parser - int ret = HTTP_CHECK_BODY_SUCCESS; - if (ret == HTTP_CHECK_BODY_CONTINUE) { - //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); - return false; - } else if (ret == HTTP_CHECK_BODY_SUCCESS){ - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len); - if (httpDecompressData(pContext)) { - return true; - } else { - httpNotifyContextClose(pContext); - return false; - } + httpDebug("context:%p, fd:%d, read size:%d, dataLen:%d", pContext, pContext->fd, pContext->parser.bufsize, + pContext->parser.data.len); + if (httpDecompressData(pContext)) { + return true; } else { - httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); httpNotifyContextClose(pContext); return false; } } + return pContext->parsed; } else if (nread < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { - httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event", - pContext, pContext->fd, pContext->ipstr, errno); - return false; // later again + httpDebug("context:%p, fd:%d, read from socket error:%d, wait another event", pContext, pContext->fd, errno); + return false; // later again } else { - httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", - pContext, pContext->fd, pContext->ipstr, errno); + httpError("context:%p, fd:%d, read from socket error:%d, close connect", pContext, pContext->fd, errno); return false; } } else { - // eof + httpError("context:%p, fd:%d, nread:%d, wait another event", pContext, pContext->fd, nread); return false; } } - diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index f19679e072..4549192407 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -39,15 +39,15 @@ void httpCreateSession(HttpContext *pContext, void *taos) { // taosCacheRelease(server->sessionCache, (void **)&temp, false); if (pContext->session == NULL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, + httpError("context:%p, fd:%d, user:%s, error:%s", pContext, pContext->fd, pContext->user, httpMsg[HTTP_SESSION_FULL]); taos_close(taos); pthread_mutex_unlock(&server->serverMutex); return; } - httpDebug("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd, - pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); + httpDebug("context:%p, fd:%d, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd, + pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pthread_mutex_unlock(&server->serverMutex); } @@ -61,11 +61,10 @@ static void httpFetchSessionImp(HttpContext *pContext) { pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len); if (pContext->session != NULL) { atomic_add_fetch_32(&pContext->session->refCount, 1); - httpDebug("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd, - pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); + httpDebug("context:%p, fd:%d, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd, + pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); } else { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, session not found", pContext, pContext->fd, pContext->user); } pthread_mutex_unlock(&server->serverMutex); diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 07cdea1380..cbaa0b36d8 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -56,18 +56,18 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n if (isContinue) { // retrieve next batch of rows - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, numOfRows, sql); + httpDebug("context:%p, fd:%d, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s", pContext, + pContext->fd, pContext->user, multiCmds->pos, numOfRows, sql); taos_fetch_rows_a(result, httpProcessMultiSqlRetrieveCallBack, param); } else { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, numOfRows, sql); + httpDebug("context:%p, fd:%d, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", pContext, pContext->fd, + pContext->user, multiCmds->pos, numOfRows, sql); if (numOfRows < 0) { - httpError("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, tstrerror(numOfRows), sql); - } - + httpError("context:%p, fd:%d, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", pContext, pContext->fd, + pContext->user, multiCmds->pos, tstrerror(numOfRows), sql); + } + taos_free_result(result); if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->stopJsonFp) { @@ -94,20 +94,20 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { char * sql = httpGetCmdsString(pContext, singleCmd->sql); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - httpWarn("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, code:%s:inprogress, sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, tstrerror(code), sql); + httpWarn("context:%p, fd:%d, user:%s, process pos:%d, code:%s:inprogress, sql:%s", pContext, pContext->fd, + pContext->user, multiCmds->pos, tstrerror(code), sql); return; } if (code < 0) { if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) { singleCmd->code = code; - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos jump to:%d, last code:%s, last sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos + 1, tstrerror(code), sql); + httpDebug("context:%p, fd:%d, user:%s, process pos jump to:%d, last code:%s, last sql:%s", pContext, pContext->fd, + pContext->user, multiCmds->pos + 1, tstrerror(code), sql); } else { singleCmd->code = code; - httpError("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, error code:%s, sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, tstrerror(code), sql); + httpError("context:%p, fd:%d, user:%s, process pos:%d, error code:%s, sql:%s", pContext, pContext->fd, + pContext->user, multiCmds->pos, tstrerror(code), sql); if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN) { if (encode->startJsonFp) (encode->startJsonFp)(pContext, singleCmd, result); @@ -125,8 +125,8 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { if (isUpdate) { // not select or show commands int affectRows = taos_affected_rows(result); - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, affectRows, sql); + httpDebug("context:%p, fd:%d, user:%s, process pos:%d, affect rows:%d, sql:%s", pContext, pContext->fd, + pContext->user, multiCmds->pos, affectRows, sql); singleCmd->code = 0; @@ -151,8 +151,8 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { taos_free_result(result); httpProcessMultiSql(pContext); } else { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start retrieve, sql:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, sql); + httpDebug("context:%p, fd:%d, user:%s, process pos:%d, start retrieve, sql:%s", pContext, pContext->fd, + pContext->user, multiCmds->pos, sql); if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->startJsonFp) { (encode->startJsonFp)(pContext, singleCmd, result); @@ -170,8 +170,8 @@ void httpProcessMultiSql(HttpContext *pContext) { HttpEncodeMethod *encode = pContext->encodeMethod; if (multiCmds->pos >= multiCmds->size) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, size:%d, stop mulit-querys", - pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, multiCmds->size); + httpDebug("context:%p, fd:%d, user:%s, process pos:%d, size:%d, stop mulit-querys", pContext, pContext->fd, + pContext->user, multiCmds->pos, multiCmds->size); if (encode->cleanJsonFp) { (encode->cleanJsonFp)(pContext); } @@ -182,8 +182,8 @@ void httpProcessMultiSql(HttpContext *pContext) { HttpSqlCmd *cmd = multiCmds->cmds + multiCmds->pos; char *sql = httpGetCmdsString(pContext, cmd->sql); - httpTraceL("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, - pContext->ipstr, pContext->user, multiCmds->pos, sql); + httpTraceL("context:%p, fd:%d, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, pContext->user, + multiCmds->pos, sql); taosNotePrintHttp(sql); taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext); } @@ -197,8 +197,8 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { return; } - httpDebug("context:%p, fd:%d, ip:%s, user:%s, start multi-querys pos:%d, size:%d", pContext, pContext->fd, - pContext->ipstr, pContext->user, multiCmds->pos, multiCmds->size); + httpDebug("context:%p, fd:%d, user:%s, start multi-querys pos:%d, size:%d", pContext, pContext->fd, pContext->user, + multiCmds->pos, multiCmds->size); HttpEncodeMethod *encode = pContext->encodeMethod; if (encode->initJsonFp) { (encode->initJsonFp)(pContext); @@ -226,24 +226,23 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int #if 0 // todo refactor if (tscResultsetFetchCompleted(result)) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, resultset fetch completed", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, resultset fetch completed", pContext, pContext->fd, pContext->user); isContinue = false; } #endif if (isContinue) { // retrieve next batch of rows - httpDebug("context:%p, fd:%d, ip:%s, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd, - pContext->ipstr, pContext->user, numOfRows); + httpDebug("context:%p, fd:%d, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user, + numOfRows); taos_fetch_rows_a(result, httpProcessSingleSqlRetrieveCallBack, param); } else { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->ipstr, - pContext->user, numOfRows); + httpDebug("context:%p, fd:%d, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user, + numOfRows); if (numOfRows < 0) { - httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr, - pContext->user, tstrerror(numOfRows)); + httpError("context:%p, fd:%d, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->user, + tstrerror(numOfRows)); } taos_free_result(result); @@ -269,20 +268,20 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo HttpEncodeMethod *encode = pContext->encodeMethod; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - httpError("context:%p, fd:%d, ip:%s, user:%s, query error, taos:%p, code:%s:inprogress, sqlObj:%p", - pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session->taos, tstrerror(code), (SSqlObj *)result); + httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd, + pContext->user, pContext->session->taos, tstrerror(code), (SSqlObj *)result); return; } if (code < 0) { SSqlObj *pObj = (SSqlObj *)result; if (code == TSDB_CODE_TSC_INVALID_SQL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, query error, taos:%p, code:%s, sqlObj:%p, error:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session->taos, tstrerror(code), pObj, pObj->cmd.payload); + httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p, error:%s", pContext, + pContext->fd, pContext->user, pContext->session->taos, tstrerror(code), pObj, pObj->cmd.payload); httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload); } else { - httpError("context:%p, fd:%d, ip:%s, user:%s, query error, taos:%p, code:%s, sqlObj:%p", - pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session->taos, tstrerror(code), pObj); + httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p", pContext, pContext->fd, + pContext->user, pContext->session->taos, tstrerror(code), pObj); httpSendTaosdErrorResp(pContext, code); } taos_free_result(result); @@ -294,8 +293,8 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo // not select or show commands int affectRows = taos_affected_rows(result); - httpDebug("context:%p, fd:%d, ip:%s, user:%s, affect rows:%d, stop query, sqlObj:%p", - pContext, pContext->fd, pContext->ipstr, pContext->user, affectRows, result); + httpDebug("context:%p, fd:%d, user:%s, affect rows:%d, stop query, sqlObj:%p", pContext, pContext->fd, + pContext->user, affectRows, result); if (encode->startJsonFp) { (encode->startJsonFp)(pContext, &pContext->singleCmd, result); @@ -312,8 +311,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo taos_free_result(result); httpCloseContextByApp(pContext); } else { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, start retrieve", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, start retrieve", pContext, pContext->fd, pContext->user); if (encode->startJsonFp) { (encode->startJsonFp)(pContext, &pContext->singleCmd, result); @@ -333,14 +331,12 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { HttpSession *pSession = pContext->session; if (sql == NULL || sql[0] == 0) { - httpError("context:%p, fd:%d, ip:%s, user:%s, error:no sql input", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpError("context:%p, fd:%d, user:%s, error:no sql input", pContext, pContext->fd, pContext->user); httpSendErrorResp(pContext, HTTP_NO_SQL_INPUT); return; } - httpTraceL("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr, - pContext->user, sql); + httpTraceL("context:%p, fd:%d, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->user, sql); taosNotePrintHttp(sql); taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext); } @@ -350,8 +346,8 @@ void httpProcessLoginCmd(HttpContext *pContext) { if (!httpGenTaosdAuthToken(pContext, token, 128)) { httpSendErrorResp(pContext, HTTP_GEN_TAOSD_TOKEN_ERR); } else { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, login via http, return token:%s", - pContext, pContext->fd, pContext->ipstr, pContext->user, token); + httpDebug("context:%p, fd:%d, user:%s, login via http, return token:%s", pContext, pContext->fd, pContext->user, + token); httpSendSuccResp(pContext, token); } } @@ -397,17 +393,16 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { if (pContext == NULL) return; if (code < 0) { - httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr, - pContext->user, tstrerror(code)); + httpError("context:%p, fd:%d, user:%s, login error, code:%s", pContext, pContext->fd, pContext->user, + tstrerror(code)); httpSendTaosdErrorResp(pContext, code); return; } - httpDebug("context:%p, fd:%d, ip:%s, user:%s, connect tdengine success, taos:%p", pContext, pContext->fd, - pContext->ipstr, pContext->user, pContext->taos); + httpDebug("context:%p, fd:%d, user:%s, connect tdengine success, taos:%p", pContext, pContext->fd, pContext->user, + pContext->taos); if (pContext->taos == NULL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, login error, taos is empty", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpError("context:%p, fd:%d, user:%s, login error, taos is empty", pContext, pContext->fd, pContext->user); httpSendErrorResp(pContext, HTTP_NO_ENOUGH_SESSIONS); return; } @@ -428,8 +423,8 @@ void httpProcessRequest(HttpContext *pContext) { if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) { taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext, &(pContext->taos)); - httpDebug("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, - pContext->ipstr, pContext->user, pContext->taos); + httpDebug("context:%p, fd:%d, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, pContext->user, + pContext->taos); } else { httpExecCmd(pContext); } diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index dc1b9a93be..e51c8dd4f7 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -40,12 +40,6 @@ HttpServer tsHttpServer; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); int httpInitSystem() { - tsHttpServer.fallback = 0; - const char *v = getenv("FALLBACK"); - if (v) { - tsHttpServer.fallback = 1; - } - strcpy(tsHttpServer.label, "rest"); tsHttpServer.serverIp = 0; tsHttpServer.serverPort = tsHttpPort; diff --git a/src/plugins/http/src/httpUtil.c b/src/plugins/http/src/httpUtil.c index d1a0eb90f0..f6e05dc1ae 100644 --- a/src/plugins/http/src/httpUtil.c +++ b/src/plugins/http/src/httpUtil.c @@ -141,16 +141,15 @@ int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize) { bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) { if (cmdSize > HTTP_MAX_CMD_SIZE) { - httpError("context:%p, fd:%d, ip:%s, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, - pContext->ipstr, pContext->user, cmdSize, HTTP_MAX_CMD_SIZE); + httpError("context:%p, fd:%d, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, pContext->user, + cmdSize, HTTP_MAX_CMD_SIZE); return false; } if (pContext->multiCmds == NULL) { pContext->multiCmds = (HttpSqlCmds *)malloc(sizeof(HttpSqlCmds)); if (pContext->multiCmds == NULL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, malloc multiCmds error", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpError("context:%p, fd:%d, user:%s, malloc multiCmds error", pContext, pContext->fd, pContext->user); return false; } memset(pContext->multiCmds, 0, sizeof(HttpSqlCmds)); @@ -161,7 +160,7 @@ bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) { free(multiCmds->cmds); multiCmds->cmds = (HttpSqlCmd *)malloc((size_t)cmdSize * sizeof(HttpSqlCmd)); if (multiCmds->cmds == NULL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->ipstr, + httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->user, cmdSize); return false; } @@ -172,8 +171,8 @@ bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) { free(multiCmds->buffer); multiCmds->buffer = (char *)malloc((size_t)bufferSize); if (multiCmds->buffer == NULL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->ipstr, - pContext->user, bufferSize); + httpError("context:%p, fd:%d, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->user, + bufferSize); return false; } multiCmds->bufferSize = bufferSize; @@ -191,15 +190,14 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize) { HttpSqlCmds *multiCmds = pContext->multiCmds; if (cmdSize > HTTP_MAX_CMD_SIZE) { - httpError("context:%p, fd:%d, ip:%s, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, - pContext->ipstr, pContext->user, cmdSize, HTTP_MAX_CMD_SIZE); + httpError("context:%p, fd:%d, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, pContext->user, + cmdSize, HTTP_MAX_CMD_SIZE); return false; } multiCmds->cmds = (HttpSqlCmd *)realloc(multiCmds->cmds, (size_t)cmdSize * sizeof(HttpSqlCmd)); if (multiCmds->cmds == NULL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->ipstr, - pContext->user, cmdSize); + httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->user, cmdSize); return false; } memset(multiCmds->cmds + multiCmds->maxSize, 0, (size_t)(cmdSize - multiCmds->maxSize) * sizeof(HttpSqlCmd)); @@ -212,15 +210,14 @@ bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize) { HttpSqlCmds *multiCmds = pContext->multiCmds; if (bufferSize > HTTP_MAX_BUFFER_SIZE) { - httpError("context:%p, fd:%d, ip:%s, user:%s, mulitcmd buffer size:%d large then %d", - pContext, pContext->fd, pContext->ipstr, pContext->user, bufferSize, HTTP_MAX_BUFFER_SIZE); + httpError("context:%p, fd:%d, user:%s, mulitcmd buffer size:%d large then %d", pContext, pContext->fd, + pContext->user, bufferSize, HTTP_MAX_BUFFER_SIZE); return false; } multiCmds->buffer = (char *)realloc(multiCmds->buffer, (size_t)bufferSize); if (multiCmds->buffer == NULL) { - httpError("context:%p, fd:%d, ip:%s, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->ipstr, - pContext->user, bufferSize); + httpError("context:%p, fd:%d, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->user, bufferSize); return false; } memset(multiCmds->buffer + multiCmds->bufferSize, 0, (size_t)(bufferSize - multiCmds->bufferSize)); diff --git a/src/plugins/http/src/restHandle.c b/src/plugins/http/src/restHandle.c index f0841e2f99..8f998420de 100644 --- a/src/plugins/http/src/restHandle.c +++ b/src/plugins/http/src/restHandle.c @@ -80,15 +80,13 @@ bool restGetPassFromUrl(HttpContext* pContext) { } bool restProcessLoginRequest(HttpContext* pContext) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process restful login msg", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, process restful login msg", pContext, pContext->fd, pContext->user); pContext->reqType = HTTP_REQTYPE_LOGIN; return true; } bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, process restful sql msg", pContext, pContext->fd, pContext->ipstr, - pContext->user); + httpDebug("context:%p, fd:%d, user:%s, process restful sql msg", pContext, pContext->fd, pContext->user); char* sql = pContext->parser.data.pos; if (sql == NULL) { diff --git a/src/plugins/http/src/restJson.c b/src/plugins/http/src/restJson.c index 7a73f6559f..4a7836ad7a 100644 --- a/src/plugins/http/src/restJson.c +++ b/src/plugins/http/src/restJson.c @@ -155,19 +155,17 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, } if (cmd->numOfRows >= tsRestRowLimit) { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, retrieve rows:%d larger than limit:%d, abort retrieve", pContext, - pContext->fd, pContext->ipstr, pContext->user, cmd->numOfRows, tsRestRowLimit); + httpDebug("context:%p, fd:%d, user:%s, retrieve rows:%d larger than limit:%d, abort retrieve", pContext, + pContext->fd, pContext->user, cmd->numOfRows, tsRestRowLimit); return false; - } - else { + } else { if (pContext->fd <= 0) { - httpError("context:%p, fd:%d, ip:%s, user:%s, connection is closed, abort retrieve", pContext, pContext->fd, - pContext->ipstr, pContext->user); + httpError("context:%p, fd:%d, user:%s, connection is closed, abort retrieve", pContext, pContext->fd, + pContext->user); return false; - } - else { - httpDebug("context:%p, fd:%d, ip:%s, user:%s, total rows:%d retrieved", pContext, pContext->fd, pContext->ipstr, - pContext->user, cmd->numOfRows); + } else { + httpDebug("context:%p, fd:%d, user:%s, total rows:%d retrieved", pContext, pContext->fd, pContext->user, + cmd->numOfRows); return true; } } diff --git a/src/plugins/http/src/tgHandle.c b/src/plugins/http/src/tgHandle.c index 48c66c4c07..cae46f222a 100644 --- a/src/plugins/http/src/tgHandle.c +++ b/src/plugins/http/src/tgHandle.c @@ -800,7 +800,7 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { } */ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { - httpDebug("context:%p, fd:%d, ip:%s, process telegraf query msg", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, process telegraf query msg", pContext, pContext->fd); HttpParser *pParser = &pContext->parser; char * filter = pParser->data.pos; @@ -818,7 +818,7 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { cJSON *metrics = cJSON_GetObjectItem(root, "metrics"); if (metrics != NULL) { int size = cJSON_GetArraySize(metrics); - httpDebug("context:%p, fd:%d, ip:%s, multiple metrics:%d at one time", pContext, pContext->fd, pContext->ipstr, + httpDebug("context:%p, fd:%d, multiple metrics:%d at one time", pContext, pContext->fd, size); if (size <= 0) { httpSendErrorResp(pContext, HTTP_TG_METRICS_NULL); @@ -859,7 +859,7 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { } } } else { - httpDebug("context:%p, fd:%d, ip:%s, single metric", pContext, pContext->fd, pContext->ipstr); + httpDebug("context:%p, fd:%d, single metric", pContext, pContext->fd); if (!httpMallocMultiCmds(pContext, 3, HTTP_BUFFER_SIZE)) { httpSendErrorResp(pContext, HTTP_NO_ENOUGH_MEMORY); diff --git a/src/plugins/http/src/tgJson.c b/src/plugins/http/src/tgJson.c index ed4ee0d7de..170a1b343e 100644 --- a/src/plugins/http/src/tgJson.c +++ b/src/plugins/http/src/tgJson.c @@ -98,8 +98,8 @@ void tgBuildSqlAffectRowsJson(HttpContext *pContext, HttpSqlCmd *cmd, int affect bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { HttpSqlCmds *multiCmds = pContext->multiCmds; - httpDebug("context:%p, fd:%d, ip:%s, check telegraf command, code:%s, state:%d, type:%d, rettype:%d, tags:%d", - pContext, pContext->fd, pContext->ipstr, tstrerror(code), cmd->cmdState, cmd->cmdType, cmd->cmdReturnType, cmd->tagNum); + httpDebug("context:%p, fd:%d, check telegraf command, code:%s, state:%d, type:%d, rettype:%d, tags:%d", pContext, + pContext->fd, tstrerror(code), cmd->cmdState, cmd->cmdType, cmd->cmdReturnType, cmd->tagNum); if (cmd->cmdType == HTTP_CMD_TYPE_INSERT) { if (cmd->cmdState == HTTP_CMD_STATE_NOT_RUN_YET) { @@ -107,16 +107,14 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; if (multiCmds->cmds[0].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) { multiCmds->pos = (int16_t)-1; - httpDebug("context:%p, fd:%d, ip:%s, import failed, try create database", pContext, pContext->fd, - pContext->ipstr); + httpDebug("context:%p, fd:%d, import failed, try create database", pContext, pContext->fd); return false; } } else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; if (multiCmds->cmds[multiCmds->pos - 1].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) { multiCmds->pos = (int16_t)(multiCmds->pos - 2); - httpDebug("context:%p, fd:%d, ip:%s, import failed, try create stable", pContext, pContext->fd, - pContext->ipstr); + httpDebug("context:%p, fd:%d, import failed, try create stable", pContext, pContext->fd); return false; } } else { @@ -125,11 +123,10 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { } } else if (cmd->cmdType == HTTP_CMD_TYPE_CREATE_DB) { cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; - httpDebug("context:%p, fd:%d, ip:%s, code:%s, create database failed", pContext, pContext->fd, pContext->ipstr, - tstrerror(code)); + httpDebug("context:%p, fd:%d, code:%s, create database failed", pContext, pContext->fd, tstrerror(code)); } else if (cmd->cmdType == HTTP_CMD_TYPE_CREATE_STBALE) { cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; - httpDebug("context:%p, fd:%d, ip:%s, code:%s, create stable failed", pContext, pContext->fd, pContext->ipstr, tstrerror(code)); + httpDebug("context:%p, fd:%d, code:%s, create stable failed", pContext, pContext->fd, tstrerror(code)); } else { } @@ -138,9 +135,9 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { void tgSetNextCmd(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { HttpSqlCmds *multiCmds = pContext->multiCmds; - httpDebug("context:%p, fd:%d, ip:%s, get telegraf next command, pos:%d, code:%s, state:%d, type:%d, rettype:%d, tags:%d", - pContext, pContext->fd, pContext->ipstr, multiCmds->pos, tstrerror(code), cmd->cmdState, cmd->cmdType, - cmd->cmdReturnType, cmd->tagNum); + httpDebug("context:%p, fd:%d, get telegraf next command, pos:%d, code:%s, state:%d, type:%d, rettype:%d, tags:%d", + pContext, pContext->fd, multiCmds->pos, tstrerror(code), cmd->cmdState, cmd->cmdType, cmd->cmdReturnType, + cmd->tagNum); if (cmd->cmdType == HTTP_CMD_TYPE_INSERT) { multiCmds->pos = (int16_t)(multiCmds->pos + 2);