From 521d7c1519ce5b8bee984189e392c5308dafd7a0 Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 08:22:03 +0800 Subject: [PATCH 1/7] add ehttp_* --- src/common/inc/elog.h | 71 ++ src/common/src/elog.c | 95 +++ src/plugins/http/inc/ehttp_gzip.h | 30 + src/plugins/http/inc/ehttp_parser.h | 36 + src/plugins/http/inc/ehttp_util_string.h | 18 + src/plugins/http/inc/httpInt.h | 11 + src/plugins/http/src/ehttp_gzip.c | 154 ++++ src/plugins/http/src/ehttp_parser.c | 965 +++++++++++++++++++++++ src/plugins/http/src/ehttp_util_string.c | 30 + src/plugins/http/src/httpContext.c | 161 ++++ src/plugins/http/src/httpServer.c | 63 ++ 11 files changed, 1634 insertions(+) create mode 100644 src/common/inc/elog.h create mode 100644 src/common/src/elog.c create mode 100644 src/plugins/http/inc/ehttp_gzip.h create mode 100644 src/plugins/http/inc/ehttp_parser.h create mode 100644 src/plugins/http/inc/ehttp_util_string.h create mode 100644 src/plugins/http/src/ehttp_gzip.c create mode 100644 src/plugins/http/src/ehttp_parser.c create mode 100644 src/plugins/http/src/ehttp_util_string.c diff --git a/src/common/inc/elog.h b/src/common/inc/elog.h new file mode 100644 index 0000000000..8e11c3e761 --- /dev/null +++ b/src/common/inc/elog.h @@ -0,0 +1,71 @@ +#ifndef _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_ +#define _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_ + +#include + +typedef enum { + ELOG_DEBUG, + ELOG_INFO, + ELOG_WARN, + ELOG_ERROR, + ELOG_CRITICAL, + ELOG_VERBOSE, + ELOG_ABORT, +} ELOG_LEVEL; + +void elog_set_level(ELOG_LEVEL base); // only log those not less than base +void elog_set_thread_name(const char *name); + +void elog(ELOG_LEVEL level, int fd, const char *file, int line, const char *func, const char *fmt, ...) +#ifdef __GNUC__ + __attribute__((format(printf, 6, 7))) +#endif +; + +#define DLOG(fd, fmt, ...) elog(ELOG_DEBUG, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define ILOG(fd, fmt, ...) elog(ELOG_INFO, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define WLOG(fd, fmt, ...) elog(ELOG_WARN, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define ELOG(fd, fmt, ...) elog(ELOG_ERROR, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define CLOG(fd, fmt, ...) elog(ELOG_CRITICAL, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define VLOG(fd, fmt, ...) elog(ELOG_VERBOSE, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define ALOG(fd, fmt, ...) elog(ELOG_ABORT, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) + +#define D(fmt, ...) elog(ELOG_DEBUG, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define I(fmt, ...) elog(ELOG_INFO, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define W(fmt, ...) elog(ELOG_WARN, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define E(fmt, ...) elog(ELOG_ERROR, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define C(fmt, ...) elog(ELOG_CRITICAL, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define V(fmt, ...) elog(ELOG_VERBOSE, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define A(fmt, ...) elog(ELOG_ABORT, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) + + + +// NOTE: https://en.wikipedia.org/wiki/Fail-fast +// for the sake of simplicity, both implementation and usage, +// we'll follow `fail-fast` or `let-it-crash` philosophy. + +// assertion in both debug/release build +#define EQ_ABORT(fmt, ...) A("Assertion failure: "fmt, ##__VA_ARGS__) + +#define EQ_ASSERT(statement) do { \ + if (statement) break; \ + A("Assertion failure: %s", #statement); \ +} while (0) + +#define EQ_ASSERT_EXT(statement, fmt, ...) do { \ + if (statement) break; \ + A("Assertion failure: %s: "fmt, #statement, ##__VA_ARGS__); \ +} while (0) + +#define EQ_ASSERT_API0(statement) do { \ + if (statement) break; \ + A("Assertion failure: %s failed: [%d]%s", #statement, errno, strerror(errno)); \ +} while (0) + +#define EQ_ASSERT_API(api) do { \ + A("Assertion failure: %s failed: [%d]%s", #api, errno, strerror(errno)); \ +} while (0) + + +#endif // _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_ + diff --git a/src/common/src/elog.c b/src/common/src/elog.c new file mode 100644 index 0000000000..95b580962c --- /dev/null +++ b/src/common/src/elog.c @@ -0,0 +1,95 @@ +#include "elog.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#define gettid() syscall(__NR_gettid) + +static ELOG_LEVEL elog_level_base = ELOG_DEBUG; + +static __thread long elog_thread_id; +static __thread char elog_thread_name[24] = {0}; + +void elog_set_level(ELOG_LEVEL base) { + elog_level_base = base; +} + +void elog_set_thread_name(const char *name) { + elog_thread_id = gettid(); + snprintf(elog_thread_name, sizeof(elog_thread_name), "%s", name); +} + +void elog(ELOG_LEVEL level, int fd, const char *file, int line, const char *func, const char *fmt, ...) +{ + if (level < elog_level_base) return; + if (fd == -1) return; + + if (elog_thread_name[0]=='\0') { + elog_set_thread_name("unknown"); + } + + char *p; + int n; + size_t bytes; + + char buf[4096]; + snprintf(buf, sizeof(buf), "%s", file); + + char fn[1024]; + snprintf(fn, sizeof(fn), "%s", basename(buf)); + + char C; + switch (level) { + case ELOG_DEBUG: C = 'D'; break; + case ELOG_INFO: C = 'I'; break; + case ELOG_WARN: C = 'W'; break; + case ELOG_ERROR: C = 'E'; break; + case ELOG_CRITICAL: C = 'C'; break; + case ELOG_VERBOSE: C = 'V'; break; + case ELOG_ABORT: C = 'A'; break; + default: return; + } + + struct tm t; + struct timeval tv; + + if (gettimeofday(&tv, NULL)) return; + if (!localtime_r(&tv.tv_sec, &t)) return; + + p = buf; + bytes = sizeof(buf); + + n = snprintf(p, bytes, "%c[%02d/%02d %02d:%02d:%02d.%06ld][%06ld]: ==", + C, + t.tm_mon + 1, t.tm_mday, + t.tm_hour, t.tm_min, t.tm_sec, + tv.tv_usec, + elog_thread_id); + p += n; bytes -= n; + + va_list arg; + va_start(arg, fmt); + if (bytes>0) { + n = vsnprintf(p, bytes, fmt, arg); + p += n; bytes -= n; + } + va_end(arg); + + if (bytes>0) { + n = snprintf(p, bytes, "== t:%s#%s[%d]#%s()", + elog_thread_name, fn, line, func); + } + + dprintf(fd, "%s\n", buf); + + if (level == ELOG_ABORT) { + abort(); + } +} + diff --git a/src/plugins/http/inc/ehttp_gzip.h b/src/plugins/http/inc/ehttp_gzip.h new file mode 100644 index 0000000000..b2d6ace9b0 --- /dev/null +++ b/src/plugins/http/inc/ehttp_gzip.h @@ -0,0 +1,30 @@ +#ifndef _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_ +#define _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_ + +#include + +#define EHTTP_GZIP_CHUNK_SIZE_DEFAULT (1024*16) + +typedef struct ehttp_gzip_s ehttp_gzip_t; + +typedef struct ehttp_gzip_callbacks_s ehttp_gzip_callbacks_t; +typedef struct ehttp_gzip_conf_s ehttp_gzip_conf_t; + +struct ehttp_gzip_callbacks_s { + void (*on_data)(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len); +}; + +struct ehttp_gzip_conf_s { + int get_header:2; // 0: not fetching header info + size_t chunk_size; // 0: fallback to default: EHTTP_GZIP_CHUNK_SIZE_DEFAULT +}; + +ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg); +ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg); +void ehttp_gzip_destroy(ehttp_gzip_t *gzip); + +int ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, size_t len); +int ehttp_gzip_finish(ehttp_gzip_t *gzip); + +#endif // _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_ + diff --git a/src/plugins/http/inc/ehttp_parser.h b/src/plugins/http/inc/ehttp_parser.h new file mode 100644 index 0000000000..be87650128 --- /dev/null +++ b/src/plugins/http/inc/ehttp_parser.h @@ -0,0 +1,36 @@ +#ifndef _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e +#define _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e + +#include + +typedef struct ehttp_parser_s ehttp_parser_t; +typedef struct ehttp_parser_callbacks_s ehttp_parser_callbacks_t; +typedef struct ehttp_parser_conf_s ehttp_parser_conf_t; +typedef struct ehttp_status_code_s ehttp_status_code_t; + +struct ehttp_parser_callbacks_s { + void (*on_request_line)(void *arg, const char *method, const char *target, const char *version, const char *target_raw); + void (*on_status_line)(void *arg, const char *version, int status_code, const char *reason_phrase); + void (*on_header_field)(void *arg, const char *key, const char *val); + void (*on_body)(void *arg, const char *chunk, size_t len); + void (*on_end)(void *arg); + void (*on_error)(void *arg, int status_code); +}; + +struct ehttp_parser_conf_s { + size_t flush_block_size; // <=0: immediately +}; + +ehttp_parser_t* ehttp_parser_create(ehttp_parser_callbacks_t callbacks, ehttp_parser_conf_t conf, void *arg); +void ehttp_parser_destroy(ehttp_parser_t *parser); +int ehttp_parser_parse(ehttp_parser_t *parser, const char *buf, size_t len); +int ehttp_parser_parse_string(ehttp_parser_t *parser, const char *str); +int ehttp_parser_parse_char(ehttp_parser_t *parser, const char c); +int ehttp_parser_parse_end(ehttp_parser_t *parser); + +char* ehttp_parser_urldecode(const char *enc); + +const char* ehttp_status_code_get_desc(const int status_code); + +#endif // _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e + diff --git a/src/plugins/http/inc/ehttp_util_string.h b/src/plugins/http/inc/ehttp_util_string.h new file mode 100644 index 0000000000..46c5a42827 --- /dev/null +++ b/src/plugins/http/inc/ehttp_util_string.h @@ -0,0 +1,18 @@ +#ifndef _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_ +#define _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_ + +#include + +typedef struct ehttp_util_string_s ehttp_util_string_t; + +struct ehttp_util_string_s { + char *str; + size_t len; +}; + +void ehttp_util_string_cleanup(ehttp_util_string_t *str); +int ehttp_util_string_append(ehttp_util_string_t *str, const char *s, size_t len); +void ehttp_util_string_clear(ehttp_util_string_t *str); + +#endif // _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_ + diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index ffd621be7a..044b5cc4cc 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -28,6 +28,8 @@ #include "httpLog.h" #include "httpJson.h" +#include "ehttp_parser.h" + #define HTTP_MAX_CMD_SIZE 1024 #define HTTP_MAX_BUFFER_SIZE 1024*1024 @@ -162,6 +164,11 @@ typedef struct { int32_t len; } HttpBuf; +typedef enum { + EHTTP_CONTEXT_PROCESS_FAILED = 0x01, + EHTTP_CONTEXT_PARSER_FAILED = 0x02 +} EHTTP_CONTEXT_FAILED_CAUSE; + typedef struct { char buffer[HTTP_BUFFER_SIZE]; int bufsize; @@ -172,6 +179,10 @@ typedef struct { HttpBuf data; // body content HttpBuf token; // auth token HttpDecodeMethod *pMethod; + + ehttp_parser_t *parser; + int inited:2; + int failed:4; } HttpParser; typedef struct HttpContext { diff --git a/src/plugins/http/src/ehttp_gzip.c b/src/plugins/http/src/ehttp_gzip.c new file mode 100644 index 0000000000..ded344dfea --- /dev/null +++ b/src/plugins/http/src/ehttp_gzip.c @@ -0,0 +1,154 @@ +#include "ehttp_gzip.h" + +#include "os.h" +#include "zlib.h" + +#include + +typedef enum { + EHTTP_GZIP_INITING, + EHTTP_GZIP_READY, + EHTTP_GZIP_CLOSED, +} EHTTP_GZIP_STATE; + +struct ehttp_gzip_s { + ehttp_gzip_conf_t conf; + ehttp_gzip_callbacks_t callbacks; + void *arg; + z_stream *gzip; + gz_header *header; + char *chunk; + + int state; +}; + +static void dummy_on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len) { +} + +static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) { + switch(gzip->state) { + case EHTTP_GZIP_READY: { + inflateEnd(gzip->gzip); + } break; + default: break; + } + if (gzip->gzip) { + free(gzip->gzip); + gzip->gzip = NULL; + } + if (gzip->header) { + free(gzip->header); + gzip->header = NULL; + } + if (gzip->chunk) { + free(gzip->chunk); + gzip->chunk = NULL; + } + gzip->state = EHTTP_GZIP_CLOSED; +} + +ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg) { + ehttp_gzip_t *gzip = (ehttp_gzip_t*)calloc(1, sizeof(*gzip)); + if (!gzip) return NULL; + + do { + gzip->conf = conf; + gzip->callbacks = callbacks; + gzip->arg = arg; + if (gzip->callbacks.on_data == NULL) gzip->callbacks.on_data = dummy_on_data; + gzip->gzip = (z_stream*)calloc(1, sizeof(*gzip->gzip)); + if (gzip->conf.get_header) { + gzip->header = (gz_header*)calloc(1, sizeof(*gzip->header)); + } + if (gzip->conf.chunk_size<=0) gzip->conf.chunk_size = EHTTP_GZIP_CHUNK_SIZE_DEFAULT; + gzip->chunk = (char*)malloc(gzip->conf.chunk_size); + if (!gzip->gzip || (gzip->conf.get_header && !gzip->header) || !gzip->chunk) break; + gzip->gzip->zalloc = Z_NULL; + gzip->gzip->zfree = Z_NULL; + gzip->gzip->opaque = Z_NULL; + + // 863 windowBits can also be greater than 15 for optional gzip decoding. Add + // 864 32 to windowBits to enable zlib and gzip decoding with automatic header + // 865 detection, or add 16 to decode only the gzip format (the zlib format will + // 866 return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is a + // 867 CRC-32 instead of an Adler-32. Unlike the gunzip utility and gzread() (see + // 868 below), inflate() will not automatically decode concatenated gzip streams. + // 869 inflate() will return Z_STREAM_END at the end of the gzip stream. The state + // 870 would need to be reset to continue decoding a subsequent gzip stream. + int ret = inflateInit2(gzip->gzip, 32); // 32/16? 32/16 + MAX_WBITS + if (ret != Z_OK) break; + if (gzip->header) { + ret = inflateGetHeader(gzip->gzip, gzip->header); + } + if (ret != Z_OK) break; + + gzip->gzip->next_out = (z_const Bytef*)gzip->chunk; + gzip->gzip->avail_out = gzip->conf.chunk_size; + gzip->state = EHTTP_GZIP_READY; + return gzip; + } while (0); + + ehttp_gzip_destroy(gzip); + return NULL; +} + +ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg); + +void ehttp_gzip_destroy(ehttp_gzip_t *gzip) { + ehttp_gzip_cleanup(gzip); + + free(gzip); +} + +int ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, size_t len) { + if (gzip->state != EHTTP_GZIP_READY) return -1; + if (len <= 0) return 0; + + gzip->gzip->next_in = (z_const Bytef*)buf; + gzip->gzip->avail_in = len; + + while (gzip->gzip->avail_in) { + int ret; + if (gzip->header) { + ret = inflate(gzip->gzip, Z_BLOCK); + } else { + ret = inflate(gzip->gzip, Z_SYNC_FLUSH); + } + if (ret != Z_OK && ret != Z_STREAM_END) return -1; + + if (gzip->gzip->avail_out>0) { + if (ret!=Z_STREAM_END) continue; + } + + size_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; + + gzip->gzip->next_out[0] = '\0'; + gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); + gzip->gzip->next_out = (z_const Bytef*)gzip->chunk; + gzip->gzip->avail_out = gzip->conf.chunk_size; + } + + return 0; +} + +int ehttp_gzip_finish(ehttp_gzip_t *gzip) { + if (gzip->state != EHTTP_GZIP_READY) return -1; + + gzip->gzip->next_in = NULL; + gzip->gzip->avail_in = 0; + + int ret; + ret = inflate(gzip->gzip, Z_FINISH); + + if (ret != Z_STREAM_END) return -1; + + size_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; + + gzip->gzip->next_out[0] = '\0'; + gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); + gzip->gzip->next_out = NULL; + gzip->gzip->avail_out = 0; + + return 0; +} + diff --git a/src/plugins/http/src/ehttp_parser.c b/src/plugins/http/src/ehttp_parser.c new file mode 100644 index 0000000000..fbe15661b5 --- /dev/null +++ b/src/plugins/http/src/ehttp_parser.c @@ -0,0 +1,965 @@ +#include "ehttp_parser.h" + +#include "ehttp_gzip.h" +#include "ehttp_util_string.h" +#include "elog.h" + +#include +#include +#include + +struct ehttp_status_code_s { + int status_code; + const char *status_desc; +}; + +static ehttp_status_code_t status_codes[] = { + {100, "Continue"}, + {101, "Switching Protocol"}, + {102, "Processing (WebDAV)"}, + {103, "Early Hints"}, + {200, "OK"}, + {201, "Created"}, + {202, "Accepted"}, + {203, "Non-Authoritative Information"}, + {204, "No Content"}, + {205, "Reset Content"}, + {206, "Partial Content"}, + {207, "Multi-Status (WebDAV)"}, + {208, "Already Reported (WebDAV)"}, + {226, "IM Used (HTTP Delta encoding)"}, + {300, "Multiple Choice"}, + {301, "Moved Permanently"}, + {302, "Found"}, + {303, "See Other"}, + {304, "Not Modified"}, + {305, "Use Proxy"}, + {306, "unused"}, + {307, "Temporary Redirect"}, + {308, "Permanent Redirect"}, + {400, "Bad Request"}, + {401, "Unauthorized"}, + {402, "Payment Required"}, + {403, "Forbidden"}, + {404, "Not Found"}, + {405, "Method Not Allowed"}, + {406, "Not Acceptable"}, + {407, "Proxy Authentication Required"}, + {408, "Request Timeout"}, + {409, "Conflict"}, + {410, "Gone"}, + {411, "Length Required"}, + {412, "Precondition Failed"}, + {413, "Payload Too Large"}, + {414, "URI Too Long"}, + {415, "Unsupported Media Type"}, + {416, "Range Not Satisfiable"}, + {417, "Expectation Failed"}, + {418, "I'm a teapot"}, + {421, "Misdirected Request"}, + {422, "Unprocessable Entity (WebDAV)"}, + {423, "Locked (WebDAV)"}, + {424, "Failed Dependency (WebDAV)"}, + {425, "Too Early"}, + {426, "Upgrade Required"}, + {428, "Precondition Required"}, + {429, "Too Many Requests"}, + {431, "Request Header Fields Too Large"}, + {451, "Unavailable For Legal Reasons"}, + {500, "Internal Server Error"}, + {501, "Not Implemented"}, + {502, "Bad Gateway"}, + {503, "Service Unavailable"}, + {504, "Gateway Timeout"}, + {505, "HTTP Version Not Supported"}, + {506, "Variant Also Negotiates"}, + {507, "Insufficient Storage"}, + {508, "Loop Detected (WebDAV)"}, + {510, "Not Extended"}, + {511, "Network Authentication Required"}, + {0, NULL} +}; + +const char* ehttp_status_code_get_desc(const int status_code) { + ehttp_status_code_t *p = status_codes; + while (p->status_code!=0) { + if (p->status_code==status_code) return p->status_desc; + ++p; + } + return "Unknow status code"; +} + +typedef enum HTTP_PARSER_STATE { + HTTP_PARSER_BEGIN, + HTTP_PARSER_REQUEST_OR_RESPONSE, + HTTP_PARSER_METHOD, + HTTP_PARSER_TARGET, + HTTP_PARSER_HTTP_VERSION, + HTTP_PARSER_SP, + HTTP_PARSER_STATUS_CODE, + HTTP_PARSER_REASON_PHRASE, + HTTP_PARSER_CRLF, + HTTP_PARSER_HEADER, + HTTP_PARSER_HEADER_KEY, + HTTP_PARSER_HEADER_VAL, + HTTP_PARSER_CHUNK_SIZE, + HTTP_PARSER_CHUNK, + HTTP_PARSER_END, + HTTP_PARSER_ERROR, +} HTTP_PARSER_STATE; + +typedef struct ehttp_parser_kv_s ehttp_parser_kv_t; + +struct ehttp_parser_kv_s { + char *key; + char *val; +}; + +struct ehttp_parser_s { + ehttp_parser_callbacks_t callbacks; + void *arg; + ehttp_parser_conf_t conf; + + char *method; + char *target; + char *target_raw; + char *version; + + int http_10:2; + int http_11:2; + int accept_encoding_gzip:2; + int accept_encoding_chunked:2; + int transfer_gzip:2; + int transfer_chunked:2; + int content_length_specified:2; + int content_chunked:2; + + + int status_code; + char *reason_phrase; + + char *key; + char *val; + ehttp_parser_kv_t *kvs; + size_t kvs_count; + + char *auth_basic; + + size_t content_length; + + size_t chunk_size; + size_t received_chunk_size; + size_t received_size; + + ehttp_gzip_t *gzip; + ehttp_util_string_t str; + HTTP_PARSER_STATE *stacks; + size_t stacks_count; +}; + +static void dummy_on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw) { +} + +static void dummy_on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase) { +} + +static void dummy_on_header_field(void *arg, const char *key, const char *val) { +} + +static void dummy_on_body(void *arg, const char *chunk, size_t len) { +} + +static void dummy_on_end(void *arg) { +} + +static void dummy_on_error(void *arg, int status_code) { +} + + +static HTTP_PARSER_STATE ehttp_parser_top(ehttp_parser_t *parser) { + EQ_ASSERT(parser->stacks_count >= 1); + EQ_ASSERT(parser->stacks); + + return parser->stacks[parser->stacks_count-1]; +} + +static int ehttp_parser_push(ehttp_parser_t *parser, HTTP_PARSER_STATE state) { + size_t n = parser->stacks_count + 1; + HTTP_PARSER_STATE *stacks = (HTTP_PARSER_STATE*)reallocarray(parser->stacks, n, sizeof(*stacks)); + if (!stacks) return -1; + + parser->stacks_count = n; + parser->stacks = stacks; + parser->stacks[n-1] = state; + + return 0; +} + +static int ehttp_parser_pop(ehttp_parser_t *parser) { + if (parser->stacks_count <= 0) return -1; + --parser->stacks_count; + + return 0; +} + +ehttp_parser_t *ehttp_parser_create(ehttp_parser_callbacks_t callbacks, ehttp_parser_conf_t conf, void *arg) { + ehttp_parser_t *parser = (ehttp_parser_t*)calloc(1, sizeof(*parser)); + if (!parser) return NULL; + + parser->callbacks = callbacks; + parser->arg = arg; + parser->conf = conf; + + if (parser->callbacks.on_request_line == NULL) { + parser->callbacks.on_request_line = dummy_on_request_line; + } + if (parser->callbacks.on_status_line == NULL) { + parser->callbacks.on_status_line = dummy_on_status_line; + } + if (parser->callbacks.on_header_field == NULL) { + parser->callbacks.on_header_field = dummy_on_header_field; + } + if (parser->callbacks.on_body == NULL) { + parser->callbacks.on_body = dummy_on_body; + } + if (parser->callbacks.on_end == NULL) { + parser->callbacks.on_end = dummy_on_end; + } + if (parser->callbacks.on_error == NULL) { + parser->callbacks.on_error = dummy_on_error; + } + + ehttp_parser_push(parser, HTTP_PARSER_BEGIN); + + return parser; +} + +static void ehttp_parser_kvs_destroy(ehttp_parser_t *parser) { + if (!parser->kvs) return; + + for (size_t i=0; ikvs_count; ++i) { + ehttp_parser_kv_t *p = &parser->kvs[i]; + free(p->key); p->key = NULL; + free(p->val); p->val = NULL; + } + free(parser->kvs); + parser->kvs = NULL; + parser->kvs_count = 0; + + free(parser->auth_basic); + parser->auth_basic = NULL; +} + +void ehttp_parser_destroy(ehttp_parser_t *parser) { + if (!parser) return; + + free(parser->method); parser->method = NULL; + free(parser->target); parser->target = NULL; + free(parser->target_raw); parser->target_raw = NULL; + free(parser->version); parser->version = NULL; + free(parser->reason_phrase); parser->reason_phrase = NULL; + free(parser->key); parser->key = NULL; + free(parser->val); parser->val = NULL; + free(parser->auth_basic); parser->auth_basic = NULL; + free(parser->stacks); parser->stacks = NULL; + + parser->stacks_count = 0; + + ehttp_parser_kvs_destroy(parser); + + ehttp_util_string_cleanup(&parser->str); + if (parser->gzip) { + ehttp_gzip_destroy(parser->gzip); + parser->gzip = NULL; + } + + free(parser); +} + +#define is_token(c) (strchr("!#$%&'*+-.^_`|~", c) || isdigit(c) || isalpha(c)) + +char *ehttp_parser_urldecode(const char *enc) { + int ok = 1; + ehttp_util_string_t str = {0}; + while (*enc) { + char *p = strchr(enc, '%'); + if (!p) break; + int hex, cnt; + int n = sscanf(p+1, "%2x%n", &hex, &cnt); + if (n!=1 && cnt !=2) { ok = 0; break; } + if (ehttp_util_string_append(&str, enc, p-enc)) { ok = 0; break; } + char c = (char)hex; + if (ehttp_util_string_append(&str, &c, 1)) { ok = 0; break; } + enc = p+3; + } + char *dec = NULL; + if (ok && *enc) { + if (ehttp_util_string_append(&str, enc, strlen(enc))) { ok = 0; } + } + if (ok) { + dec = str.str; + str.str = NULL; + } + ehttp_util_string_cleanup(&str); + return dec; +} + +static void on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len) { + ehttp_parser_t *parser = (ehttp_parser_t*)arg; + parser->callbacks.on_body(parser->arg, buf, len); +} + +static int ehttp_parser_check_field(ehttp_parser_t *parser, const char *key, const char *val) { + int ok = 0; + do { + if (0==strcasecmp(key, "Content-Length")) { + size_t len = 0; + int bytes = 0; + int n = sscanf(val, "%ld%n", &len, &bytes); + if (n==1 && bytes==strlen(val)) { + parser->content_length = len; + parser->chunk_size = len; + parser->content_length_specified = 1; + break; + } + ok = -1; + break; + } + if (0==strcasecmp(key, "Accept-Encoding")) { + if (strstr(val, "gzip")) { + parser->accept_encoding_gzip = 1; + } + if (strstr(val, "chunked")) { + parser->accept_encoding_chunked = 1; + } + break; + } + if (0==strcasecmp(key, "Content-Encoding")) { + if (0==strcmp(val, "gzip")) { + parser->content_chunked = 1; + } + break; + } + if (0==strcasecmp(key, "Transfer-Encoding")) { + if (strstr(val, "gzip")) { + parser->transfer_gzip = 1; + ehttp_gzip_conf_t conf = {0}; + ehttp_gzip_callbacks_t callbacks = {0}; + + callbacks.on_data = on_data; + + parser->gzip = ehttp_gzip_create_decompressor(conf, callbacks, parser); + + if (!parser->gzip) { + E("failed to create gzip decompressor"); + ok = -1; + break; + } + } + if (strstr(val, "chunked")) { + parser->transfer_chunked = 1; + } + break; + } + if (0==strcasecmp(key, "Authorization")) { + char *t = NULL; + char *s = NULL; + int bytes = 0; + int n = sscanf(val, "%ms %ms%n", &t, &s, &bytes); + if (n==2 && t && s && bytes==strlen(val) && strcmp(t, "Basic")) { + free(parser->auth_basic); + parser->auth_basic = s; s = NULL; + } else { + ok = -1; + } + free(t); free(s); + break; + } + } while (0); + return ok; +} + +static int ehttp_parser_kvs_append_kv(ehttp_parser_t *parser, const char *key, const char *val) { + ehttp_parser_kv_t *kvs = (ehttp_parser_kv_t*)reallocarray(parser->kvs, parser->kvs_count + 1, sizeof(*kvs)); + if (!kvs) return -1; + + parser->kvs = kvs; + + kvs[parser->kvs_count].key = strdup(key); + kvs[parser->kvs_count].val = strdup(val); + + if (kvs[parser->kvs_count].key && kvs[parser->kvs_count].val) { + ++parser->kvs_count; + return 0; + } + + free(kvs[parser->kvs_count].key); + kvs[parser->kvs_count].key = NULL; + free(kvs[parser->kvs_count].val); + kvs[parser->kvs_count].val = NULL; + + return -1; +} + +static int on_begin(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c=='G' || c=='P' || c=='H' || c=='D' || c=='C' || c=='O' || c=='T') { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_REQUEST_OR_RESPONSE); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_request_or_response(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (parser->str.len==1) { + if (c=='T' && parser->str.str[0]=='H') { + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_END); + ehttp_parser_push(parser, HTTP_PARSER_HEADER); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_REASON_PHRASE); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_STATUS_CODE); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_HTTP_VERSION); + *again = 1; + break; + } + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_END); + ehttp_parser_push(parser, HTTP_PARSER_HEADER); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_HTTP_VERSION); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_TARGET); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_METHOD); + *again = 1; + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_method(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + parser->method = strdup(parser->str.str); + if (!parser->method) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + } while (0); + return ok; +} + +static int on_target(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (!isspace(c) && c!='\r' && c!='\n') { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + parser->target_raw = strdup(parser->str.str); + parser->target = ehttp_parser_urldecode(parser->str.str); + if (!parser->target_raw || !parser->target) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + } while (0); + return ok; +} + +static int on_version(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + const char *prefix = "HTTP/1."; + int len = strlen(prefix); + if (parser->str.len < len) { + if (prefix[parser->str.len]!=c) { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + + if (c!='0' && c!='1') { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + if (c=='0') parser->http_10 = 1; + if (c=='1') parser->http_11 = 1; + + parser->version = strdup(parser->str.str); + if (!parser->version) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + + if (parser->method) { + parser->callbacks.on_request_line(parser->arg, parser->method, parser->target, parser->version, parser->target_raw); + } + + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + } while (0); + return ok; +} + +static int on_sp(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c==' ') { + ehttp_parser_pop(parser); + break; + } + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + } while (0); + return ok; +} + +static int on_status_code(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (isdigit(c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + if (parser->str.len < 3) break; + + sscanf(parser->str.str, "%d", &parser->status_code); + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_reason_phrase(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c=='\r') { + parser->reason_phrase = strdup(parser->str.str); + if (!parser->reason_phrase) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + parser->callbacks.on_status_line(parser->arg, parser->version, parser->status_code, parser->reason_phrase); + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + } while (0); + return ok; +} + +static int post_process(ehttp_parser_t *parser) { + if (parser->gzip) { + if (ehttp_gzip_finish(parser->gzip)) { + E("gzip failed"); + parser->callbacks.on_error(parser->arg, 507); + return -1; + } + } + parser->callbacks.on_end(parser->arg); + return 0; +} + +static int on_crlf(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + const char *s = "\r\n"; + int len = strlen(s); + if (s[parser->str.len]!=c) { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + if (parser->str.len == len) { + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + if (ehttp_parser_top(parser) == HTTP_PARSER_END) { + ok = post_process(parser); + } + } + break; + } while (0); + return ok; +} + +static int on_header(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c=='\r') { + ehttp_parser_pop(parser); + if (parser->transfer_chunked) { + ehttp_parser_push(parser, HTTP_PARSER_CHUNK_SIZE); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + } else { + if (parser->content_length > 0) { + ehttp_parser_push(parser, HTTP_PARSER_CHUNK); + } + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + } + *again = 1; + break; + } + if (c!=' ' && c!='\t' && c!=':' ) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_HEADER_VAL); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_HEADER_KEY); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_header_key(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + if (c==':') { + parser->key = strdup(parser->str.str); + if (!parser->key) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_header_val(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c != '\r' && c != '\n' && (!isspace(c) || parser->str.len>0)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + const char *val = parser->str.str; + ok = ehttp_parser_check_field(parser, parser->key, val); + if (ehttp_parser_kvs_append_kv(parser, parser->key, val)) { + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + } else { + parser->callbacks.on_header_field(parser->arg, parser->key, val); + } + free(parser->key); parser->key = NULL; + val = NULL; + if (ok==-1) break; + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + } while (0); + return ok; +} + +static int on_chunk_size(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + int bytes; + size_t len; + int n; + do { + if (isxdigit(c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + if (c=='\r') { + n = sscanf(parser->str.str, "%lx%n", &len, &bytes); + if (n==1 && bytes==strlen(parser->str.str) && len>=0) { + if (len==0) { + if (parser->content_length_specified == 0 || parser->received_size == parser->content_length) { + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + *again = 1; + break; + } + } else { + if (parser->content_length_specified == 0 || parser->received_size + len <= parser->content_length) { + parser->chunk_size = len; + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_CHUNK_SIZE); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_CHUNK); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + *again = 1; + break; + } + } + } + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_chunk(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ++parser->received_size; + ++parser->received_chunk_size; + if (parser->received_chunk_size < parser->chunk_size) break; + + if (parser->gzip) { + if (ehttp_gzip_write(parser->gzip, parser->str.str, parser->str.len)) { + E("gzip failed"); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + } else { + parser->callbacks.on_body(parser->arg, parser->str.str, parser->str.len); + } + parser->received_chunk_size = 0; + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + if (ehttp_parser_top(parser) == HTTP_PARSER_END) { + ok = post_process(parser); + } + } while (0); + return ok; +} + +static int on_end(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + } while (0); + return ok; +} + +static int parse_char(ehttp_parser_t *parser, const char c, int *again) { + int ok = 0; + HTTP_PARSER_STATE state = ehttp_parser_top(parser); + do { + if (state == HTTP_PARSER_BEGIN) { + ok = on_begin(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_REQUEST_OR_RESPONSE) { + ok = on_request_or_response(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_METHOD) { + ok = on_method(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_TARGET) { + ok = on_target(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HTTP_VERSION) { + ok = on_version(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_SP) { + ok = on_sp(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_STATUS_CODE) { + ok = on_status_code(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_REASON_PHRASE) { + ok = on_reason_phrase(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_CRLF) { + ok = on_crlf(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HEADER) { + ok = on_header(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HEADER_KEY) { + ok = on_header_key(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HEADER_VAL) { + ok = on_header_val(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_CHUNK_SIZE) { + ok = on_chunk_size(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_CHUNK) { + ok = on_chunk(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_END) { + ok = on_end(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_ERROR) { + ok = -2; + break; + } + E("unknown parser state: %d", state); + ok = -1; + parser->callbacks.on_error(parser->arg, 500); + } while (0); + if (ok==-1) { + ehttp_parser_push(parser, HTTP_PARSER_ERROR); + } + if (ok==-2) ok = -1; + return ok; +} + +int ehttp_parser_parse_string(ehttp_parser_t *parser, const char *str) { + return ehttp_parser_parse(parser, str, str?strlen(str):0); +} + +int ehttp_parser_parse_char(ehttp_parser_t *parser, const char c) { + return ehttp_parser_parse(parser, &c, 1); +} + +int ehttp_parser_parse(ehttp_parser_t *parser, const char *buf, size_t len) { + const char *p = buf; + int ret = 0; + size_t i = 0; + while (i < len) { + int again = 0; + ret = parse_char(parser, *p, &again); + if (ret) break; + if (again) continue; + ++p; + ++i; + } + return ret; +} + diff --git a/src/plugins/http/src/ehttp_util_string.c b/src/plugins/http/src/ehttp_util_string.c new file mode 100644 index 0000000000..94ebaaafa6 --- /dev/null +++ b/src/plugins/http/src/ehttp_util_string.c @@ -0,0 +1,30 @@ +#include "ehttp_util_string.h" + +#include +#include + +void ehttp_util_string_cleanup(ehttp_util_string_t *str) { + free(str->str); + str->str = NULL; + str->len = 0; +} + +int ehttp_util_string_append(ehttp_util_string_t *str, const char *s, size_t len) { + // int n = str->str?strlen(str->str):0; + int n = str->len; + char *p = (char*)realloc(str->str, n + len + 1); + if (!p) return -1; + strncpy(p+n, s, len); + p[n+len] = '\0'; + str->str = p; + str->len = n+len; + return 0; +} + +void ehttp_util_string_clear(ehttp_util_string_t *str) { + if (str->str) { + str->str[0] = '\0'; + str->len = 0; + } +} + diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index f46d3fb427..4e235e24e8 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -28,6 +28,22 @@ #include "httpSql.h" #include "httpSession.h" +#include "elog.h" + +// dirty tweak +extern bool httpGetHttpMethod(HttpContext* pContext); +extern bool httpParseURL(HttpContext* pContext); +extern bool httpParseHttpVersion(HttpContext* pContext); +extern bool httpGetDecodeMethod(HttpContext* pContext); +extern bool httpParseHead(HttpContext* pContext); + +static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw); +static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase); +static void on_header_field(void *arg, const char *key, const char *val); +static void on_body(void *arg, const char *chunk, size_t len); +static void on_end(void *arg); +static void on_error(void *arg, int status_code); + static void httpRemoveContextFromEpoll(HttpContext *pContext) { HttpThread *pThread = pContext->pThread; if (pContext->fd >= 0) { @@ -149,6 +165,11 @@ void httpReleaseContext(HttpContext *pContext) { httpDebug("context:%p, won't be destroyed for cache is already released", pContext); // httpDestroyContext((void **)(&ppContext)); } + + if (pContext->parser.parser) { + ehttp_parser_destroy(pContext->parser.parser); + pContext->parser.parser = NULL; + } } bool httpInitContext(HttpContext *pContext) { @@ -168,6 +189,20 @@ bool httpInitContext(HttpContext *pContext) { memset(pParser, 0, sizeof(HttpParser)); pParser->pCur = pParser->pLast = pParser->buffer; + ehttp_parser_callbacks_t callbacks = { + on_request_line, + on_status_line, + on_header_field, + on_body, + on_end, + on_error + }; + ehttp_parser_conf_t conf = { + .flush_block_size = 0 + }; + pParser->parser = ehttp_parser_create(callbacks, conf, pContext); + pParser->inited = 1; + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d", pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed); return true; @@ -230,3 +265,129 @@ void httpCloseContextByServer(HttpContext *pContext) { httpRemoveContextFromEpoll(pContext); httpReleaseContext(pContext); } + + + + + +static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); + int n = snprintf(pParser->pLast, avail, + "%s %s %s\r\n", method, target_raw, version); + + char *last = pParser->pLast; + + do { + if (n>=avail) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), exceeding buffer size", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + pParser->bufsize += n; + + if (!httpGetHttpMethod(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http method failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + if (!httpParseURL(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http url failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + if (!httpParseHttpVersion(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http version failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + if (!httpGetDecodeMethod(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), get decode method failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + + last += n; + pParser->pLast = last; + return; + } while (0); + + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; +} + +static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; +} + +static void on_header_field(void *arg, const char *key, const char *val) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + if (pParser->failed) return; + + int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); + int n = snprintf(pParser->pLast, avail, + "%s: %s\r\n", key, val); + + char *last = pParser->pLast; + + do { + if (n>=avail) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), exceeding buffer size", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val); + break; + } + pParser->bufsize += n; + pParser->pCur = pParser->pLast; + + if (!httpParseHead(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), parse head failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val); + break; + } + + last += n; + pParser->pLast = last; + return; + } while (0); + + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; +} + +static void on_body(void *arg, const char *chunk, size_t len) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + if (pParser->failed) return; + + if (!pContext->parsed) { + pContext->parsed = true; + } + + A("not implemented yet"); +} + +static void on_end(void *arg) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + if (pParser->failed) return; + + if (!pContext->parsed) { + pContext->parsed = true; + } +} + +static void on_error(void *arg, int status_code) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + D("=="); + pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED; +} + diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index ec3d2c0d44..d0b01c628c 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -26,10 +26,14 @@ #include "httpResp.h" #include "httpUtil.h" +#include "elog.h" + #ifndef EPOLLWAKEUP #define EPOLLWAKEUP (1u << 29) #endif +static bool ehttpReadData(HttpContext *pContext); + static void httpStopThread(HttpThread* pThread) { pThread->stop = true; @@ -134,6 +138,8 @@ static bool httpDecompressData(HttpContext *pContext) { } static bool httpReadData(HttpContext *pContext) { + if (1) return ehttpReadData(pContext); + if (!pContext->parsed) { httpInitContext(pContext); } @@ -405,3 +411,60 @@ bool httpInitConnect() { pServer->serverPort, pServer->numOfThreads); return true; } + + + + +static bool ehttpReadData(HttpContext *pContext) { + HttpParser *pParser = &pContext->parser; + EQ_ASSERT(!pContext->parsed); + if (!pParser->parser) { + if (!pParser->inited) { + httpInitContext(pContext); + } + if (!pParser->parser) { + return false; + } + } + + pContext->accessTimes++; + pContext->lastAccessTime = taosGetTimestampSec(); + + char buf[HTTP_STEP_SIZE+1] = {0}; + int nread = (int)taosReadSocket(pContext->fd, buf, sizeof(buf)); + if (nread > 0) { + buf[nread] = '\0'; + if (strstr(buf, "GET ")==buf && !strchr(buf, '\r') && !strchr(buf, '\n')) { + D("==half of request line received:\n%s\n==", buf); + } + if (ehttp_parser_parse(pParser->parser, buf, nread)) { + D("==parsing failed=="); + httpCloseContextByServer(pContext); + return false; + } + if (pContext->parser.failed) { + D("==parsing failed: [0x%x]==", pContext->parser.failed); + httpNotifyContextClose(pContext); + return false; + } + return pContext->parsed; + } else if (nread < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event", + pContext, pContext->fd, pContext->ipstr, errno); + return false; // later again + } else { + httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", + pContext, pContext->fd, pContext->ipstr, errno); + D("==releasing because of reading failed=="); + httpReleaseContext(pContext); + return false; + } + } else { + // eof + D("==releasing because of remote close/reset=="); + httpReleaseContext(pContext); + return false; + } +} + From d9c04b18e9b96a888e363210b8b903e19735ec7d Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 10:59:12 +0800 Subject: [PATCH 2/7] elog to util; parser.data.pos --- src/plugins/http/src/httpContext.c | 13 ++++++++++--- src/plugins/http/src/httpServer.c | 25 ++++++++++++++++++++++++- src/{common => util}/inc/elog.h | 0 src/{common => util}/src/elog.c | 0 4 files changed, 34 insertions(+), 4 deletions(-) rename src/{common => util}/inc/elog.h (100%) rename src/{common => util}/src/elog.c (100%) diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 4e235e24e8..1e36fdef75 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -55,6 +55,7 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { static void httpDestroyContext(void *data) { HttpContext *pContext = *(HttpContext **)data; + D("==context[%p] destroyed==", pContext); if (pContext->fd > 0) tclose(pContext->fd); HttpThread *pThread = pContext->pThread; @@ -80,6 +81,7 @@ bool httpInitContexts() { httpError("failed to init context cache"); return false; } + D("==cache [%p] created==", tsHttpServer.contextCache); return true; } @@ -120,6 +122,8 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *pContext = calloc(1, sizeof(HttpContext)); if (pContext == NULL) return NULL; + D("==context[%p] created==", pContext); + pContext->fd = fd; pContext->httpVersion = HTTP_VERSION_10; pContext->lastAccessTime = taosGetTimestampSec(); @@ -209,6 +213,7 @@ bool httpInitContext(HttpContext *pContext) { } void httpCloseContextByApp(HttpContext *pContext) { + D("=="); pContext->parsed = false; bool keepAlive = true; @@ -220,6 +225,7 @@ void httpCloseContextByApp(HttpContext *pContext) { } if (keepAlive) { + D("==keepAlive=="); if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) { httpDebug("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd, pContext->ipstr); @@ -240,6 +246,7 @@ void httpCloseContextByApp(HttpContext *pContext) { pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); } } else { + D("==not keepAlive=="); httpRemoveContextFromEpoll(pContext); httpDebug("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close context", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); @@ -365,9 +372,7 @@ static void on_body(void *arg, const char *chunk, size_t len) { if (pParser->failed) return; - if (!pContext->parsed) { - pContext->parsed = true; - } + if (pParser->data.pos == 0) pParser->data.pos = pParser->pLast; A("not implemented yet"); } @@ -378,6 +383,8 @@ static void on_end(void *arg) { if (pParser->failed) return; + if (pParser->data.pos == 0) pParser->data.pos = pParser->pLast; + if (!pContext->parsed) { pContext->parsed = true; } diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index d0b01c628c..d21fa89d5c 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -138,7 +138,7 @@ static bool httpDecompressData(HttpContext *pContext) { } static bool httpReadData(HttpContext *pContext) { - if (1) return ehttpReadData(pContext); + if (0) return ehttpReadData(pContext); if (!pContext->parsed) { httpInitContext(pContext); @@ -447,6 +447,29 @@ static bool ehttpReadData(HttpContext *pContext) { httpNotifyContextClose(pContext); return false; } + if (pContext->parsed) { + int ret = httpCheckReadCompleted(pContext); + if (ret == HTTP_CHECK_BODY_CONTINUE) { + //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); + httpReleaseContext(pContext); + return false; + } else if (ret == HTTP_CHECK_BODY_SUCCESS){ + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len); + if (httpDecompressData(pContext)) { + return true; + } else { + httpNotifyContextClose(pContext); + httpReleaseContext(pContext); + return false; + } + } else { + httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); + httpNotifyContextClose(pContext); + httpReleaseContext(pContext); + return false; + } + } return pContext->parsed; } else if (nread < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { diff --git a/src/common/inc/elog.h b/src/util/inc/elog.h similarity index 100% rename from src/common/inc/elog.h rename to src/util/inc/elog.h diff --git a/src/common/src/elog.c b/src/util/src/elog.c similarity index 100% rename from src/common/src/elog.c rename to src/util/src/elog.c From 93cb6386df27403bc04d957586b36140a477bf31 Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 13:28:39 +0800 Subject: [PATCH 3/7] body process --- src/plugins/http/src/httpContext.c | 17 ++++++++++++++--- src/plugins/http/src/httpServer.c | 6 ++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 1e36fdef75..5012fd15f5 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -337,6 +337,7 @@ static void on_header_field(void *arg, const char *key, const char *val) { if (pParser->failed) return; + D("==key:[%s], val:[%s]==", key, val); int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); int n = snprintf(pParser->pLast, avail, "%s: %s\r\n", key, val); @@ -350,7 +351,7 @@ static void on_header_field(void *arg, const char *key, const char *val) { break; } pParser->bufsize += n; - pParser->pCur = pParser->pLast; + pParser->pCur = pParser->pLast + n; if (!httpParseHead(pContext)) { httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), parse head failed", @@ -372,9 +373,19 @@ static void on_body(void *arg, const char *chunk, size_t len) { if (pParser->failed) return; - if (pParser->data.pos == 0) pParser->data.pos = pParser->pLast; + if (pParser->data.pos == 0) { + pParser->data.pos = pParser->pLast; + pParser->data.len = 0; + } - A("not implemented yet"); + int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); + if (len+1>=avail) { + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; + return; + } + memcpy(pParser->pLast, chunk, len); + pParser->pLast += len; + pParser->data.len += len; } static void on_end(void *arg) { diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index d21fa89d5c..4f2ea63dc2 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -138,7 +138,7 @@ static bool httpDecompressData(HttpContext *pContext) { } static bool httpReadData(HttpContext *pContext) { - if (0) return ehttpReadData(pContext); + if (1) return ehttpReadData(pContext); if (!pContext->parsed) { httpInitContext(pContext); @@ -448,7 +448,9 @@ static bool ehttpReadData(HttpContext *pContext) { return false; } if (pContext->parsed) { - int ret = httpCheckReadCompleted(pContext); + // int ret = httpCheckReadCompleted(pContext); + // already done in ehttp_parser + int ret = HTTP_CHECK_BODY_SUCCESS; if (ret == HTTP_CHECK_BODY_CONTINUE) { //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); httpReleaseContext(pContext); From 7c434d6108e26a52995069978e1911efe40287b4 Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 15:19:43 +0800 Subject: [PATCH 4/7] add env FALLBACK, for the sake of easy debug in different mode --- src/plugins/http/inc/httpInt.h | 2 ++ src/plugins/http/src/httpContext.c | 40 ++++++++++++++++-------------- src/plugins/http/src/httpServer.c | 6 +++-- src/plugins/http/src/httpSystem.c | 6 +++++ 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 044b5cc4cc..bde799d6d6 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -242,6 +242,8 @@ typedef struct HttpServer { pthread_mutex_t serverMutex; HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE]; bool (*processData)(HttpContext *pContext); + + int fallback:2; } HttpServer; extern const char *httpKeepAliveStr[]; diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 5012fd15f5..4440da6d45 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -72,6 +72,13 @@ static void httpDestroyContext(void *data) { httpFreeJsonBuf(pContext); httpFreeMultiCmds(pContext); + if (!tsHttpServer.fallback) { + if (pContext->parser.parser) { + ehttp_parser_destroy(pContext->parser.parser); + pContext->parser.parser = NULL; + } + } + tfree(pContext); } @@ -169,11 +176,6 @@ void httpReleaseContext(HttpContext *pContext) { httpDebug("context:%p, won't be destroyed for cache is already released", pContext); // httpDestroyContext((void **)(&ppContext)); } - - if (pContext->parser.parser) { - ehttp_parser_destroy(pContext->parser.parser); - pContext->parser.parser = NULL; - } } bool httpInitContext(HttpContext *pContext) { @@ -193,19 +195,21 @@ bool httpInitContext(HttpContext *pContext) { memset(pParser, 0, sizeof(HttpParser)); pParser->pCur = pParser->pLast = pParser->buffer; - ehttp_parser_callbacks_t callbacks = { - on_request_line, - on_status_line, - on_header_field, - on_body, - on_end, - on_error - }; - ehttp_parser_conf_t conf = { - .flush_block_size = 0 - }; - pParser->parser = ehttp_parser_create(callbacks, conf, pContext); - pParser->inited = 1; + if (!tsHttpServer.fallback) { + ehttp_parser_callbacks_t callbacks = { + on_request_line, + on_status_line, + on_header_field, + on_body, + on_end, + on_error + }; + ehttp_parser_conf_t conf = { + .flush_block_size = 0 + }; + pParser->parser = ehttp_parser_create(callbacks, conf, pContext); + pParser->inited = 1; + } httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d", pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed); diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 4f2ea63dc2..819f7a5f4a 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -138,7 +138,7 @@ static bool httpDecompressData(HttpContext *pContext) { } static bool httpReadData(HttpContext *pContext) { - if (1) return ehttpReadData(pContext); + if (!tsHttpServer.fallback) return ehttpReadData(pContext); if (!pContext->parsed) { httpInitContext(pContext); @@ -437,11 +437,13 @@ static bool ehttpReadData(HttpContext *pContext) { if (strstr(buf, "GET ")==buf && !strchr(buf, '\r') && !strchr(buf, '\n')) { D("==half of request line received:\n%s\n==", buf); } + if (ehttp_parser_parse(pParser->parser, buf, nread)) { D("==parsing failed=="); httpCloseContextByServer(pContext); return false; } + if (pContext->parser.failed) { D("==parsing failed: [0x%x]==", pContext->parser.failed); httpNotifyContextClose(pContext); @@ -450,7 +452,7 @@ static bool ehttpReadData(HttpContext *pContext) { if (pContext->parsed) { // int ret = httpCheckReadCompleted(pContext); // already done in ehttp_parser - int ret = HTTP_CHECK_BODY_SUCCESS; + int ret = HTTP_CHECK_BODY_SUCCESS; if (ret == HTTP_CHECK_BODY_CONTINUE) { //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); httpReleaseContext(pContext); diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 3a0998f2e8..43466ee57e 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -39,6 +39,12 @@ HttpServer tsHttpServer; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); int httpInitSystem() { + tsHttpServer.fallback = 0; + const char *v = getenv("FALLBACK"); + if (v) { + tsHttpServer.fallback = 1; + } + strcpy(tsHttpServer.label, "rest"); tsHttpServer.serverIp = 0; tsHttpServer.serverPort = tsHttpPort; From e55a55a6f82e523e65a81785db6d3c36fb875561 Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 21:57:07 +0800 Subject: [PATCH 5/7] add ehttpInc/DecContextRef --- src/plugins/http/inc/httpContext.h | 3 ++ src/plugins/http/inc/httpInt.h | 2 + src/plugins/http/src/httpContext.c | 76 ++++++++++++++++++++++++++---- src/plugins/http/src/httpServer.c | 29 ++++++++---- 4 files changed, 92 insertions(+), 18 deletions(-) diff --git a/src/plugins/http/inc/httpContext.h b/src/plugins/http/inc/httpContext.h index a2d50d6b7f..594900d0cf 100644 --- a/src/plugins/http/inc/httpContext.h +++ b/src/plugins/http/inc/httpContext.h @@ -31,4 +31,7 @@ void httpCloseContextByApp(HttpContext *pContext); void httpNotifyContextClose(HttpContext *pContext); bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState); +void ehttpIncContextRef(HttpContext *pContext); +void ehttpDecContextRef(HttpContext **ppContext); + #endif diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index bde799d6d6..40f980f101 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -212,6 +212,8 @@ typedef struct HttpContext { void * timer; HttpEncodeMethod * encodeMethod; struct HttpThread *pThread; + + int closed:2; } HttpContext; typedef struct HttpThread { diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 4440da6d45..b229673df2 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -28,6 +28,7 @@ #include "httpSql.h" #include "httpSession.h" +#include "httpContext.h" #include "elog.h" // dirty tweak @@ -44,12 +45,20 @@ static void on_body(void *arg, const char *chunk, size_t len); static void on_end(void *arg); static void on_error(void *arg, int status_code); +static void httpDestroyContext(void *data); +static void httpMightDestroyContext(void *data); +static void ehttpReleaseContext(HttpContext *pContext); + static void httpRemoveContextFromEpoll(HttpContext *pContext) { HttpThread *pThread = pContext->pThread; if (pContext->fd >= 0) { epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); - taosCloseSocket(pContext->fd); + int32_t fd = pContext->fd; pContext->fd = -1; + taosCloseSocket(fd); + if (!tsHttpServer.fallback) { + ehttpDecContextRef(&pContext); + } } } @@ -83,12 +92,11 @@ static void httpDestroyContext(void *data) { } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc"); + tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpMightDestroyContext, "restc"); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; } - D("==cache [%p] created==", tsHttpServer.contextCache); return true; } @@ -136,10 +144,12 @@ HttpContext *httpCreateContext(int32_t fd) { pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; + ehttpIncContextRef(pContext); HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(int64_t), &pContext, sizeof(int64_t), 3); pContext->ppContext = ppContext; httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); + ehttpIncContextRef(pContext); // set the ref to 0 taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); @@ -148,10 +158,13 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *httpGetContext(void *ptr) { HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &ptr, sizeof(HttpContext *)); + EQ_ASSERT(ppContext); + EQ_ASSERT(*ppContext); if (ppContext) { HttpContext *pContext = *ppContext; if (pContext) { + if (!tsHttpServer.fallback) return pContext; int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount); return pContext; @@ -161,6 +174,10 @@ HttpContext *httpGetContext(void *ptr) { } void httpReleaseContext(HttpContext *pContext) { + if (!tsHttpServer.fallback) { + ehttpReleaseContext(pContext); + return; + } int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); if (refCount < 0) { httpError("context:%p, is already released, refCount:%d", pContext, refCount); @@ -217,7 +234,9 @@ bool httpInitContext(HttpContext *pContext) { } void httpCloseContextByApp(HttpContext *pContext) { - D("=="); + if (!tsHttpServer.fallback) { + if (pContext->parsed == false) return; + } pContext->parsed = false; bool keepAlive = true; @@ -229,7 +248,6 @@ void httpCloseContextByApp(HttpContext *pContext) { } if (keepAlive) { - D("==keepAlive=="); if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) { httpDebug("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd, pContext->ipstr); @@ -250,16 +268,19 @@ void httpCloseContextByApp(HttpContext *pContext) { pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); } } else { - D("==not keepAlive=="); httpRemoveContextFromEpoll(pContext); httpDebug("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close context", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); } - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); } void httpCloseContextByServer(HttpContext *pContext) { + if (!tsHttpServer.fallback) { + if (pContext->closed) return; + pContext->closed = 1; + } if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { httpDebug("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { @@ -274,7 +295,7 @@ void httpCloseContextByServer(HttpContext *pContext) { pContext->parsed = false; httpRemoveContextFromEpoll(pContext); - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); } @@ -409,7 +430,44 @@ static void on_error(void *arg, int status_code) { HttpContext *pContext = (HttpContext*)arg; HttpParser *pParser = &pContext->parser; - D("=="); pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED; } +static void httpMightDestroyContext(void *data) { + HttpContext *pContext = *(HttpContext **)data; + if (!tsHttpServer.fallback) { + httpRemoveContextFromEpoll(pContext); + ehttpDecContextRef(&pContext); + return; + } + int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); + if (refCount>0) return; + EQ_ASSERT(refCount==0); + httpDestroyContext(data); +} + +static void ehttpReleaseContext(HttpContext *pContext) { + HttpContext **ppContext = pContext->ppContext; + + if (tsHttpServer.contextCache != NULL) { + taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); + } else { + httpDebug("context:%p, won't be destroyed for cache is already released", pContext); + // httpDestroyContext((void **)(&ppContext)); + } +} + +void ehttpIncContextRef(HttpContext *pContext) { + if (tsHttpServer.fallback) return; + atomic_add_fetch_32(&pContext->refCount, 1); +} + +void ehttpDecContextRef(HttpContext **ppContext) { + if (tsHttpServer.fallback) return; + HttpContext *pContext = *ppContext; + int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); + if (refCount>0) return; + EQ_ASSERT(refCount==0); + httpDestroyContext(ppContext); +} + diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 819f7a5f4a..5a785d2e55 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -194,6 +194,8 @@ static void httpProcessHttpData(void *param) { sigaddset(&set, SIGPIPE); pthread_sigmask(SIG_SETMASK, &set, NULL); + elog_set_thread_name("httpProcessHttpData"); + while (1) { struct epoll_event events[HTTP_MAX_EVENTS]; //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 @@ -209,14 +211,18 @@ static void httpProcessHttpData(void *param) { if (pContext == NULL) { httpError("context:%p, is already released, close connect", events[i].data.ptr); //epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, events[i].data.fd, NULL); - //tclose(events[i].data.fd); + //taosClose(events[i].data.fd); continue; } + ehttpIncContextRef(pContext); + if (events[i].events & EPOLLPRI) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -224,6 +230,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -231,6 +239,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -238,6 +248,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -245,6 +257,7 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -253,11 +266,15 @@ static void httpProcessHttpData(void *param) { pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE); httpNotifyContextClose(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); } else { if (httpReadData(pContext)) { (*(pThread->processData))(pContext); atomic_fetch_add_32(&pServer->requestNum, 1); } + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); } } } @@ -338,7 +355,8 @@ static void *httpAcceptHttpConnection(void *arg) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, pContext->ipstr, pThread->label, strerror(errno)); tclose(pContext->fd); - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -455,7 +473,6 @@ static bool ehttpReadData(HttpContext *pContext) { int ret = HTTP_CHECK_BODY_SUCCESS; if (ret == HTTP_CHECK_BODY_CONTINUE) { //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); - httpReleaseContext(pContext); return false; } else if (ret == HTTP_CHECK_BODY_SUCCESS){ httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", @@ -464,13 +481,11 @@ static bool ehttpReadData(HttpContext *pContext) { return true; } else { httpNotifyContextClose(pContext); - httpReleaseContext(pContext); return false; } } else { httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); httpNotifyContextClose(pContext); - httpReleaseContext(pContext); return false; } } @@ -483,14 +498,10 @@ static bool ehttpReadData(HttpContext *pContext) { } else { httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", pContext, pContext->fd, pContext->ipstr, errno); - D("==releasing because of reading failed=="); - httpReleaseContext(pContext); return false; } } else { // eof - D("==releasing because of remote close/reset=="); - httpReleaseContext(pContext); return false; } } From aac52f9691fcb4aa13eac068c22b1b287070e7e0 Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 22:12:35 +0800 Subject: [PATCH 6/7] no need to dec ref in fallback mode --- src/plugins/http/src/httpContext.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 98028fdbb4..ab10234662 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -438,9 +438,6 @@ static void httpMightDestroyContext(void *data) { ehttpDecContextRef(&pContext); return; } - int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); - if (refCount>0) return; - EQ_ASSERT(refCount==0); httpDestroyContext(data); } From 46cf2db0a267994c8e1113fe1f4d750cfcdcaa26 Mon Sep 17 00:00:00 2001 From: freemine Date: Sat, 1 Aug 2020 22:41:29 +0800 Subject: [PATCH 7/7] reallocarray => realloc, because of configuration in travis --- src/plugins/http/src/ehttp_parser.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/plugins/http/src/ehttp_parser.c b/src/plugins/http/src/ehttp_parser.c index fbe15661b5..30d37f8a0d 100644 --- a/src/plugins/http/src/ehttp_parser.c +++ b/src/plugins/http/src/ehttp_parser.c @@ -185,7 +185,8 @@ static HTTP_PARSER_STATE ehttp_parser_top(ehttp_parser_t *parser) { static int ehttp_parser_push(ehttp_parser_t *parser, HTTP_PARSER_STATE state) { size_t n = parser->stacks_count + 1; - HTTP_PARSER_STATE *stacks = (HTTP_PARSER_STATE*)reallocarray(parser->stacks, n, sizeof(*stacks)); + // HTTP_PARSER_STATE *stacks = (HTTP_PARSER_STATE*)reallocarray(parser->stacks, n, sizeof(*stacks)); + HTTP_PARSER_STATE *stacks = (HTTP_PARSER_STATE*)realloc(parser->stacks, n * sizeof(*stacks)); if (!stacks) return -1; parser->stacks_count = n; @@ -380,7 +381,8 @@ static int ehttp_parser_check_field(ehttp_parser_t *parser, const char *key, con } static int ehttp_parser_kvs_append_kv(ehttp_parser_t *parser, const char *key, const char *val) { - ehttp_parser_kv_t *kvs = (ehttp_parser_kv_t*)reallocarray(parser->kvs, parser->kvs_count + 1, sizeof(*kvs)); + // ehttp_parser_kv_t *kvs = (ehttp_parser_kv_t*)reallocarray(parser->kvs, parser->kvs_count + 1, sizeof(*kvs)); + ehttp_parser_kv_t *kvs = (ehttp_parser_kv_t*)realloc(parser->kvs, (parser->kvs_count + 1) * sizeof(*kvs)); if (!kvs) return -1; parser->kvs = kvs;