[TD-1291]
This commit is contained in:
parent
dfa681df3c
commit
f878512588
|
@ -25,7 +25,7 @@ const char *httpContextStateStr(HttpContextState state);
|
||||||
HttpContext *httpCreateContext(int32_t fd);
|
HttpContext *httpCreateContext(int32_t fd);
|
||||||
bool httpInitContext(HttpContext *pContext);
|
bool httpInitContext(HttpContext *pContext);
|
||||||
HttpContext *httpGetContext(void * pContext);
|
HttpContext *httpGetContext(void * pContext);
|
||||||
void httpReleaseContext(HttpContext *pContext);
|
void httpReleaseContext(HttpContext *pContext, bool clearRes);
|
||||||
void httpCloseContextByServer(HttpContext *pContext);
|
void httpCloseContextByServer(HttpContext *pContext);
|
||||||
void httpCloseContextByApp(HttpContext *pContext);
|
void httpCloseContextByApp(HttpContext *pContext);
|
||||||
void httpNotifyContextClose(HttpContext *pContext);
|
void httpNotifyContextClose(HttpContext *pContext);
|
||||||
|
|
|
@ -32,9 +32,9 @@
|
||||||
#define HTTP_MAX_BUFFER_SIZE 1024*1024*8
|
#define HTTP_MAX_BUFFER_SIZE 1024*1024*8
|
||||||
#define HTTP_LABEL_SIZE 8
|
#define HTTP_LABEL_SIZE 8
|
||||||
#define HTTP_MAX_EVENTS 10
|
#define HTTP_MAX_EVENTS 10
|
||||||
#define HTTP_BUFFER_INIT 8192
|
#define HTTP_BUFFER_INIT 4096
|
||||||
#define HTTP_BUFFER_SIZE 8192000
|
#define HTTP_BUFFER_SIZE 8388608
|
||||||
#define HTTP_STEP_SIZE 1024 //http message get process step by step
|
#define HTTP_STEP_SIZE 4096 //http message get process step by step
|
||||||
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
|
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
|
||||||
#define TSDB_CODE_HTTP_GC_TARGET_SIZE 512
|
#define TSDB_CODE_HTTP_GC_TARGET_SIZE 512
|
||||||
#define HTTP_WRITE_RETRY_TIMES 500
|
#define HTTP_WRITE_RETRY_TIMES 500
|
||||||
|
|
|
@ -145,14 +145,17 @@ HttpContext *httpGetContext(void *ptr) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void httpReleaseContext(HttpContext *pContext) {
|
void httpReleaseContext(HttpContext *pContext, bool clearRes) {
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
|
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
|
||||||
if (refCount < 0) {
|
if (refCount < 0) {
|
||||||
httpError("context:%p, is already released, refCount:%d", pContext, refCount);
|
httpError("context:%p, is already released, refCount:%d", pContext, refCount);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (clearRes) {
|
||||||
httpClearParser(pContext->parser);
|
httpClearParser(pContext->parser);
|
||||||
|
}
|
||||||
|
|
||||||
HttpContext **ppContext = pContext->ppContext;
|
HttpContext **ppContext = pContext->ppContext;
|
||||||
httpTrace("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount);
|
httpTrace("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount);
|
||||||
|
|
||||||
|
@ -211,7 +214,7 @@ void httpCloseContextByApp(HttpContext *pContext) {
|
||||||
httpContextStateStr(pContext->state), pContext->state);
|
httpContextStateStr(pContext->state), pContext->state);
|
||||||
}
|
}
|
||||||
|
|
||||||
httpReleaseContext(pContext);
|
httpReleaseContext(pContext, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void httpCloseContextByServer(HttpContext *pContext) {
|
void httpCloseContextByServer(HttpContext *pContext) {
|
||||||
|
|
|
@ -110,11 +110,11 @@ static void httpCleanupString(HttpString *str) {
|
||||||
static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) {
|
static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) {
|
||||||
if (str->size == 0) {
|
if (str->size == 0) {
|
||||||
str->pos = 0;
|
str->pos = 0;
|
||||||
str->size = 32;
|
str->size = 64;
|
||||||
str->str = malloc(str->size);
|
str->str = malloc(str->size);
|
||||||
} else if (str->pos + len + 1 >= str->size) {
|
} else if (str->pos + len + 1 >= str->size) {
|
||||||
str->size += len;
|
str->size += len;
|
||||||
str->size *= 10;
|
str->size *= 4;
|
||||||
str->str = realloc(str->str, str->size);
|
str->str = realloc(str->str, str->size);
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
@ -153,17 +153,10 @@ static int32_t httpOnRequestLine(HttpParser *pParser, char *method, char *target
|
||||||
for (int32_t i = 0; i < HTTP_MAX_URL; i++) {
|
for (int32_t i = 0; i < HTTP_MAX_URL; i++) {
|
||||||
char *pSeek = strchr(pStart, '/');
|
char *pSeek = strchr(pStart, '/');
|
||||||
if (pSeek == NULL) {
|
if (pSeek == NULL) {
|
||||||
pParser->path[i].str = strdup(pStart);
|
httpAppendString(pParser->path + i, pStart, strlen(pStart));
|
||||||
pParser->path[i].size = strlen(pStart);
|
|
||||||
pParser->path[i].pos = pParser->path[i].size;
|
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
int32_t len = (int32_t)(pSeek - pStart);
|
httpAppendString(pParser->path + i, pStart, (int32_t)(pSeek - pStart));
|
||||||
pParser->path[i].str = malloc(len + 1);
|
|
||||||
memcpy(pParser->path[i].str, pStart, len);
|
|
||||||
pParser->path[i].str[len] = 0;
|
|
||||||
pParser->path[i].size = len;
|
|
||||||
pParser->path[i].pos = len;
|
|
||||||
}
|
}
|
||||||
pStart = pSeek + 1;
|
pStart = pSeek + 1;
|
||||||
}
|
}
|
||||||
|
@ -336,24 +329,30 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
|
||||||
HttpString * buf = &parser->body;
|
HttpString * buf = &parser->body;
|
||||||
if (parser->parseCode != TSDB_CODE_SUCCESS) return -1;
|
if (parser->parseCode != TSDB_CODE_SUCCESS) return -1;
|
||||||
|
|
||||||
|
if (buf->size <= 0) {
|
||||||
|
buf->size = MIN(len + 2, HTTP_BUFFER_SIZE);
|
||||||
|
buf->str = malloc(buf->size);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t newSize = buf->pos + len + 1;
|
int32_t newSize = buf->pos + len + 1;
|
||||||
if (newSize >= buf->size) {
|
if (newSize >= buf->size) {
|
||||||
if (buf->size >= HTTP_BUFFER_SIZE) {
|
if (buf->size >= HTTP_BUFFER_SIZE) {
|
||||||
httpError("context:%p, fd:%d, failed parse body, exceeding buffer size %d", pContext, pContext->fd, buf->size);
|
httpError("context:%p, fd:%d, failed parse body, exceeding buffer size %d", pContext, pContext->fd, buf->size);
|
||||||
httpOnError(parser, 0, TSDB_CODE_HTTP_REQUSET_TOO_BIG);
|
httpOnError(parser, 0, TSDB_CODE_HTTP_REQUSET_TOO_BIG);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
}
|
||||||
newSize = MAX(newSize, 32);
|
|
||||||
newSize *= 10;
|
newSize = MAX(newSize, HTTP_BUFFER_INIT);
|
||||||
|
newSize *= 4;
|
||||||
newSize = MIN(newSize, HTTP_BUFFER_SIZE);
|
newSize = MIN(newSize, HTTP_BUFFER_SIZE);
|
||||||
buf->str = realloc(buf->str, newSize);
|
buf->str = realloc(buf->str, newSize);
|
||||||
|
buf->size = newSize;
|
||||||
|
|
||||||
if (buf->str == NULL) {
|
if (buf->str == NULL) {
|
||||||
httpError("context:%p, fd:%d, failed parse body, realloc %d failed", pContext, pContext->fd, newSize);
|
httpError("context:%p, fd:%d, failed parse body, realloc %d failed", pContext, pContext->fd, buf->size);
|
||||||
httpOnError(parser, 0, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
|
httpOnError(parser, 0, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
buf->size = newSize;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(buf->str + buf->pos, chunk, len);
|
memcpy(buf->str + buf->pos, chunk, len);
|
||||||
|
@ -389,7 +388,7 @@ static int32_t httpPushStack(HttpParser *parser, HTTP_PARSER_STATE state) {
|
||||||
stack->size = 32;
|
stack->size = 32;
|
||||||
stack->stacks = malloc(stack->size * sizeof(int8_t));
|
stack->stacks = malloc(stack->size * sizeof(int8_t));
|
||||||
} else if (stack->pos + 1 > stack->size) {
|
} else if (stack->pos + 1 > stack->size) {
|
||||||
stack->size *= 10;
|
stack->size *= 2;
|
||||||
stack->stacks = realloc(stack->stacks, stack->size * sizeof(int8_t));
|
stack->stacks = realloc(stack->stacks, stack->size * sizeof(int8_t));
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,7 +132,7 @@ static void httpProcessHttpData(void *param) {
|
||||||
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
|
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
|
||||||
httpDebug("context:%p, fd:%d, state:%s, not in ready state, ignore read events", pContext, pContext->fd,
|
httpDebug("context:%p, fd:%d, state:%s, not in ready state, ignore read events", pContext, pContext->fd,
|
||||||
httpContextStateStr(pContext->state));
|
httpContextStateStr(pContext->state));
|
||||||
httpReleaseContext(pContext);
|
httpReleaseContext(pContext, true);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,6 +145,8 @@ static void httpProcessHttpData(void *param) {
|
||||||
if (httpReadData(pContext)) {
|
if (httpReadData(pContext)) {
|
||||||
(*(pThread->processData))(pContext);
|
(*(pThread->processData))(pContext);
|
||||||
atomic_fetch_add_32(&pServer->requestNum, 1);
|
atomic_fetch_add_32(&pServer->requestNum, 1);
|
||||||
|
} else {
|
||||||
|
httpReleaseContext(pContext, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -226,7 +228,7 @@ static void *httpAcceptHttpConnection(void *arg) {
|
||||||
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
|
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
|
||||||
pContext->ipstr, pThread->label, strerror(errno));
|
pContext->ipstr, pThread->label, strerror(errno));
|
||||||
taosClose(pContext->fd);
|
taosClose(pContext->fd);
|
||||||
httpReleaseContext(pContext);
|
httpReleaseContext(pContext, true);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,7 +316,7 @@ static bool httpReadData(HttpContext *pContext) {
|
||||||
int32_t nread = (int32_t)taosReadSocket(pContext->fd, buf, sizeof(buf));
|
int32_t nread = (int32_t)taosReadSocket(pContext->fd, buf, sizeof(buf));
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
buf[nread] = '\0';
|
buf[nread] = '\0';
|
||||||
httpTrace("context:%p, fd:%d, nread:%d content:%s", pContext, pContext->fd, nread, buf);
|
httpTrace("context:%p, fd:%d, nread:%d", pContext, pContext->fd, nread);
|
||||||
int32_t ok = httpParseBuf(pParser, buf, nread);
|
int32_t ok = httpParseBuf(pParser, buf, nread);
|
||||||
|
|
||||||
if (ok) {
|
if (ok) {
|
||||||
|
|
Loading…
Reference in New Issue