Merge branch 'feature/http' of https://github.com/taosdata/TDengine into feature/http
This commit is contained in:
commit
20f95567de
|
@ -108,8 +108,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
|
|||
}
|
||||
|
||||
void mnodeReleaseConn(SConnObj *pConn) {
|
||||
if(pConn == NULL) return;
|
||||
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false);
|
||||
if (pConn == NULL) return;
|
||||
taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
|
||||
}
|
||||
|
||||
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
|
||||
|
@ -138,7 +138,7 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p
|
|||
static void mnodeFreeConn(void *data) {
|
||||
SConnObj *pConn = data;
|
||||
tfree(pConn->pQueries);
|
||||
tfree(pConn->pQueries);
|
||||
tfree(pConn->pStreams);
|
||||
|
||||
mTrace("connId:%d, is destroyed", pConn->connId);
|
||||
}
|
||||
|
|
|
@ -16,15 +16,11 @@
|
|||
#ifndef TDENGINE_GC_HANDLE_H
|
||||
#define TDENGINE_GC_HANDLE_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "http.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpUtil.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpSql.h"
|
||||
|
||||
#define GC_ROOT_URL_POS 0
|
||||
#define GC_ACTION_URL_POS 1
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_TOKEN_H
|
||||
#define TDENGINE_HTTP_TOKEN_H
|
||||
|
||||
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
|
||||
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
|
||||
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
|
||||
|
||||
#endif
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_CONTEXT_H
|
||||
#define TDENGINE_HTTP_CONTEXT_H
|
||||
|
||||
#include "httpInt.h"
|
||||
|
||||
bool httpInitContexts();
|
||||
void httpCleanupContexts();
|
||||
const char *httpContextStateStr(HttpContextState state);
|
||||
|
||||
HttpContext *httpCreateContext(int32_t fd);
|
||||
bool httpInitContext(HttpContext *pContext);
|
||||
HttpContext *httpGetContext(void * pContext);
|
||||
void httpReleaseContext(HttpContext *pContext);
|
||||
void httpCloseContextByServer(HttpContext *pContext);
|
||||
void httpCloseContextByApp(HttpContext *pContext);
|
||||
void httpNotifyContextClose(HttpContext *pContext);
|
||||
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
|
||||
|
||||
#endif
|
|
@ -13,304 +13,11 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_SERVER_H
|
||||
#define TDENGINE_HTTP_SERVER_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include "pthread.h"
|
||||
#include "semaphore.h"
|
||||
#include "tmempool.h"
|
||||
#include "taosdef.h"
|
||||
#include "tutil.h"
|
||||
#include "zlib.h"
|
||||
#include "http.h"
|
||||
#include "httpJson.h"
|
||||
|
||||
#define HTTP_MAX_CMD_SIZE 1024
|
||||
#define HTTP_MAX_BUFFER_SIZE 1024*1024
|
||||
|
||||
#define HTTP_LABEL_SIZE 8
|
||||
#define HTTP_MAX_EVENTS 10
|
||||
#define HTTP_BUFFER_SIZE 1024*65 //65k
|
||||
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
|
||||
#define HTTP_STEP_SIZE 1024 //http message get process step by step
|
||||
#define HTTP_MAX_URL 5 //http url stack size
|
||||
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
|
||||
#define HTTP_GC_TARGET_SIZE 512
|
||||
|
||||
#define HTTP_VERSION_10 0
|
||||
#define HTTP_VERSION_11 1
|
||||
//#define HTTP_VERSION_12 2
|
||||
|
||||
#define HTTP_UNCUNKED 0
|
||||
#define HTTP_CHUNKED 1
|
||||
|
||||
#define HTTP_KEEPALIVE_NO_INPUT 0
|
||||
#define HTTP_KEEPALIVE_ENABLE 1
|
||||
#define HTTP_KEEPALIVE_DISABLE 2
|
||||
|
||||
#define HTTP_REQTYPE_OTHERS 0
|
||||
#define HTTP_REQTYPE_LOGIN 1
|
||||
#define HTTP_REQTYPE_HEARTBEAT 2
|
||||
#define HTTP_REQTYPE_SINGLE_SQL 3
|
||||
#define HTTP_REQTYPE_MULTI_SQL 4
|
||||
|
||||
#define HTTP_CHECK_BODY_ERROR -1
|
||||
#define HTTP_CHECK_BODY_CONTINUE 0
|
||||
#define HTTP_CHECK_BODY_SUCCESS 1
|
||||
|
||||
#define HTTP_WRITE_RETRY_TIMES 500
|
||||
#define HTTP_WRITE_WAIT_TIME_MS 5
|
||||
#define HTTP_EXPIRED_TIME 60000
|
||||
#define HTTP_DELAY_CLOSE_TIME_MS 500
|
||||
|
||||
#define HTTP_COMPRESS_IDENTITY 0
|
||||
#define HTTP_COMPRESS_GZIP 2
|
||||
|
||||
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
|
||||
|
||||
typedef enum {
|
||||
HTTP_CONTEXT_STATE_READY,
|
||||
HTTP_CONTEXT_STATE_HANDLING,
|
||||
HTTP_CONTEXT_STATE_DROPPING,
|
||||
HTTP_CONTEXT_STATE_CLOSED
|
||||
} HttpContextState;
|
||||
|
||||
struct HttpContext;
|
||||
struct HttpThread;
|
||||
|
||||
typedef struct {
|
||||
void *signature;
|
||||
int expire;
|
||||
int access;
|
||||
void *taos;
|
||||
char id[HTTP_SESSION_ID_LEN];
|
||||
} HttpSession;
|
||||
|
||||
typedef enum {
|
||||
HTTP_CMD_TYPE_UN_SPECIFIED,
|
||||
HTTP_CMD_TYPE_CREATE_DB,
|
||||
HTTP_CMD_TYPE_CREATE_STBALE,
|
||||
HTTP_CMD_TYPE_INSERT
|
||||
} HttpSqlCmdType;
|
||||
|
||||
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
|
||||
|
||||
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
|
||||
|
||||
typedef struct {
|
||||
// used by single cmd
|
||||
char *nativSql;
|
||||
int32_t numOfRows;
|
||||
int32_t code;
|
||||
|
||||
// these are the locations in the buffer
|
||||
int32_t tagNames[TSDB_MAX_TAGS];
|
||||
int32_t tagValues[TSDB_MAX_TAGS];
|
||||
int32_t timestamp;
|
||||
int32_t metric;
|
||||
int32_t stable;
|
||||
int32_t table;
|
||||
int32_t values;
|
||||
int32_t sql;
|
||||
|
||||
// used by multi-cmd
|
||||
int8_t cmdType;
|
||||
int8_t cmdReturnType;
|
||||
int8_t cmdState;
|
||||
int8_t tagNum;
|
||||
} HttpSqlCmd;
|
||||
|
||||
typedef struct {
|
||||
HttpSqlCmd *cmds;
|
||||
int16_t pos;
|
||||
int16_t size;
|
||||
int16_t maxSize;
|
||||
int32_t bufferPos;
|
||||
int32_t bufferSize;
|
||||
char * buffer;
|
||||
} HttpSqlCmds;
|
||||
|
||||
typedef struct {
|
||||
char *module;
|
||||
bool (*decodeFp)(struct HttpContext *pContext);
|
||||
} HttpDecodeMethod;
|
||||
|
||||
typedef struct {
|
||||
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
|
||||
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
|
||||
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
|
||||
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
|
||||
void (*initJsonFp)(struct HttpContext *pContext);
|
||||
void (*cleanJsonFp)(struct HttpContext *pContext);
|
||||
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
|
||||
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
|
||||
} HttpEncodeMethod;
|
||||
|
||||
typedef struct {
|
||||
char *pos;
|
||||
int32_t len;
|
||||
} HttpBuf;
|
||||
|
||||
typedef struct {
|
||||
char buffer[HTTP_BUFFER_SIZE];
|
||||
int bufsize;
|
||||
char *pLast;
|
||||
char *pCur;
|
||||
HttpBuf method;
|
||||
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
|
||||
HttpBuf data; // body content
|
||||
HttpBuf token; // auth token
|
||||
HttpDecodeMethod *pMethod;
|
||||
} HttpParser;
|
||||
|
||||
typedef struct HttpContext {
|
||||
void * signature;
|
||||
int fd;
|
||||
uint32_t accessTimes;
|
||||
uint32_t lastAccessTime;
|
||||
uint8_t httpVersion;
|
||||
uint8_t httpChunked;
|
||||
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
|
||||
uint8_t fromMemPool;
|
||||
uint8_t acceptEncoding;
|
||||
uint8_t contentEncoding;
|
||||
uint8_t reqType;
|
||||
uint8_t parsed;
|
||||
int32_t state;
|
||||
char ipstr[22];
|
||||
char user[TSDB_USER_LEN]; // parsed from auth token or login message
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
void *taos;
|
||||
HttpSession *session;
|
||||
z_stream gzipStream;
|
||||
HttpEncodeMethod *encodeMethod;
|
||||
HttpSqlCmd singleCmd;
|
||||
HttpSqlCmds *multiCmds;
|
||||
JsonBuf *jsonBuf;
|
||||
HttpParser parser;
|
||||
void *timer;
|
||||
struct HttpThread *pThread;
|
||||
struct HttpContext *prev;
|
||||
struct HttpContext *next;
|
||||
} HttpContext;
|
||||
|
||||
typedef struct HttpThread {
|
||||
pthread_t thread;
|
||||
HttpContext * pHead;
|
||||
pthread_mutex_t threadMutex;
|
||||
bool stop;
|
||||
int pollFd;
|
||||
int numOfFds;
|
||||
int threadId;
|
||||
char label[HTTP_LABEL_SIZE];
|
||||
bool (*processData)(HttpContext *pContext);
|
||||
struct HttpServer *pServer; // handle passed by upper layer during pServer initialization
|
||||
} HttpThread;
|
||||
|
||||
typedef struct HttpServer {
|
||||
char label[HTTP_LABEL_SIZE];
|
||||
uint32_t serverIp;
|
||||
uint16_t serverPort;
|
||||
bool online;
|
||||
int fd;
|
||||
int cacheContext;
|
||||
int sessionExpire;
|
||||
int numOfThreads;
|
||||
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
|
||||
int methodScannerLen;
|
||||
pthread_mutex_t serverMutex;
|
||||
void *pSessionHash;
|
||||
void *pContextPool;
|
||||
void *expireTimer;
|
||||
HttpThread *pThreads;
|
||||
pthread_t thread;
|
||||
bool (*processData)(HttpContext *pContext);
|
||||
int requestNum;
|
||||
void *timerHandle;
|
||||
} HttpServer;
|
||||
|
||||
// http util method
|
||||
bool httpCheckUsedbSql(char *sql);
|
||||
void httpTimeToString(time_t t, char *buf, int buflen);
|
||||
|
||||
// http init method
|
||||
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
|
||||
void httpCleanUpServer(HttpServer *pServer);
|
||||
|
||||
// http server connection
|
||||
void httpCleanUpConnect(HttpServer *pServer);
|
||||
bool httpInitConnect(HttpServer *pServer);
|
||||
|
||||
// http context for each client connection
|
||||
HttpContext *httpCreateContext(HttpServer *pServer);
|
||||
bool httpInitContext(HttpContext *pContext);
|
||||
void httpCloseContextByApp(HttpContext *pContext);
|
||||
void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext);
|
||||
|
||||
// http session method
|
||||
void httpCreateSession(HttpContext *pContext, void *taos);
|
||||
void httpAccessSession(HttpContext *pContext);
|
||||
void httpFetchSession(HttpContext *pContext);
|
||||
void httpRestoreSession(HttpContext *pContext);
|
||||
void httpRemoveExpireSessions(HttpServer *pServer);
|
||||
bool httpInitAllSessions(HttpServer *pServer);
|
||||
void httpRemoveAllSessions(HttpServer *pServer);
|
||||
void httpProcessSessionExpire(void *handle, void *tmrId);
|
||||
|
||||
// http request parser
|
||||
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
|
||||
|
||||
// http token method
|
||||
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
|
||||
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
|
||||
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
|
||||
|
||||
// util
|
||||
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
|
||||
bool httpProcessData(HttpContext *pContext);
|
||||
bool httpReadDataImp(HttpContext *pContext);
|
||||
bool httpParseRequest(HttpContext* pContext);
|
||||
int httpCheckReadCompleted(HttpContext* pContext);
|
||||
void httpReadDirtyData(HttpContext *pContext);
|
||||
#ifndef TDENGINE_HTTP_HANDLE_H
|
||||
#define TDENGINE_HTTP_HANDLE_H
|
||||
|
||||
// http request handler
|
||||
void httpProcessRequest(HttpContext *pContext);
|
||||
|
||||
// http json printer
|
||||
JsonBuf *httpMallocJsonBuf(HttpContext *pContext);
|
||||
void httpFreeJsonBuf(HttpContext *pContext);
|
||||
|
||||
// http multicmds util
|
||||
|
||||
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
|
||||
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
|
||||
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
|
||||
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
|
||||
|
||||
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
|
||||
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
|
||||
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
|
||||
void httpFreeMultiCmds(HttpContext *pContext);
|
||||
|
||||
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
|
||||
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
|
||||
int httpCurSqlCmdPos(HttpContext *pContext);
|
||||
|
||||
void httpTrimTableName(char *name);
|
||||
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
|
||||
char *httpGetCmdsString(HttpContext *pContext, int pos);
|
||||
|
||||
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
|
||||
int httpGzipCompressInit(HttpContext *pContext);
|
||||
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
|
||||
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
|
||||
|
||||
extern const char *httpKeepAliveStr[];
|
||||
extern const char *httpVersionStr[];
|
||||
const char* httpContextStateStr(HttpContextState state);
|
||||
|
||||
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
|
||||
void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext);
|
||||
bool httpProcessData(HttpContext *pContext);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -0,0 +1,237 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_INT_H
|
||||
#define TDENGINE_HTTP_INT_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include "pthread.h"
|
||||
#include "semaphore.h"
|
||||
#include "tmempool.h"
|
||||
#include "taosdef.h"
|
||||
#include "tutil.h"
|
||||
#include "zlib.h"
|
||||
#include "http.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpLog.h"
|
||||
#include "httpJson.h"
|
||||
|
||||
#define HTTP_MAX_CMD_SIZE 1024
|
||||
#define HTTP_MAX_BUFFER_SIZE 1024*1024
|
||||
|
||||
#define HTTP_LABEL_SIZE 8
|
||||
#define HTTP_MAX_EVENTS 10
|
||||
#define HTTP_BUFFER_SIZE 1024*65 //65k
|
||||
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
|
||||
#define HTTP_STEP_SIZE 1024 //http message get process step by step
|
||||
#define HTTP_MAX_URL 5 //http url stack size
|
||||
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
|
||||
#define HTTP_GC_TARGET_SIZE 512
|
||||
|
||||
#define HTTP_VERSION_10 0
|
||||
#define HTTP_VERSION_11 1
|
||||
//#define HTTP_VERSION_12 2
|
||||
|
||||
#define HTTP_UNCUNKED 0
|
||||
#define HTTP_CHUNKED 1
|
||||
|
||||
#define HTTP_KEEPALIVE_NO_INPUT 0
|
||||
#define HTTP_KEEPALIVE_ENABLE 1
|
||||
#define HTTP_KEEPALIVE_DISABLE 2
|
||||
|
||||
#define HTTP_REQTYPE_OTHERS 0
|
||||
#define HTTP_REQTYPE_LOGIN 1
|
||||
#define HTTP_REQTYPE_HEARTBEAT 2
|
||||
#define HTTP_REQTYPE_SINGLE_SQL 3
|
||||
#define HTTP_REQTYPE_MULTI_SQL 4
|
||||
|
||||
#define HTTP_CHECK_BODY_ERROR -1
|
||||
#define HTTP_CHECK_BODY_CONTINUE 0
|
||||
#define HTTP_CHECK_BODY_SUCCESS 1
|
||||
|
||||
#define HTTP_WRITE_RETRY_TIMES 500
|
||||
#define HTTP_WRITE_WAIT_TIME_MS 5
|
||||
#define HTTP_EXPIRED_TIME 60000
|
||||
#define HTTP_DELAY_CLOSE_TIME_MS 500
|
||||
|
||||
#define HTTP_COMPRESS_IDENTITY 0
|
||||
#define HTTP_COMPRESS_GZIP 2
|
||||
|
||||
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
|
||||
|
||||
typedef enum {
|
||||
HTTP_SERVER_INIT,
|
||||
HTTP_SERVER_RUNNING,
|
||||
HTTP_SERVER_CLOSING,
|
||||
HTTP_SERVER_CLOSED
|
||||
} HttpServerStatus;
|
||||
|
||||
typedef enum {
|
||||
HTTP_CONTEXT_STATE_READY,
|
||||
HTTP_CONTEXT_STATE_HANDLING,
|
||||
HTTP_CONTEXT_STATE_DROPPING,
|
||||
HTTP_CONTEXT_STATE_CLOSED
|
||||
} HttpContextState;
|
||||
|
||||
struct HttpContext;
|
||||
struct HttpThread;
|
||||
|
||||
typedef struct {
|
||||
char id[HTTP_SESSION_ID_LEN];
|
||||
int refCount;
|
||||
void *taos;
|
||||
} HttpSession;
|
||||
|
||||
typedef enum {
|
||||
HTTP_CMD_TYPE_UN_SPECIFIED,
|
||||
HTTP_CMD_TYPE_CREATE_DB,
|
||||
HTTP_CMD_TYPE_CREATE_STBALE,
|
||||
HTTP_CMD_TYPE_INSERT
|
||||
} HttpSqlCmdType;
|
||||
|
||||
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
|
||||
|
||||
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
|
||||
|
||||
typedef struct {
|
||||
// used by single cmd
|
||||
char *nativSql;
|
||||
int32_t numOfRows;
|
||||
int32_t code;
|
||||
|
||||
// these are the locations in the buffer
|
||||
int32_t tagNames[TSDB_MAX_TAGS];
|
||||
int32_t tagValues[TSDB_MAX_TAGS];
|
||||
int32_t timestamp;
|
||||
int32_t metric;
|
||||
int32_t stable;
|
||||
int32_t table;
|
||||
int32_t values;
|
||||
int32_t sql;
|
||||
|
||||
// used by multi-cmd
|
||||
int8_t cmdType;
|
||||
int8_t cmdReturnType;
|
||||
int8_t cmdState;
|
||||
int8_t tagNum;
|
||||
} HttpSqlCmd;
|
||||
|
||||
typedef struct {
|
||||
HttpSqlCmd *cmds;
|
||||
int16_t pos;
|
||||
int16_t size;
|
||||
int16_t maxSize;
|
||||
int32_t bufferPos;
|
||||
int32_t bufferSize;
|
||||
char * buffer;
|
||||
} HttpSqlCmds;
|
||||
|
||||
typedef struct {
|
||||
char *module;
|
||||
bool (*decodeFp)(struct HttpContext *pContext);
|
||||
} HttpDecodeMethod;
|
||||
|
||||
typedef struct {
|
||||
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
|
||||
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
|
||||
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
|
||||
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
|
||||
void (*initJsonFp)(struct HttpContext *pContext);
|
||||
void (*cleanJsonFp)(struct HttpContext *pContext);
|
||||
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
|
||||
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
|
||||
} HttpEncodeMethod;
|
||||
|
||||
typedef struct {
|
||||
char *pos;
|
||||
int32_t len;
|
||||
} HttpBuf;
|
||||
|
||||
typedef struct {
|
||||
char buffer[HTTP_BUFFER_SIZE];
|
||||
int bufsize;
|
||||
char *pLast;
|
||||
char *pCur;
|
||||
HttpBuf method;
|
||||
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
|
||||
HttpBuf data; // body content
|
||||
HttpBuf token; // auth token
|
||||
HttpDecodeMethod *pMethod;
|
||||
} HttpParser;
|
||||
|
||||
typedef struct HttpContext {
|
||||
int32_t refCount;
|
||||
int fd;
|
||||
uint32_t accessTimes;
|
||||
uint32_t lastAccessTime;
|
||||
int32_t state;
|
||||
uint8_t httpVersion;
|
||||
uint8_t httpChunked;
|
||||
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
|
||||
uint8_t acceptEncoding;
|
||||
uint8_t contentEncoding;
|
||||
uint8_t reqType;
|
||||
uint8_t parsed;
|
||||
char ipstr[22];
|
||||
char user[TSDB_USER_LEN]; // parsed from auth token or login message
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
void * taos;
|
||||
void * ppContext;
|
||||
HttpSession *session;
|
||||
z_stream gzipStream;
|
||||
HttpParser parser;
|
||||
HttpSqlCmd singleCmd;
|
||||
HttpSqlCmds *multiCmds;
|
||||
JsonBuf * jsonBuf;
|
||||
void * timer;
|
||||
HttpEncodeMethod * encodeMethod;
|
||||
struct HttpThread *pThread;
|
||||
} HttpContext;
|
||||
|
||||
typedef struct HttpThread {
|
||||
pthread_t thread;
|
||||
HttpContext * pHead;
|
||||
pthread_mutex_t threadMutex;
|
||||
bool stop;
|
||||
int pollFd;
|
||||
int numOfFds;
|
||||
int threadId;
|
||||
char label[HTTP_LABEL_SIZE];
|
||||
bool (*processData)(HttpContext *pContext);
|
||||
} HttpThread;
|
||||
|
||||
typedef struct HttpServer {
|
||||
char label[HTTP_LABEL_SIZE];
|
||||
uint32_t serverIp;
|
||||
uint16_t serverPort;
|
||||
int fd;
|
||||
int numOfThreads;
|
||||
int methodScannerLen;
|
||||
int32_t requestNum;
|
||||
int32_t status;
|
||||
pthread_t thread;
|
||||
HttpThread * pThreads;
|
||||
void * contextCache;
|
||||
void * sessionCache;
|
||||
pthread_mutex_t serverMutex;
|
||||
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
|
||||
bool (*processData)(HttpContext *pContext);
|
||||
} HttpServer;
|
||||
|
||||
extern const char *httpKeepAliveStr[];
|
||||
extern const char *httpVersionStr[];
|
||||
extern HttpServer tsHttpServer;
|
||||
|
||||
#endif
|
|
@ -97,4 +97,8 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len);
|
|||
// quick
|
||||
void httpJsonPairStatus(JsonBuf* buf, int code);
|
||||
|
||||
// http json printer
|
||||
JsonBuf* httpMallocJsonBuf(struct HttpContext* pContext);
|
||||
void httpFreeJsonBuf(struct HttpContext* pContext);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef TDENGINE_HTTP_RESP_H
|
||||
#define TDENGINE_HTTP_RESP_H
|
||||
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
|
||||
enum _httpRespTempl {
|
||||
HTTP_RESPONSE_JSON_OK,
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_SERVER_H
|
||||
#define TDENGINE_HTTP_SERVER_H
|
||||
|
||||
#include "httpInt.h"
|
||||
|
||||
bool httpInitConnect();
|
||||
void httpCleanUpConnect();
|
||||
|
||||
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
|
||||
void httpCleanUpServer(HttpServer *pServer);
|
||||
bool httpReadDataImp(HttpContext *pContext);
|
||||
|
||||
#endif
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_SESSION_H
|
||||
#define TDENGINE_HTTP_SESSION_H
|
||||
|
||||
bool httpInitSessions();
|
||||
void httpCleanUpSessions();
|
||||
|
||||
// http session method
|
||||
void httpCreateSession(HttpContext *pContext, void *taos);
|
||||
void httpGetSession(HttpContext *pContext);
|
||||
void httpReleaseSession(HttpContext *pContext);
|
||||
|
||||
#endif
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_SQL_H
|
||||
#define TDENGINE_HTTP_SQL_H
|
||||
|
||||
|
||||
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
|
||||
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
|
||||
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
|
||||
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
|
||||
|
||||
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
|
||||
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
|
||||
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
|
||||
void httpFreeMultiCmds(HttpContext *pContext);
|
||||
|
||||
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
|
||||
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
|
||||
int httpCurSqlCmdPos(HttpContext *pContext);
|
||||
|
||||
void httpTrimTableName(char *name);
|
||||
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
|
||||
char *httpGetCmdsString(HttpContext *pContext, int pos);
|
||||
|
||||
#endif
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_HTTP_UTIL_H
|
||||
#define TDENGINE_HTTP_UTIL_H
|
||||
|
||||
bool httpCheckUsedbSql(char *sql);
|
||||
void httpTimeToString(time_t t, char *buf, int buflen);
|
||||
|
||||
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
|
||||
bool httpParseRequest(HttpContext *pContext);
|
||||
int httpCheckReadCompleted(HttpContext *pContext);
|
||||
void httpReadDirtyData(HttpContext *pContext);
|
||||
|
||||
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
|
||||
int httpGzipCompressInit(HttpContext *pContext);
|
||||
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
|
||||
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
|
||||
|
||||
// http request parser
|
||||
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
|
||||
|
||||
|
||||
|
||||
#endif
|
|
@ -16,15 +16,11 @@
|
|||
#ifndef TDENGINE_REST_HANDLE_H
|
||||
#define TDENGINE_REST_HANDLE_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "http.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpUtil.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpSql.h"
|
||||
|
||||
#define REST_ROOT_URL_POS 0
|
||||
#define REST_ACTION_URL_POS 1
|
||||
|
|
|
@ -16,16 +16,11 @@
|
|||
#ifndef TDENGINE_TG_HANDLE_H
|
||||
#define TDENGINE_TG_HANDLE_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "cJSON.h"
|
||||
#include "http.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpUtil.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpSql.h"
|
||||
|
||||
#define TG_ROOT_URL_POS 0
|
||||
#define TG_DB_URL_POS 1
|
||||
|
|
|
@ -18,8 +18,8 @@
|
|||
#include "tkey.h"
|
||||
#include "tutil.h"
|
||||
#include "http.h"
|
||||
#include "httpLog.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpAuth.h"
|
||||
|
||||
#define KEY_DES_4 4971256377704625728L
|
||||
|
||||
|
|
|
@ -0,0 +1,228 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tsocket.h"
|
||||
#include "tutil.h"
|
||||
#include "ttime.h"
|
||||
#include "ttimer.h"
|
||||
#include "tglobal.h"
|
||||
#include "tcache.h"
|
||||
#include "hash.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpSql.h"
|
||||
#include "httpSession.h"
|
||||
|
||||
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
|
||||
HttpThread *pThread = pContext->pThread;
|
||||
if (pContext->fd >= 0) {
|
||||
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
|
||||
taosCloseSocket(pContext->fd);
|
||||
pContext->fd = -1;
|
||||
}
|
||||
}
|
||||
|
||||
static void httpDestroyContext(void *data) {
|
||||
HttpContext *pContext = *(HttpContext **)data;
|
||||
if (pContext->fd > 0) tclose(pContext->fd);
|
||||
|
||||
HttpThread *pThread = pContext->pThread;
|
||||
httpRemoveContextFromEpoll(pContext);
|
||||
httpReleaseSession(pContext);
|
||||
atomic_sub_fetch_32(&pThread->numOfFds, 1);
|
||||
|
||||
pContext->pThread = 0;
|
||||
pContext->state = HTTP_CONTEXT_STATE_CLOSED;
|
||||
|
||||
// avoid double free
|
||||
httpFreeJsonBuf(pContext);
|
||||
httpFreeMultiCmds(pContext);
|
||||
|
||||
httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
|
||||
tfree(pContext);
|
||||
}
|
||||
|
||||
bool httpInitContexts() {
|
||||
tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext);
|
||||
if (tsHttpServer.contextCache == NULL) {
|
||||
httpError("failed to init context cache");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void httpCleanupContexts() {
|
||||
// TODO: wait until all context is closed
|
||||
if (tsHttpServer.contextCache != NULL) {
|
||||
SCacheObj *cache = tsHttpServer.contextCache;
|
||||
httpPrint("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
|
||||
taosCacheCleanup(tsHttpServer.contextCache);
|
||||
tsHttpServer.contextCache = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
const char *httpContextStateStr(HttpContextState state) {
|
||||
switch (state) {
|
||||
case HTTP_CONTEXT_STATE_READY:
|
||||
return "ready";
|
||||
case HTTP_CONTEXT_STATE_HANDLING:
|
||||
return "handling";
|
||||
case HTTP_CONTEXT_STATE_DROPPING:
|
||||
return "dropping";
|
||||
case HTTP_CONTEXT_STATE_CLOSED:
|
||||
return "closed";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
void httpNotifyContextClose(HttpContext *pContext) {
|
||||
shutdown(pContext->fd, SHUT_WR);
|
||||
}
|
||||
|
||||
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
|
||||
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
|
||||
}
|
||||
|
||||
HttpContext *httpCreateContext(int32_t fd) {
|
||||
HttpContext *pContext = calloc(1, sizeof(HttpContext));
|
||||
if (pContext == NULL) return NULL;
|
||||
|
||||
char contextStr[16] = {0};
|
||||
snprintf(contextStr, sizeof(contextStr), "%p", pContext);
|
||||
|
||||
pContext->fd = fd;
|
||||
pContext->httpVersion = HTTP_VERSION_10;
|
||||
pContext->lastAccessTime = taosGetTimestampSec();
|
||||
pContext->state = HTTP_CONTEXT_STATE_READY;
|
||||
|
||||
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3);
|
||||
pContext->ppContext = ppContext;
|
||||
httpTrace("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext);
|
||||
|
||||
// set the ref to 0
|
||||
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
|
||||
|
||||
return pContext;
|
||||
}
|
||||
|
||||
HttpContext *httpGetContext(void *ptr) {
|
||||
char contextStr[16] = {0};
|
||||
snprintf(contextStr, sizeof(contextStr), "%p", ptr);
|
||||
|
||||
HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr);
|
||||
|
||||
if (ppContext) {
|
||||
HttpContext *pContext = *ppContext;
|
||||
if (pContext) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
|
||||
httpTrace("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount);
|
||||
return pContext;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void httpReleaseContext(HttpContext *pContext) {
|
||||
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
|
||||
assert(refCount >= 0);
|
||||
httpTrace("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount);
|
||||
|
||||
HttpContext **ppContext = pContext->ppContext;
|
||||
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
|
||||
}
|
||||
|
||||
bool httpInitContext(HttpContext *pContext) {
|
||||
pContext->accessTimes++;
|
||||
pContext->lastAccessTime = taosGetTimestampSec();
|
||||
pContext->httpVersion = HTTP_VERSION_10;
|
||||
pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT;
|
||||
pContext->httpChunked = HTTP_UNCUNKED;
|
||||
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
|
||||
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
|
||||
pContext->reqType = HTTP_REQTYPE_OTHERS;
|
||||
pContext->encodeMethod = NULL;
|
||||
pContext->timer = NULL;
|
||||
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
|
||||
|
||||
HttpParser *pParser = &pContext->parser;
|
||||
memset(pParser, 0, sizeof(HttpParser));
|
||||
pParser->pCur = pParser->pLast = pParser->buffer;
|
||||
|
||||
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d",
|
||||
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed);
|
||||
return true;
|
||||
}
|
||||
|
||||
void httpCloseContextByApp(HttpContext *pContext) {
|
||||
pContext->parsed = false;
|
||||
|
||||
bool keepAlive = true;
|
||||
if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) {
|
||||
keepAlive = false;
|
||||
} else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) {
|
||||
keepAlive = false;
|
||||
} else {}
|
||||
|
||||
if (keepAlive) {
|
||||
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpRemoveContextFromEpoll(pContext);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpRemoveContextFromEpoll(pContext);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
} else {
|
||||
httpRemoveContextFromEpoll(pContext);
|
||||
httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
|
||||
}
|
||||
} else {
|
||||
httpRemoveContextFromEpoll(pContext);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
|
||||
}
|
||||
|
||||
httpReleaseContext(pContext);
|
||||
}
|
||||
|
||||
void httpCloseContextByServer(HttpContext *pContext) {
|
||||
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr);
|
||||
} else {
|
||||
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state);
|
||||
}
|
||||
|
||||
pContext->parsed = false;
|
||||
httpRemoveContextFromEpoll(pContext);
|
||||
httpReleaseContext(pContext);
|
||||
}
|
|
@ -19,11 +19,12 @@
|
|||
#include "tglobal.h"
|
||||
#include "tsocket.h"
|
||||
#include "ttimer.h"
|
||||
#include "http.h"
|
||||
#include "httpLog.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpAuth.h"
|
||||
#include "httpServer.h"
|
||||
#include "httpContext.h"
|
||||
#include "httpHandle.h"
|
||||
|
||||
void httpToLowerUrl(char* url) {
|
||||
/*ignore case */
|
||||
|
@ -159,7 +160,7 @@ bool httpGetHttpMethod(HttpContext* pContext) {
|
|||
bool httpGetDecodeMethod(HttpContext* pContext) {
|
||||
HttpParser* pParser = &pContext->parser;
|
||||
|
||||
HttpServer* pServer = pContext->pThread->pServer;
|
||||
HttpServer* pServer = &tsHttpServer;
|
||||
int methodLen = pServer->methodScannerLen;
|
||||
for (int i = 0; i < methodLen; i++) {
|
||||
HttpDecodeMethod* method = pServer->methodScanner[i];
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "httpCode.h"
|
||||
#include "httpJson.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpUtil.h"
|
||||
|
||||
#define MAX_NUM_STR_SZ 25
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "httpResp.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpJson.h"
|
||||
#include "httpContext.h"
|
||||
|
||||
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};
|
||||
|
||||
|
|
|
@ -21,244 +21,15 @@
|
|||
#include "ttime.h"
|
||||
#include "ttimer.h"
|
||||
#include "tglobal.h"
|
||||
#include "http.h"
|
||||
#include "httpLog.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpContext.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpUtil.h"
|
||||
|
||||
#ifndef EPOLLWAKEUP
|
||||
#define EPOLLWAKEUP (1u << 29)
|
||||
#endif
|
||||
|
||||
const char* httpContextStateStr(HttpContextState state) {
|
||||
switch (state) {
|
||||
case HTTP_CONTEXT_STATE_READY:
|
||||
return "ready";
|
||||
case HTTP_CONTEXT_STATE_HANDLING:
|
||||
return "handling";
|
||||
case HTTP_CONTEXT_STATE_DROPPING:
|
||||
return "dropping";
|
||||
case HTTP_CONTEXT_STATE_CLOSED:
|
||||
return "closed";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext) {
|
||||
if (pContext->fd >= 0) {
|
||||
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
|
||||
taosCloseSocket(pContext->fd);
|
||||
pContext->fd = -1;
|
||||
}
|
||||
}
|
||||
|
||||
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
|
||||
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
|
||||
}
|
||||
|
||||
void httpFreeContext(HttpServer *pServer, HttpContext *pContext);
|
||||
|
||||
/**
|
||||
* context will be reused while connection exist
|
||||
* multiCmds and jsonBuf will be malloc after taos_query_a called
|
||||
* and won't be freed until connection closed
|
||||
*/
|
||||
HttpContext *httpCreateContext(HttpServer *pServer) {
|
||||
HttpContext *pContext = (HttpContext *)taosMemPoolMalloc(pServer->pContextPool);
|
||||
if (pContext != NULL) {
|
||||
pContext->fromMemPool = 1;
|
||||
httpTrace("context:%p, is malloced from mempool", pContext);
|
||||
} else {
|
||||
pContext = (HttpContext *)malloc(sizeof(HttpContext));
|
||||
if (pContext == NULL) {
|
||||
return NULL;
|
||||
} else {
|
||||
memset(pContext, 0, sizeof(HttpContext));
|
||||
}
|
||||
httpTrace("context:%p, is malloced from raw memory", pContext);
|
||||
}
|
||||
|
||||
pContext->signature = pContext;
|
||||
pContext->httpVersion = HTTP_VERSION_10;
|
||||
pContext->lastAccessTime = taosGetTimestampSec();
|
||||
pContext->state = HTTP_CONTEXT_STATE_READY;
|
||||
return pContext;
|
||||
}
|
||||
|
||||
void httpFreeContext(HttpServer *pServer, HttpContext *pContext) {
|
||||
if (pContext->fromMemPool) {
|
||||
httpTrace("context:%p, is freed from mempool", pContext);
|
||||
taosMemPoolFree(pServer->pContextPool, (char *)pContext);
|
||||
} else {
|
||||
httpTrace("context:%p, is freed from raw memory", pContext);
|
||||
tfree(pContext);
|
||||
}
|
||||
}
|
||||
|
||||
void httpCleanUpContextTimer(HttpContext *pContext) {
|
||||
if (pContext->timer != NULL) {
|
||||
taosTmrStopA(&pContext->timer);
|
||||
//httpTrace("context:%p, ip:%s, close timer:%p", pContext, pContext->ipstr, pContext->timer);
|
||||
pContext->timer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void httpCleanUpContext(HttpContext *pContext, void *unused) {
|
||||
httpTrace("context:%p, start the clean up operation, sig:%p", pContext, pContext->signature);
|
||||
void *sig = atomic_val_compare_exchange_ptr(&pContext->signature, pContext, 0);
|
||||
if (sig == NULL) {
|
||||
httpTrace("context:%p is freed by another thread.", pContext);
|
||||
return;
|
||||
}
|
||||
|
||||
HttpThread *pThread = pContext->pThread;
|
||||
|
||||
httpCleanUpContextTimer(pContext);
|
||||
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
|
||||
httpRestoreSession(pContext);
|
||||
|
||||
pthread_mutex_lock(&pThread->threadMutex);
|
||||
|
||||
pThread->numOfFds--;
|
||||
if (pThread->numOfFds < 0) {
|
||||
httpError("context:%p, ip:%s, thread:%s, number of FDs:%d shall never be negative",
|
||||
pContext, pContext->ipstr, pThread->label, pThread->numOfFds);
|
||||
pThread->numOfFds = 0;
|
||||
}
|
||||
|
||||
// remove from the link list
|
||||
if (pContext->prev) {
|
||||
(pContext->prev)->next = pContext->next;
|
||||
} else {
|
||||
pThread->pHead = pContext->next;
|
||||
}
|
||||
|
||||
if (pContext->next) {
|
||||
(pContext->next)->prev = pContext->prev;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pThread->threadMutex);
|
||||
|
||||
httpTrace("context:%p, ip:%s, thread:%s, numOfFds:%d, context is cleaned up", pContext, pContext->ipstr,
|
||||
pThread->label, pThread->numOfFds);
|
||||
|
||||
pContext->signature = 0;
|
||||
pContext->fd = -1;
|
||||
pContext->pThread = 0;
|
||||
pContext->prev = 0;
|
||||
pContext->next = 0;
|
||||
pContext->state = HTTP_CONTEXT_STATE_READY;
|
||||
|
||||
// avoid double free
|
||||
httpFreeJsonBuf(pContext);
|
||||
httpFreeMultiCmds(pContext);
|
||||
httpFreeContext(pThread->pServer, pContext);
|
||||
}
|
||||
|
||||
bool httpInitContext(HttpContext *pContext) {
|
||||
pContext->accessTimes++;
|
||||
pContext->lastAccessTime = taosGetTimestampSec();
|
||||
pContext->httpVersion = HTTP_VERSION_10;
|
||||
pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT;
|
||||
pContext->httpChunked = HTTP_UNCUNKED;
|
||||
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
|
||||
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
|
||||
pContext->reqType = HTTP_REQTYPE_OTHERS;
|
||||
pContext->encodeMethod = NULL;
|
||||
pContext->timer = NULL;
|
||||
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
|
||||
|
||||
HttpParser *pParser = &pContext->parser;
|
||||
memset(pParser, 0, sizeof(HttpParser));
|
||||
pParser->pCur = pParser->pLast = pParser->buffer;
|
||||
|
||||
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d",
|
||||
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void httpCloseContext(HttpThread *pThread, HttpContext *pContext) {
|
||||
taosTmrReset((TAOS_TMR_CALLBACK)httpCleanUpContext, HTTP_DELAY_CLOSE_TIME_MS, pContext, pThread->pServer->timerHandle, &pContext->timer);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s will be closed after:%d ms, timer:%p",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), HTTP_DELAY_CLOSE_TIME_MS, pContext->timer);
|
||||
}
|
||||
|
||||
void httpCloseContextByApp(HttpContext *pContext) {
|
||||
HttpThread *pThread = pContext->pThread;
|
||||
pContext->parsed = false;
|
||||
|
||||
bool keepAlive = true;
|
||||
if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) {
|
||||
keepAlive = false;
|
||||
} else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) {
|
||||
keepAlive = false;
|
||||
} else {}
|
||||
|
||||
if (keepAlive) {
|
||||
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
httpCloseContext(pThread, pContext);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr);
|
||||
httpCloseContext(pThread, pContext);
|
||||
} else {
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
|
||||
httpCloseContext(pThread, pContext);
|
||||
}
|
||||
} else {
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
|
||||
httpCloseContext(pThread, pContext);
|
||||
}
|
||||
}
|
||||
|
||||
void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) {
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
pContext->parsed = false;
|
||||
|
||||
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr);
|
||||
httpCloseContext(pThread, pContext);
|
||||
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr);
|
||||
httpCloseContext(pThread, pContext);
|
||||
} else {
|
||||
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state);
|
||||
httpCloseContext(pThread, pContext);
|
||||
}
|
||||
}
|
||||
|
||||
void httpCloseContextByServerForExpired(void *param, void *tmrId) {
|
||||
HttpContext *pContext = (HttpContext *)param;
|
||||
httpRemoveContextFromEpoll(pContext->pThread, pContext);
|
||||
httpError("context:%p, fd:%d, ip:%s, read http body error, time expired, timer:%p", pContext, pContext->fd, pContext->ipstr, tmrId);
|
||||
httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR);
|
||||
httpCloseContextByServer(pContext->pThread, pContext);
|
||||
}
|
||||
|
||||
|
||||
static void httpStopThread(HttpThread* pThread) {
|
||||
pThread->stop = true;
|
||||
|
||||
|
@ -281,17 +52,10 @@ static void httpStopThread(HttpThread* pThread) {
|
|||
|
||||
close(pThread->pollFd);
|
||||
pthread_mutex_destroy(&(pThread->threadMutex));
|
||||
|
||||
//while (pThread->pHead) {
|
||||
// httpCleanUpContext(pThread->pHead, 0);
|
||||
//}
|
||||
}
|
||||
|
||||
|
||||
void httpCleanUpConnect(HttpServer *pServer) {
|
||||
if (pServer == NULL) return;
|
||||
|
||||
shutdown(pServer->fd, SHUT_RD);
|
||||
void httpCleanUpConnect() {
|
||||
HttpServer *pServer = &tsHttpServer;
|
||||
pthread_join(pServer->thread, NULL);
|
||||
|
||||
for (int i = 0; i < pServer->numOfThreads; ++i) {
|
||||
|
@ -302,19 +66,10 @@ void httpCleanUpConnect(HttpServer *pServer) {
|
|||
}
|
||||
|
||||
tfree(pServer->pThreads);
|
||||
pServer->pThreads = NULL;
|
||||
httpTrace("http server:%s is cleaned up", pServer->label);
|
||||
}
|
||||
|
||||
// read all the data, then just discard it
|
||||
void httpReadDirtyData(HttpContext *pContext) {
|
||||
int fd = pContext->fd;
|
||||
char data[1024] = {0};
|
||||
int len = (int)taosReadSocket(fd, data, 1024);
|
||||
while (len >= sizeof(data)) {
|
||||
len = (int)taosReadSocket(fd, data, 1024);
|
||||
}
|
||||
}
|
||||
|
||||
bool httpReadDataImp(HttpContext *pContext) {
|
||||
HttpParser *pParser = &pContext->parser;
|
||||
|
||||
|
@ -338,11 +93,10 @@ bool httpReadDataImp(HttpContext *pContext) {
|
|||
}
|
||||
|
||||
if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
|
||||
httpReadDirtyData(pContext);
|
||||
httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d",
|
||||
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE);
|
||||
httpRemoveContextFromEpoll(pContext->pThread, pContext);
|
||||
httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG);
|
||||
httpNotifyContextClose(pContext);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -352,7 +106,7 @@ bool httpReadDataImp(HttpContext *pContext) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool httpDecompressData(HttpContext *pContext) {
|
||||
static bool httpDecompressData(HttpContext *pContext) {
|
||||
if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) {
|
||||
httpDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos);
|
||||
return true;
|
||||
|
@ -382,45 +136,43 @@ bool httpDecompressData(HttpContext *pContext) {
|
|||
return ret == 0;
|
||||
}
|
||||
|
||||
bool httpReadData(HttpThread *pThread, HttpContext *pContext) {
|
||||
static bool httpReadData(HttpContext *pContext) {
|
||||
if (!pContext->parsed) {
|
||||
httpInitContext(pContext);
|
||||
}
|
||||
|
||||
if (!httpReadDataImp(pContext)) {
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpNotifyContextClose(pContext);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!httpParseRequest(pContext)) {
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpNotifyContextClose(pContext);
|
||||
return false;
|
||||
}
|
||||
|
||||
int ret = httpCheckReadCompleted(pContext);
|
||||
if (ret == HTTP_CHECK_BODY_CONTINUE) {
|
||||
taosTmrReset(httpCloseContextByServerForExpired, HTTP_EXPIRED_TIME, pContext, pThread->pServer->timerHandle, &pContext->timer);
|
||||
//httpTrace("context:%p, fd:%d, ip:%s, not finished yet, try another times, timer:%p", pContext, pContext->fd, pContext->ipstr, pContext->timer);
|
||||
//httpTrace("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
|
||||
return false;
|
||||
} else if (ret == HTTP_CHECK_BODY_SUCCESS){
|
||||
httpCleanUpContextTimer(pContext);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
|
||||
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len);
|
||||
if (httpDecompressData(pContext)) {
|
||||
return true;
|
||||
} else {
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpNotifyContextClose(pContext);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
httpCleanUpContextTimer(pContext);
|
||||
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpNotifyContextClose(pContext);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void httpProcessHttpData(void *param) {
|
||||
static void httpProcessHttpData(void *param) {
|
||||
HttpServer *pServer = &tsHttpServer;
|
||||
HttpThread *pThread = (HttpThread *)param;
|
||||
HttpContext *pContext;
|
||||
int fdNum;
|
||||
|
@ -441,77 +193,72 @@ void httpProcessHttpData(void *param) {
|
|||
if (fdNum <= 0) continue;
|
||||
|
||||
for (int i = 0; i < fdNum; ++i) {
|
||||
pContext = events[i].data.ptr;
|
||||
if (pContext->signature != pContext || pContext->pThread != pThread || pContext->fd <= 0) {
|
||||
pContext = httpGetContext(events[i].data.ptr);
|
||||
if (pContext == NULL) {
|
||||
httpError("fd:%d, is already released, close connect", events[i].data.fd);
|
||||
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
|
||||
tclose(events[i].data.fd);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLPRI) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpCloseContextByServer(pContext);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLRDHUP) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpCloseContextByServer(pContext);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLERR) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpCloseContextByServer(pContext);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLHUP) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
httpCloseContextByServer(pContext);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state));
|
||||
httpReleaseContext(pContext);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pContext->pThread->pServer->online) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s, server is not online, accessed:%d, close connect",
|
||||
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
|
||||
httpRemoveContextFromEpoll(pThread, pContext);
|
||||
httpReadDirtyData(pContext);
|
||||
if (pServer->status != HTTP_SERVER_RUNNING) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, state:%s, server is not running, accessed:%d, close connect", pContext,
|
||||
pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
|
||||
httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE);
|
||||
httpCloseContextByServer(pThread, pContext);
|
||||
continue;
|
||||
httpNotifyContextClose(pContext);
|
||||
} else {
|
||||
if (httpReadData(pThread, pContext)) {
|
||||
if (httpReadData(pContext)) {
|
||||
(*(pThread->processData))(pContext);
|
||||
atomic_fetch_add_32(&pThread->pServer->requestNum, 1);
|
||||
atomic_fetch_add_32(&pServer->requestNum, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void* httpAcceptHttpConnection(void *arg) {
|
||||
static void *httpAcceptHttpConnection(void *arg) {
|
||||
int connFd = -1;
|
||||
struct sockaddr_in clientAddr;
|
||||
int threadId = 0;
|
||||
HttpThread * pThread;
|
||||
HttpServer * pServer;
|
||||
HttpContext * pContext;
|
||||
int totalFds;
|
||||
|
||||
pServer = (HttpServer *)arg;
|
||||
HttpServer * pServer = &tsHttpServer;
|
||||
HttpThread * pThread = NULL;
|
||||
HttpContext * pContext = NULL;
|
||||
int totalFds = 0;
|
||||
|
||||
sigset_t set;
|
||||
sigemptyset(&set);
|
||||
|
@ -521,12 +268,12 @@ void* httpAcceptHttpConnection(void *arg) {
|
|||
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
|
||||
|
||||
if (pServer->fd < 0) {
|
||||
httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp),
|
||||
pServer->serverPort, strerror(errno));
|
||||
httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label,
|
||||
taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno));
|
||||
return NULL;
|
||||
} else {
|
||||
httpPrint("http service init success at %u", pServer->serverPort);
|
||||
pServer->online = true;
|
||||
httpPrint("http server init success at %u", pServer->serverPort);
|
||||
pServer->status = HTTP_SERVER_RUNNING;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -534,10 +281,10 @@ void* httpAcceptHttpConnection(void *arg) {
|
|||
connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
|
||||
if (connFd == -1) {
|
||||
if (errno == EINVAL) {
|
||||
httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label);
|
||||
httpTrace("http server:%s socket was shutdown, exiting...", pServer->label);
|
||||
break;
|
||||
}
|
||||
httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno));
|
||||
httpError("http server:%s, accept connect failure, errno:%d reason:%s", pServer->label, errno, strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -547,8 +294,8 @@ void* httpAcceptHttpConnection(void *arg) {
|
|||
}
|
||||
|
||||
if (totalFds > tsHttpCacheSessions * 100) {
|
||||
httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection",
|
||||
connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions);
|
||||
httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection", connFd,
|
||||
inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions);
|
||||
taosCloseSocket(connFd);
|
||||
continue;
|
||||
}
|
||||
|
@ -559,7 +306,7 @@ void* httpAcceptHttpConnection(void *arg) {
|
|||
// pick up the thread to handle this connection
|
||||
pThread = pServer->pThreads + threadId;
|
||||
|
||||
pContext = httpCreateContext(pServer);
|
||||
pContext = httpCreateContext(connFd);
|
||||
if (pContext == NULL) {
|
||||
httpError("fd:%d, ip:%s:%u, no enough resource to allocate http context", connFd, inet_ntoa(clientAddr.sin_addr),
|
||||
htons(clientAddr.sin_port));
|
||||
|
@ -567,39 +314,24 @@ void* httpAcceptHttpConnection(void *arg) {
|
|||
continue;
|
||||
}
|
||||
|
||||
httpTrace("context:%p, fd:%d, ip:%s:%u, thread:%s, numOfFds:%d, totalFds:%d, accept a new connection",
|
||||
pContext, connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), pThread->label,
|
||||
pThread->numOfFds, totalFds);
|
||||
|
||||
pContext->fd = connFd;
|
||||
sprintf(pContext->ipstr, "%s:%d", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
|
||||
pContext->pThread = pThread;
|
||||
|
||||
sprintf(pContext->ipstr, "%s:%u", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
|
||||
|
||||
struct epoll_event event;
|
||||
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
|
||||
|
||||
event.data.ptr = pContext;
|
||||
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
||||
httpError("context:%p, fd:%d, ip:%s:%u, thread:%s, failed to add http fd for epoll, error:%s",
|
||||
pContext, connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), pThread->label,
|
||||
strerror(errno));
|
||||
httpFreeContext(pThread->pServer, pContext);
|
||||
tclose(connFd);
|
||||
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
|
||||
pContext->ipstr, pThread->label, strerror(errno));
|
||||
tclose(pContext->fd);
|
||||
httpReleaseContext(pContext);
|
||||
continue;
|
||||
}
|
||||
|
||||
// notify the data process, add into the FdObj list
|
||||
pthread_mutex_lock(&(pThread->threadMutex));
|
||||
|
||||
pContext->next = pThread->pHead;
|
||||
|
||||
if (pThread->pHead) (pThread->pHead)->prev = pContext;
|
||||
|
||||
pThread->pHead = pContext;
|
||||
|
||||
pThread->numOfFds++;
|
||||
|
||||
pthread_mutex_unlock(&(pThread->threadMutex));
|
||||
atomic_add_fetch_32(&pThread->numOfFds, 1);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, thread:%s numOfFds:%d totalFds:%d, accept a new connection", pContext, connFd,
|
||||
pContext->ipstr, pThread->label, pThread->numOfFds, totalFds);
|
||||
|
||||
// pick up next thread for next connection
|
||||
threadId++;
|
||||
|
@ -610,21 +342,17 @@ void* httpAcceptHttpConnection(void *arg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
bool httpInitConnect(HttpServer *pServer) {
|
||||
int i;
|
||||
HttpThread * pThread;
|
||||
|
||||
pServer->pThreads = (HttpThread *)malloc(sizeof(HttpThread) * (size_t)pServer->numOfThreads);
|
||||
bool httpInitConnect() {
|
||||
HttpServer *pServer = &tsHttpServer;
|
||||
pServer->pThreads = calloc(pServer->numOfThreads, sizeof(HttpThread));
|
||||
if (pServer->pThreads == NULL) {
|
||||
httpError("init error no enough memory");
|
||||
return false;
|
||||
}
|
||||
memset(pServer->pThreads, 0, sizeof(HttpThread) * (size_t)pServer->numOfThreads);
|
||||
|
||||
pThread = pServer->pThreads;
|
||||
for (i = 0; i < pServer->numOfThreads; ++i) {
|
||||
HttpThread *pThread = pServer->pThreads;
|
||||
for (int i = 0; i < pServer->numOfThreads; ++i) {
|
||||
sprintf(pThread->label, "%s%d", pServer->label, i);
|
||||
pThread->pServer = pServer;
|
||||
pThread->processData = pServer->processData;
|
||||
pThread->threadId = i;
|
||||
|
||||
|
@ -643,8 +371,8 @@ bool httpInitConnect(HttpServer *pServer) {
|
|||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) {
|
||||
httpError("http thread:%s, failed to create HTTP process data thread, reason:%s",
|
||||
pThread->label, strerror(errno));
|
||||
httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label,
|
||||
strerror(errno));
|
||||
return false;
|
||||
}
|
||||
pthread_attr_destroy(&thattr);
|
||||
|
|
|
@ -15,44 +15,28 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "hash.h"
|
||||
#include "taos.h"
|
||||
#include "ttime.h"
|
||||
#include "ttimer.h"
|
||||
#include "http.h"
|
||||
#include "httpLog.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpResp.h"
|
||||
|
||||
void httpAccessSession(HttpContext *pContext) {
|
||||
HttpServer *server = pContext->pThread->pServer;
|
||||
pthread_mutex_lock(&server->serverMutex);
|
||||
if (pContext->session == pContext->session->signature) {
|
||||
pContext->session->expire = (int) taosGetTimestampSec() + pContext->pThread->pServer->sessionExpire;
|
||||
}
|
||||
pthread_mutex_unlock(&server->serverMutex);
|
||||
}
|
||||
#include "tglobal.h"
|
||||
#include "tcache.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpContext.h"
|
||||
#include "httpSession.h"
|
||||
|
||||
void httpCreateSession(HttpContext *pContext, void *taos) {
|
||||
HttpServer *server = pContext->pThread->pServer;
|
||||
HttpServer *server = &tsHttpServer;
|
||||
httpReleaseSession(pContext);
|
||||
|
||||
pthread_mutex_lock(&server->serverMutex);
|
||||
|
||||
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd,
|
||||
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos);
|
||||
pContext->session->expire = 0;
|
||||
pContext->session->access--;
|
||||
}
|
||||
|
||||
HttpSession session;
|
||||
HttpSession session = {0};
|
||||
session.taos = taos;
|
||||
session.expire = (int)taosGetTimestampSec() + server->sessionExpire;
|
||||
session.access = 1;
|
||||
int sessionIdLen = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
|
||||
session.refCount = 1;
|
||||
snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
|
||||
|
||||
taosHashPut(server->pSessionHash, session.id, sessionIdLen, (char *)(&session), sizeof(HttpSession));
|
||||
pContext->session = taosHashGet(server->pSessionHash, session.id, sessionIdLen);
|
||||
pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire);
|
||||
// void *temp = pContext->session;
|
||||
// taosCacheRelease(server->sessionCache, (void **)&temp, false);
|
||||
|
||||
if (pContext->session == NULL) {
|
||||
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user,
|
||||
|
@ -62,26 +46,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
|
|||
return;
|
||||
}
|
||||
|
||||
pContext->session->signature = pContext->session;
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr,
|
||||
pContext->user, pContext->session, pContext->session->taos);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd,
|
||||
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
|
||||
pthread_mutex_unlock(&server->serverMutex);
|
||||
}
|
||||
|
||||
void httpFetchSessionImp(HttpContext *pContext) {
|
||||
HttpServer *server = pContext->pThread->pServer;
|
||||
static void httpFetchSessionImp(HttpContext *pContext) {
|
||||
HttpServer *server = &tsHttpServer;
|
||||
pthread_mutex_lock(&server->serverMutex);
|
||||
|
||||
char sessionId[HTTP_SESSION_ID_LEN];
|
||||
int sessonIdLen = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
|
||||
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
|
||||
|
||||
pContext->session = taosHashGet(server->pSessionHash, sessionId, sessonIdLen);
|
||||
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
|
||||
pContext->session->access++;
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d",
|
||||
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session,
|
||||
pContext->session->taos, pContext->session->access, pContext->session->expire);
|
||||
pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire;
|
||||
pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId);
|
||||
if (pContext->session != NULL) {
|
||||
atomic_add_fetch_32(&pContext->session->refCount, 1);
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd,
|
||||
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
|
||||
} else {
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr,
|
||||
pContext->user);
|
||||
|
@ -90,113 +71,54 @@ void httpFetchSessionImp(HttpContext *pContext) {
|
|||
pthread_mutex_unlock(&server->serverMutex);
|
||||
}
|
||||
|
||||
void httpFetchSession(HttpContext *pContext) {
|
||||
void httpGetSession(HttpContext *pContext) {
|
||||
if (pContext->session == NULL) {
|
||||
httpFetchSessionImp(pContext);
|
||||
} else {
|
||||
char sessionId[HTTP_SESSION_ID_LEN];
|
||||
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
|
||||
if (strcmp(pContext->session->id, sessionId) != 0) {
|
||||
httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user);
|
||||
httpRestoreSession(pContext);
|
||||
httpFetchSessionImp(pContext);
|
||||
}
|
||||
httpReleaseSession(pContext);
|
||||
httpFetchSessionImp(pContext);
|
||||
}
|
||||
}
|
||||
|
||||
void httpRestoreSession(HttpContext *pContext) {
|
||||
HttpServer * server = pContext->pThread->pServer;
|
||||
void httpReleaseSession(HttpContext *pContext) {
|
||||
if (pContext == NULL || pContext->session == NULL) return;
|
||||
|
||||
// all access to the session is via serverMutex
|
||||
pthread_mutex_lock(&server->serverMutex);
|
||||
HttpSession *session = pContext->session;
|
||||
if (session == NULL || session != session->signature) {
|
||||
pthread_mutex_unlock(&server->serverMutex);
|
||||
return;
|
||||
}
|
||||
session->access--;
|
||||
httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d",
|
||||
pContext, pContext->ipstr, pContext->user, session, session->taos,
|
||||
session->access, pContext->session->expire);
|
||||
int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1);
|
||||
assert(refCount >= 0);
|
||||
httpTrace("context:%p, release session:%p:%p, sessionRef:%d", pContext, pContext->session, pContext->session->taos,
|
||||
pContext->session->refCount);
|
||||
|
||||
taosCacheRelease(tsHttpServer.sessionCache, (void **)&pContext->session, false);
|
||||
pContext->session = NULL;
|
||||
pthread_mutex_unlock(&server->serverMutex);
|
||||
}
|
||||
|
||||
void httpResetSession(HttpSession *pSession) {
|
||||
httpTrace("close session:%p:%p", pSession, pSession->taos);
|
||||
if (pSession->taos != NULL) {
|
||||
taos_close(pSession->taos);
|
||||
pSession->taos = NULL;
|
||||
static void httpDestroySession(void *data) {
|
||||
HttpSession *session = data;
|
||||
httpTrace("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
|
||||
|
||||
if (session->taos != NULL) {
|
||||
taos_close(session->taos);
|
||||
session->taos = NULL;
|
||||
}
|
||||
pSession->signature = NULL;
|
||||
}
|
||||
|
||||
void httpRemoveAllSessions(HttpServer *pServer) {
|
||||
SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash);
|
||||
|
||||
while (taosHashIterNext(pIter)) {
|
||||
HttpSession *pSession = taosHashIterGet(pIter);
|
||||
if (pSession == NULL) continue;
|
||||
httpResetSession(pSession);
|
||||
void httpCleanUpSessions() {
|
||||
if (tsHttpServer.sessionCache != NULL) {
|
||||
SCacheObj *cache = tsHttpServer.sessionCache;
|
||||
httpPrint("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
|
||||
taosCacheCleanup(tsHttpServer.sessionCache);
|
||||
tsHttpServer.sessionCache = NULL;
|
||||
}
|
||||
|
||||
taosHashDestroyIter(pIter);
|
||||
}
|
||||
|
||||
bool httpInitAllSessions(HttpServer *pServer) {
|
||||
if (pServer->pSessionHash == NULL) {
|
||||
pServer->pSessionHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
|
||||
}
|
||||
if (pServer->pSessionHash == NULL) {
|
||||
httpError("http init session pool failed");
|
||||
bool httpInitSessions() {
|
||||
tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession);
|
||||
if (tsHttpServer.sessionCache == NULL) {
|
||||
httpError("failed to init session cache");
|
||||
return false;
|
||||
}
|
||||
if (pServer->expireTimer == NULL) {
|
||||
taosTmrReset(httpProcessSessionExpire, 50000, pServer, pServer->timerHandle, &pServer->expireTimer);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool httpSessionExpired(HttpSession *pSession) {
|
||||
time_t cur = taosGetTimestampSec();
|
||||
|
||||
if (pSession->taos != NULL) {
|
||||
if (pSession->expire > cur) {
|
||||
return false; // un-expired, so return false
|
||||
}
|
||||
if (pSession->access > 0) {
|
||||
httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos,
|
||||
pSession->access);
|
||||
return false; // still used, so return false
|
||||
}
|
||||
httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d",
|
||||
pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void httpRemoveExpireSessions(HttpServer *pServer) {
|
||||
SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash);
|
||||
|
||||
while (taosHashIterNext(pIter)) {
|
||||
HttpSession *pSession = taosHashIterGet(pIter);
|
||||
if (pSession == NULL) continue;
|
||||
|
||||
pthread_mutex_lock(&pServer->serverMutex);
|
||||
if (httpSessionExpired(pSession)) {
|
||||
httpResetSession(pSession);
|
||||
taosHashRemove(pServer->pSessionHash, pSession->id, strlen(pSession->id));
|
||||
}
|
||||
pthread_mutex_unlock(&pServer->serverMutex);
|
||||
}
|
||||
|
||||
taosHashDestroyIter(pIter);
|
||||
}
|
||||
|
||||
void httpProcessSessionExpire(void *handle, void *tmrId) {
|
||||
HttpServer *pServer = (HttpServer *)handle;
|
||||
httpRemoveExpireSessions(pServer);
|
||||
taosTmrReset(httpProcessSessionExpire, 60000, pServer, pServer->timerHandle, &pServer->expireTimer);
|
||||
}
|
|
@ -18,11 +18,12 @@
|
|||
#include "tnote.h"
|
||||
#include "taos.h"
|
||||
#include "tsclient.h"
|
||||
#include "http.h"
|
||||
#include "httpLog.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpContext.h"
|
||||
#include "httpSql.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpAuth.h"
|
||||
#include "httpSession.h"
|
||||
|
||||
void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
||||
void *param, void **taos);
|
||||
|
@ -30,7 +31,7 @@ void httpProcessMultiSql(HttpContext *pContext);
|
|||
|
||||
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
|
||||
HttpContext *pContext = (HttpContext *)param;
|
||||
if (pContext == NULL || pContext->signature != pContext) return;
|
||||
if (pContext == NULL) return;
|
||||
|
||||
HttpSqlCmds * multiCmds = pContext->multiCmds;
|
||||
HttpEncodeMethod *encode = pContext->encodeMethod;
|
||||
|
@ -72,7 +73,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
|
|||
|
||||
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
|
||||
HttpContext *pContext = (HttpContext *)param;
|
||||
if (pContext == NULL || pContext->signature != pContext) return;
|
||||
if (pContext == NULL) return;
|
||||
|
||||
HttpSqlCmds * multiCmds = pContext->multiCmds;
|
||||
HttpEncodeMethod *encode = pContext->encodeMethod;
|
||||
|
@ -172,7 +173,7 @@ void httpProcessMultiSql(HttpContext *pContext) {
|
|||
}
|
||||
|
||||
void httpProcessMultiSqlCmd(HttpContext *pContext) {
|
||||
if (pContext == NULL || pContext->signature != pContext) return;
|
||||
if (pContext == NULL) return;
|
||||
|
||||
HttpSqlCmds *multiCmds = pContext->multiCmds;
|
||||
if (multiCmds == NULL || multiCmds->size <= 0 || multiCmds->pos >= multiCmds->size || multiCmds->pos < 0) {
|
||||
|
@ -192,7 +193,7 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
|
|||
|
||||
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
|
||||
HttpContext *pContext = (HttpContext *)param;
|
||||
if (pContext == NULL || pContext->signature != pContext) return;
|
||||
if (pContext == NULL) return;
|
||||
|
||||
HttpEncodeMethod *encode = pContext->encodeMethod;
|
||||
|
||||
|
@ -230,7 +231,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
|
|||
|
||||
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) {
|
||||
HttpContext *pContext = (HttpContext *)param;
|
||||
if (pContext == NULL || pContext->signature != pContext) return;
|
||||
if (pContext == NULL) return;
|
||||
|
||||
HttpEncodeMethod *encode = pContext->encodeMethod;
|
||||
|
||||
|
@ -354,7 +355,7 @@ void httpExecCmd(HttpContext *pContext) {
|
|||
|
||||
void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
|
||||
HttpContext *pContext = param;
|
||||
if (pContext == NULL || pContext->signature != pContext) return;
|
||||
if (pContext == NULL) return;
|
||||
|
||||
if (code < 0) {
|
||||
httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr,
|
||||
|
@ -383,16 +384,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
|
|||
}
|
||||
|
||||
void httpProcessRequest(HttpContext *pContext) {
|
||||
httpFetchSession(pContext);
|
||||
httpGetSession(pContext);
|
||||
|
||||
if (pContext->session == NULL || pContext->session != pContext->session->signature ||
|
||||
pContext->reqType == HTTP_REQTYPE_LOGIN) {
|
||||
if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) {
|
||||
taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext,
|
||||
&(pContext->taos));
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd,
|
||||
pContext->ipstr, pContext->user, pContext->taos);
|
||||
} else {
|
||||
httpAccessSession(pContext);
|
||||
httpExecCmd(pContext);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,84 +20,64 @@
|
|||
#include "tsocket.h"
|
||||
#include "ttimer.h"
|
||||
#include "tadmin.h"
|
||||
#include "http.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpContext.h"
|
||||
#include "httpSession.h"
|
||||
#include "httpServer.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpLog.h"
|
||||
#include "gcHandle.h"
|
||||
#include "httpHandle.h"
|
||||
#include "gcHandle.h"
|
||||
#include "restHandle.h"
|
||||
#include "tgHandle.h"
|
||||
|
||||
#ifndef _ADMIN
|
||||
|
||||
void adminInitHandle(HttpServer* pServer) {}
|
||||
void opInitHandle(HttpServer* pServer) {}
|
||||
|
||||
#endif
|
||||
|
||||
static HttpServer *httpServer = NULL;
|
||||
HttpServer tsHttpServer;
|
||||
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
|
||||
|
||||
int httpInitSystem() {
|
||||
// taos_init();
|
||||
strcpy(tsHttpServer.label, "rest");
|
||||
tsHttpServer.serverIp = 0;
|
||||
tsHttpServer.serverPort = tsHttpPort;
|
||||
tsHttpServer.numOfThreads = tsHttpMaxThreads;
|
||||
tsHttpServer.processData = httpProcessData;
|
||||
|
||||
httpServer = (HttpServer *)malloc(sizeof(HttpServer));
|
||||
memset(httpServer, 0, sizeof(HttpServer));
|
||||
|
||||
strcpy(httpServer->label, "rest");
|
||||
httpServer->serverIp = 0;
|
||||
httpServer->serverPort = tsHttpPort;
|
||||
httpServer->cacheContext = tsHttpCacheSessions;
|
||||
httpServer->sessionExpire = tsHttpSessionExpire;
|
||||
httpServer->numOfThreads = tsHttpMaxThreads;
|
||||
httpServer->processData = httpProcessData;
|
||||
|
||||
pthread_mutex_init(&httpServer->serverMutex, NULL);
|
||||
pthread_mutex_init(&tsHttpServer.serverMutex, NULL);
|
||||
|
||||
if (tsHttpEnableRecordSql != 0) {
|
||||
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"http_note");
|
||||
}
|
||||
restInitHandle(httpServer);
|
||||
adminInitHandle(httpServer);
|
||||
gcInitHandle(httpServer);
|
||||
tgInitHandle(httpServer);
|
||||
opInitHandle(httpServer);
|
||||
restInitHandle(&tsHttpServer);
|
||||
adminInitHandle(&tsHttpServer);
|
||||
gcInitHandle(&tsHttpServer);
|
||||
tgInitHandle(&tsHttpServer);
|
||||
opInitHandle(&tsHttpServer);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int httpStartSystem() {
|
||||
httpPrint("starting to initialize http service ...");
|
||||
httpPrint("start http server ...");
|
||||
|
||||
if (httpServer == NULL) {
|
||||
httpError("http server is null");
|
||||
httpInitSystem();
|
||||
}
|
||||
|
||||
if (httpServer->pContextPool == NULL) {
|
||||
httpServer->pContextPool = taosMemPoolInit(httpServer->cacheContext, sizeof(HttpContext));
|
||||
}
|
||||
if (httpServer->pContextPool == NULL) {
|
||||
httpError("http init context pool failed");
|
||||
if (tsHttpServer.status != HTTP_SERVER_INIT) {
|
||||
httpError("http server is already started");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (httpServer->timerHandle == NULL) {
|
||||
httpServer->timerHandle = taosTmrInit(tsHttpCacheSessions * 100 + 100, 200, 60000, "http");
|
||||
}
|
||||
if (httpServer->timerHandle == NULL) {
|
||||
httpError("http init timer failed");
|
||||
if (!httpInitContexts()) {
|
||||
httpError("http init contexts failed");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!httpInitAllSessions(httpServer)) {
|
||||
if (!httpInitSessions()) {
|
||||
httpError("http init session failed");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!httpInitConnect(httpServer)) {
|
||||
if (!httpInitConnect()) {
|
||||
httpError("http init server failed");
|
||||
return -1;
|
||||
}
|
||||
|
@ -106,53 +86,23 @@ int httpStartSystem() {
|
|||
}
|
||||
|
||||
void httpStopSystem() {
|
||||
if (httpServer != NULL) {
|
||||
httpServer->online = false;
|
||||
}
|
||||
tsHttpServer.status = HTTP_SERVER_CLOSING;
|
||||
shutdown(tsHttpServer.fd, SHUT_RD);
|
||||
tgCleanupHandle();
|
||||
}
|
||||
|
||||
void httpCleanUpSystem() {
|
||||
httpPrint("http service cleanup");
|
||||
httpPrint("http server cleanup");
|
||||
httpStopSystem();
|
||||
|
||||
//#if 0
|
||||
if (httpServer == NULL) {
|
||||
return;
|
||||
}
|
||||
httpCleanupContexts();
|
||||
httpCleanUpSessions();
|
||||
httpCleanUpConnect();
|
||||
pthread_mutex_destroy(&tsHttpServer.serverMutex);
|
||||
|
||||
if (httpServer->expireTimer != NULL) {
|
||||
taosTmrStopA(&(httpServer->expireTimer));
|
||||
}
|
||||
|
||||
if (httpServer->timerHandle != NULL) {
|
||||
taosTmrCleanUp(httpServer->timerHandle);
|
||||
httpServer->timerHandle = NULL;
|
||||
}
|
||||
|
||||
if (httpServer->pThreads != NULL) {
|
||||
httpCleanUpConnect(httpServer);
|
||||
httpServer->pThreads = NULL;
|
||||
}
|
||||
|
||||
|
||||
#if 0
|
||||
httpRemoveAllSessions(httpServer);
|
||||
|
||||
if (httpServer->pContextPool != NULL) {
|
||||
taosMemPoolCleanUp(httpServer->pContextPool);
|
||||
httpServer->pContextPool = NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&httpServer->serverMutex);
|
||||
|
||||
tfree(httpServer);
|
||||
#endif
|
||||
tsHttpServer.status = HTTP_SERVER_CLOSED;
|
||||
}
|
||||
|
||||
int32_t httpGetReqCount() {
|
||||
if (httpServer != NULL) {
|
||||
return atomic_exchange_32(&httpServer->requestNum, 0);
|
||||
}
|
||||
return 0;
|
||||
return atomic_exchange_32(&tsHttpServer.requestNum, 0);
|
||||
}
|
||||
|
|
|
@ -17,11 +17,10 @@
|
|||
#include "os.h"
|
||||
#include "tmd5.h"
|
||||
#include "taos.h"
|
||||
#include "http.h"
|
||||
#include "httpLog.h"
|
||||
#include "httpCode.h"
|
||||
#include "httpHandle.h"
|
||||
#include "httpInt.h"
|
||||
#include "httpResp.h"
|
||||
#include "httpSql.h"
|
||||
#include "httpUtil.h"
|
||||
|
||||
bool httpCheckUsedbSql(char *sql) {
|
||||
if (strstr(sql, "use ") != NULL) {
|
||||
|
|
|
@ -18,9 +18,10 @@
|
|||
#include "tglobal.h"
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "httpInt.h"
|
||||
#include "tgHandle.h"
|
||||
#include "tgJson.h"
|
||||
#include "httpLog.h"
|
||||
#include "cJSON.h"
|
||||
|
||||
/*
|
||||
* taos.telegraf.cfg formats like
|
||||
|
|
|
@ -63,10 +63,12 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
|
|||
#endif
|
||||
}
|
||||
|
||||
#if 0
|
||||
static FORCE_INLINE void taosFreeNode(void *data) {
|
||||
SCacheDataNode *pNode = *(SCacheDataNode **)data;
|
||||
free(pNode);
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @param key key of object for hash, usually a null-terminated string
|
||||
|
@ -241,7 +243,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
|
|||
}
|
||||
|
||||
// set free cache node callback function for hash table
|
||||
taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
|
||||
// taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
|
||||
|
||||
pCacheObj->freeFp = freeCb;
|
||||
pCacheObj->refreshTime = refreshTime * 1000;
|
||||
|
@ -565,7 +567,17 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
|
|||
|
||||
void doCleanupDataCache(SCacheObj *pCacheObj) {
|
||||
__cache_wr_lock(pCacheObj);
|
||||
taosHashCleanup(pCacheObj->pHashTable);
|
||||
|
||||
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
|
||||
while (taosHashIterNext(pIter)) {
|
||||
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
|
||||
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
|
||||
taosCacheReleaseNode(pCacheObj, pNode);
|
||||
//}
|
||||
}
|
||||
taosHashDestroyIter(pIter);
|
||||
|
||||
taosHashCleanup(pCacheObj->pHashTable);
|
||||
__cache_unlock(pCacheObj);
|
||||
|
||||
taosTrashCanEmpty(pCacheObj, true);
|
||||
|
|
Loading…
Reference in New Issue