Merge branch 'develop' into test

This commit is contained in:
Ping Xiao 2020-06-15 19:05:02 +08:00
commit c59edd65ab
47 changed files with 1026 additions and 975 deletions

View File

@ -286,7 +286,7 @@ typedef struct STscObj {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
char acctId[TSDB_ACCT_LEN]; char acctId[TSDB_ACCT_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
char sversion[TSDB_VERSION_LEN]; char sversion[TSDB_VERSION_LEN];
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;

View File

@ -368,7 +368,9 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
pSql->fp = pSql->fetchFp; pSql->fp = pSql->fetchFp;
} }
(*pSql->fp)(pSql->param, taosres, code); if (pSql->fp) {
(*pSql->fp)(pSql->param, taosres, code);
}
if (shouldFree) { if (shouldFree) {
tscTrace("%p sqlObj is automatically freed in async res", pSql); tscTrace("%p sqlObj is automatically freed in async res", pSql);

View File

@ -97,7 +97,7 @@ int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int1
useconds = str2int64(pToken->z); useconds = str2int64(pToken->z);
} else { } else {
// strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm); // strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
if (taosParseTime(pToken->z, time, pToken->n, timePrec) != TSDB_CODE_SUCCESS) { if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z); return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
} }

View File

@ -138,7 +138,7 @@ static int setColumnFilterInfoForTimestamp(SQueryInfo* pQueryInfo, tVariant* pVa
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (seg != NULL) { if (seg != NULL) {
if (taosParseTime(pVar->pz, &time, pVar->nLen, tinfo.precision) != TSDB_CODE_SUCCESS) { if (taosParseTime(pVar->pz, &time, pVar->nLen, tinfo.precision, tsDaylight) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg); return invalidSqlErrMsg(pQueryInfo->msg, msg);
} }
} else { } else {
@ -1042,7 +1042,7 @@ int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pDB, SSQL
/* db name is not specified, the tableName dose not include db name */ /* db name is not specified, the tableName dose not include db name */
if (pDB != NULL) { if (pDB != NULL) {
if (pDB->n >= TSDB_DB_NAME_LEN) { if (pDB->n >= TSDB_ACCT_LEN + TSDB_DB_NAME_LEN) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
@ -3950,7 +3950,7 @@ int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t t
char* seg = strnchr(pRight->val.pz, '-', pRight->val.nLen, false); char* seg = strnchr(pRight->val.pz, '-', pRight->val.nLen, false);
if (seg != NULL) { if (seg != NULL) {
if (taosParseTime(pRight->val.pz, &val, pRight->val.nLen, TSDB_TIME_PRECISION_MICRO) == TSDB_CODE_SUCCESS) { if (taosParseTime(pRight->val.pz, &val, pRight->val.nLen, TSDB_TIME_PRECISION_MICRO, tsDaylight) == TSDB_CODE_SUCCESS) {
parsed = true; parsed = true;
} else { } else {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
@ -4508,7 +4508,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) { if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
len = tDataTypeDesc[pTagsSchema->type].nSize; len = tDataTypeDesc[pTagsSchema->type].nSize;
} else { } else {
len = varDataLen(pUpdateMsg->data); len = varDataTLen(pUpdateMsg->data);
} }
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data

View File

@ -1451,8 +1451,6 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
return false; return false;
} }
assert(pSql->fp != NULL);
STscObj* pTscObj = pSql->pTscObj; STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) {
return false; return false;

View File

@ -170,6 +170,7 @@ extern char gitinfo[];
extern char gitinfoOfInternal[]; extern char gitinfoOfInternal[];
extern char buildinfo[]; extern char buildinfo[];
extern int8_t tsDaylight;
extern char tsTimezone[64]; extern char tsTimezone[64];
extern char tsLocale[64]; extern char tsLocale[64];
extern char tsCharset[64]; // default encode string extern char tsCharset[64]; // default encode string

View File

@ -198,6 +198,7 @@ char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
char tsInternalPass[] = "secretkey"; char tsInternalPass[] = "secretkey";
int32_t tsMonitorInterval = 30; // seconds int32_t tsMonitorInterval = 30; // seconds
int8_t tsDaylight = 0;
char tsTimezone[64] = {0}; char tsTimezone[64] = {0};
char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string

View File

@ -58,6 +58,7 @@ void tsSetTimeZone() {
* (BST, +0100) * (BST, +0100)
*/ */
sprintf(tsTimezone, "(%s, %s%02d00)", tzname[daylight], tz >= 0 ? "+" : "-", abs(tz)); sprintf(tsTimezone, "(%s, %s%02d00)", tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
tsDaylight = daylight;
uPrint("timezone format changed to %s", tsTimezone); uPrint("timezone format changed to %s", tsTimezone);
} }

View File

@ -250,7 +250,7 @@ typedef struct {
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int8_t igExists; int8_t igExists;
int8_t getMeta; int8_t getMeta;
int16_t numOfTags; int16_t numOfTags;
@ -268,7 +268,7 @@ typedef struct {
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int16_t type; /* operation type */ int16_t type; /* operation type */
int16_t numOfCols; /* number of schema */ int16_t numOfCols; /* number of schema */
int32_t tagValLen; int32_t tagValLen;
@ -670,7 +670,7 @@ typedef struct {
*/ */
typedef struct { typedef struct {
int8_t type; int8_t type;
char db[TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
uint16_t payloadLen; uint16_t payloadLen;
char payload[]; char payload[];
} SCMShowMsg; } SCMShowMsg;

View File

@ -469,7 +469,6 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) {
} while( row != NULL); } while( row != NULL);
fclose(fp); fclose(fp);
taos_free_result(result);
return numOfRows; return numOfRows;
} }

View File

@ -297,7 +297,7 @@ void *deleteTable();
void *asyncWrite(void *sarg); void *asyncWrite(void *sarg);
void generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary); int generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary);
void rand_string(char *str, int size); void rand_string(char *str, int size);
@ -817,7 +817,7 @@ void queryDB(TAOS *taos, char *command) {
i--; i--;
} }
if (i == 0) { if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql); taos_free_result(pSql);
@ -846,14 +846,19 @@ void *syncWrite(void *sarg) {
int k; int k;
for (k = 0; k < winfo->nrecords_per_request;) { for (k = 0; k < winfo->nrecords_per_request;) {
int rand_num = rand() % 100; int rand_num = rand() % 100;
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) int len = -1;
{ if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) {
long d = tmp_time - rand() % 1000000 + rand_num; long d = tmp_time - rand() % 1000000 + rand_num;
generateData(data, data_type, ncols_per_record, d, len_of_binary); len = generateData(data, data_type, ncols_per_record, d, len_of_binary);
} else } else {
{ len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
} }
//assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= BUFFER_SIZE) { // too long
break;
}
pstr += sprintf(pstr, " %s", data); pstr += sprintf(pstr, " %s", data);
inserted++; inserted++;
k++; k++;
@ -968,7 +973,7 @@ double getCurrentTime() {
return tv.tv_sec + tv.tv_usec / 1E6; return tv.tv_sec + tv.tv_usec / 1E6;
} }
void generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary) { int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary) {
memset(res, 0, MAX_DATA_SIZE); memset(res, 0, MAX_DATA_SIZE);
char *pstr = res; char *pstr = res;
pstr += sprintf(pstr, "(%" PRId64, timestamp); pstr += sprintf(pstr, "(%" PRId64, timestamp);
@ -1002,9 +1007,16 @@ void generateData(char *res, char **data_type, int num_of_cols, int64_t timestam
rand_string(s, len_of_binary); rand_string(s, len_of_binary);
pstr += sprintf(pstr, ", \"%s\"", s); pstr += sprintf(pstr, ", \"%s\"", s);
} }
if (pstr - res > MAX_DATA_SIZE) {
perror("column length too long, abort");
exit(-1);
}
} }
pstr += sprintf(pstr, ")"); pstr += sprintf(pstr, ")");
return pstr - res;
} }
static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJK1234567890"; static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJK1234567890";

View File

@ -108,8 +108,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
} }
void mnodeReleaseConn(SConnObj *pConn) { void mnodeReleaseConn(SConnObj *pConn) {
if(pConn == NULL) return; if (pConn == NULL) return;
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
} }
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
@ -138,7 +138,7 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p
static void mnodeFreeConn(void *data) { static void mnodeFreeConn(void *data) {
SConnObj *pConn = data; SConnObj *pConn = data;
tfree(pConn->pQueries); tfree(pConn->pQueries);
tfree(pConn->pQueries); tfree(pConn->pStreams);
mTrace("connId:%d, is destroyed", pConn->connId); mTrace("connId:%d, is destroyed", pConn->connId);
} }

View File

@ -16,15 +16,11 @@
#ifndef TDENGINE_GC_HANDLE_H #ifndef TDENGINE_GC_HANDLE_H
#define TDENGINE_GC_HANDLE_H #define TDENGINE_GC_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "http.h" #include "http.h"
#include "httpCode.h" #include "httpInt.h"
#include "httpHandle.h" #include "httpUtil.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpSql.h"
#define GC_ROOT_URL_POS 0 #define GC_ROOT_URL_POS 0
#define GC_ACTION_URL_POS 1 #define GC_ACTION_URL_POS 1

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_TOKEN_H
#define TDENGINE_HTTP_TOKEN_H
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
#endif

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_CONTEXT_H
#define TDENGINE_HTTP_CONTEXT_H
#include "httpInt.h"
bool httpInitContexts();
void httpCleanupContexts();
const char *httpContextStateStr(HttpContextState state);
HttpContext *httpCreateContext(int32_t fd);
bool httpInitContext(HttpContext *pContext);
HttpContext *httpGetContext(void * pContext);
void httpReleaseContext(HttpContext *pContext);
void httpCloseContextByServer(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
#endif

View File

@ -13,304 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_HTTP_SERVER_H #ifndef TDENGINE_HTTP_HANDLE_H
#define TDENGINE_HTTP_SERVER_H #define TDENGINE_HTTP_HANDLE_H
#include <stdbool.h>
#include "pthread.h"
#include "semaphore.h"
#include "tmempool.h"
#include "taosdef.h"
#include "tutil.h"
#include "zlib.h"
#include "http.h"
#include "httpJson.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*65 //65k
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
#define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512
#define HTTP_VERSION_10 0
#define HTTP_VERSION_11 1
//#define HTTP_VERSION_12 2
#define HTTP_UNCUNKED 0
#define HTTP_CHUNKED 1
#define HTTP_KEEPALIVE_NO_INPUT 0
#define HTTP_KEEPALIVE_ENABLE 1
#define HTTP_KEEPALIVE_DISABLE 2
#define HTTP_REQTYPE_OTHERS 0
#define HTTP_REQTYPE_LOGIN 1
#define HTTP_REQTYPE_HEARTBEAT 2
#define HTTP_REQTYPE_SINGLE_SQL 3
#define HTTP_REQTYPE_MULTI_SQL 4
#define HTTP_CHECK_BODY_ERROR -1
#define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000
#define HTTP_DELAY_CLOSE_TIME_MS 500
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
HTTP_CONTEXT_STATE_DROPPING,
HTTP_CONTEXT_STATE_CLOSED
} HttpContextState;
struct HttpContext;
struct HttpThread;
typedef struct {
void *signature;
int expire;
int access;
void *taos;
char id[HTTP_SESSION_ID_LEN];
} HttpSession;
typedef enum {
HTTP_CMD_TYPE_UN_SPECIFIED,
HTTP_CMD_TYPE_CREATE_DB,
HTTP_CMD_TYPE_CREATE_STBALE,
HTTP_CMD_TYPE_INSERT
} HttpSqlCmdType;
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
typedef struct {
// used by single cmd
char *nativSql;
int32_t numOfRows;
int32_t code;
// these are the locations in the buffer
int32_t tagNames[TSDB_MAX_TAGS];
int32_t tagValues[TSDB_MAX_TAGS];
int32_t timestamp;
int32_t metric;
int32_t stable;
int32_t table;
int32_t values;
int32_t sql;
// used by multi-cmd
int8_t cmdType;
int8_t cmdReturnType;
int8_t cmdState;
int8_t tagNum;
} HttpSqlCmd;
typedef struct {
HttpSqlCmd *cmds;
int16_t pos;
int16_t size;
int16_t maxSize;
int32_t bufferPos;
int32_t bufferSize;
char * buffer;
} HttpSqlCmds;
typedef struct {
char *module;
bool (*decodeFp)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
} HttpEncodeMethod;
typedef struct {
char *pos;
int32_t len;
} HttpBuf;
typedef struct {
char buffer[HTTP_BUFFER_SIZE];
int bufsize;
char *pLast;
char *pCur;
HttpBuf method;
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
HttpBuf data; // body content
HttpBuf token; // auth token
HttpDecodeMethod *pMethod;
} HttpParser;
typedef struct HttpContext {
void * signature;
int fd;
uint32_t accessTimes;
uint32_t lastAccessTime;
uint8_t httpVersion;
uint8_t httpChunked;
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
uint8_t fromMemPool;
uint8_t acceptEncoding;
uint8_t contentEncoding;
uint8_t reqType;
uint8_t parsed;
int32_t state;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN];
void *taos;
HttpSession *session;
z_stream gzipStream;
HttpEncodeMethod *encodeMethod;
HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds;
JsonBuf *jsonBuf;
HttpParser parser;
void *timer;
struct HttpThread *pThread;
struct HttpContext *prev;
struct HttpContext *next;
} HttpContext;
typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
bool stop;
int pollFd;
int numOfFds;
int threadId;
char label[HTTP_LABEL_SIZE];
bool (*processData)(HttpContext *pContext);
struct HttpServer *pServer; // handle passed by upper layer during pServer initialization
} HttpThread;
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
bool online;
int fd;
int cacheContext;
int sessionExpire;
int numOfThreads;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
int methodScannerLen;
pthread_mutex_t serverMutex;
void *pSessionHash;
void *pContextPool;
void *expireTimer;
HttpThread *pThreads;
pthread_t thread;
bool (*processData)(HttpContext *pContext);
int requestNum;
void *timerHandle;
} HttpServer;
// http util method
bool httpCheckUsedbSql(char *sql);
void httpTimeToString(time_t t, char *buf, int buflen);
// http init method
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
// http server connection
void httpCleanUpConnect(HttpServer *pServer);
bool httpInitConnect(HttpServer *pServer);
// http context for each client connection
HttpContext *httpCreateContext(HttpServer *pServer);
bool httpInitContext(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext);
// http session method
void httpCreateSession(HttpContext *pContext, void *taos);
void httpAccessSession(HttpContext *pContext);
void httpFetchSession(HttpContext *pContext);
void httpRestoreSession(HttpContext *pContext);
void httpRemoveExpireSessions(HttpServer *pServer);
bool httpInitAllSessions(HttpServer *pServer);
void httpRemoveAllSessions(HttpServer *pServer);
void httpProcessSessionExpire(void *handle, void *tmrId);
// http request parser
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
// http token method
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
// util
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
bool httpProcessData(HttpContext *pContext);
bool httpReadDataImp(HttpContext *pContext);
bool httpParseRequest(HttpContext* pContext);
int httpCheckReadCompleted(HttpContext* pContext);
void httpReadDirtyData(HttpContext *pContext);
// http request handler // http request handler
void httpProcessRequest(HttpContext *pContext); void httpProcessRequest(HttpContext *pContext);
bool httpProcessData(HttpContext *pContext);
// http json printer
JsonBuf *httpMallocJsonBuf(HttpContext *pContext);
void httpFreeJsonBuf(HttpContext *pContext);
// http multicmds util
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
void httpFreeMultiCmds(HttpContext *pContext);
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
int httpCurSqlCmdPos(HttpContext *pContext);
void httpTrimTableName(char *name);
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
char *httpGetCmdsString(HttpContext *pContext, int pos);
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int httpGzipCompressInit(HttpContext *pContext);
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[];
const char* httpContextStateStr(HttpContextState state);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext);
#endif #endif

View File

@ -0,0 +1,237 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_INT_H
#define TDENGINE_HTTP_INT_H
#include <stdbool.h>
#include "pthread.h"
#include "semaphore.h"
#include "tmempool.h"
#include "taosdef.h"
#include "tutil.h"
#include "zlib.h"
#include "http.h"
#include "httpCode.h"
#include "httpLog.h"
#include "httpJson.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*65 //65k
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
#define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512
#define HTTP_VERSION_10 0
#define HTTP_VERSION_11 1
//#define HTTP_VERSION_12 2
#define HTTP_UNCUNKED 0
#define HTTP_CHUNKED 1
#define HTTP_KEEPALIVE_NO_INPUT 0
#define HTTP_KEEPALIVE_ENABLE 1
#define HTTP_KEEPALIVE_DISABLE 2
#define HTTP_REQTYPE_OTHERS 0
#define HTTP_REQTYPE_LOGIN 1
#define HTTP_REQTYPE_HEARTBEAT 2
#define HTTP_REQTYPE_SINGLE_SQL 3
#define HTTP_REQTYPE_MULTI_SQL 4
#define HTTP_CHECK_BODY_ERROR -1
#define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000
#define HTTP_DELAY_CLOSE_TIME_MS 500
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
typedef enum {
HTTP_SERVER_INIT,
HTTP_SERVER_RUNNING,
HTTP_SERVER_CLOSING,
HTTP_SERVER_CLOSED
} HttpServerStatus;
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
HTTP_CONTEXT_STATE_DROPPING,
HTTP_CONTEXT_STATE_CLOSED
} HttpContextState;
struct HttpContext;
struct HttpThread;
typedef struct {
char id[HTTP_SESSION_ID_LEN];
int refCount;
void *taos;
} HttpSession;
typedef enum {
HTTP_CMD_TYPE_UN_SPECIFIED,
HTTP_CMD_TYPE_CREATE_DB,
HTTP_CMD_TYPE_CREATE_STBALE,
HTTP_CMD_TYPE_INSERT
} HttpSqlCmdType;
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
typedef struct {
// used by single cmd
char *nativSql;
int32_t numOfRows;
int32_t code;
// these are the locations in the buffer
int32_t tagNames[TSDB_MAX_TAGS];
int32_t tagValues[TSDB_MAX_TAGS];
int32_t timestamp;
int32_t metric;
int32_t stable;
int32_t table;
int32_t values;
int32_t sql;
// used by multi-cmd
int8_t cmdType;
int8_t cmdReturnType;
int8_t cmdState;
int8_t tagNum;
} HttpSqlCmd;
typedef struct {
HttpSqlCmd *cmds;
int16_t pos;
int16_t size;
int16_t maxSize;
int32_t bufferPos;
int32_t bufferSize;
char * buffer;
} HttpSqlCmds;
typedef struct {
char *module;
bool (*decodeFp)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
} HttpEncodeMethod;
typedef struct {
char *pos;
int32_t len;
} HttpBuf;
typedef struct {
char buffer[HTTP_BUFFER_SIZE];
int bufsize;
char *pLast;
char *pCur;
HttpBuf method;
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
HttpBuf data; // body content
HttpBuf token; // auth token
HttpDecodeMethod *pMethod;
} HttpParser;
typedef struct HttpContext {
int32_t refCount;
int fd;
uint32_t accessTimes;
uint32_t lastAccessTime;
int32_t state;
uint8_t httpVersion;
uint8_t httpChunked;
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
uint8_t acceptEncoding;
uint8_t contentEncoding;
uint8_t reqType;
uint8_t parsed;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN];
void * taos;
void * ppContext;
HttpSession *session;
z_stream gzipStream;
HttpParser parser;
HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds;
JsonBuf * jsonBuf;
void * timer;
HttpEncodeMethod * encodeMethod;
struct HttpThread *pThread;
} HttpContext;
typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
bool stop;
int pollFd;
int numOfFds;
int threadId;
char label[HTTP_LABEL_SIZE];
bool (*processData)(HttpContext *pContext);
} HttpThread;
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
int fd;
int numOfThreads;
int methodScannerLen;
int32_t requestNum;
int32_t status;
pthread_t thread;
HttpThread * pThreads;
void * contextCache;
void * sessionCache;
pthread_mutex_t serverMutex;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
bool (*processData)(HttpContext *pContext);
} HttpServer;
extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[];
extern HttpServer tsHttpServer;
#endif

View File

@ -97,4 +97,8 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len);
// quick // quick
void httpJsonPairStatus(JsonBuf* buf, int code); void httpJsonPairStatus(JsonBuf* buf, int code);
// http json printer
JsonBuf* httpMallocJsonBuf(struct HttpContext* pContext);
void httpFreeJsonBuf(struct HttpContext* pContext);
#endif #endif

View File

@ -16,7 +16,7 @@
#ifndef TDENGINE_HTTP_RESP_H #ifndef TDENGINE_HTTP_RESP_H
#define TDENGINE_HTTP_RESP_H #define TDENGINE_HTTP_RESP_H
#include "httpHandle.h" #include "httpInt.h"
enum _httpRespTempl { enum _httpRespTempl {
HTTP_RESPONSE_JSON_OK, HTTP_RESPONSE_JSON_OK,

View File

@ -0,0 +1,28 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SERVER_H
#define TDENGINE_HTTP_SERVER_H
#include "httpInt.h"
bool httpInitConnect();
void httpCleanUpConnect();
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
bool httpReadDataImp(HttpContext *pContext);
#endif

View File

@ -0,0 +1,27 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SESSION_H
#define TDENGINE_HTTP_SESSION_H
bool httpInitSessions();
void httpCleanUpSessions();
// http session method
void httpCreateSession(HttpContext *pContext, void *taos);
void httpGetSession(HttpContext *pContext);
void httpReleaseSession(HttpContext *pContext);
#endif

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SQL_H
#define TDENGINE_HTTP_SQL_H
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
void httpFreeMultiCmds(HttpContext *pContext);
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
int httpCurSqlCmdPos(HttpContext *pContext);
void httpTrimTableName(char *name);
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
char *httpGetCmdsString(HttpContext *pContext, int pos);
#endif

View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_UTIL_H
#define TDENGINE_HTTP_UTIL_H
bool httpCheckUsedbSql(char *sql);
void httpTimeToString(time_t t, char *buf, int buflen);
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
bool httpParseRequest(HttpContext *pContext);
int httpCheckReadCompleted(HttpContext *pContext);
void httpReadDirtyData(HttpContext *pContext);
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int httpGzipCompressInit(HttpContext *pContext);
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
// http request parser
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
#endif

View File

@ -16,15 +16,11 @@
#ifndef TDENGINE_REST_HANDLE_H #ifndef TDENGINE_REST_HANDLE_H
#define TDENGINE_REST_HANDLE_H #define TDENGINE_REST_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "http.h" #include "http.h"
#include "httpCode.h" #include "httpInt.h"
#include "httpHandle.h" #include "httpUtil.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpSql.h"
#define REST_ROOT_URL_POS 0 #define REST_ROOT_URL_POS 0
#define REST_ACTION_URL_POS 1 #define REST_ACTION_URL_POS 1

View File

@ -16,16 +16,11 @@
#ifndef TDENGINE_TG_HANDLE_H #ifndef TDENGINE_TG_HANDLE_H
#define TDENGINE_TG_HANDLE_H #define TDENGINE_TG_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "cJSON.h"
#include "http.h" #include "http.h"
#include "httpCode.h" #include "httpInt.h"
#include "httpHandle.h" #include "httpUtil.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpSql.h"
#define TG_ROOT_URL_POS 0 #define TG_ROOT_URL_POS 0
#define TG_DB_URL_POS 1 #define TG_DB_URL_POS 1

View File

@ -18,8 +18,8 @@
#include "tkey.h" #include "tkey.h"
#include "tutil.h" #include "tutil.h"
#include "http.h" #include "http.h"
#include "httpLog.h" #include "httpInt.h"
#include "httpHandle.h" #include "httpAuth.h"
#define KEY_DES_4 4971256377704625728L #define KEY_DES_4 4971256377704625728L
@ -73,6 +73,7 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) {
unsigned char *base64 = base64_decode(token, len, &outlen); unsigned char *base64 = base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) { if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, ip:%s, taosd token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token); httpError("context:%p, fd:%d, ip:%s, taosd token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token);
if (base64) free(base64);
return false; return false;
} }
if (outlen != (TSDB_USER_LEN + TSDB_PASSWORD_LEN)) { if (outlen != (TSDB_USER_LEN + TSDB_PASSWORD_LEN)) {

View File

@ -0,0 +1,227 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tsocket.h"
#include "tutil.h"
#include "ttime.h"
#include "ttimer.h"
#include "tglobal.h"
#include "tcache.h"
#include "hash.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpSql.h"
#include "httpSession.h"
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread;
if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
taosCloseSocket(pContext->fd);
pContext->fd = -1;
}
}
static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
if (pContext->fd > 0) tclose(pContext->fd);
HttpThread *pThread = pContext->pThread;
httpRemoveContextFromEpoll(pContext);
httpReleaseSession(pContext);
atomic_sub_fetch_32(&pThread->numOfFds, 1);
pContext->pThread = 0;
pContext->state = HTTP_CONTEXT_STATE_CLOSED;
// avoid double free
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
tfree(pContext);
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext);
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
}
return true;
}
void httpCleanupContexts() {
if (tsHttpServer.contextCache != NULL) {
SCacheObj *cache = tsHttpServer.contextCache;
httpPrint("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL;
}
}
const char *httpContextStateStr(HttpContextState state) {
switch (state) {
case HTTP_CONTEXT_STATE_READY:
return "ready";
case HTTP_CONTEXT_STATE_HANDLING:
return "handling";
case HTTP_CONTEXT_STATE_DROPPING:
return "dropping";
case HTTP_CONTEXT_STATE_CLOSED:
return "closed";
default:
return "unknown";
}
}
void httpNotifyContextClose(HttpContext *pContext) {
shutdown(pContext->fd, SHUT_WR);
}
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
}
HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL;
char contextStr[16] = {0};
snprintf(contextStr, sizeof(contextStr), "%p", pContext);
pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3);
pContext->ppContext = ppContext;
httpTrace("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
return pContext;
}
HttpContext *httpGetContext(void *ptr) {
char contextStr[16] = {0};
snprintf(contextStr, sizeof(contextStr), "%p", ptr);
HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr);
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpTrace("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount);
return pContext;
}
}
return NULL;
}
void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
httpTrace("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount);
HttpContext **ppContext = pContext->ppContext;
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
}
bool httpInitContext(HttpContext *pContext) {
pContext->accessTimes++;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->httpVersion = HTTP_VERSION_10;
pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT;
pContext->httpChunked = HTTP_UNCUNKED;
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
pContext->reqType = HTTP_REQTYPE_OTHERS;
pContext->encodeMethod = NULL;
pContext->timer = NULL;
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
HttpParser *pParser = &pContext->parser;
memset(pParser, 0, sizeof(HttpParser));
pParser->pCur = pParser->pLast = pParser->buffer;
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed);
return true;
}
void httpCloseContextByApp(HttpContext *pContext) {
pContext->parsed = false;
bool keepAlive = true;
if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) {
keepAlive = false;
} else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) {
keepAlive = false;
} else {}
if (keepAlive) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
} else {
httpRemoveContextFromEpoll(pContext);
httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
} else {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
httpReleaseContext(pContext);
}
void httpCloseContextByServer(HttpContext *pContext) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr);
} else {
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state);
}
pContext->parsed = false;
httpRemoveContextFromEpoll(pContext);
httpReleaseContext(pContext);
}

View File

@ -19,11 +19,12 @@
#include "tglobal.h" #include "tglobal.h"
#include "tsocket.h" #include "tsocket.h"
#include "ttimer.h" #include "ttimer.h"
#include "http.h" #include "httpInt.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpAuth.h"
#include "httpServer.h"
#include "httpContext.h"
#include "httpHandle.h"
void httpToLowerUrl(char* url) { void httpToLowerUrl(char* url) {
/*ignore case */ /*ignore case */
@ -58,6 +59,10 @@ bool httpParseURL(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
char* pSeek; char* pSeek;
char* pEnd = strchr(pParser->pLast, ' '); char* pEnd = strchr(pParser->pLast, ' ');
if (pEnd == NULL) {
return false;
}
if (*pParser->pLast != '/') { if (*pParser->pLast != '/') {
httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL); httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL);
return false; return false;
@ -159,7 +164,7 @@ bool httpGetHttpMethod(HttpContext* pContext) {
bool httpGetDecodeMethod(HttpContext* pContext) { bool httpGetDecodeMethod(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
HttpServer* pServer = pContext->pThread->pServer; HttpServer* pServer = &tsHttpServer;
int methodLen = pServer->methodScannerLen; int methodLen = pServer->methodScannerLen;
for (int i = 0; i < methodLen; i++) { for (int i = 0; i < methodLen; i++) {
HttpDecodeMethod* method = pServer->methodScanner[i]; HttpDecodeMethod* method = pServer->methodScanner[i];

View File

@ -22,6 +22,7 @@
#include "httpCode.h" #include "httpCode.h"
#include "httpJson.h" #include "httpJson.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpUtil.h"
#define MAX_NUM_STR_SZ 25 #define MAX_NUM_STR_SZ 25

View File

@ -21,6 +21,7 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpCode.h" #include "httpCode.h"
#include "httpJson.h" #include "httpJson.h"
#include "httpContext.h"
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"}; const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};

View File

@ -21,244 +21,15 @@
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tglobal.h" #include "tglobal.h"
#include "http.h" #include "httpInt.h"
#include "httpLog.h" #include "httpContext.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpUtil.h"
#ifndef EPOLLWAKEUP #ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29) #define EPOLLWAKEUP (1u << 29)
#endif #endif
const char* httpContextStateStr(HttpContextState state) {
switch (state) {
case HTTP_CONTEXT_STATE_READY:
return "ready";
case HTTP_CONTEXT_STATE_HANDLING:
return "handling";
case HTTP_CONTEXT_STATE_DROPPING:
return "dropping";
case HTTP_CONTEXT_STATE_CLOSED:
return "closed";
default:
return "unknown";
}
}
void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext) {
if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
taosCloseSocket(pContext->fd);
pContext->fd = -1;
}
}
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
}
void httpFreeContext(HttpServer *pServer, HttpContext *pContext);
/**
* context will be reused while connection exist
* multiCmds and jsonBuf will be malloc after taos_query_a called
* and won't be freed until connection closed
*/
HttpContext *httpCreateContext(HttpServer *pServer) {
HttpContext *pContext = (HttpContext *)taosMemPoolMalloc(pServer->pContextPool);
if (pContext != NULL) {
pContext->fromMemPool = 1;
httpTrace("context:%p, is malloced from mempool", pContext);
} else {
pContext = (HttpContext *)malloc(sizeof(HttpContext));
if (pContext == NULL) {
return NULL;
} else {
memset(pContext, 0, sizeof(HttpContext));
}
httpTrace("context:%p, is malloced from raw memory", pContext);
}
pContext->signature = pContext;
pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
return pContext;
}
void httpFreeContext(HttpServer *pServer, HttpContext *pContext) {
if (pContext->fromMemPool) {
httpTrace("context:%p, is freed from mempool", pContext);
taosMemPoolFree(pServer->pContextPool, (char *)pContext);
} else {
httpTrace("context:%p, is freed from raw memory", pContext);
tfree(pContext);
}
}
void httpCleanUpContextTimer(HttpContext *pContext) {
if (pContext->timer != NULL) {
taosTmrStopA(&pContext->timer);
//httpTrace("context:%p, ip:%s, close timer:%p", pContext, pContext->ipstr, pContext->timer);
pContext->timer = NULL;
}
}
void httpCleanUpContext(HttpContext *pContext, void *unused) {
httpTrace("context:%p, start the clean up operation, sig:%p", pContext, pContext->signature);
void *sig = atomic_val_compare_exchange_ptr(&pContext->signature, pContext, 0);
if (sig == NULL) {
httpTrace("context:%p is freed by another thread.", pContext);
return;
}
HttpThread *pThread = pContext->pThread;
httpCleanUpContextTimer(pContext);
httpRemoveContextFromEpoll(pThread, pContext);
httpRestoreSession(pContext);
pthread_mutex_lock(&pThread->threadMutex);
pThread->numOfFds--;
if (pThread->numOfFds < 0) {
httpError("context:%p, ip:%s, thread:%s, number of FDs:%d shall never be negative",
pContext, pContext->ipstr, pThread->label, pThread->numOfFds);
pThread->numOfFds = 0;
}
// remove from the link list
if (pContext->prev) {
(pContext->prev)->next = pContext->next;
} else {
pThread->pHead = pContext->next;
}
if (pContext->next) {
(pContext->next)->prev = pContext->prev;
}
pthread_mutex_unlock(&pThread->threadMutex);
httpTrace("context:%p, ip:%s, thread:%s, numOfFds:%d, context is cleaned up", pContext, pContext->ipstr,
pThread->label, pThread->numOfFds);
pContext->signature = 0;
pContext->fd = -1;
pContext->pThread = 0;
pContext->prev = 0;
pContext->next = 0;
pContext->state = HTTP_CONTEXT_STATE_READY;
// avoid double free
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
httpFreeContext(pThread->pServer, pContext);
}
bool httpInitContext(HttpContext *pContext) {
pContext->accessTimes++;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->httpVersion = HTTP_VERSION_10;
pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT;
pContext->httpChunked = HTTP_UNCUNKED;
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
pContext->reqType = HTTP_REQTYPE_OTHERS;
pContext->encodeMethod = NULL;
pContext->timer = NULL;
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
HttpParser *pParser = &pContext->parser;
memset(pParser, 0, sizeof(HttpParser));
pParser->pCur = pParser->pLast = pParser->buffer;
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed);
return true;
}
void httpCloseContext(HttpThread *pThread, HttpContext *pContext) {
taosTmrReset((TAOS_TMR_CALLBACK)httpCleanUpContext, HTTP_DELAY_CLOSE_TIME_MS, pContext, pThread->pServer->timerHandle, &pContext->timer);
httpTrace("context:%p, fd:%d, ip:%s, state:%s will be closed after:%d ms, timer:%p",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), HTTP_DELAY_CLOSE_TIME_MS, pContext->timer);
}
void httpCloseContextByApp(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread;
pContext->parsed = false;
bool keepAlive = true;
if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) {
keepAlive = false;
} else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) {
keepAlive = false;
} else {}
if (keepAlive) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pThread, pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
httpCloseContext(pThread, pContext);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pThread, pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
httpCloseContext(pThread, pContext);
} else {
httpRemoveContextFromEpoll(pThread, pContext);
httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
httpCloseContext(pThread, pContext);
}
} else {
httpRemoveContextFromEpoll(pThread, pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
httpCloseContext(pThread, pContext);
}
}
void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) {
httpRemoveContextFromEpoll(pThread, pContext);
pContext->parsed = false;
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr);
httpCloseContext(pThread, pContext);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr);
httpCloseContext(pThread, pContext);
} else {
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state);
httpCloseContext(pThread, pContext);
}
}
void httpCloseContextByServerForExpired(void *param, void *tmrId) {
HttpContext *pContext = (HttpContext *)param;
httpRemoveContextFromEpoll(pContext->pThread, pContext);
httpError("context:%p, fd:%d, ip:%s, read http body error, time expired, timer:%p", pContext, pContext->fd, pContext->ipstr, tmrId);
httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR);
httpCloseContextByServer(pContext->pThread, pContext);
}
static void httpStopThread(HttpThread* pThread) { static void httpStopThread(HttpThread* pThread) {
pThread->stop = true; pThread->stop = true;
@ -281,19 +52,13 @@ static void httpStopThread(HttpThread* pThread) {
close(pThread->pollFd); close(pThread->pollFd);
pthread_mutex_destroy(&(pThread->threadMutex)); pthread_mutex_destroy(&(pThread->threadMutex));
//while (pThread->pHead) {
// httpCleanUpContext(pThread->pHead, 0);
//}
} }
void httpCleanUpConnect() {
HttpServer *pServer = &tsHttpServer;
if (pServer->pThreads == NULL) return;
void httpCleanUpConnect(HttpServer *pServer) {
if (pServer == NULL) return;
shutdown(pServer->fd, SHUT_RD);
pthread_join(pServer->thread, NULL); pthread_join(pServer->thread, NULL);
for (int i = 0; i < pServer->numOfThreads; ++i) { for (int i = 0; i < pServer->numOfThreads; ++i) {
HttpThread* pThread = pServer->pThreads + i; HttpThread* pThread = pServer->pThreads + i;
if (pThread != NULL) { if (pThread != NULL) {
@ -302,19 +67,10 @@ void httpCleanUpConnect(HttpServer *pServer) {
} }
tfree(pServer->pThreads); tfree(pServer->pThreads);
pServer->pThreads = NULL;
httpTrace("http server:%s is cleaned up", pServer->label); httpTrace("http server:%s is cleaned up", pServer->label);
} }
// read all the data, then just discard it
void httpReadDirtyData(HttpContext *pContext) {
int fd = pContext->fd;
char data[1024] = {0};
int len = (int)taosReadSocket(fd, data, 1024);
while (len >= sizeof(data)) {
len = (int)taosReadSocket(fd, data, 1024);
}
}
bool httpReadDataImp(HttpContext *pContext) { bool httpReadDataImp(HttpContext *pContext) {
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
@ -338,11 +94,10 @@ bool httpReadDataImp(HttpContext *pContext) {
} }
if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
httpReadDirtyData(pContext);
httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d", httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE); pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE);
httpRemoveContextFromEpoll(pContext->pThread, pContext);
httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG); httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG);
httpNotifyContextClose(pContext);
return false; return false;
} }
} }
@ -352,7 +107,7 @@ bool httpReadDataImp(HttpContext *pContext) {
return true; return true;
} }
bool httpDecompressData(HttpContext *pContext) { static bool httpDecompressData(HttpContext *pContext) {
if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) { if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) {
httpDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); httpDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos);
return true; return true;
@ -382,45 +137,43 @@ bool httpDecompressData(HttpContext *pContext) {
return ret == 0; return ret == 0;
} }
bool httpReadData(HttpThread *pThread, HttpContext *pContext) { static bool httpReadData(HttpContext *pContext) {
if (!pContext->parsed) { if (!pContext->parsed) {
httpInitContext(pContext); httpInitContext(pContext);
} }
if (!httpReadDataImp(pContext)) { if (!httpReadDataImp(pContext)) {
httpCloseContextByServer(pThread, pContext); httpNotifyContextClose(pContext);
return false; return false;
} }
if (!httpParseRequest(pContext)) { if (!httpParseRequest(pContext)) {
httpCloseContextByServer(pThread, pContext); httpNotifyContextClose(pContext);
return false; return false;
} }
int ret = httpCheckReadCompleted(pContext); int ret = httpCheckReadCompleted(pContext);
if (ret == HTTP_CHECK_BODY_CONTINUE) { if (ret == HTTP_CHECK_BODY_CONTINUE) {
taosTmrReset(httpCloseContextByServerForExpired, HTTP_EXPIRED_TIME, pContext, pThread->pServer->timerHandle, &pContext->timer); //httpTrace("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
//httpTrace("context:%p, fd:%d, ip:%s, not finished yet, try another times, timer:%p", pContext, pContext->fd, pContext->ipstr, pContext->timer);
return false; return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){ } else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpCleanUpContextTimer(pContext);
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", httpTrace("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len); pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len);
if (httpDecompressData(pContext)) { if (httpDecompressData(pContext)) {
return true; return true;
} else { } else {
httpCloseContextByServer(pThread, pContext); httpNotifyContextClose(pContext);
return false; return false;
} }
} else { } else {
httpCleanUpContextTimer(pContext);
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
httpCloseContextByServer(pThread, pContext); httpNotifyContextClose(pContext);
return false; return false;
} }
} }
void httpProcessHttpData(void *param) { static void httpProcessHttpData(void *param) {
HttpServer *pServer = &tsHttpServer;
HttpThread *pThread = (HttpThread *)param; HttpThread *pThread = (HttpThread *)param;
HttpContext *pContext; HttpContext *pContext;
int fdNum; int fdNum;
@ -441,77 +194,72 @@ void httpProcessHttpData(void *param) {
if (fdNum <= 0) continue; if (fdNum <= 0) continue;
for (int i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
pContext = events[i].data.ptr; pContext = httpGetContext(events[i].data.ptr);
if (pContext->signature != pContext || pContext->pThread != pThread || pContext->fd <= 0) { if (pContext == NULL) {
httpError("context:%p, is already released, close connect", events[i].data.ptr);
//epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
//tclose(events[i].data.fd);
continue; continue;
} }
if (events[i].events & EPOLLPRI) { if (events[i].events & EPOLLPRI) {
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect", httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpRemoveContextFromEpoll(pThread, pContext); httpCloseContextByServer(pContext);
httpCloseContextByServer(pThread, pContext);
continue; continue;
} }
if (events[i].events & EPOLLRDHUP) { if (events[i].events & EPOLLRDHUP) {
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpRemoveContextFromEpoll(pThread, pContext); httpCloseContextByServer(pContext);
httpCloseContextByServer(pThread, pContext);
continue; continue;
} }
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect", httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpRemoveContextFromEpoll(pThread, pContext); httpCloseContextByServer(pContext);
httpCloseContextByServer(pThread, pContext);
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect", httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpRemoveContextFromEpoll(pThread, pContext); httpCloseContextByServer(pContext);
httpCloseContextByServer(pThread, pContext);
continue; continue;
} }
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events", httpTrace("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state));
httpReleaseContext(pContext);
continue; continue;
} }
if (!pContext->pThread->pServer->online) { if (pServer->status != HTTP_SERVER_RUNNING) {
httpTrace("context:%p, fd:%d, ip:%s, state:%s, server is not online, accessed:%d, close connect", httpTrace("context:%p, fd:%d, ip:%s, state:%s, server is not running, accessed:%d, close connect", pContext,
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpRemoveContextFromEpoll(pThread, pContext);
httpReadDirtyData(pContext);
httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE); httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE);
httpCloseContextByServer(pThread, pContext); httpNotifyContextClose(pContext);
continue;
} else { } else {
if (httpReadData(pThread, pContext)) { if (httpReadData(pContext)) {
(*(pThread->processData))(pContext); (*(pThread->processData))(pContext);
atomic_fetch_add_32(&pThread->pServer->requestNum, 1); atomic_fetch_add_32(&pServer->requestNum, 1);
} }
} }
} }
} }
} }
void* httpAcceptHttpConnection(void *arg) { static void *httpAcceptHttpConnection(void *arg) {
int connFd = -1; int connFd = -1;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
int threadId = 0; int threadId = 0;
HttpThread * pThread; HttpServer * pServer = &tsHttpServer;
HttpServer * pServer; HttpThread * pThread = NULL;
HttpContext * pContext; HttpContext * pContext = NULL;
int totalFds; int totalFds = 0;
pServer = (HttpServer *)arg;
sigset_t set; sigset_t set;
sigemptyset(&set); sigemptyset(&set);
@ -521,12 +269,12 @@ void* httpAcceptHttpConnection(void *arg) {
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
if (pServer->fd < 0) { if (pServer->fd < 0) {
httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp), httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label,
pServer->serverPort, strerror(errno)); taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno));
return NULL; return NULL;
} else { } else {
httpPrint("http service init success at %u", pServer->serverPort); httpPrint("http server init success at %u", pServer->serverPort);
pServer->online = true; pServer->status = HTTP_SERVER_RUNNING;
} }
while (1) { while (1) {
@ -534,10 +282,10 @@ void* httpAcceptHttpConnection(void *arg) {
connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen); connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd == -1) { if (connFd == -1) {
if (errno == EINVAL) { if (errno == EINVAL) {
httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label); httpTrace("http server:%s socket was shutdown, exiting...", pServer->label);
break; break;
} }
httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno)); httpError("http server:%s, accept connect failure, errno:%d reason:%s", pServer->label, errno, strerror(errno));
continue; continue;
} }
@ -547,8 +295,8 @@ void* httpAcceptHttpConnection(void *arg) {
} }
if (totalFds > tsHttpCacheSessions * 100) { if (totalFds > tsHttpCacheSessions * 100) {
httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection", httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection", connFd,
connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions); inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions);
taosCloseSocket(connFd); taosCloseSocket(connFd);
continue; continue;
} }
@ -559,7 +307,7 @@ void* httpAcceptHttpConnection(void *arg) {
// pick up the thread to handle this connection // pick up the thread to handle this connection
pThread = pServer->pThreads + threadId; pThread = pServer->pThreads + threadId;
pContext = httpCreateContext(pServer); pContext = httpCreateContext(connFd);
if (pContext == NULL) { if (pContext == NULL) {
httpError("fd:%d, ip:%s:%u, no enough resource to allocate http context", connFd, inet_ntoa(clientAddr.sin_addr), httpError("fd:%d, ip:%s:%u, no enough resource to allocate http context", connFd, inet_ntoa(clientAddr.sin_addr),
htons(clientAddr.sin_port)); htons(clientAddr.sin_port));
@ -567,39 +315,24 @@ void* httpAcceptHttpConnection(void *arg) {
continue; continue;
} }
httpTrace("context:%p, fd:%d, ip:%s:%u, thread:%s, numOfFds:%d, totalFds:%d, accept a new connection",
pContext, connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), pThread->label,
pThread->numOfFds, totalFds);
pContext->fd = connFd;
sprintf(pContext->ipstr, "%s:%d", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
pContext->pThread = pThread; pContext->pThread = pThread;
sprintf(pContext->ipstr, "%s:%u", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
struct epoll_event event; struct epoll_event event;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
event.data.ptr = pContext; event.data.ptr = pContext;
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
httpError("context:%p, fd:%d, ip:%s:%u, thread:%s, failed to add http fd for epoll, error:%s", httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
pContext, connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), pThread->label, pContext->ipstr, pThread->label, strerror(errno));
strerror(errno)); tclose(pContext->fd);
httpFreeContext(pThread->pServer, pContext); httpReleaseContext(pContext);
tclose(connFd);
continue; continue;
} }
// notify the data process, add into the FdObj list // notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThread->threadMutex)); atomic_add_fetch_32(&pThread->numOfFds, 1);
httpTrace("context:%p, fd:%d, ip:%s, thread:%s numOfFds:%d totalFds:%d, accept a new connection", pContext, connFd,
pContext->next = pThread->pHead; pContext->ipstr, pThread->label, pThread->numOfFds, totalFds);
if (pThread->pHead) (pThread->pHead)->prev = pContext;
pThread->pHead = pContext;
pThread->numOfFds++;
pthread_mutex_unlock(&(pThread->threadMutex));
// pick up next thread for next connection // pick up next thread for next connection
threadId++; threadId++;
@ -610,21 +343,17 @@ void* httpAcceptHttpConnection(void *arg) {
return NULL; return NULL;
} }
bool httpInitConnect(HttpServer *pServer) { bool httpInitConnect() {
int i; HttpServer *pServer = &tsHttpServer;
HttpThread * pThread; pServer->pThreads = calloc(pServer->numOfThreads, sizeof(HttpThread));
pServer->pThreads = (HttpThread *)malloc(sizeof(HttpThread) * (size_t)pServer->numOfThreads);
if (pServer->pThreads == NULL) { if (pServer->pThreads == NULL) {
httpError("init error no enough memory"); httpError("init error no enough memory");
return false; return false;
} }
memset(pServer->pThreads, 0, sizeof(HttpThread) * (size_t)pServer->numOfThreads);
pThread = pServer->pThreads; HttpThread *pThread = pServer->pThreads;
for (i = 0; i < pServer->numOfThreads; ++i) { for (int i = 0; i < pServer->numOfThreads; ++i) {
sprintf(pThread->label, "%s%d", pServer->label, i); sprintf(pThread->label, "%s%d", pServer->label, i);
pThread->pServer = pServer;
pThread->processData = pServer->processData; pThread->processData = pServer->processData;
pThread->threadId = i; pThread->threadId = i;
@ -643,8 +372,8 @@ bool httpInitConnect(HttpServer *pServer) {
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) { if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) {
httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label,
pThread->label, strerror(errno)); strerror(errno));
return false; return false;
} }
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);

View File

@ -15,44 +15,28 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "hash.h"
#include "taos.h" #include "taos.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "tglobal.h"
#include "http.h" #include "tcache.h"
#include "httpLog.h" #include "httpInt.h"
#include "httpCode.h" #include "httpContext.h"
#include "httpHandle.h" #include "httpSession.h"
#include "httpResp.h"
void httpAccessSession(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer;
pthread_mutex_lock(&server->serverMutex);
if (pContext->session == pContext->session->signature) {
pContext->session->expire = (int) taosGetTimestampSec() + pContext->pThread->pServer->sessionExpire;
}
pthread_mutex_unlock(&server->serverMutex);
}
void httpCreateSession(HttpContext *pContext, void *taos) { void httpCreateSession(HttpContext *pContext, void *taos) {
HttpServer *server = pContext->pThread->pServer; HttpServer *server = &tsHttpServer;
httpReleaseSession(pContext);
pthread_mutex_lock(&server->serverMutex); pthread_mutex_lock(&server->serverMutex);
if (pContext->session != NULL && pContext->session == pContext->session->signature) { HttpSession session = {0};
httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos);
pContext->session->expire = 0;
pContext->session->access--;
}
HttpSession session;
session.taos = taos; session.taos = taos;
session.expire = (int)taosGetTimestampSec() + server->sessionExpire; session.refCount = 1;
session.access = 1; snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
int sessionIdLen = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
taosHashPut(server->pSessionHash, session.id, sessionIdLen, (char *)(&session), sizeof(HttpSession)); pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire);
pContext->session = taosHashGet(server->pSessionHash, session.id, sessionIdLen); // void *temp = pContext->session;
// taosCacheRelease(server->sessionCache, (void **)&temp, false);
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user,
@ -62,26 +46,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
return; return;
} }
pContext->session->signature = pContext->session; httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd,
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr, pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
pContext->user, pContext->session, pContext->session->taos);
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
} }
void httpFetchSessionImp(HttpContext *pContext) { static void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer; HttpServer *server = &tsHttpServer;
pthread_mutex_lock(&server->serverMutex); pthread_mutex_lock(&server->serverMutex);
char sessionId[HTTP_SESSION_ID_LEN]; char sessionId[HTTP_SESSION_ID_LEN];
int sessonIdLen = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosHashGet(server->pSessionHash, sessionId, sessonIdLen); pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId);
if (pContext->session != NULL && pContext->session == pContext->session->signature) { if (pContext->session != NULL) {
pContext->session->access++; atomic_add_fetch_32(&pContext->session->refCount, 1);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d", httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
pContext->session->taos, pContext->session->access, pContext->session->expire);
pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire;
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr, httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr,
pContext->user); pContext->user);
@ -90,113 +71,54 @@ void httpFetchSessionImp(HttpContext *pContext) {
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
} }
void httpFetchSession(HttpContext *pContext) { void httpGetSession(HttpContext *pContext) {
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpFetchSessionImp(pContext); httpFetchSessionImp(pContext);
} else { } else {
char sessionId[HTTP_SESSION_ID_LEN]; char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
if (strcmp(pContext->session->id, sessionId) != 0) { httpReleaseSession(pContext);
httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user); httpFetchSessionImp(pContext);
httpRestoreSession(pContext);
httpFetchSessionImp(pContext);
}
} }
} }
void httpRestoreSession(HttpContext *pContext) { void httpReleaseSession(HttpContext *pContext) {
HttpServer * server = pContext->pThread->pServer; if (pContext == NULL || pContext->session == NULL) return;
// all access to the session is via serverMutex int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1);
pthread_mutex_lock(&server->serverMutex); assert(refCount >= 0);
HttpSession *session = pContext->session; httpTrace("context:%p, release session:%p:%p, sessionRef:%d", pContext, pContext->session, pContext->session->taos,
if (session == NULL || session != session->signature) { pContext->session->refCount);
pthread_mutex_unlock(&server->serverMutex);
return; taosCacheRelease(tsHttpServer.sessionCache, (void **)&pContext->session, false);
}
session->access--;
httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d",
pContext, pContext->ipstr, pContext->user, session, session->taos,
session->access, pContext->session->expire);
pContext->session = NULL; pContext->session = NULL;
pthread_mutex_unlock(&server->serverMutex);
} }
void httpResetSession(HttpSession *pSession) { static void httpDestroySession(void *data) {
httpTrace("close session:%p:%p", pSession, pSession->taos); HttpSession *session = data;
if (pSession->taos != NULL) { httpTrace("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
taos_close(pSession->taos);
pSession->taos = NULL; if (session->taos != NULL) {
taos_close(session->taos);
session->taos = NULL;
} }
pSession->signature = NULL;
} }
void httpRemoveAllSessions(HttpServer *pServer) { void httpCleanUpSessions() {
SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash); if (tsHttpServer.sessionCache != NULL) {
SCacheObj *cache = tsHttpServer.sessionCache;
while (taosHashIterNext(pIter)) { httpPrint("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
HttpSession *pSession = taosHashIterGet(pIter); taosCacheCleanup(tsHttpServer.sessionCache);
if (pSession == NULL) continue; tsHttpServer.sessionCache = NULL;
httpResetSession(pSession);
} }
taosHashDestroyIter(pIter);
} }
bool httpInitAllSessions(HttpServer *pServer) { bool httpInitSessions() {
if (pServer->pSessionHash == NULL) { tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession);
pServer->pSessionHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); if (tsHttpServer.sessionCache == NULL) {
} httpError("failed to init session cache");
if (pServer->pSessionHash == NULL) {
httpError("http init session pool failed");
return false; return false;
} }
if (pServer->expireTimer == NULL) {
taosTmrReset(httpProcessSessionExpire, 50000, pServer, pServer->timerHandle, &pServer->expireTimer);
}
return true; return true;
} }
bool httpSessionExpired(HttpSession *pSession) {
time_t cur = taosGetTimestampSec();
if (pSession->taos != NULL) {
if (pSession->expire > cur) {
return false; // un-expired, so return false
}
if (pSession->access > 0) {
httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos,
pSession->access);
return false; // still used, so return false
}
httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d",
pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire);
}
return true;
}
void httpRemoveExpireSessions(HttpServer *pServer) {
SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash);
while (taosHashIterNext(pIter)) {
HttpSession *pSession = taosHashIterGet(pIter);
if (pSession == NULL) continue;
pthread_mutex_lock(&pServer->serverMutex);
if (httpSessionExpired(pSession)) {
httpResetSession(pSession);
taosHashRemove(pServer->pSessionHash, pSession->id, strlen(pSession->id));
}
pthread_mutex_unlock(&pServer->serverMutex);
}
taosHashDestroyIter(pIter);
}
void httpProcessSessionExpire(void *handle, void *tmrId) {
HttpServer *pServer = (HttpServer *)handle;
httpRemoveExpireSessions(pServer);
taosTmrReset(httpProcessSessionExpire, 60000, pServer, pServer->timerHandle, &pServer->expireTimer);
}

View File

@ -18,11 +18,12 @@
#include "tnote.h" #include "tnote.h"
#include "taos.h" #include "taos.h"
#include "tsclient.h" #include "tsclient.h"
#include "http.h" #include "httpInt.h"
#include "httpLog.h" #include "httpContext.h"
#include "httpCode.h" #include "httpSql.h"
#include "httpHandle.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpAuth.h"
#include "httpSession.h"
void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos); void *param, void **taos);
@ -30,7 +31,7 @@ void httpProcessMultiSql(HttpContext *pContext);
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return; if (pContext == NULL) return;
HttpSqlCmds * multiCmds = pContext->multiCmds; HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
@ -72,7 +73,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return; if (pContext == NULL) return;
HttpSqlCmds * multiCmds = pContext->multiCmds; HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
@ -172,7 +173,7 @@ void httpProcessMultiSql(HttpContext *pContext) {
} }
void httpProcessMultiSqlCmd(HttpContext *pContext) { void httpProcessMultiSqlCmd(HttpContext *pContext) {
if (pContext == NULL || pContext->signature != pContext) return; if (pContext == NULL) return;
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
if (multiCmds == NULL || multiCmds->size <= 0 || multiCmds->pos >= multiCmds->size || multiCmds->pos < 0) { if (multiCmds == NULL || multiCmds->size <= 0 || multiCmds->pos >= multiCmds->size || multiCmds->pos < 0) {
@ -192,7 +193,7 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return; if (pContext == NULL) return;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
@ -230,7 +231,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) { void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return; if (pContext == NULL) return;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
@ -354,7 +355,7 @@ void httpExecCmd(HttpContext *pContext) {
void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = param; HttpContext *pContext = param;
if (pContext == NULL || pContext->signature != pContext) return; if (pContext == NULL) return;
if (code < 0) { if (code < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr,
@ -383,16 +384,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
} }
void httpProcessRequest(HttpContext *pContext) { void httpProcessRequest(HttpContext *pContext) {
httpFetchSession(pContext); httpGetSession(pContext);
if (pContext->session == NULL || pContext->session != pContext->session->signature || if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) {
pContext->reqType == HTTP_REQTYPE_LOGIN) {
taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext, taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext,
&(pContext->taos)); &(pContext->taos));
httpTrace("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->taos); pContext->ipstr, pContext->user, pContext->taos);
} else { } else {
httpAccessSession(pContext);
httpExecCmd(pContext); httpExecCmd(pContext);
} }
} }

View File

@ -20,84 +20,64 @@
#include "tsocket.h" #include "tsocket.h"
#include "ttimer.h" #include "ttimer.h"
#include "tadmin.h" #include "tadmin.h"
#include "http.h" #include "httpInt.h"
#include "httpCode.h" #include "httpContext.h"
#include "httpHandle.h" #include "httpSession.h"
#include "httpServer.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpLog.h"
#include "gcHandle.h"
#include "httpHandle.h" #include "httpHandle.h"
#include "gcHandle.h"
#include "restHandle.h" #include "restHandle.h"
#include "tgHandle.h" #include "tgHandle.h"
#ifndef _ADMIN #ifndef _ADMIN
void adminInitHandle(HttpServer* pServer) {} void adminInitHandle(HttpServer* pServer) {}
void opInitHandle(HttpServer* pServer) {} void opInitHandle(HttpServer* pServer) {}
#endif #endif
static HttpServer *httpServer = NULL; HttpServer tsHttpServer;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
int httpInitSystem() { int httpInitSystem() {
// taos_init(); strcpy(tsHttpServer.label, "rest");
tsHttpServer.serverIp = 0;
tsHttpServer.serverPort = tsHttpPort;
tsHttpServer.numOfThreads = tsHttpMaxThreads;
tsHttpServer.processData = httpProcessData;
httpServer = (HttpServer *)malloc(sizeof(HttpServer)); pthread_mutex_init(&tsHttpServer.serverMutex, NULL);
memset(httpServer, 0, sizeof(HttpServer));
strcpy(httpServer->label, "rest");
httpServer->serverIp = 0;
httpServer->serverPort = tsHttpPort;
httpServer->cacheContext = tsHttpCacheSessions;
httpServer->sessionExpire = tsHttpSessionExpire;
httpServer->numOfThreads = tsHttpMaxThreads;
httpServer->processData = httpProcessData;
pthread_mutex_init(&httpServer->serverMutex, NULL);
if (tsHttpEnableRecordSql != 0) { if (tsHttpEnableRecordSql != 0) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"http_note"); taosInitNote(tsNumOfLogLines / 10, 1, (char*)"http_note");
} }
restInitHandle(httpServer); restInitHandle(&tsHttpServer);
adminInitHandle(httpServer); adminInitHandle(&tsHttpServer);
gcInitHandle(httpServer); gcInitHandle(&tsHttpServer);
tgInitHandle(httpServer); tgInitHandle(&tsHttpServer);
opInitHandle(httpServer); opInitHandle(&tsHttpServer);
return 0; return 0;
} }
int httpStartSystem() { int httpStartSystem() {
httpPrint("starting to initialize http service ..."); httpPrint("start http server ...");
if (httpServer == NULL) { if (tsHttpServer.status != HTTP_SERVER_INIT) {
httpError("http server is null"); httpError("http server is already started");
httpInitSystem();
}
if (httpServer->pContextPool == NULL) {
httpServer->pContextPool = taosMemPoolInit(httpServer->cacheContext, sizeof(HttpContext));
}
if (httpServer->pContextPool == NULL) {
httpError("http init context pool failed");
return -1; return -1;
} }
if (httpServer->timerHandle == NULL) { if (!httpInitContexts()) {
httpServer->timerHandle = taosTmrInit(tsHttpCacheSessions * 100 + 100, 200, 60000, "http"); httpError("http init contexts failed");
}
if (httpServer->timerHandle == NULL) {
httpError("http init timer failed");
return -1; return -1;
} }
if (!httpInitAllSessions(httpServer)) { if (!httpInitSessions()) {
httpError("http init session failed"); httpError("http init session failed");
return -1; return -1;
} }
if (!httpInitConnect(httpServer)) { if (!httpInitConnect()) {
httpError("http init server failed"); httpError("http init server failed");
return -1; return -1;
} }
@ -106,53 +86,23 @@ int httpStartSystem() {
} }
void httpStopSystem() { void httpStopSystem() {
if (httpServer != NULL) { tsHttpServer.status = HTTP_SERVER_CLOSING;
httpServer->online = false; shutdown(tsHttpServer.fd, SHUT_RD);
}
tgCleanupHandle(); tgCleanupHandle();
} }
void httpCleanUpSystem() { void httpCleanUpSystem() {
httpPrint("http service cleanup"); httpPrint("http server cleanup");
httpStopSystem(); httpStopSystem();
//#if 0 httpCleanupContexts();
if (httpServer == NULL) { httpCleanUpSessions();
return; httpCleanUpConnect();
} pthread_mutex_destroy(&tsHttpServer.serverMutex);
if (httpServer->expireTimer != NULL) { tsHttpServer.status = HTTP_SERVER_CLOSED;
taosTmrStopA(&(httpServer->expireTimer));
}
if (httpServer->timerHandle != NULL) {
taosTmrCleanUp(httpServer->timerHandle);
httpServer->timerHandle = NULL;
}
if (httpServer->pThreads != NULL) {
httpCleanUpConnect(httpServer);
httpServer->pThreads = NULL;
}
#if 0
httpRemoveAllSessions(httpServer);
if (httpServer->pContextPool != NULL) {
taosMemPoolCleanUp(httpServer->pContextPool);
httpServer->pContextPool = NULL;
}
pthread_mutex_destroy(&httpServer->serverMutex);
tfree(httpServer);
#endif
} }
int32_t httpGetReqCount() { int32_t httpGetReqCount() {
if (httpServer != NULL) { return atomic_exchange_32(&tsHttpServer.requestNum, 0);
return atomic_exchange_32(&httpServer->requestNum, 0);
}
return 0;
} }

View File

@ -17,11 +17,10 @@
#include "os.h" #include "os.h"
#include "tmd5.h" #include "tmd5.h"
#include "taos.h" #include "taos.h"
#include "http.h" #include "httpInt.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpSql.h"
#include "httpUtil.h"
bool httpCheckUsedbSql(char *sql) { bool httpCheckUsedbSql(char *sql) {
if (strstr(sql, "use ") != NULL) { if (strstr(sql, "use ") != NULL) {

View File

@ -18,9 +18,10 @@
#include "tglobal.h" #include "tglobal.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "httpInt.h"
#include "tgHandle.h" #include "tgHandle.h"
#include "tgJson.h" #include "tgJson.h"
#include "httpLog.h" #include "cJSON.h"
/* /*
* taos.telegraf.cfg formats like * taos.telegraf.cfg formats like

View File

@ -800,12 +800,13 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
break; break;
} }
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
int32_t newlen = 0;
if (!includeLengthPrefix) { if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) { if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*(uint32_t *)payload = TSDB_DATA_NCHAR_NULL; *(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
} else { } else {
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) { if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
toNchar(pVariant, &payload, &pVariant->nLen); toNchar(pVariant, &payload, &newlen);
} else { } else {
wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen); wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen);
} }
@ -817,12 +818,13 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
char *p = varDataVal(payload); char *p = varDataVal(payload);
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) { if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
toNchar(pVariant, &p, &pVariant->nLen); toNchar(pVariant, &p, &newlen);
} else { } else {
wcsncpy((wchar_t *)p, pVariant->wpz, pVariant->nLen); wcsncpy((wchar_t *)p, pVariant->wpz, pVariant->nLen);
newlen = pVariant->nLen;
} }
varDataSetLen(payload, pVariant->nLen); // the length may be changed after toNchar function called varDataSetLen(payload, newlen); // the length may be changed after toNchar function called
assert(p == varDataVal(payload)); assert(p == varDataVal(payload));
} }
} }

View File

@ -924,7 +924,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
// underlying UDP layer does not know it is server or client // underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType; pRecv->connType = pRecv->connType | pRpc->connType;
if (pRecv->ip == 0) { if (pRecv->msg == NULL) {
rpcProcessBrokenLink(pConn); rpcProcessBrokenLink(pConn);
return NULL; return NULL;
} }

View File

@ -56,7 +56,7 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts); int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts);
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec); int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce(); void deltaToUtcInitOnce();
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -63,10 +63,12 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
#endif #endif
} }
#if 0
static FORCE_INLINE void taosFreeNode(void *data) { static FORCE_INLINE void taosFreeNode(void *data) {
SCacheDataNode *pNode = *(SCacheDataNode **)data; SCacheDataNode *pNode = *(SCacheDataNode **)data;
free(pNode); free(pNode);
} }
#endif
/** /**
* @param key key of object for hash, usually a null-terminated string * @param key key of object for hash, usually a null-terminated string
@ -241,7 +243,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
} }
// set free cache node callback function for hash table // set free cache node callback function for hash table
taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); // taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
pCacheObj->freeFp = freeCb; pCacheObj->freeFp = freeCb;
pCacheObj->refreshTime = refreshTime * 1000; pCacheObj->refreshTime = refreshTime * 1000;
@ -565,7 +567,17 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
void doCleanupDataCache(SCacheObj *pCacheObj) { void doCleanupDataCache(SCacheObj *pCacheObj) {
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
taosHashCleanup(pCacheObj->pHashTable);
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
//}
}
taosHashDestroyIter(pIter);
taosHashCleanup(pCacheObj->pHashTable);
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
taosTrashCanEmpty(pCacheObj, true); taosTrashCanEmpty(pCacheObj, true);

View File

@ -24,7 +24,6 @@
#include "taosdef.h" #include "taosdef.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
/* /*
* mktime64 - Converts date to seconds. * mktime64 - Converts date to seconds.
* Converts Gregorian date to seconds since 1970-01-01 00:00:00. * Converts Gregorian date to seconds since 1970-01-01 00:00:00.
@ -119,15 +118,21 @@ static int month[12] = {
static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec);
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = {
parseLocaltime,
parseLocaltimeWithDst
};
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); } int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) { int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t daylight) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) { if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec); return parseTimeWithTz(timestr, time, timePrec);
} else { } else {
return parseLocaltime(timestr, time, timePrec); return (*parseLocaltimeFp[daylight])(timestr, time, timePrec);
} }
} }
@ -304,9 +309,6 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return -1; return -1;
} }
/* mktime will be affected by TZ, set by using taos_options */
//int64_t seconds = mktime(&tm);
//int64_t seconds = (int64_t)user_mktime(&tm);
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
int64_t fraction = 0; int64_t fraction = 0;
@ -324,6 +326,32 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return 0; return 0;
} }
int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) {
*time = 0;
struct tm tm = {0};
tm.tm_isdst = -1;
char* str = strptime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
if (str == NULL) {
return -1;
}
/* mktime will be affected by TZ, set by using taos_options */
int64_t seconds = mktime(&tm);
int64_t fraction = 0;
if (*str == '.') {
/* parse the second fraction part */
if ((fraction = parseFraction(str + 1, &str, timePrec)) < 0) {
return -1;
}
}
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : 1000000;
*time = factor * seconds + fraction;
return 0;
}
static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* result) { static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* result) {
*result = val; *result = val;

View File

@ -11,6 +11,8 @@
4. pip install ../src/connector/python/linux/python2 ; pip3 install 4. pip install ../src/connector/python/linux/python2 ; pip3 install
../src/connector/python/linux/python3 ../src/connector/python/linux/python3
5. pip install numpy; pip3 install numpy
> Note: Both Python2 and Python3 are currently supported by the Python test > Note: Both Python2 and Python3 are currently supported by the Python test
> framework. Since Python2 is no longer officially supported by Python Software > framework. Since Python2 is no longer officially supported by Python Software
> Foundation since January 1, 2020, it is recommended that subsequent test case > Foundation since January 1, 2020, it is recommended that subsequent test case

View File

@ -296,13 +296,13 @@ class TDTestCase:
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data23 != 1 then # TSIM: if $data23 != 1 then
tdLog.info('tdSql.checkData(2, 3, 1)') tdLog.info('tdSql.checkData(2, 3, TAG)')
tdSql.checkData(2, 3, 1) tdSql.checkData(2, 3, "TAG")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data33 != 2.000000 then # TSIM: if $data33 != 2.000000 then
tdLog.info('tdSql.checkData(3, 3, 2.000000)') tdLog.info('tdSql.checkData(3, 3, 2.000000)')
tdSql.checkData(3, 3, 2.000000) tdSql.checkData(3, 3, "TAG")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -396,7 +396,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 2 then # TSIM: if $data03 != 2 then
tdLog.info('tdSql.checkData(0, 3, 2)') tdLog.info('tdSql.checkData(0, 3, 2)')
tdSql.checkData(0, 3, 2) tdSql.checkData(0, 3, "2")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -553,12 +553,12 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 5 then # TSIM: if $data03 != 5 then
tdLog.info('tdSql.checkData(0, 3, 5)') tdLog.info('tdSql.checkData(0, 3, 5)')
tdSql.checkData(0, 3, 5) tdSql.checkData(0, 3, "5")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 6 then # TSIM: if $data04 != 6 then
tdLog.info('tdSql.checkData(0, 4, 6)') tdLog.info('tdSql.checkData(0, 4, 6)')
tdSql.checkData(0, 4, 6) tdSql.checkData(0, 4, "6")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -584,12 +584,12 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 5 then # TSIM: if $data03 != 5 then
tdLog.info('tdSql.checkData(0, 3, 5)') tdLog.info('tdSql.checkData(0, 3, 5)')
tdSql.checkData(0, 3, 5) tdSql.checkData(0, 3, "5")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 6 then # TSIM: if $data04 != 6 then
tdLog.info('tdSql.checkData(0, 4, 6)') tdLog.info('tdSql.checkData(0, 4, 6)')
tdSql.checkData(0, 4, 6) tdSql.checkData(0, 4, "6")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -654,7 +654,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 3 then # TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)') tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3) tdSql.checkData(0, 4, "3")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -779,7 +779,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 3 then # TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)') tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3) tdSql.checkData(0, 4, "3")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -838,7 +838,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 5 then # TSIM: if $data03 != 5 then
tdLog.info('tdSql.checkData(0, 3, 5)') tdLog.info('tdSql.checkData(0, 3, 5)')
tdSql.checkData(0, 3, 5) tdSql.checkData(0, 3, "5")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 1 then # TSIM: if $data04 != 1 then
@ -900,12 +900,12 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 2 then # TSIM: if $data03 != 2 then
tdLog.info('tdSql.checkData(0, 3, 2)') tdLog.info('tdSql.checkData(0, 3, 2)')
tdSql.checkData(0, 3, 2) tdSql.checkData(0, 3, "2")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 3 then # TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)') tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3) tdSql.checkData(0, 4, "3")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -1024,28 +1024,28 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data02 != 1 then # TSIM: if $data02 != 1 then
tdLog.info('tdSql.checkData(0, 2, 1)') tdLog.info('tdSql.checkData(0, 2, 1)')
tdSql.checkData(0, 2, 1) tdSql.checkData(0, 2, "1")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 2 then # TSIM: if $data03 != 2 then
tdLog.info('tdSql.checkData(0, 3, 2)') tdLog.info('tdSql.checkData(0, 3, 2)')
tdSql.checkData(0, 3, 2) tdSql.checkData(0, 3, "2")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 3 then # TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)') tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3) tdSql.checkData(0, 4, "3")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data05 != 4 then # TSIM: if $data05 != 4 then
tdLog.info('tdSql.checkData(0, 5, 4)') tdLog.info('tdSql.checkData(0, 5, 4)')
tdSql.checkData(0, 5, 4) tdSql.checkData(0, 5, "4")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
# TSIM: sql alter table $mt change tag tgcol1 tgcol4 -x step103 # TSIM: sql alter table $mt change tag tgcol1 tgcol4 -x step103
tdLog.info('alter table %s change tag tgcol1 tgcol4 -x step103' % (mt)) tdLog.info('alter table %s change tag tgcol1 tgcol4 -x step103' % (mt))
tdSql.error('alter table %s change tag tgcol1 tgcol403' % (mt)) tdSql.error('alter table %s change tag tgcol1 tgcol4' % (mt))
# TSIM: return -1 # TSIM: return -1
# TSIM: step103: # TSIM: step103:
# TSIM: # TSIM:
@ -1098,12 +1098,12 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data02 != 1 then # TSIM: if $data02 != 1 then
tdLog.info('tdSql.checkData(0, 2, 1)') tdLog.info('tdSql.checkData(0, 2, 1)')
tdSql.checkData(0, 2, 1) tdSql.checkData(0, 2, "1")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 4 then # TSIM: if $data03 != 4 then
tdLog.info('tdSql.checkData(0, 3, 4)') tdLog.info('tdSql.checkData(0, 3, 4)')
tdSql.checkData(0, 3, 4) tdSql.checkData(0, 3, "4")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 0 then # TSIM: if $data04 != 0 then
@ -1112,8 +1112,8 @@ class TDTestCase:
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data05 != NULL then # TSIM: if $data05 != NULL then
tdLog.info('tdSql.checkData(0, 5, NULL)') #tdLog.info('tdSql.checkData(0, 5, NULL)')
tdSql.checkData(0, 5, None) # tdSql.checkData(0, 5, None)
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -1189,13 +1189,13 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data06 != 5 then # TSIM: if $data06 != 5 then
tdLog.info('tdSql.checkData(0, 6, 5)') tdLog.info('tdSql.checkData(0, 6, 5)')
tdSql.checkData(0, 6, 5) tdSql.checkData(0, 6, "5")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
# TSIM: sql alter table $mt change tag tgcol1 tgcol4 -x step114 # TSIM: sql alter table $mt change tag tgcol1 tgcol4 -x step114
tdLog.info('alter table %s change tag tgcol1 tgcol4 -x step114' % (mt)) tdLog.info('alter table %s change tag tgcol1 tgcol4 -x step114' % (mt))
tdSql.error('alter table %s change tag tgcol1 tgcol414' % (mt)) tdSql.error('alter table %s change tag tgcol1 tgcol4' % (mt))
# TSIM: return -1 # TSIM: return -1
# TSIM: step114: # TSIM: step114:
# TSIM: # TSIM:
@ -1274,7 +1274,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 4 then # TSIM: if $data03 != 4 then
tdLog.info('tdSql.checkData(0, 3, 4)') tdLog.info('tdSql.checkData(0, 3, 4)')
tdSql.checkData(0, 3, 4) tdSql.checkData(0, 3, "4")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 5 then # TSIM: if $data04 != 5 then
@ -1284,7 +1284,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data05 != 6 then # TSIM: if $data05 != 6 then
tdLog.info('tdSql.checkData(0, 5, 6)') tdLog.info('tdSql.checkData(0, 5, 6)')
tdSql.checkData(0, 5, 6) tdSql.checkData(0, 5, "6")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data06 != 7 then # TSIM: if $data06 != 7 then
@ -1376,12 +1376,12 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data06 != 5 then # TSIM: if $data06 != 5 then
tdLog.info('tdSql.checkData(0, 6, 5)') tdLog.info('tdSql.checkData(0, 6, 5)')
tdSql.checkData(0, 6, 5) tdSql.checkData(0, 6, "5")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data07 != 6 then # TSIM: if $data07 != 6 then
tdLog.info('tdSql.checkData(0, 7, 6)') tdLog.info('tdSql.checkData(0, 7, 6)')
tdSql.checkData(0, 7, 6) tdSql.checkData(0, 7, "6")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -1460,12 +1460,12 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 1 then # TSIM: if $data03 != 1 then
tdLog.info('tdSql.checkData(0, 3, 1)') tdLog.info('tdSql.checkData(0, 3, 1)')
tdSql.checkData(0, 3, 1) tdSql.checkData(0, 3, "1")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data04 != 5 then # TSIM: if $data04 != 5 then
tdLog.info('tdSql.checkData(0, 4, 5)') tdLog.info('tdSql.checkData(0, 4, 5)')
tdSql.checkData(0, 4, 5) tdSql.checkData(0, 4, "5")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data05 != 4 then # TSIM: if $data05 != 4 then
@ -1475,7 +1475,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data06 != 3 then # TSIM: if $data06 != 3 then
tdLog.info('tdSql.checkData(0, 6, 3)') tdLog.info('tdSql.checkData(0, 6, 3)')
tdSql.checkData(0, 6, 3) tdSql.checkData(0, 6, "3")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data07 != 2 then # TSIM: if $data07 != 2 then
@ -1562,7 +1562,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data02 != 1 then # TSIM: if $data02 != 1 then
tdLog.info('tdSql.checkData(0, 2, 1)') tdLog.info('tdSql.checkData(0, 2, 1)')
tdSql.checkData(0, 2, 1) tdSql.checkData(0, 2, "1")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 2 then # TSIM: if $data03 != 2 then
@ -1577,7 +1577,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data05 != 4 then # TSIM: if $data05 != 4 then
tdLog.info('tdSql.checkData(0, 5, 4)') tdLog.info('tdSql.checkData(0, 5, 4)')
tdSql.checkData(0, 5, 4) tdSql.checkData(0, 5, "4")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data06 != 5.000000000 then # TSIM: if $data06 != 5.000000000 then
@ -1587,7 +1587,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data07 != 6 then # TSIM: if $data07 != 6 then
tdLog.info('tdSql.checkData(0, 7, 6)') tdLog.info('tdSql.checkData(0, 7, 6)')
tdSql.checkData(0, 7, 6) tdSql.checkData(0, 7, "6")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: # TSIM:
@ -1655,7 +1655,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data02 != 7 then # TSIM: if $data02 != 7 then
tdLog.info('tdSql.checkData(0, 2, 7)') tdLog.info('tdSql.checkData(0, 2, 7)')
tdSql.checkData(0, 2, 7) tdSql.checkData(0, 2, "7")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data03 != 9 then # TSIM: if $data03 != 9 then
@ -1670,7 +1670,7 @@ class TDTestCase:
# TSIM: endi # TSIM: endi
# TSIM: if $data05 != 8 then # TSIM: if $data05 != 8 then
tdLog.info('tdSql.checkData(0, 5, 8)') tdLog.info('tdSql.checkData(0, 5, 8)')
tdSql.checkData(0, 5, 8) tdSql.checkData(0, 5, "8")
# TSIM: return -1 # TSIM: return -1
# TSIM: endi # TSIM: endi
# TSIM: if $data06 != 10 then # TSIM: if $data06 != 10 then
@ -1744,8 +1744,8 @@ class TDTestCase:
# TSIM: print =============== clear # TSIM: print =============== clear
tdLog.info('=============== clear') tdLog.info('=============== clear')
# TSIM: sql drop database $db # TSIM: sql drop database $db
tdLog.info('sql drop database $db') tdLog.info('sql drop database db')
tdSql.execute('sql drop database $db') tdSql.execute('drop database db')
# TSIM: sql show databases # TSIM: sql show databases
tdLog.info('show databases') tdLog.info('show databases')
tdSql.query('show databases') tdSql.query('show databases')

View File

@ -49,7 +49,7 @@ if $rows != 0 then
endi endi
print =============== step4 print =============== step4
sql create database a012345678901201234567890120123456789012 -x step4 sql create database a012345678901201234567890120123456789012a012345678901201234567890120123456789012 -x step4
return -1 return -1
step4: step4:
sql show databases sql show databases

View File

@ -38,7 +38,7 @@ endi
if $data02 != 1 then if $data02 != 1 then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi
sql alter table tb add column c3 nchar(4) sql alter table tb add column c3 nchar(4)
@ -72,7 +72,7 @@ endi
if $data02 != 1 then if $data02 != 1 then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi
@ -169,7 +169,7 @@ endi
if $data01 != 2 then if $data01 != 2 then
return -1 return -1
endi endi
if $data02 != NULL then if $data02 != null then
return -1 return -1
endi endi
sql alter table mt add column c2 int sql alter table mt add column c2 int
@ -239,4 +239,4 @@ if $rows != 0 then
return -1 return -1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -37,7 +37,7 @@ sleep 4000
sql select * from mt sql select * from mt
sql select * from strm sql select * from strm
sql drop table tb1 sql drop table tb1
sleep 10000 sleep 100000
sql select * from strm sql select * from strm
if $rows != 2 then if $rows != 2 then
if $rows != 1 then if $rows != 1 then
@ -221,4 +221,4 @@ sql use $db
sql create table stb (ts timestamp, c1 int) tags(t1 int) sql create table stb (ts timestamp, c1 int) tags(t1 int)
sql create table tb1 using stb tags(1) sql create table tb1 using stb tags(1)
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -322,3 +322,39 @@ cd ../../../debug; make
./test.sh -f unique/vnode/replica3_basic.sim ./test.sh -f unique/vnode/replica3_basic.sim
./test.sh -f unique/vnode/replica3_repeat.sim ./test.sh -f unique/vnode/replica3_repeat.sim
./test.sh -f unique/vnode/replica3_vgroup.sim ./test.sh -f unique/vnode/replica3_vgroup.sim
./test.sh -f unique/arbitrator/dn2_mn1_cache_file_sync.sim
#./test.sh -f unique/arbitrator/dn2_mn1_cache_file_sync_second.sim
./test.sh -f unique/arbitrator/dn3_mn1_full_createTableFail.sim
./test.sh -f unique/arbitrator/dn3_mn1_full_dropDnodeFail.sim
./test.sh -f unique/arbitrator/dn3_mn1_multiCreateDropTable.sim
./test.sh -f unique/arbitrator/dn3_mn1_nw_disable_timeout_autoDropDnode.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica2_wal1_AddDelDnode.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_nomaster.sim
./test.sh -f unique/arbitrator/dn3_mn2_killDnode.sim
./test.sh -f unique/arbitrator/insert_duplicationTs.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica2_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica3_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropTable_online.sim
./test.sh -f unique/arbitrator/replica_changeWithArbitrator.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim