Merge branch 'develop' into feature/mergeimport
This commit is contained in:
commit
99c6d1ec04
|
@ -125,7 +125,9 @@ IF (NOT DEFINED TD_CLUSTER)
|
|||
# debug flag
|
||||
#
|
||||
# ADD_DEFINITIONS(-D_CHECK_HEADER_FILE_)
|
||||
# ADD_DEFINITIONS(-D_TAOS_MEM_TEST_)
|
||||
IF (${MEM_CHECK} MATCHES "true")
|
||||
ADD_DEFINITIONS(-DTAOS_MEM_CHECK)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_CLUSTER)
|
||||
ADD_DEFINITIONS(-DCLUSTER)
|
||||
|
|
|
@ -58,6 +58,12 @@
|
|||
# The server and client should have the same socket type. Otherwise, connect will fail.
|
||||
# sockettype udp
|
||||
|
||||
# The compressed rpc message, option:
|
||||
# -1 (no compression)
|
||||
# 0 (all message compressed),
|
||||
# > 0 (rpc message body which larger than this value will be compressed)
|
||||
# compressMsgSize -1
|
||||
|
||||
# RPC re-try timer, millisecond
|
||||
# rpcTimer 300
|
||||
|
||||
|
|
|
@ -105,6 +105,7 @@ enum TSQL_TYPE {
|
|||
SHOW_MODULES = 0x6c,
|
||||
SHOW_CONNECTIONS = 0x6d,
|
||||
SHOW_GRANTS = 0x6e,
|
||||
SHOW_VNODES = 0x6f,
|
||||
|
||||
// create dnode
|
||||
CREATE_DNODE = 0x80,
|
||||
|
|
|
@ -34,8 +34,8 @@ extern "C" {
|
|||
#include "tglobalcfg.h"
|
||||
#include "tlog.h"
|
||||
#include "tscCache.h"
|
||||
#include "tsdb.h"
|
||||
#include "tscSQLParser.h"
|
||||
#include "tsdb.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "tutil.h"
|
||||
|
||||
|
@ -219,22 +219,22 @@ typedef struct STagCond {
|
|||
} STagCond;
|
||||
|
||||
typedef struct SParamInfo {
|
||||
int32_t idx;
|
||||
char type;
|
||||
uint8_t timePrec;
|
||||
short bytes;
|
||||
int32_t idx;
|
||||
char type;
|
||||
uint8_t timePrec;
|
||||
short bytes;
|
||||
uint32_t offset;
|
||||
} SParamInfo;
|
||||
|
||||
typedef struct STableDataBlocks {
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
int8_t tsSource;
|
||||
bool ordered;
|
||||
bool ordered;
|
||||
|
||||
int64_t vgid;
|
||||
int64_t prevTS;
|
||||
|
||||
int32_t numOfMeters;
|
||||
int32_t numOfMeters;
|
||||
|
||||
int32_t rowSize;
|
||||
uint32_t nAllocSize;
|
||||
|
@ -245,9 +245,9 @@ typedef struct STableDataBlocks {
|
|||
};
|
||||
|
||||
// for parameter ('?') binding
|
||||
uint32_t numOfAllocedParams;
|
||||
uint32_t numOfParams;
|
||||
SParamInfo* params;
|
||||
uint32_t numOfAllocedParams;
|
||||
uint32_t numOfParams;
|
||||
SParamInfo *params;
|
||||
} STableDataBlocks;
|
||||
|
||||
typedef struct SDataBlockList {
|
||||
|
@ -262,18 +262,17 @@ typedef struct SDataBlockList {
|
|||
typedef struct {
|
||||
SOrderVal order;
|
||||
int command;
|
||||
|
||||
// TODO refactor
|
||||
int count;
|
||||
int16_t isInsertFromFile; // load data from file or not
|
||||
int count;// TODO refactor
|
||||
|
||||
union {
|
||||
bool existsCheck;
|
||||
int8_t showType;
|
||||
bool existsCheck; // check if the table exists
|
||||
int8_t showType; // show command type
|
||||
};
|
||||
|
||||
|
||||
int8_t isInsertFromFile; // load data from file or not
|
||||
bool import; // import/insert type
|
||||
char msgType;
|
||||
uint16_t type;
|
||||
uint16_t type; // query type
|
||||
char intervalTimeUnit;
|
||||
int64_t etime, stime;
|
||||
int64_t nAggTimeInterval; // aggregation time interval
|
||||
|
@ -286,20 +285,20 @@ typedef struct {
|
|||
*
|
||||
* In such cases, allocate the memory dynamically, and need to free the memory
|
||||
*/
|
||||
uint32_t allocSize;
|
||||
char * payload;
|
||||
int payloadLen;
|
||||
short numOfCols;
|
||||
uint32_t allocSize;
|
||||
char * payload;
|
||||
int payloadLen;
|
||||
short numOfCols;
|
||||
SColumnBaseInfo colList;
|
||||
SFieldInfo fieldsInfo;
|
||||
SSqlExprInfo exprsInfo;
|
||||
SLimitVal limit;
|
||||
SLimitVal slimit;
|
||||
int64_t globalLimit;
|
||||
STagCond tagCond;
|
||||
int16_t vnodeIdx; // vnode index in pMetricMeta for metric query
|
||||
int16_t interpoType; // interpolate type
|
||||
int16_t numOfTables;
|
||||
SFieldInfo fieldsInfo;
|
||||
SSqlExprInfo exprsInfo;
|
||||
SLimitVal limit;
|
||||
SLimitVal slimit;
|
||||
int64_t globalLimit;
|
||||
STagCond tagCond;
|
||||
int16_t vnodeIdx; // vnode index in pMetricMeta for metric query
|
||||
int16_t interpoType; // interpolate type
|
||||
int16_t numOfTables;
|
||||
|
||||
// submit data blocks branched according to vnode
|
||||
SDataBlockList * pDataBlocks;
|
||||
|
@ -430,11 +429,11 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion);
|
|||
|
||||
void tscInitMsgs();
|
||||
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle);
|
||||
int tscProcessSql(SSqlObj *pSql);
|
||||
int tscProcessSql(SSqlObj *pSql);
|
||||
|
||||
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
||||
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId);
|
||||
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId);
|
||||
void tscQueueAsyncRes(SSqlObj *pSql);
|
||||
|
||||
void tscQueueAsyncError(void(*fp), void *param);
|
||||
|
@ -448,18 +447,12 @@ int taos_retrieve(TAOS_RES *res);
|
|||
* before send query message to vnode
|
||||
*/
|
||||
int32_t tscTansformSQLFunctionForMetricQuery(SSqlCmd *pCmd);
|
||||
void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd);
|
||||
|
||||
/**
|
||||
* release both metric/meter meta information
|
||||
* @param pCmd SSqlCmd object that contains the metric/meter meta info
|
||||
*/
|
||||
void tscClearSqlMetaInfo(SSqlCmd *pCmd);
|
||||
void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd);
|
||||
|
||||
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
|
||||
|
||||
int32_t tscCreateResPointerInfo(SSqlCmd *pCmd, SSqlRes *pRes);
|
||||
void tscDestroyResPointerInfo(SSqlRes *pRes);
|
||||
void tscDestroyResPointerInfo(SSqlRes *pRes);
|
||||
|
||||
void tscFreeSqlCmdData(SSqlCmd *pCmd);
|
||||
|
||||
|
@ -479,12 +472,12 @@ void tscFreeSqlObj(SSqlObj *pObj);
|
|||
|
||||
void tscCloseTscObj(STscObj *pObj);
|
||||
|
||||
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
|
||||
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
|
||||
void tscKillMetricQuery(SSqlObj *pSql);
|
||||
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
|
||||
int32_t tscBuildResultsForEmptyRetrieval(SSqlObj *pSql);
|
||||
bool tscIsUpdateQuery(STscObj *pObj);
|
||||
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
|
||||
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
|
||||
void tscKillMetricQuery(SSqlObj *pSql);
|
||||
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
|
||||
bool tscIsUpdateQuery(STscObj *pObj);
|
||||
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
|
||||
|
||||
// transfer SSqlInfo to SqlCmd struct
|
||||
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
|
||||
|
|
|
@ -9,6 +9,22 @@ extern "C" {
|
|||
#endif
|
||||
#undef com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE
|
||||
#define com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE 0LL
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||
* Method:
|
||||
* Signature: (Ljava/lang/String;)V
|
||||
*/
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp
|
||||
(JNIEnv *, jclass, jint, jstring, jboolean);
|
||||
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||
* Method:
|
||||
* Signature: ()Ljava/lang/String;
|
||||
*/
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_dumpMemoryLeakImp
|
||||
(JNIEnv *, jclass);
|
||||
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||
* Method: initImp
|
||||
|
|
|
@ -111,6 +111,20 @@ void jniGetGlobalMethod(JNIEnv *env) {
|
|||
jniTrace("native method register finished");
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode, jstring jPath, jboolean jAutoDump) {
|
||||
if (jPath != NULL) {
|
||||
const char *path = (*env)->GetStringUTFChars(env, jPath, NULL);
|
||||
taosSetAllocMode(jMode, path, !!jAutoDump);
|
||||
(*env)->ReleaseStringUTFChars(env, jPath, path);
|
||||
} else {
|
||||
taosSetAllocMode(jMode, NULL, !!jAutoDump);
|
||||
}
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_dumpMemoryLeakImp(JNIEnv *env, jobject jobj) {
|
||||
taosDumpMemoryLeak();
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_initImp(JNIEnv *env, jobject jobj, jstring jconfigDir) {
|
||||
if (jconfigDir != NULL) {
|
||||
const char *confDir = (*env)->GetStringUTFChars(env, jconfigDir, NULL);
|
||||
|
|
2897
src/client/src/sql.c
2897
src/client/src/sql.c
File diff suppressed because it is too large
Load Diff
|
@ -40,6 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
|||
*/
|
||||
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
||||
// TODO return the correct error code to client in tscQueueAsyncError
|
||||
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
|
@ -54,18 +55,17 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
|
|||
tscError("sql string too long");
|
||||
tscQueueAsyncError(fp, param);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
taosNotePrintTsc(sqlstr);
|
||||
|
||||
SSqlObj *pSql = (SSqlObj *)malloc(sizeof(SSqlObj));
|
||||
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
|
||||
if (pSql == NULL) {
|
||||
tscError("failed to malloc sqlObj");
|
||||
tscQueueAsyncError(fp, param);
|
||||
return;
|
||||
}
|
||||
|
||||
memset(pSql, 0, sizeof(SSqlObj));
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
|
|
|
@ -34,18 +34,11 @@
|
|||
#include "tstoken.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define INVALID_SQL_RET_MSG(p, ...) \
|
||||
do { \
|
||||
sprintf(p, __VA_ARGS__); \
|
||||
return TSDB_CODE_INVALID_SQL; \
|
||||
} while (0)
|
||||
|
||||
enum {
|
||||
TSDB_USE_SERVER_TS = 0,
|
||||
TSDB_USE_CLI_TS = 1,
|
||||
};
|
||||
|
||||
static void setErrMsg(char *msg, const char *sql);
|
||||
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize);
|
||||
|
||||
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
|
||||
|
@ -63,6 +56,7 @@ static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
|
|||
radix = 2;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
*value = strtoll(pToken->z, endPtr, radix);
|
||||
|
||||
return numType;
|
||||
|
@ -73,6 +67,8 @@ static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
|
|||
if (TK_ILLEGAL == numType) {
|
||||
return numType;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
*value = strtod(pToken->z, endPtr);
|
||||
return numType;
|
||||
}
|
||||
|
@ -97,7 +93,7 @@ int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int1
|
|||
} else {
|
||||
// strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
|
||||
if (taosParseTime(pToken->z, time, pToken->n, timePrec) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -122,18 +118,21 @@ int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int1
|
|||
index = 0;
|
||||
sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
|
||||
pTokenEnd += index;
|
||||
|
||||
if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
|
||||
|
||||
index = 0;
|
||||
valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
|
||||
pTokenEnd += index;
|
||||
|
||||
if (valueToken.n < 2) {
|
||||
strcpy(error, "value is expected");
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z);
|
||||
}
|
||||
|
||||
if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (timePrec == TSDB_TIME_PRECISION_MILLI) {
|
||||
interval /= 1000;
|
||||
}
|
||||
|
@ -156,7 +155,6 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
int64_t iv;
|
||||
int32_t numType;
|
||||
char * endptr = NULL;
|
||||
errno = 0; // reset global error code
|
||||
|
||||
switch (pSchema->type) {
|
||||
case TSDB_DATA_TYPE_BOOL: { // bool
|
||||
|
@ -168,7 +166,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
|
||||
*(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
|
||||
} else {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
} else if (pToken->type == TK_INTEGER) {
|
||||
iv = strtoll(pToken->z, NULL, 10);
|
||||
|
@ -179,7 +177,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else if (pToken->type == TK_NULL) {
|
||||
*(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
|
||||
} else {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -192,12 +190,12 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
numType = tscToInteger(pToken, &iv, &endptr);
|
||||
if (TK_ILLEGAL == numType) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
|
||||
} else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is overflow");
|
||||
return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
*((int8_t *)payload) = (int8_t)iv;
|
||||
*((int8_t *)payload) = (int8_t) iv;
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -211,9 +209,9 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
numType = tscToInteger(pToken, &iv, &endptr);
|
||||
if (TK_ILLEGAL == numType) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
|
||||
} else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is overflow");
|
||||
return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
*((int16_t *)payload) = (int16_t)iv;
|
||||
|
@ -229,9 +227,9 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
numType = tscToInteger(pToken, &iv, &endptr);
|
||||
if (TK_ILLEGAL == numType) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
|
||||
} else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is overflow");
|
||||
return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
|
||||
}
|
||||
|
||||
*((int32_t *)payload) = (int32_t)iv;
|
||||
|
@ -248,9 +246,9 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
numType = tscToInteger(pToken, &iv, &endptr);
|
||||
if (TK_ILLEGAL == numType) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
|
||||
} else if (errno == ERANGE || iv > INT64_MAX || iv <= INT64_MIN) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is overflow");
|
||||
return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
*((int64_t *)payload) = iv;
|
||||
|
@ -266,12 +264,12 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
|
||||
}
|
||||
|
||||
float fv = (float)dv;
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
|
||||
}
|
||||
|
||||
if (isinf(fv) || isnan(fv)) {
|
||||
|
@ -291,11 +289,11 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
|
||||
INVALID_SQL_RET_MSG(msg, "data is illegal");
|
||||
return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
if (isinf(dv) || isnan(dv)) {
|
||||
|
@ -310,11 +308,11 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
|
||||
if (pToken->type == TK_NULL) {
|
||||
*payload = TSDB_DATA_BINARY_NULL;
|
||||
} else {
|
||||
// too long values will return invalid sql, not be truncated automatically
|
||||
} else { // too long values will return invalid sql, not be truncated automatically
|
||||
if (pToken->n > pSchema->bytes) {
|
||||
INVALID_SQL_RET_MSG(msg, "value too long");
|
||||
return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
|
||||
}
|
||||
|
||||
strncpy(payload, pToken->z, pToken->n);
|
||||
}
|
||||
|
||||
|
@ -326,8 +324,10 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
// if the converted output len is over than pSchema->bytes, return error: 'Argument list too long'
|
||||
if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) {
|
||||
sprintf(msg, "%s", strerror(errno));
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, 512, "%s", strerror(errno));
|
||||
|
||||
return tscInvalidSQLErrMsg(msg, buf, pToken->z);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -342,8 +342,9 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
} else {
|
||||
int64_t temp;
|
||||
if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
|
||||
}
|
||||
|
||||
*((int64_t *)payload) = temp;
|
||||
}
|
||||
|
||||
|
@ -351,18 +352,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
|
|||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo merge the error msg function with tSQLParser
|
||||
static void setErrMsg(char *msg, const char *sql) {
|
||||
const char * msgFormat = "near \"%s\" syntax error";
|
||||
const int32_t BACKWARD_CHAR_STEP = 15;
|
||||
|
||||
// only extract part of sql string,avoid too long sql string cause stack over flow
|
||||
char buf[64] = {0};
|
||||
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
|
||||
sprintf(msg, msgFormat, buf);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -385,7 +375,8 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
|
|||
}
|
||||
} else {
|
||||
if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
|
||||
return -1;
|
||||
return -1; // client time/server time can not be mixed
|
||||
|
||||
} else if (pDataBlocks->tsSource == -1) {
|
||||
pDataBlocks->tsSource = TSDB_USE_CLI_TS;
|
||||
}
|
||||
|
@ -403,7 +394,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
|
|||
int16_t timePrec) {
|
||||
int32_t index = 0;
|
||||
bool isPrevOptr;
|
||||
SSQLToken sToken;
|
||||
SSQLToken sToken = {0};
|
||||
char * payload = pDataBlocks->pData + pDataBlocks->size;
|
||||
|
||||
// 1. set the parsed value from sql string
|
||||
|
@ -424,6 +415,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
|
|||
if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
strcpy(error, "client out of memory");
|
||||
return -1;
|
||||
}
|
||||
|
@ -431,7 +423,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
|
|||
if (((sToken.type != TK_NOW) && (sToken.type != TK_INTEGER) && (sToken.type != TK_STRING) &&
|
||||
(sToken.type != TK_FLOAT) && (sToken.type != TK_BOOL) && (sToken.type != TK_NULL)) ||
|
||||
(sToken.n == 0) || (sToken.type == TK_RP)) {
|
||||
setErrMsg(error, *str);
|
||||
tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -448,6 +440,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
|
|||
}
|
||||
|
||||
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
|
||||
tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -457,8 +450,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
|
|||
char *ptr = payload;
|
||||
|
||||
for (int32_t i = 0; i < spd->numOfCols; ++i) {
|
||||
if (!spd->hasVal[i]) {
|
||||
// current column do not have any value to insert, set it to null
|
||||
if (!spd->hasVal[i]) { // current column do not have any value to insert, set it to null
|
||||
setNull(ptr, schema[i].type, schema[i].bytes);
|
||||
}
|
||||
|
||||
|
@ -513,8 +505,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe
|
|||
}
|
||||
|
||||
int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision);
|
||||
if (len <= 0) {
|
||||
setErrMsg(error, *str);
|
||||
if (len <= 0) { // error message has been set in tsParseOneRowData
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -524,7 +515,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe
|
|||
sToken = tStrGetToken(*str, &index, false, 0, NULL);
|
||||
*str += index;
|
||||
if (sToken.n == 0 || sToken.type != TK_RP) {
|
||||
setErrMsg(error, *str);
|
||||
tscInvalidSQLErrMsg(error, ") expected", *str);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -719,8 +710,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
|
|||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (sToken.type == TK_USING) {
|
||||
// create table if not exists
|
||||
if (sToken.type == TK_USING) { // create table if not exists
|
||||
index = 0;
|
||||
sToken = tStrGetToken(sql, &index, false, 0, NULL);
|
||||
sql += index;
|
||||
|
@ -736,8 +726,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
if (!UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
|
||||
strcpy(pCmd->payload, "create table only from super table is allowed");
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
|
||||
}
|
||||
|
||||
char * tagVal = pTag->data;
|
||||
|
@ -747,8 +736,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
|
|||
sToken = tStrGetToken(sql, &index, false, 0, NULL);
|
||||
sql += index;
|
||||
if (sToken.type != TK_TAGS) {
|
||||
setErrMsg(pCmd->payload, sql);
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sql);
|
||||
}
|
||||
|
||||
int32_t numOfTagValues = 0;
|
||||
|
@ -773,28 +761,23 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
|
|||
code = tsParseOneColumnData(&pTagSchema[numOfTagValues], &sToken, tagVal, pCmd->payload, &sql, false,
|
||||
pMeterMetaInfo->pMeterMeta->precision);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
setErrMsg(pCmd->payload, sql);
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return code;
|
||||
}
|
||||
|
||||
if ((pTagSchema[numOfTagValues].type == TSDB_DATA_TYPE_BINARY ||
|
||||
pTagSchema[numOfTagValues].type == TSDB_DATA_TYPE_NCHAR) &&
|
||||
sToken.n > pTagSchema[numOfTagValues].bytes) {
|
||||
strcpy(pCmd->payload, "tag value too long");
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
pTagSchema[numOfTagValues].type == TSDB_DATA_TYPE_NCHAR) && sToken.n > pTagSchema[numOfTagValues].bytes) {
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z);
|
||||
}
|
||||
|
||||
tagVal += pTagSchema[numOfTagValues++].bytes;
|
||||
}
|
||||
|
||||
if (numOfTagValues != pMeterMetaInfo->pMeterMeta->numOfTags) {
|
||||
setErrMsg(pCmd->payload, sql);
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "number of tags mismatch", sql);
|
||||
}
|
||||
|
||||
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
|
||||
setErrMsg(pCmd->payload, sql);
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", sql);
|
||||
}
|
||||
|
||||
int32_t ret = setMeterID(pSql, &tableToken, 0);
|
||||
|
@ -844,25 +827,19 @@ int validateTableName(char *tblName, int len) {
|
|||
* @param pSql
|
||||
* @return
|
||||
*/
|
||||
int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
||||
int doParserInsertSql(SSqlObj *pSql, char *str) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
pCmd->command = TSDB_SQL_INSERT;
|
||||
pCmd->isInsertFromFile = -1;
|
||||
pCmd->count = 0;
|
||||
|
||||
pSql->res.numOfRows = 0;
|
||||
|
||||
int32_t code = TSDB_CODE_INVALID_SQL;
|
||||
int32_t totalNum = 0;
|
||||
|
||||
int code = TSDB_CODE_INVALID_SQL;
|
||||
|
||||
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd);
|
||||
|
||||
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
void *pTableHashList = taosInitIntHash(128, sizeof(void *), taosHashInt);
|
||||
void *pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt);
|
||||
|
||||
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
|
||||
tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks);
|
||||
|
@ -885,11 +862,11 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
|
||||
// Check if the table name available or not
|
||||
if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "table name is invalid");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
//TODO refactor
|
||||
if ((code = setMeterID(pSql, &sToken, 0)) != TSDB_CODE_SUCCESS) {
|
||||
goto _error_clean;
|
||||
}
|
||||
|
@ -909,8 +886,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
}
|
||||
|
||||
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "insert data into metric is not supported");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
|
@ -918,8 +894,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
sToken = tStrGetToken(str, &index, false, 0, NULL);
|
||||
str += index;
|
||||
if (sToken.n == 0) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "keyword VALUES or FILE are required");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
|
@ -933,8 +908,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
pCmd->isInsertFromFile = 0;
|
||||
} else {
|
||||
if (pCmd->isInsertFromFile == 1) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
}
|
||||
|
@ -953,8 +927,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
pCmd->isInsertFromFile = 1;
|
||||
} else {
|
||||
if (pCmd->isInsertFromFile == 0) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
}
|
||||
|
@ -963,8 +936,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
sToken = tStrGetToken(str, &index, false, 0, NULL);
|
||||
str += index;
|
||||
if (sToken.n == 0) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "file path is required following keyword FILE");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
|
@ -974,8 +946,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
|
||||
wordexp_t full_path;
|
||||
if (wordexp(fname, &full_path, 0) != 0) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "invalid filename");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
strcpy(fname, full_path.we_wordv[0]);
|
||||
|
@ -994,8 +965,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
if (pCmd->isInsertFromFile == -1) {
|
||||
pCmd->isInsertFromFile = 0;
|
||||
} else if (pCmd->isInsertFromFile == 1) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
|
@ -1032,8 +1002,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
pElem->colIndex = t;
|
||||
|
||||
if (spd.hasVal[t] == true) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "duplicated column name");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
|
@ -1044,15 +1013,13 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
}
|
||||
|
||||
if (!findColumnIndex) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "invalid column name");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
}
|
||||
|
||||
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > pMeterMeta->numOfColumns) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "column name expected");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
|
@ -1061,8 +1028,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
str += index;
|
||||
|
||||
if (sToken.type != TK_VALUES) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "keyword VALUES is expected");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
|
@ -1071,8 +1037,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
|
|||
goto _error_clean;
|
||||
}
|
||||
} else {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
sprintf(pCmd->payload, "keyword VALUES or FILE are required");
|
||||
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
|
||||
goto _error_clean;
|
||||
}
|
||||
}
|
||||
|
@ -1116,29 +1081,25 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
|
|||
return TSDB_CODE_NO_RIGHTS;
|
||||
}
|
||||
|
||||
int32_t index = 0;
|
||||
int32_t index = 0;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SSQLToken sToken = tStrGetToken(sql, &index, false, 0, NULL);
|
||||
if (sToken.type == TK_IMPORT) {
|
||||
pCmd->order.order = TSQL_SO_ASC;
|
||||
} else if (sToken.type != TK_INSERT) {
|
||||
if (sToken.n) {
|
||||
sToken.z[sToken.n] = 0;
|
||||
sprintf(pCmd->payload, "invalid keyword:%s", sToken.z);
|
||||
} else {
|
||||
strcpy(pCmd->payload, "no any keywords");
|
||||
}
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
||||
assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
|
||||
pCmd->import = (sToken.type == TK_IMPORT);
|
||||
|
||||
sToken = tStrGetToken(sql, &index, false, 0, NULL);
|
||||
if (sToken.type != TK_INTO) {
|
||||
strcpy(pCmd->payload, "keyword INTO is expected");
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
|
||||
}
|
||||
|
||||
return tsParseInsertStatement(pSql, sql + index, acct, db);
|
||||
|
||||
pCmd->count = 0;
|
||||
pCmd->command = TSDB_SQL_INSERT;
|
||||
pCmd->isInsertFromFile = -1;
|
||||
pSql->res.numOfRows = 0;
|
||||
|
||||
return doParserInsertSql(pSql, sql + index);
|
||||
}
|
||||
|
||||
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
|
||||
|
@ -1259,6 +1220,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp) {
|
|||
pSql->res.code = TSDB_CODE_INVALID_SQL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTableDataBlock->size += len;
|
||||
|
||||
count++;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -746,7 +746,7 @@ void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken
|
|||
}
|
||||
|
||||
void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
|
||||
pDBInfo->numOfBlocksPerTable = -1;
|
||||
pDBInfo->numOfBlocksPerTable = 50;
|
||||
pDBInfo->compressionLevel = -1;
|
||||
|
||||
pDBInfo->commitLog = -1;
|
||||
|
|
|
@ -123,6 +123,7 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) {
|
|||
return memcmp(p1, p2, size) == 0;
|
||||
}
|
||||
|
||||
//todo refactor
|
||||
static FORCE_INLINE char* skipSegments(char* input, char delimiter, int32_t num) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
while (*input != 0 && *input++ != delimiter) {
|
||||
|
|
|
@ -1415,7 +1415,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
|
|||
pMsg = pStart;
|
||||
|
||||
pShellMsg = (SShellSubmitMsg *)pMsg;
|
||||
pShellMsg->import = pSql->cmd.order.order;
|
||||
pShellMsg->import = pSql->cmd.import;
|
||||
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
|
||||
pShellMsg->numOfSid = htonl(pSql->cmd.count); // number of meters to be inserted
|
||||
|
||||
|
@ -2175,7 +2175,7 @@ int tscBuildShowMsg(SSqlObj *pSql) {
|
|||
pShowMsg = (SShowMsg *)pMsg;
|
||||
pShowMsg->type = pCmd->showType;
|
||||
|
||||
if ((pShowMsg->type == TSDB_MGMT_TABLE_TABLE || pShowMsg->type == TSDB_MGMT_TABLE_METRIC) && pCmd->payloadLen != 0) {
|
||||
if ((pShowMsg->type == TSDB_MGMT_TABLE_TABLE || pShowMsg->type == TSDB_MGMT_TABLE_METRIC || pShowMsg->type == TSDB_MGMT_TABLE_VNODES ) && pCmd->payloadLen != 0) {
|
||||
// only show tables support wildcard query
|
||||
pShowMsg->payloadLen = htons(pCmd->payloadLen);
|
||||
memcpy(pShowMsg->payload, payload, pCmd->payloadLen);
|
||||
|
@ -3453,31 +3453,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void doDecompressPayload(SSqlCmd *pCmd, SSqlRes *pRes, int16_t compressed) {
|
||||
if (compressed && pRes->numOfRows > 0) {
|
||||
SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;
|
||||
|
||||
int32_t numOfTotalCols = pCmd->fieldsInfo.numOfOutputCols + pCmd->fieldsInfo.numOfHiddenCols;
|
||||
int32_t rowSize = pCmd->fieldsInfo.pOffset[numOfTotalCols - 1] + pCmd->fieldsInfo.pFields[numOfTotalCols - 1].bytes;
|
||||
|
||||
// TODO handle the OOM problem
|
||||
char * buf = malloc(rowSize * pRes->numOfRows);
|
||||
|
||||
int32_t payloadSize = pRes->rspLen - 1 - sizeof(SRetrieveMeterRsp);
|
||||
assert(payloadSize > 0);
|
||||
|
||||
int32_t decompressedSize = tsDecompressString(pRetrieve->data, payloadSize, 1, buf, rowSize * pRes->numOfRows, 0, 0, 0);
|
||||
assert(decompressedSize == rowSize * pRes->numOfRows);
|
||||
|
||||
pRes->pRsp = realloc(pRes->pRsp, pRes->rspLen - payloadSize + decompressedSize);
|
||||
memcpy(pRes->pRsp + sizeof(SRetrieveMeterRsp), buf, decompressedSize);
|
||||
|
||||
free(buf);
|
||||
}
|
||||
|
||||
pRes->data = ((SRetrieveMeterRsp *)pRes->pRsp)->data;
|
||||
}
|
||||
|
||||
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
@ -3490,9 +3465,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
|
|||
pRes->offset = htobe64(pRetrieve->offset);
|
||||
|
||||
pRes->useconds = htobe64(pRetrieve->useconds);
|
||||
pRetrieve->compress = htons(pRetrieve->compress);
|
||||
|
||||
doDecompressPayload(pCmd, pRes, pRetrieve->compress);
|
||||
pRes->data = pRetrieve->data;
|
||||
|
||||
tscSetResultPointer(pCmd, pRes);
|
||||
pRes->row = 0;
|
||||
|
@ -3652,7 +3625,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
|
|||
*/
|
||||
if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
|
||||
if (pMeterMetaInfo->pMeterMeta) {
|
||||
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%d, addr:%p", pSql,
|
||||
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql,
|
||||
pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
|
||||
}
|
||||
tscWaitingForCreateTable(&pSql->cmd);
|
||||
|
|
|
@ -246,7 +246,12 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
|
|||
tscDoQuery(pSql);
|
||||
}
|
||||
|
||||
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
|
||||
if (pRes->code == TSDB_CODE_SUCCESS) {
|
||||
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
|
||||
} else {
|
||||
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
|
||||
}
|
||||
|
||||
if (pRes->code != TSDB_CODE_SUCCESS) {
|
||||
tscFreeSqlObjPartial(pSql);
|
||||
}
|
||||
|
@ -266,8 +271,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
|
|||
|
||||
size_t sqlLen = strlen(sqlstr);
|
||||
if (sqlLen > TSDB_MAX_SQL_LEN) {
|
||||
tscError("%p sql too long", pSql);
|
||||
pRes->code = TSDB_CODE_INVALID_SQL;
|
||||
pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
|
||||
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
|
||||
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
|
@ -276,8 +282,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
|
|||
void *sql = realloc(pSql->sqlstr, sqlLen + 1);
|
||||
if (sql == NULL) {
|
||||
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
tscError("%p failed to malloc sql string buffer", pSql);
|
||||
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
|
||||
tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno));
|
||||
|
||||
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
|
@ -777,9 +784,9 @@ int taos_errno(TAOS *taos) {
|
|||
}
|
||||
|
||||
char *taos_errstr(TAOS *taos) {
|
||||
STscObj * pObj = (STscObj *)taos;
|
||||
unsigned char code;
|
||||
char temp[256] = {0};
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
uint8_t code;
|
||||
// char temp[256] = {0};
|
||||
|
||||
if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode];
|
||||
|
||||
|
@ -788,9 +795,10 @@ char *taos_errstr(TAOS *taos) {
|
|||
else
|
||||
code = pObj->pSql->res.code;
|
||||
|
||||
// for invalid sql, additional information is attached to explain why the sql is invalid
|
||||
if (code == TSDB_CODE_INVALID_SQL) {
|
||||
snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload);
|
||||
strcpy(pObj->pSql->cmd.payload, temp);
|
||||
// snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload);
|
||||
// strcpy(pObj->pSql->cmd.payload, temp);
|
||||
return pObj->pSql->cmd.payload;
|
||||
} else {
|
||||
return tsError[code];
|
||||
|
|
|
@ -1294,8 +1294,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
|
|||
|
||||
// re-build the whole name string
|
||||
if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
|
||||
// first part do not have quote
|
||||
// do nothing
|
||||
// first part do not have quote do nothing
|
||||
} else {
|
||||
pStr[firstPartLen] = TS_PATH_DELIMITER[0];
|
||||
memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
|
||||
|
@ -1842,5 +1841,30 @@ bool tscIsUpdateQuery(STscObj* pObj) {
|
|||
SSqlCmd* pCmd = &pObj->pSql->cmd;
|
||||
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) ||
|
||||
TSDB_SQL_USE_DB == pCmd->command) ? 1 : 0;
|
||||
|
||||
}
|
||||
|
||||
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql) {
|
||||
const char *msgFormat1 = "invalid SQL: %s";
|
||||
const char *msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)";
|
||||
const char *msgFormat3 = "invalid SQL: syntax error near \"%s\"";
|
||||
|
||||
const int32_t BACKWARD_CHAR_STEP = 0;
|
||||
|
||||
if (sql == NULL) {
|
||||
assert(additionalInfo != NULL);
|
||||
sprintf(msg, msgFormat1, additionalInfo);
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
char buf[64] = {0}; // only extract part of sql string
|
||||
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
|
||||
|
||||
if (additionalInfo != NULL) {
|
||||
sprintf(msg, msgFormat2, buf, additionalInfo);
|
||||
} else {
|
||||
sprintf(msg, msgFormat3, buf); // no additional information for invalid sql error
|
||||
}
|
||||
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
|
|
@ -12,15 +12,25 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package taosSql
|
||||
|
||||
/*
|
||||
#cgo CFLAGS : -I/usr/include
|
||||
#include <stdlib.h>
|
||||
#cgo LDFLAGS: -L/usr/lib -ltaos
|
||||
void taosSetAllocMode(int mode, const char* path, _Bool autoDump);
|
||||
void taosDumpMemoryLeak();
|
||||
*/
|
||||
import "C"
|
||||
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// Returns the bool value of the input.
|
||||
|
@ -398,3 +408,15 @@ func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
|
|||
}
|
||||
|
||||
|
||||
/******************************************************************************
|
||||
* Utils for C memory issues debugging *
|
||||
******************************************************************************/
|
||||
func SetAllocMode(mode int32, path string) {
|
||||
cpath := C.CString(path)
|
||||
defer C.free(unsafe.Pointer(cpath))
|
||||
C.taosSetAllocMode(C.int(mode), cpath, false)
|
||||
}
|
||||
|
||||
func DumpMemoryLeak() {
|
||||
C.taosDumpMemoryLeak()
|
||||
}
|
||||
|
|
|
@ -73,6 +73,9 @@ cmd ::= SHOW CONFIGS. { setDCLSQLElems(pInfo, SHOW_CONFIGS, 0); }
|
|||
cmd ::= SHOW SCORES. { setDCLSQLElems(pInfo, SHOW_SCORES, 0); }
|
||||
cmd ::= SHOW GRANTS. { setDCLSQLElems(pInfo, SHOW_GRANTS, 0); }
|
||||
|
||||
cmd ::= SHOW VNODES. { setDCLSQLElems(pInfo, SHOW_VNODES, 0); }
|
||||
cmd ::= SHOW VNODES IPTOKEN(X). { setDCLSQLElems(pInfo, SHOW_VNODES, 1, &X); }
|
||||
|
||||
%type dbPrefix {SSQLToken}
|
||||
dbPrefix(A) ::=. {A.n = 0;}
|
||||
dbPrefix(A) ::= ids(X) DOT. {A = X; }
|
||||
|
@ -658,4 +661,4 @@ cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X
|
|||
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
|
||||
LIKE MATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
|
||||
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
|
||||
SPREAD TWA INTERP LAST_ROW NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT METRIC TBNAME JOIN METRICS STABLE NULL.
|
||||
SPREAD TWA INTERP LAST_ROW NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT METRIC TBNAME JOIN METRICS STABLE NULL.
|
||||
|
|
|
@ -158,6 +158,7 @@ enum _mgmt_table {
|
|||
TSDB_MGMT_TABLE_CONNS,
|
||||
TSDB_MGMT_TABLE_SCORES,
|
||||
TSDB_MGMT_TABLE_GRANTS,
|
||||
TSDB_MGMT_TABLE_VNODES,
|
||||
TSDB_MGMT_TABLE_MAX,
|
||||
};
|
||||
|
||||
|
@ -224,7 +225,7 @@ typedef struct {
|
|||
char meterId[TSDB_UNI_LEN];
|
||||
uint16_t port; // for UDP only
|
||||
char empty[1];
|
||||
char msgType;
|
||||
uint8_t msgType;
|
||||
int32_t msgLen;
|
||||
uint8_t content[0];
|
||||
} STaosHeader;
|
||||
|
@ -567,7 +568,6 @@ typedef struct {
|
|||
typedef struct {
|
||||
int32_t numOfRows;
|
||||
int16_t precision;
|
||||
int16_t compress;
|
||||
int64_t offset; // updated offset value for multi-vnode projection query
|
||||
int64_t useconds;
|
||||
char data[];
|
||||
|
|
|
@ -256,6 +256,8 @@ SGlobalConfig *tsGetConfigOption(const char *option);
|
|||
#define TSDB_CFG_OPTION_LEN 24
|
||||
#define TSDB_CFG_VALUE_LEN 41
|
||||
|
||||
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "unistd.h"
|
||||
#include "os.h"
|
||||
#include "tutil.h"
|
||||
#include "tglobalcfg.h"
|
||||
|
|
|
@ -72,145 +72,144 @@
|
|||
#define TK_CONFIGS 54
|
||||
#define TK_SCORES 55
|
||||
#define TK_GRANTS 56
|
||||
#define TK_DOT 57
|
||||
#define TK_TABLES 58
|
||||
#define TK_STABLES 59
|
||||
#define TK_VGROUPS 60
|
||||
#define TK_DROP 61
|
||||
#define TK_TABLE 62
|
||||
#define TK_DATABASE 63
|
||||
#define TK_DNODE 64
|
||||
#define TK_IPTOKEN 65
|
||||
#define TK_USER 66
|
||||
#define TK_ACCOUNT 67
|
||||
#define TK_USE 68
|
||||
#define TK_DESCRIBE 69
|
||||
#define TK_ALTER 70
|
||||
#define TK_PASS 71
|
||||
#define TK_PRIVILEGE 72
|
||||
#define TK_LOCAL 73
|
||||
#define TK_IF 74
|
||||
#define TK_EXISTS 75
|
||||
#define TK_CREATE 76
|
||||
#define TK_PPS 77
|
||||
#define TK_TSERIES 78
|
||||
#define TK_DBS 79
|
||||
#define TK_STORAGE 80
|
||||
#define TK_QTIME 81
|
||||
#define TK_CONNS 82
|
||||
#define TK_STATE 83
|
||||
#define TK_KEEP 84
|
||||
#define TK_CACHE 85
|
||||
#define TK_REPLICA 86
|
||||
#define TK_DAYS 87
|
||||
#define TK_ROWS 88
|
||||
#define TK_ABLOCKS 89
|
||||
#define TK_TBLOCKS 90
|
||||
#define TK_CTIME 91
|
||||
#define TK_CLOG 92
|
||||
#define TK_COMP 93
|
||||
#define TK_PRECISION 94
|
||||
#define TK_LP 95
|
||||
#define TK_RP 96
|
||||
#define TK_TAGS 97
|
||||
#define TK_USING 98
|
||||
#define TK_AS 99
|
||||
#define TK_COMMA 100
|
||||
#define TK_NULL 101
|
||||
#define TK_SELECT 102
|
||||
#define TK_FROM 103
|
||||
#define TK_VARIABLE 104
|
||||
#define TK_INTERVAL 105
|
||||
#define TK_FILL 106
|
||||
#define TK_SLIDING 107
|
||||
#define TK_ORDER 108
|
||||
#define TK_BY 109
|
||||
#define TK_ASC 110
|
||||
#define TK_DESC 111
|
||||
#define TK_GROUP 112
|
||||
#define TK_HAVING 113
|
||||
#define TK_LIMIT 114
|
||||
#define TK_OFFSET 115
|
||||
#define TK_SLIMIT 116
|
||||
#define TK_SOFFSET 117
|
||||
#define TK_WHERE 118
|
||||
#define TK_NOW 119
|
||||
#define TK_INSERT 120
|
||||
#define TK_INTO 121
|
||||
#define TK_VALUES 122
|
||||
#define TK_RESET 123
|
||||
#define TK_QUERY 124
|
||||
#define TK_ADD 125
|
||||
#define TK_COLUMN 126
|
||||
#define TK_TAG 127
|
||||
#define TK_CHANGE 128
|
||||
#define TK_SET 129
|
||||
#define TK_KILL 130
|
||||
#define TK_CONNECTION 131
|
||||
#define TK_COLON 132
|
||||
#define TK_STREAM 133
|
||||
#define TK_ABORT 134
|
||||
#define TK_AFTER 135
|
||||
#define TK_ATTACH 136
|
||||
#define TK_BEFORE 137
|
||||
#define TK_BEGIN 138
|
||||
#define TK_CASCADE 139
|
||||
#define TK_CLUSTER 140
|
||||
#define TK_CONFLICT 141
|
||||
#define TK_COPY 142
|
||||
#define TK_DEFERRED 143
|
||||
#define TK_DELIMITERS 144
|
||||
#define TK_DETACH 145
|
||||
#define TK_EACH 146
|
||||
#define TK_END 147
|
||||
#define TK_EXPLAIN 148
|
||||
#define TK_FAIL 149
|
||||
#define TK_FOR 150
|
||||
#define TK_IGNORE 151
|
||||
#define TK_IMMEDIATE 152
|
||||
#define TK_INITIALLY 153
|
||||
#define TK_INSTEAD 154
|
||||
#define TK_MATCH 155
|
||||
#define TK_KEY 156
|
||||
#define TK_OF 157
|
||||
#define TK_RAISE 158
|
||||
#define TK_REPLACE 159
|
||||
#define TK_RESTRICT 160
|
||||
#define TK_ROW 161
|
||||
#define TK_STATEMENT 162
|
||||
#define TK_TRIGGER 163
|
||||
#define TK_VIEW 164
|
||||
#define TK_ALL 165
|
||||
#define TK_COUNT 166
|
||||
#define TK_SUM 167
|
||||
#define TK_AVG 168
|
||||
#define TK_MIN 169
|
||||
#define TK_MAX 170
|
||||
#define TK_FIRST 171
|
||||
#define TK_LAST 172
|
||||
#define TK_TOP 173
|
||||
#define TK_BOTTOM 174
|
||||
#define TK_STDDEV 175
|
||||
#define TK_PERCENTILE 176
|
||||
#define TK_APERCENTILE 177
|
||||
#define TK_LEASTSQUARES 178
|
||||
#define TK_HISTOGRAM 179
|
||||
#define TK_DIFF 180
|
||||
#define TK_SPREAD 181
|
||||
#define TK_TWA 182
|
||||
#define TK_INTERP 183
|
||||
#define TK_LAST_ROW 184
|
||||
#define TK_SEMI 185
|
||||
#define TK_NONE 186
|
||||
#define TK_PREV 187
|
||||
#define TK_LINEAR 188
|
||||
#define TK_IMPORT 189
|
||||
#define TK_METRIC 190
|
||||
#define TK_TBNAME 191
|
||||
#define TK_JOIN 192
|
||||
#define TK_METRICS 193
|
||||
#define TK_STABLE 194
|
||||
#define TK_VNODES 57
|
||||
#define TK_IPTOKEN 58
|
||||
#define TK_DOT 59
|
||||
#define TK_TABLES 60
|
||||
#define TK_STABLES 61
|
||||
#define TK_VGROUPS 62
|
||||
#define TK_DROP 63
|
||||
#define TK_TABLE 64
|
||||
#define TK_DATABASE 65
|
||||
#define TK_DNODE 66
|
||||
#define TK_USER 67
|
||||
#define TK_ACCOUNT 68
|
||||
#define TK_USE 69
|
||||
#define TK_DESCRIBE 70
|
||||
#define TK_ALTER 71
|
||||
#define TK_PASS 72
|
||||
#define TK_PRIVILEGE 73
|
||||
#define TK_LOCAL 74
|
||||
#define TK_IF 75
|
||||
#define TK_EXISTS 76
|
||||
#define TK_CREATE 77
|
||||
#define TK_PPS 78
|
||||
#define TK_TSERIES 79
|
||||
#define TK_DBS 80
|
||||
#define TK_STORAGE 81
|
||||
#define TK_QTIME 82
|
||||
#define TK_CONNS 83
|
||||
#define TK_STATE 84
|
||||
#define TK_KEEP 85
|
||||
#define TK_CACHE 86
|
||||
#define TK_REPLICA 87
|
||||
#define TK_DAYS 88
|
||||
#define TK_ROWS 89
|
||||
#define TK_ABLOCKS 90
|
||||
#define TK_TBLOCKS 91
|
||||
#define TK_CTIME 92
|
||||
#define TK_CLOG 93
|
||||
#define TK_COMP 94
|
||||
#define TK_PRECISION 95
|
||||
#define TK_LP 96
|
||||
#define TK_RP 97
|
||||
#define TK_TAGS 98
|
||||
#define TK_USING 99
|
||||
#define TK_AS 100
|
||||
#define TK_COMMA 101
|
||||
#define TK_NULL 102
|
||||
#define TK_SELECT 103
|
||||
#define TK_FROM 104
|
||||
#define TK_VARIABLE 105
|
||||
#define TK_INTERVAL 106
|
||||
#define TK_FILL 107
|
||||
#define TK_SLIDING 108
|
||||
#define TK_ORDER 109
|
||||
#define TK_BY 110
|
||||
#define TK_ASC 111
|
||||
#define TK_DESC 112
|
||||
#define TK_GROUP 113
|
||||
#define TK_HAVING 114
|
||||
#define TK_LIMIT 115
|
||||
#define TK_OFFSET 116
|
||||
#define TK_SLIMIT 117
|
||||
#define TK_SOFFSET 118
|
||||
#define TK_WHERE 119
|
||||
#define TK_NOW 120
|
||||
#define TK_INSERT 121
|
||||
#define TK_INTO 122
|
||||
#define TK_VALUES 123
|
||||
#define TK_RESET 124
|
||||
#define TK_QUERY 125
|
||||
#define TK_ADD 126
|
||||
#define TK_COLUMN 127
|
||||
#define TK_TAG 128
|
||||
#define TK_CHANGE 129
|
||||
#define TK_SET 130
|
||||
#define TK_KILL 131
|
||||
#define TK_CONNECTION 132
|
||||
#define TK_COLON 133
|
||||
#define TK_STREAM 134
|
||||
#define TK_ABORT 135
|
||||
#define TK_AFTER 136
|
||||
#define TK_ATTACH 137
|
||||
#define TK_BEFORE 138
|
||||
#define TK_BEGIN 139
|
||||
#define TK_CASCADE 140
|
||||
#define TK_CLUSTER 141
|
||||
#define TK_CONFLICT 142
|
||||
#define TK_COPY 143
|
||||
#define TK_DEFERRED 144
|
||||
#define TK_DELIMITERS 145
|
||||
#define TK_DETACH 146
|
||||
#define TK_EACH 147
|
||||
#define TK_END 148
|
||||
#define TK_EXPLAIN 149
|
||||
#define TK_FAIL 150
|
||||
#define TK_FOR 151
|
||||
#define TK_IGNORE 152
|
||||
#define TK_IMMEDIATE 153
|
||||
#define TK_INITIALLY 154
|
||||
#define TK_INSTEAD 155
|
||||
#define TK_MATCH 156
|
||||
#define TK_KEY 157
|
||||
#define TK_OF 158
|
||||
#define TK_RAISE 159
|
||||
#define TK_REPLACE 160
|
||||
#define TK_RESTRICT 161
|
||||
#define TK_ROW 162
|
||||
#define TK_STATEMENT 163
|
||||
#define TK_TRIGGER 164
|
||||
#define TK_VIEW 165
|
||||
#define TK_ALL 166
|
||||
#define TK_COUNT 167
|
||||
#define TK_SUM 168
|
||||
#define TK_AVG 169
|
||||
#define TK_MIN 170
|
||||
#define TK_MAX 171
|
||||
#define TK_FIRST 172
|
||||
#define TK_LAST 173
|
||||
#define TK_TOP 174
|
||||
#define TK_BOTTOM 175
|
||||
#define TK_STDDEV 176
|
||||
#define TK_PERCENTILE 177
|
||||
#define TK_APERCENTILE 178
|
||||
#define TK_LEASTSQUARES 179
|
||||
#define TK_HISTOGRAM 180
|
||||
#define TK_DIFF 181
|
||||
#define TK_SPREAD 182
|
||||
#define TK_TWA 183
|
||||
#define TK_INTERP 184
|
||||
#define TK_LAST_ROW 185
|
||||
#define TK_SEMI 186
|
||||
#define TK_NONE 187
|
||||
#define TK_PREV 188
|
||||
#define TK_LINEAR 189
|
||||
#define TK_IMPORT 190
|
||||
#define TK_METRIC 191
|
||||
#define TK_TBNAME 192
|
||||
#define TK_JOIN 193
|
||||
#define TK_METRICS 194
|
||||
#define TK_STABLE 195
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
|
|
|
@ -187,18 +187,35 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, unsigned int inLen, cha
|
|||
|
||||
char *taosIpStr(uint32_t ipInt);
|
||||
|
||||
#ifdef _TAOS_MEM_TEST_
|
||||
// Use during test to simulate the success and failure scenarios of memory allocation
|
||||
extern void* taos_malloc(unsigned int size, char* _func);
|
||||
extern void* taos_calloc(unsigned int num, unsigned int size, char* _func);
|
||||
extern void* taos_realloc(void* ptr, unsigned int size, char* _func);
|
||||
extern void taos_free(void* ptr);
|
||||
#define malloc(size) taos_malloc(size, __FUNCTION__)
|
||||
#define calloc(num, size) taos_calloc(num, size, __FUNCTION__)
|
||||
#define realloc(ptr, size) taos_realloc(ptr, size, __FUNCTION__)
|
||||
#define free(ptr) taos_free(ptr)
|
||||
#endif
|
||||
#define TAOS_ALLOC_MODE_DEFAULT 0
|
||||
#define TAOS_ALLOC_MODE_RANDOM_FAIL 1
|
||||
#define TAOS_ALLOC_MODE_DETECT_LEAK 2
|
||||
void taosSetAllocMode(int mode, const char* path, bool autoDump);
|
||||
void taosDumpMemoryLeak();
|
||||
|
||||
#ifdef TAOS_MEM_CHECK
|
||||
|
||||
void * taos_malloc(size_t size, const char *file, uint32_t line);
|
||||
void * taos_calloc(size_t num, size_t size, const char *file, uint32_t line);
|
||||
void * taos_realloc(void *ptr, size_t size, const char *file, uint32_t line);
|
||||
void taos_free(void *ptr, const char *file, uint32_t line);
|
||||
char * taos_strdup(const char *str, const char *file, uint32_t line);
|
||||
char * taos_strndup(const char *str, size_t size, const char *file, uint32_t line);
|
||||
ssize_t taos_getline(char **lineptr, size_t *n, FILE *stream, const char *file, uint32_t line);
|
||||
|
||||
#ifndef TAOS_MEM_CHECK_IMPL
|
||||
|
||||
#define malloc(size) taos_malloc(size, __FILE__, __LINE__)
|
||||
#define calloc(num, size) taos_calloc(num, size, __FILE__, __LINE__)
|
||||
#define realloc(ptr, size) taos_realloc(ptr, size, __FILE__, __LINE__)
|
||||
#define free(ptr) taos_free(ptr, __FILE__, __LINE__)
|
||||
#define strdup(str) taos_strdup(str, __FILE__, __LINE__)
|
||||
#define strndup(str, size) taos_strndup(str, size, __FILE__, __LINE__)
|
||||
#define getline(lineptr, n, stream) taos_getline(lineptr, n, stream, __FILE__, __LINE__)
|
||||
|
||||
#endif // TAOS_MEM_CHECK_IMPL
|
||||
|
||||
#endif // TAOS_MEM_CHECK
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "shellCommand.h"
|
||||
#include "ttime.h"
|
||||
#include "tutil.h"
|
||||
#include <regex.h>
|
||||
|
||||
/**************** Global variables ****************/
|
||||
#ifdef WINDOWS
|
||||
|
|
|
@ -81,6 +81,7 @@ bool taosGetProcMemory(float *memoryUsedMB) {
|
|||
char * line = NULL;
|
||||
while (!feof(fp)) {
|
||||
tfree(line);
|
||||
len = 0;
|
||||
getline(&line, &len, fp);
|
||||
if (line == NULL) {
|
||||
break;
|
||||
|
@ -137,7 +138,7 @@ bool taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
size_t len;
|
||||
size_t len = 0;
|
||||
char * line = NULL;
|
||||
getline(&line, &len, fp);
|
||||
if (line == NULL) {
|
||||
|
@ -409,6 +410,7 @@ bool taosGetCardInfo(int64_t *bytes) {
|
|||
|
||||
while (!feof(fp)) {
|
||||
tfree(line);
|
||||
len = 0;
|
||||
getline(&line, &len, fp);
|
||||
if (line == NULL) {
|
||||
break;
|
||||
|
@ -480,6 +482,7 @@ bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
|
|||
|
||||
while (!feof(fp)) {
|
||||
tfree(line);
|
||||
len = 0;
|
||||
getline(&line, &len, fp);
|
||||
if (line == NULL) {
|
||||
break;
|
||||
|
|
|
@ -16,20 +16,30 @@
|
|||
#ifndef TDENGINE_PLATFORM_WINDOWS_H
|
||||
#define TDENGINE_PLATFORM_WINDOWS_H
|
||||
|
||||
#include <io.h>
|
||||
#include <stdio.h>
|
||||
#include <signal.h>
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include <pthread.h>
|
||||
#include <assert.h>
|
||||
#include <ctype.h>
|
||||
#include <direct.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <float.h>
|
||||
#include <locale.h>
|
||||
#include <intrin.h>
|
||||
#include <io.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
#include <semaphore.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <time.h>
|
||||
#include "winsock2.h"
|
||||
#include <WS2tcpip.h>
|
||||
#include <assert.h>
|
||||
#include <math.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <intrin.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -366,6 +376,8 @@ int fsendfile(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count);
|
|||
|
||||
char *strndup(const char *s, size_t n);
|
||||
|
||||
void taosSetCoreDump();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -394,4 +394,6 @@ char *strndup(const char *s, size_t n) {
|
|||
memcpy(r, s, len);
|
||||
r[len] = 0;
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
void taosSetCoreDump() {}
|
|
@ -14,7 +14,6 @@
|
|||
*/
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "shash.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tidpool.h"
|
||||
|
@ -30,6 +29,7 @@
|
|||
#include "ttimer.h"
|
||||
#include "tudp.h"
|
||||
#include "tutil.h"
|
||||
#include "lz4.h"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
|
||||
|
||||
|
@ -50,8 +50,7 @@ typedef struct {
|
|||
char encrypt;
|
||||
uint8_t secret[TSDB_KEY_LEN];
|
||||
uint8_t ckey[TSDB_KEY_LEN];
|
||||
|
||||
uint16_t localPort; // for UDP only
|
||||
uint16_t localPort; // for UDP only
|
||||
uint32_t peerUid;
|
||||
uint32_t peerIp; // peer IP
|
||||
uint16_t peerPort; // peer port
|
||||
|
@ -66,7 +65,7 @@ typedef struct {
|
|||
void * chandle; // handle passed by TCP/UDP connection layer
|
||||
void * ahandle; // handle returned by upper app layter
|
||||
int retry;
|
||||
int tretry; // total retry
|
||||
int tretry; // total retry
|
||||
void * pTimer;
|
||||
void * pIdleTimer;
|
||||
char * pRspMsg;
|
||||
|
@ -79,7 +78,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int sessions;
|
||||
void * qhandle; // for scheduler
|
||||
void * qhandle; // for scheduler
|
||||
SRpcConn * connList;
|
||||
void * idPool;
|
||||
void * tmrCtrl;
|
||||
|
@ -94,11 +93,11 @@ typedef struct rpc_server {
|
|||
int mask;
|
||||
int numOfChanns;
|
||||
int numOfThreads;
|
||||
int idMgmt; // ID management method
|
||||
int idMgmt; // ID management method
|
||||
int type;
|
||||
int idleTime; // milliseconds;
|
||||
int noFree; // do not free the request msg when rsp is received
|
||||
int index; // for UDP server, next thread for new connection
|
||||
int idleTime; // milliseconds;
|
||||
int noFree; // do not free the request msg when rsp is received
|
||||
int index; // for UDP server, next thread for new connection
|
||||
uint16_t localPort;
|
||||
char label[12];
|
||||
void *(*fp)(char *, void *ahandle, void *thandle);
|
||||
|
@ -107,8 +106,7 @@ typedef struct rpc_server {
|
|||
SRpcChann *channList;
|
||||
} STaosRpc;
|
||||
|
||||
|
||||
int tsRpcProgressTime = 10; // milliseocnds
|
||||
int tsRpcProgressTime = 10; // milliseocnds
|
||||
|
||||
// not configurable
|
||||
int tsRpcMaxRetry;
|
||||
|
@ -141,6 +139,89 @@ void taosProcessSchedMsg(SSchedMsg *pMsg);
|
|||
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
|
||||
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
|
||||
|
||||
static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
|
||||
STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
|
||||
int32_t overhead = sizeof(int32_t) * 2;
|
||||
int32_t finalLen = 0;
|
||||
|
||||
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
|
||||
return contLen;
|
||||
}
|
||||
|
||||
char *buf = malloc (contLen + overhead + 8); // 16 extra bytes
|
||||
if (buf == NULL) {
|
||||
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
|
||||
return contLen;
|
||||
}
|
||||
|
||||
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
|
||||
|
||||
/*
|
||||
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
||||
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
||||
*/
|
||||
if (compLen < contLen - overhead) {
|
||||
//tDump(pCont, contLen);
|
||||
int32_t *pLen = (int32_t *)pCont;
|
||||
|
||||
*pLen = 0; // first 4 bytes must be zero
|
||||
pLen = (int32_t *)(pCont + sizeof(int32_t));
|
||||
|
||||
*pLen = htonl(contLen); // contLen is encoded in second 4 bytes
|
||||
memcpy(pCont + overhead, buf, compLen);
|
||||
|
||||
pHeader->comp = 1;
|
||||
tTrace("compress rpc msg, before:%lld, after:%lld", contLen, compLen);
|
||||
|
||||
finalLen = compLen + overhead;
|
||||
//tDump(pCont, contLen);
|
||||
} else {
|
||||
finalLen = contLen;
|
||||
}
|
||||
|
||||
free(buf);
|
||||
return finalLen;
|
||||
}
|
||||
|
||||
static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) {
|
||||
int overhead = sizeof(int32_t) * 2;
|
||||
|
||||
if (pHeader->comp == 0) {
|
||||
pSchedMsg->msg = (char *)(&(pHeader->destId));
|
||||
return pHeader;
|
||||
}
|
||||
|
||||
// decompress the content
|
||||
assert(GET_INT32_VAL(pHeader->content) == 0);
|
||||
|
||||
// contLen is original message length before compression applied
|
||||
int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t)));
|
||||
|
||||
// prepare the temporary buffer to decompress message
|
||||
char *buf = malloc(sizeof(STaosHeader) + contLen);
|
||||
|
||||
//tDump(pHeader->content, msgLen);
|
||||
|
||||
if (buf) {
|
||||
int32_t originalLen = LZ4_decompress_safe(pHeader->content + overhead, buf + sizeof(STaosHeader),
|
||||
msgLen - overhead, contLen);
|
||||
|
||||
memcpy(buf, pHeader, sizeof(STaosHeader));
|
||||
free(pHeader); // free the compressed message buffer
|
||||
|
||||
STaosHeader* pNewHeader = (STaosHeader *) buf;
|
||||
pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg);
|
||||
assert(originalLen == contLen);
|
||||
|
||||
pSchedMsg->msg = (char *)(&(pNewHeader->destId));
|
||||
//tDump(pHeader->content, contLen);
|
||||
return pNewHeader;
|
||||
} else {
|
||||
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
|
||||
pSchedMsg->msg = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
char *taosBuildReqHeader(void *param, char type, char *msg) {
|
||||
STaosHeader *pHeader;
|
||||
SRpcConn * pConn = (SRpcConn *)param;
|
||||
|
@ -151,7 +232,9 @@ char *taosBuildReqHeader(void *param, char type, char *msg) {
|
|||
}
|
||||
|
||||
pHeader = (STaosHeader *)(msg + sizeof(SMsgNode));
|
||||
memset(pHeader, 0, sizeof(STaosHeader));
|
||||
pHeader->version = 1;
|
||||
pHeader->comp = 0;
|
||||
pHeader->msgType = type;
|
||||
pHeader->spi = 0;
|
||||
pHeader->tcp = 0;
|
||||
|
@ -1074,8 +1157,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
|
|||
if (code != 0) {
|
||||
// parsing error
|
||||
|
||||
if (pHeader->msgType & 1) {
|
||||
if (pHeader->msgType & 1U) {
|
||||
memset(pReply, 0, sizeof(pReply));
|
||||
|
||||
msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
|
||||
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
|
||||
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
|
||||
|
@ -1090,17 +1174,17 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
|
|||
// parsing OK
|
||||
|
||||
// internal communication is based on TAOS protocol, a trick here to make it efficient
|
||||
pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg);
|
||||
if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest);
|
||||
if (pHeader->spi) msgLen -= sizeof(STaosDigest);
|
||||
msgLen -= (int)sizeof(STaosHeader);
|
||||
pHeader->msgLen = msgLen + (int)sizeof(SIntMsg);
|
||||
|
||||
if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
|
||||
if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
|
||||
schedMsg.msg = NULL; // connection shall be closed
|
||||
} else {
|
||||
schedMsg.msg = (char *)(&(pHeader->destId));
|
||||
// memcpy(schedMsg.msg, (char *)(&(pHeader->destId)), pHeader->msgLen);
|
||||
pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen);
|
||||
}
|
||||
|
||||
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
|
||||
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) {
|
||||
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
|
||||
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
|
||||
}
|
||||
|
@ -1132,9 +1216,12 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
|
|||
pChann = pServer->channList + pConn->chann;
|
||||
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
|
||||
msg = (char *)pHeader;
|
||||
msgLen = contLen + (int32_t)sizeof(STaosHeader);
|
||||
|
||||
if ((pHeader->msgType & 1) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
|
||||
if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
|
||||
|
||||
contLen = taosCompressRpcMsg(pCont, contLen);
|
||||
|
||||
msgLen = contLen + (int32_t)sizeof(STaosHeader);
|
||||
|
||||
if (pConn->spi) {
|
||||
// add auth part
|
||||
|
@ -1151,7 +1238,7 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
|
|||
pthread_mutex_lock(&pChann->mutex);
|
||||
msgType = pHeader->msgType;
|
||||
|
||||
if ((msgType & 1) == 0) {
|
||||
if ((msgType & 1U) == 0) {
|
||||
// response
|
||||
pConn->inType = 0;
|
||||
tfree(pConn->pRspMsg);
|
||||
|
|
|
@ -346,10 +346,41 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
|
|||
int real_size = 0;
|
||||
/* char action = SDB_TYPE_INSERT; */
|
||||
|
||||
if (pTable == NULL) return -1;
|
||||
if (pTable == NULL) {
|
||||
sdbError("sdb tables is null");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row))
|
||||
if (sdbGetRow(handle, row)) return -1;
|
||||
if (sdbGetRow(handle, row)) {
|
||||
if (strcmp(pTable->name, "mnode") == 0) {
|
||||
/*
|
||||
* The first mnode created when the system just start, so the insert action may failed
|
||||
* see sdbPeer.c : sdbInitPeers
|
||||
*/
|
||||
pTable->id++;
|
||||
sdbVersion++;
|
||||
sdbPrint("table:%s, record:%s already exist, think it successed, sdbVersion:%ld id:%d",
|
||||
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id);
|
||||
return 0;
|
||||
} else {
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEYTYPE_STRING:
|
||||
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, (char *)row, sdbVersion, pTable->id);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
|
||||
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id);
|
||||
break;
|
||||
case SDB_KEYTYPE_AUTO:
|
||||
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, *(int32_t *)row, sdbVersion, pTable->id);
|
||||
break;
|
||||
default:
|
||||
sdbError("table:%s, failed to insert record:%s sdbVersion:%ld id:%d", pTable->name, sdbVersion, pTable->id);
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
|
||||
SRowHead *rowHead = (SRowHead *)malloc(total_size);
|
||||
|
@ -408,24 +439,26 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
|
|||
pTable->numOfRows++;
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEYTYPE_STRING:
|
||||
sdbTrace(
|
||||
"table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
|
||||
pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
|
||||
sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
|
||||
pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
|
||||
sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
|
||||
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32:
|
||||
case SDB_KEYTYPE_AUTO:
|
||||
sdbTrace(
|
||||
"table:%s, a record is inserted:%d, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
|
||||
pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
|
||||
sdbTrace("table:%s, a record is inserted:%d, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
|
||||
pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
|
||||
break;
|
||||
default:
|
||||
sdbTrace(
|
||||
"table:%s, a record is inserted, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
|
||||
pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
|
||||
sdbTrace("table:%s, a record is inserted, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
|
||||
pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
|
||||
break;
|
||||
}
|
||||
|
||||
id = rowMeta.id;
|
||||
} else {
|
||||
sdbError("table:%s, failed to insert record", pTable->name);
|
||||
}
|
||||
|
||||
tfree(rowHead);
|
||||
|
@ -509,15 +542,16 @@ int sdbDeleteRow(void *handle, void *row) {
|
|||
sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow);
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEYTYPE_STRING:
|
||||
sdbTrace(
|
||||
"table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
|
||||
sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32:
|
||||
case SDB_KEYTYPE_AUTO:
|
||||
sdbTrace(
|
||||
"table:%s, a record is deleted:%d, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
break;
|
||||
default:
|
||||
sdbTrace("table:%s, a record is deleted, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
|
@ -553,7 +587,24 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
|
|||
if (pTable == NULL || row == NULL) return -1;
|
||||
pMeta = sdbGetRowMeta(handle, row);
|
||||
if (pMeta == NULL) {
|
||||
sdbTrace("table:%s, record is not there, update failed", pTable->name);
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEYTYPE_STRING:
|
||||
sdbError("table:%s, failed to update record:%s, record is not there, sdbVersion:%ld id:%d",
|
||||
pTable->name, (char *) row, sdbVersion, pTable->id);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
|
||||
sdbError("table:%s, failed to update record:%s record is not there, sdbVersion:%ld id:%d",
|
||||
pTable->name, taosIpStr(*(int32_t *) row), sdbVersion, pTable->id);
|
||||
break;
|
||||
case SDB_KEYTYPE_AUTO:
|
||||
sdbError("table:%s, failed to update record:F%s record is not there, sdbVersion:%ld id:%d",
|
||||
pTable->name, *(int32_t *) row, sdbVersion, pTable->id);
|
||||
break;
|
||||
default:
|
||||
sdbError("table:%s, failed to update record:%s record is not there, sdbVersion:%ld id:%d",
|
||||
pTable->name, sdbVersion, pTable->id);
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -610,15 +661,16 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
|
|||
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEYTYPE_STRING:
|
||||
sdbTrace(
|
||||
"table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
sdbTrace("table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
|
||||
sdbTrace("table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows);
|
||||
break;
|
||||
case SDB_KEYTYPE_UINT32:
|
||||
case SDB_KEYTYPE_AUTO:
|
||||
sdbTrace(
|
||||
"table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
sdbTrace("table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d",
|
||||
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
|
||||
break;
|
||||
default:
|
||||
sdbTrace("table:%s, a record is updated, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, sdbVersion,
|
||||
|
|
|
@ -410,6 +410,9 @@ int mgmtRetrieveScores(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
|||
int grantGetGrantsMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int grantRetrieveGrants(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
||||
|
||||
int mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
||||
|
||||
// dnode balance api
|
||||
int mgmtInitBalance();
|
||||
void mgmtCleanupBalance();
|
||||
|
|
|
@ -353,7 +353,7 @@ bool vnodeIsValidVnodeCfg(SVnodeCfg *pCfg);
|
|||
|
||||
int32_t vnodeGetResultSize(void *handle, int32_t *numOfRows);
|
||||
|
||||
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size);
|
||||
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows);
|
||||
|
||||
int64_t vnodeGetOffsetVal(void *thandle);
|
||||
|
||||
|
|
|
@ -61,6 +61,20 @@ int main(int argc, char *argv[]) {
|
|||
return 0;
|
||||
} else if (strcmp(argv[i], "-k") == 0) {
|
||||
dnodeParseParameterK();
|
||||
#ifdef TAOS_MEM_CHECK
|
||||
} else if (strcmp(argv[i], "--alloc-random-fail") == 0) {
|
||||
if ((i < argc - 1) && (argv[i+1][0] != '-')) {
|
||||
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true);
|
||||
} else {
|
||||
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true);
|
||||
}
|
||||
} else if (strcmp(argv[i], "--detect-mem-leak") == 0) {
|
||||
if ((i < argc - 1) && (argv[i+1][0] != '-')) {
|
||||
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true);
|
||||
} else {
|
||||
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -389,3 +389,121 @@ int mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
|
|||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
int mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
||||
int cols = 0;
|
||||
|
||||
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
|
||||
|
||||
SSchema *pSchema = tsGetSchema(pMeta);
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "vnode");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "vgid");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 12;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "status");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 12;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "sync status");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htons(cols);
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
|
||||
// TODO: if other thread drop dnode ????
|
||||
SDnodeObj *pDnode = NULL;
|
||||
if (pShow->payloadLen > 0 ) {
|
||||
uint32_t ip = ip2uint(pShow->payload);
|
||||
pDnode = mgmtGetDnode(ip);
|
||||
if (NULL == pDnode) {
|
||||
return TSDB_CODE_NODE_OFFLINE;
|
||||
}
|
||||
|
||||
pShow->numOfRows = pDnode->openVnodes;
|
||||
pShow->pNode = pDnode;
|
||||
|
||||
} else {
|
||||
while (true) {
|
||||
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
|
||||
if (pDnode == NULL) break;
|
||||
pShow->numOfRows += pDnode->openVnodes;
|
||||
|
||||
if (0 == pShow->numOfRows) return TSDB_CODE_NODE_OFFLINE;
|
||||
}
|
||||
|
||||
pShow->pNode = NULL;
|
||||
}
|
||||
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
|
||||
int numOfRows = 0;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
char * pWrite;
|
||||
int cols = 0;
|
||||
char ipstr[20];
|
||||
|
||||
if (0 == rows) return 0;
|
||||
|
||||
if (pShow->payloadLen) {
|
||||
// output the vnodes info of the designated dnode. And output all vnodes of this dnode, instead of rows (max 100)
|
||||
pDnode = (SDnodeObj *)(pShow->pNode);
|
||||
if (pDnode != NULL) {
|
||||
SVnodeLoad* pVnode;
|
||||
for (int i = 0 ; i < TSDB_MAX_VNODES; i++) {
|
||||
pVnode = &pDnode->vload[i];
|
||||
if (0 == pVnode->vgId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(uint32_t *)pWrite = pVnode->vnode;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(uint32_t *)pWrite = pVnode->vgId;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, taosGetVnodeStatusStr(pVnode->status));
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, taosGetVnodeSyncStatusStr(pVnode->syncStatus));
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: output all vnodes of all dnodes
|
||||
numOfRows = 0;
|
||||
}
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -519,10 +519,8 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pMeter = mgmtGetMeter(pCreate->meterId);
|
||||
if (pMeter) {
|
||||
if (pCreate->igExists) {
|
||||
mError("table:%s, igExists is true", pCreate->meterId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("table:%s, table is already exist", pCreate->meterId);
|
||||
return TSDB_CODE_TABLE_ALREADY_EXIST;
|
||||
}
|
||||
}
|
||||
|
@ -675,7 +673,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
|
||||
// send create message to the selected vnode servers
|
||||
if (pCreate->numOfTags == 0) {
|
||||
mTrace("table:%s, send create msg to dnode, vgId:%d, sid:%d, vnode:%d",
|
||||
mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d",
|
||||
pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode);
|
||||
|
||||
grantAddTimeSeries(pMeter->numOfColumns - 1);
|
||||
|
|
|
@ -788,12 +788,14 @@ int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = {
|
|||
mgmtGetAcctMeta, mgmtGetUserMeta, mgmtGetDbMeta, mgmtGetMeterMeta, mgmtGetDnodeMeta,
|
||||
mgmtGetMnodeMeta, mgmtGetVgroupMeta, mgmtGetMetricMeta, mgmtGetModuleMeta, mgmtGetQueryMeta,
|
||||
mgmtGetStreamMeta, mgmtGetConfigMeta, mgmtGetConnsMeta, mgmtGetScoresMeta, grantGetGrantsMeta,
|
||||
mgmtGetVnodeMeta,
|
||||
};
|
||||
|
||||
int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = {
|
||||
mgmtRetrieveAccts, mgmtRetrieveUsers, mgmtRetrieveDbs, mgmtRetrieveMeters, mgmtRetrieveDnodes,
|
||||
mgmtRetrieveMnodes, mgmtRetrieveVgroups, mgmtRetrieveMetrics, mgmtRetrieveModules, mgmtRetrieveQueries,
|
||||
mgmtRetrieveStreams, mgmtRetrieveConfigs, mgmtRetrieveConns, mgmtRetrieveScores, grantRetrieveGrants,
|
||||
mgmtRetrieveVnodes,
|
||||
};
|
||||
|
||||
int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||
|
|
|
@ -114,6 +114,7 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam
|
|||
|
||||
char *path = vnodeGetDataDir(vnode, fileId);
|
||||
if (path == NULL) {
|
||||
dError("vid:%d, fileId:%d, failed to get dataDir", vnode, fileId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -2952,11 +2952,11 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles
|
|||
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
|
||||
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
|
||||
|
||||
if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
|
||||
pVnodeFiles->dataFileSize = fstat.st_size;
|
||||
|
||||
if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1;
|
||||
pVnodeFiles->lastFileSize = fstat.st_size;
|
||||
// if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
|
||||
// pVnodeFiles->dataFileSize = fstat.st_size;
|
||||
//
|
||||
// if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1;
|
||||
// pVnodeFiles->lastFileSize = fstat.st_size;
|
||||
|
||||
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
|
||||
/* enforce kernel to preload data when the file is mapping */
|
||||
|
@ -6943,36 +6943,18 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p
|
|||
return numOfRes;
|
||||
}
|
||||
|
||||
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int32_t *size) {
|
||||
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
|
||||
SMeterObj *pObj = pQInfo->pObj;
|
||||
SQuery * pQuery = &pQInfo->query;
|
||||
|
||||
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
|
||||
int32_t dataSize = pQInfo->query.rowSize * numOfRows;
|
||||
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
|
||||
|
||||
// for metric query, bufIndex always be 0.
|
||||
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
|
||||
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
|
||||
|
||||
if (dataSize >= tsCompressMsgSize && tsCompressMsgSize > 0) {
|
||||
char *compBuf = malloc((size_t)dataSize);
|
||||
|
||||
// for metric query, bufIndex always be 0.
|
||||
char *d = compBuf;
|
||||
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
|
||||
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
|
||||
|
||||
memmove(d, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
|
||||
d += bytes * numOfRows;
|
||||
}
|
||||
|
||||
*size = tsCompressString(compBuf, dataSize, 1, data, dataSize + EXTRA_BYTES, 0, 0, 0);
|
||||
|
||||
dTrace("QInfo:%p compress rsp msg, before:%d, after:%d", pQInfo, dataSize, *size);
|
||||
free(compBuf);
|
||||
} else { // for metric query, bufIndex always be 0.
|
||||
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
|
||||
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
|
||||
|
||||
memmove(data, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
|
||||
data += bytes * numOfRows;
|
||||
}
|
||||
memmove(data, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
|
||||
data += bytes * numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6987,7 +6969,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
|||
* @param numOfRows the number of rows that are not returned in current retrieve
|
||||
* @return
|
||||
*/
|
||||
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size) {
|
||||
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) {
|
||||
SQInfo *pQInfo = (SQInfo *)handle;
|
||||
SQuery *pQuery = &pQInfo->query;
|
||||
|
||||
|
@ -7000,7 +6982,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
|
|||
// make sure file exist
|
||||
if (VALIDFD(fd)) {
|
||||
size_t s = lseek(fd, 0, SEEK_END);
|
||||
dTrace("QInfo:%p ts comp data return, file:%s, size:%ld", pQInfo, pQuery->sdata[0]->data, size);
|
||||
dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s);
|
||||
|
||||
lseek(fd, 0, SEEK_SET);
|
||||
read(fd, data, s);
|
||||
|
@ -7012,7 +6994,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
|
|||
pQuery->sdata[0]->data, strerror(errno));
|
||||
}
|
||||
} else {
|
||||
doCopyQueryResultToMsg(pQInfo, numOfRows, data, size);
|
||||
doCopyQueryResultToMsg(pQInfo, numOfRows, data);
|
||||
}
|
||||
|
||||
return numOfRows;
|
||||
|
|
|
@ -483,13 +483,9 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) {
|
|||
}
|
||||
|
||||
tfree(pQuery->pGroupbyExpr);
|
||||
|
||||
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId);
|
||||
|
||||
/*
|
||||
* destory signature, in order to avoid the query process pass the object
|
||||
* safety check
|
||||
*/
|
||||
//destroy signature, in order to avoid the query process pass the object safety check
|
||||
memset(pQInfo, 0, sizeof(SQInfo));
|
||||
tfree(pQInfo);
|
||||
}
|
||||
|
@ -854,7 +850,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
|
|||
// the remained number of retrieved rows, not the interpolated result
|
||||
int numOfRows = pQInfo->pointsRead - pQInfo->pointsReturned;
|
||||
|
||||
int32_t numOfFinal = vnodeCopyQueryResultToMsg(pQInfo, data, numOfRows, size);
|
||||
int32_t numOfFinal = vnodeCopyQueryResultToMsg(pQInfo, data, numOfRows);
|
||||
pQInfo->pointsReturned += numOfFinal;
|
||||
|
||||
dTrace("QInfo:%p %d are returned, totalReturned:%d totalRead:%d", pQInfo, numOfFinal, pQInfo->pointsReturned,
|
||||
|
@ -866,12 +862,9 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
|
|||
uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
|
||||
|
||||
/*
|
||||
* If SQInfo has been released, the value of signature cannot be equalled to
|
||||
* the address of pQInfo, since in release function, the original value has
|
||||
* been
|
||||
* destroyed. However, this memory area may be reused by another function.
|
||||
* It may be 0 or any value, but it is rarely still be equalled to the address
|
||||
* of SQInfo.
|
||||
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
|
||||
* since in release function, the original value has been destroyed. However, this memory area may be reused
|
||||
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
|
||||
*/
|
||||
if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
|
||||
dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
|
||||
|
|
|
@ -99,28 +99,32 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
|||
}
|
||||
}
|
||||
|
||||
// if ( vnodeList[vnode].status != TSDB_STATUS_MASTER && pMsg->msgType != TSDB_MSG_TYPE_RETRIEVE ) {
|
||||
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
|
||||
|
||||
#ifdef CLUSTER
|
||||
if (vnodeList[vnode].vnodeStatus != TSDB_VN_STATUS_MASTER) {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell msg is ignored since in state:%d", vnode, sid, vnodeList[vnode].vnodeStatus);
|
||||
} else {
|
||||
#endif
|
||||
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
|
||||
|
||||
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
|
||||
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
|
||||
vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
|
||||
vnodeProcessRetrieveRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
|
||||
vnodeProcessShellSubmitRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else {
|
||||
dError("%s is not processed", taosMsg[pMsg->msgType]);
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell query msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
}
|
||||
#ifdef CLUSTER
|
||||
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
|
||||
vnodeProcessRetrieveRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell retrieve msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
}
|
||||
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) {
|
||||
vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
}
|
||||
} else {
|
||||
dError("%s is not processed", taosMsg[pMsg->msgType]);
|
||||
}
|
||||
#endif
|
||||
|
||||
return pObj;
|
||||
}
|
||||
|
@ -461,11 +465,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
|
|||
pMsg = pRsp->data;
|
||||
|
||||
if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) {
|
||||
int32_t oldSize = size;
|
||||
vnodeSaveQueryResult((void *)(pRetrieve->qhandle), pRsp->data, &size);
|
||||
if (oldSize > size) {
|
||||
pRsp->compress = htons(1); // denote that the response msg is compressed
|
||||
}
|
||||
}
|
||||
|
||||
pMsg += size;
|
||||
|
@ -587,6 +587,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
|
|||
int32_t sversion = htonl(pBlocks->sversion);
|
||||
|
||||
if (pSubmit->import) {
|
||||
dTrace("start to import data");
|
||||
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
|
||||
sversion, &numOfPoints, now);
|
||||
pObj->numOfTotalPoints += numOfPoints;
|
||||
|
|
|
@ -37,7 +37,6 @@ ELSEIF (TD_WINDOWS_64)
|
|||
LIST(APPEND SRC ./src/ihash.c)
|
||||
LIST(APPEND SRC ./src/lz4.c)
|
||||
LIST(APPEND SRC ./src/shash.c)
|
||||
LIST(APPEND SRC ./src/sql.c)
|
||||
LIST(APPEND SRC ./src/tbase64.c)
|
||||
LIST(APPEND SRC ./src/tcache.c)
|
||||
LIST(APPEND SRC ./src/tcompression.c)
|
||||
|
@ -59,8 +58,6 @@ ELSEIF (TD_WINDOWS_64)
|
|||
LIST(APPEND SRC ./src/tskiplist.c)
|
||||
LIST(APPEND SRC ./src/tsocket.c)
|
||||
LIST(APPEND SRC ./src/tstatus.c)
|
||||
LIST(APPEND SRC ./src/tstoken.c)
|
||||
LIST(APPEND SRC ./src/tstoken.c)
|
||||
LIST(APPEND SRC ./src/tstrbuild.c)
|
||||
LIST(APPEND SRC ./src/ttime.c)
|
||||
LIST(APPEND SRC ./src/ttimer.c)
|
||||
|
|
|
@ -644,6 +644,7 @@ static void doInitGlobalConfig() {
|
|||
tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING,
|
||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT,
|
||||
0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE);
|
||||
|
||||
// socket type, udp by default
|
||||
tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING,
|
||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW,
|
||||
|
|
|
@ -16,63 +16,462 @@
|
|||
#include "os.h"
|
||||
#include "tlog.h"
|
||||
|
||||
extern int32_t taosGetTimestampSec();
|
||||
static int32_t startTime = 0;
|
||||
static int64_t m_curLimit = 100*1024;
|
||||
#define TAOS_MEM_CHECK_IMPL
|
||||
#include "tutil.h"
|
||||
|
||||
bool isMallocMem(unsigned int size, char* _func) {
|
||||
if (0 == startTime) {
|
||||
startTime = taosGetTimestampSec();
|
||||
return true;
|
||||
} else {
|
||||
int32_t currentTime = taosGetTimestampSec();
|
||||
if (currentTime - startTime < 10) return true;
|
||||
|
||||
#ifdef TAOS_MEM_CHECK
|
||||
|
||||
static int allocMode = TAOS_ALLOC_MODE_DEFAULT;
|
||||
static FILE* fpAllocLog = NULL;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// memory allocator which fails randomly
|
||||
|
||||
extern int32_t taosGetTimestampSec();
|
||||
static int32_t startTime = INT32_MAX;;
|
||||
|
||||
static bool random_alloc_fail(size_t size, const char* file, uint32_t line) {
|
||||
if (taosGetTimestampSec() < startTime) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (size > m_curLimit) {
|
||||
if (3 == rand() % 20) {
|
||||
pTrace("====no alloc mem in func: %s, size:%d", _func, size);
|
||||
return false;
|
||||
}
|
||||
if (size < 100 * (size_t)1024) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (rand() % 20 != 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fpAllocLog != NULL) {
|
||||
fprintf(fpAllocLog, "%s:%d: memory allocation of %zu bytes will fail.\n", file, line, size);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void* taos_malloc(unsigned int size, char* _func) {
|
||||
static void* malloc_random(size_t size, const char* file, uint32_t line) {
|
||||
return random_alloc_fail(size, file, line) ? NULL : malloc(size);
|
||||
}
|
||||
|
||||
if (false == isMallocMem(size, _func)) {
|
||||
static void* calloc_random(size_t num, size_t size, const char* file, uint32_t line) {
|
||||
return random_alloc_fail(num * size, file, line) ? NULL : calloc(num, size);
|
||||
}
|
||||
|
||||
static void* realloc_random(void* ptr, size_t size, const char* file, uint32_t line) {
|
||||
return random_alloc_fail(size, file, line) ? NULL : realloc(ptr, size);
|
||||
}
|
||||
|
||||
static char* strdup_random(const char* str, const char* file, uint32_t line) {
|
||||
size_t len = strlen(str);
|
||||
return random_alloc_fail(len + 1, file, line) ? NULL : strdup(str);
|
||||
}
|
||||
|
||||
static char* strndup_random(const char* str, size_t size, const char* file, uint32_t line) {
|
||||
size_t len = strlen(str);
|
||||
if (len > size) {
|
||||
len = size;
|
||||
}
|
||||
return random_alloc_fail(len + 1, file, line) ? NULL : strndup(str, len);
|
||||
}
|
||||
|
||||
static ssize_t getline_random(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
|
||||
return random_alloc_fail(*n, file, line) ? -1 : getline(lineptr, n, stream);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// memory allocator with leak detection
|
||||
|
||||
#define MEMBLK_MAGIC 0x55AA
|
||||
|
||||
typedef struct SMemBlock {
|
||||
const char* file;
|
||||
uint16_t line;
|
||||
uint16_t magic;
|
||||
uint32_t size;
|
||||
struct SMemBlock* prev;
|
||||
struct SMemBlock* next;
|
||||
// TODO: need pading in 32bit platform
|
||||
char data[0];
|
||||
} SMemBlock;
|
||||
|
||||
static SMemBlock *blocks = NULL;
|
||||
static uintptr_t lock = 0;
|
||||
|
||||
static void add_mem_block(SMemBlock* blk) {
|
||||
blk->prev = NULL;
|
||||
while (atomic_val_compare_exchange_ptr(&lock, 0, 1) != 0);
|
||||
blk->next = blocks;
|
||||
if (blocks != NULL) {
|
||||
blocks->prev = blk;
|
||||
}
|
||||
blocks = blk;
|
||||
atomic_store_ptr(&lock, 0);
|
||||
}
|
||||
|
||||
static void remove_mem_block(SMemBlock* blk) {
|
||||
while (atomic_val_compare_exchange_ptr(&lock, 0, 1) != 0);
|
||||
|
||||
if (blocks == blk) {
|
||||
blocks = blk->next;
|
||||
}
|
||||
if (blk->prev != NULL) {
|
||||
blk->prev->next = blk->next;
|
||||
}
|
||||
if (blk->next != NULL) {
|
||||
blk->next->prev = blk->prev;
|
||||
}
|
||||
|
||||
atomic_store_ptr(&lock, 0);
|
||||
|
||||
blk->prev = NULL;
|
||||
blk->next = NULL;
|
||||
}
|
||||
|
||||
static void free_detect_leak(void* ptr, const char* file, uint32_t line) {
|
||||
if (ptr == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SMemBlock* blk = (SMemBlock*)(((char*)ptr) - sizeof(SMemBlock));
|
||||
if (blk->magic != MEMBLK_MAGIC) {
|
||||
if (fpAllocLog != NULL) {
|
||||
fprintf(fpAllocLog, "%s:%d: memory is allocated by default allocator.\n", file, line);
|
||||
}
|
||||
free(ptr);
|
||||
return;
|
||||
}
|
||||
|
||||
remove_mem_block(blk);
|
||||
free(blk);
|
||||
}
|
||||
|
||||
static void* malloc_detect_leak(size_t size, const char* file, uint32_t line) {
|
||||
if (size == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *p = NULL;
|
||||
p = malloc(size);
|
||||
|
||||
SMemBlock *blk = (SMemBlock*)malloc(size + sizeof(SMemBlock));
|
||||
if (blk == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (line > UINT16_MAX && fpAllocLog != NULL) {
|
||||
fprintf(fpAllocLog, "%s:%d: line number too large.\n", file, line);
|
||||
}
|
||||
|
||||
if (size > UINT32_MAX && fpAllocLog != NULL) {
|
||||
fprintf(fpAllocLog, "%s:%d: size too large: %zu.\n", file, line, size);
|
||||
}
|
||||
|
||||
blk->file = file;
|
||||
blk->line = (uint16_t)line;
|
||||
blk->magic = MEMBLK_MAGIC;
|
||||
blk->size = size;
|
||||
add_mem_block(blk);
|
||||
|
||||
return blk->data;
|
||||
}
|
||||
|
||||
static void* calloc_detect_leak(size_t num, size_t size, const char* file, uint32_t line) {
|
||||
size *= num;
|
||||
void* p = malloc_detect_leak(size, file, line);
|
||||
if (p != NULL) {
|
||||
memset(p, 0, size);
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
void* taos_calloc(unsigned int num, unsigned int size, char* _func) {
|
||||
|
||||
if (false == isMallocMem(size, _func)) {
|
||||
static void* realloc_detect_leak(void* ptr, size_t size, const char* file, uint32_t line) {
|
||||
if (size == 0) {
|
||||
free_detect_leak(ptr, file, line);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *p = NULL;
|
||||
p = calloc(num, size);
|
||||
|
||||
if (ptr == NULL) {
|
||||
return malloc_detect_leak(size, file, line);
|
||||
}
|
||||
|
||||
SMemBlock* blk = ((char*)ptr) - sizeof(SMemBlock);
|
||||
if (blk->magic != MEMBLK_MAGIC) {
|
||||
if (fpAllocLog != NULL) {
|
||||
fprintf(fpAllocLog, "%s:%d: memory is allocated by default allocator.\n", file, line);
|
||||
}
|
||||
return realloc(ptr, size);
|
||||
}
|
||||
|
||||
remove_mem_block(blk);
|
||||
|
||||
void* p = realloc(blk, size + sizeof(SMemBlock));
|
||||
if (p == NULL) {
|
||||
add_mem_block(blk);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (size > UINT32_MAX && fpAllocLog != NULL) {
|
||||
fprintf(fpAllocLog, "%s:%d: size too large: %zu.\n", file, line, size);
|
||||
}
|
||||
|
||||
blk = (SMemBlock*)p;
|
||||
blk->size = size;
|
||||
add_mem_block(blk);
|
||||
return blk->data;
|
||||
}
|
||||
|
||||
static char* strdup_detect_leak(const char* str, const char* file, uint32_t line) {
|
||||
size_t len = strlen(str);
|
||||
char *p = malloc_detect_leak(len + 1, file, line);
|
||||
if (p != NULL) {
|
||||
memcpy(p, str, len);
|
||||
p[len] = 0;
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
void* taos_realloc(void* ptr, unsigned int size, char* _func) {
|
||||
|
||||
if (false == isMallocMem(size, _func)) {
|
||||
return NULL;
|
||||
static char* strndup_detect_leak(const char* str, size_t size, const char* file, uint32_t line) {
|
||||
size_t len = strlen(str);
|
||||
if (len > size) {
|
||||
len = size;
|
||||
}
|
||||
char *p = malloc_detect_leak(len + 1, file, line);
|
||||
if (p != NULL) {
|
||||
memcpy(p, str, len);
|
||||
p[len] = 0;
|
||||
}
|
||||
|
||||
void *p = NULL;
|
||||
p = realloc(ptr, size);
|
||||
return p;
|
||||
}
|
||||
|
||||
void taos_free(void* ptr) {
|
||||
free(ptr);
|
||||
static ssize_t getline_detect_leak(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
|
||||
char* buf = NULL;
|
||||
size_t bufSize = 0;
|
||||
ssize_t size = getline(&buf, &bufSize, stream);
|
||||
if (size != -1) {
|
||||
if (*n < size + 1) {
|
||||
void* p = realloc_detect_leak(*lineptr, size + 1, file, line);
|
||||
if (p == NULL) {
|
||||
free(buf);
|
||||
return -1;
|
||||
}
|
||||
*lineptr = (char*)p;
|
||||
*n = size + 1;
|
||||
}
|
||||
memcpy(*lineptr, buf, size + 1);
|
||||
}
|
||||
|
||||
free(buf);
|
||||
return size;
|
||||
}
|
||||
|
||||
static void dump_memory_leak() {
|
||||
const char* hex = "0123456789ABCDEF";
|
||||
const char* fmt = ":%d: addr=%p, size=%d, content(first 16 bytes)=";
|
||||
size_t numOfBlk = 0, totalSize = 0;
|
||||
|
||||
if (fpAllocLog == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
fputs("memory blocks allocated but not freed before exit:\n", fpAllocLog);
|
||||
|
||||
while (atomic_val_compare_exchange_ptr(&lock, 0, 1) != 0);
|
||||
|
||||
for (SMemBlock* blk = blocks; blk != NULL; blk = blk->next) {
|
||||
++numOfBlk;
|
||||
totalSize += blk->size;
|
||||
|
||||
fputs(blk->file, fpAllocLog);
|
||||
fprintf(fpAllocLog, fmt, blk->line, blk->data, blk->size);
|
||||
|
||||
char sep = '\'';
|
||||
size_t size = blk->size > 16 ? 16 : blk->size;
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
uint8_t c = (uint8_t)(blk->data[i]);
|
||||
fputc(sep, fpAllocLog);
|
||||
sep = ' ';
|
||||
fputc(hex[c >> 4], fpAllocLog);
|
||||
fputc(hex[c & 0x0f], fpAllocLog);
|
||||
}
|
||||
|
||||
fputs("'\n", fpAllocLog);
|
||||
}
|
||||
|
||||
atomic_store_ptr(&lock, 0);
|
||||
|
||||
fprintf(fpAllocLog, "\nnumber of blocks: %zu, total bytes: %zu\n", numOfBlk, totalSize);
|
||||
fflush(fpAllocLog);
|
||||
}
|
||||
|
||||
static void dump_memory_leak_on_sig(int sig) {
|
||||
fprintf(fpAllocLog, "signal %d received.\n", sig);
|
||||
|
||||
// restore default signal handler
|
||||
struct sigaction act = {0};
|
||||
act.sa_handler = SIG_DFL;
|
||||
sigaction(sig, &act, NULL);
|
||||
|
||||
dump_memory_leak();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// interface functions
|
||||
|
||||
void* taos_malloc(size_t size, const char* file, uint32_t line) {
|
||||
switch (allocMode) {
|
||||
case TAOS_ALLOC_MODE_DEFAULT:
|
||||
return malloc(size);
|
||||
|
||||
case TAOS_ALLOC_MODE_RANDOM_FAIL:
|
||||
return malloc_random(size, file, line);
|
||||
|
||||
case TAOS_ALLOC_MODE_DETECT_LEAK:
|
||||
return malloc_detect_leak(size, file, line);
|
||||
}
|
||||
return malloc(size);
|
||||
}
|
||||
|
||||
void* taos_calloc(size_t num, size_t size, const char* file, uint32_t line) {
|
||||
switch (allocMode) {
|
||||
case TAOS_ALLOC_MODE_DEFAULT:
|
||||
return calloc(num, size);
|
||||
|
||||
case TAOS_ALLOC_MODE_RANDOM_FAIL:
|
||||
return calloc_random(num, size, file, line);
|
||||
|
||||
case TAOS_ALLOC_MODE_DETECT_LEAK:
|
||||
return calloc_detect_leak(num, size, file, line);
|
||||
}
|
||||
return calloc(num, size);
|
||||
}
|
||||
|
||||
void* taos_realloc(void* ptr, size_t size, const char* file, uint32_t line) {
|
||||
switch (allocMode) {
|
||||
case TAOS_ALLOC_MODE_DEFAULT:
|
||||
return realloc(ptr, size);
|
||||
|
||||
case TAOS_ALLOC_MODE_RANDOM_FAIL:
|
||||
return realloc_random(ptr, size, file, line);
|
||||
|
||||
case TAOS_ALLOC_MODE_DETECT_LEAK:
|
||||
return realloc_detect_leak(ptr, size, file, line);
|
||||
}
|
||||
return realloc(ptr, size);
|
||||
}
|
||||
|
||||
void taos_free(void* ptr, const char* file, uint32_t line) {
|
||||
switch (allocMode) {
|
||||
case TAOS_ALLOC_MODE_DEFAULT:
|
||||
return free(ptr);
|
||||
|
||||
case TAOS_ALLOC_MODE_RANDOM_FAIL:
|
||||
return free(ptr);
|
||||
|
||||
case TAOS_ALLOC_MODE_DETECT_LEAK:
|
||||
return free_detect_leak(ptr, file, line);
|
||||
}
|
||||
return free(ptr);
|
||||
}
|
||||
|
||||
char* taos_strdup(const char* str, const char* file, uint32_t line) {
|
||||
switch (allocMode) {
|
||||
case TAOS_ALLOC_MODE_DEFAULT:
|
||||
return strdup(str);
|
||||
|
||||
case TAOS_ALLOC_MODE_RANDOM_FAIL:
|
||||
return strdup_random(str, file, line);
|
||||
|
||||
case TAOS_ALLOC_MODE_DETECT_LEAK:
|
||||
return strdup_detect_leak(str, file, line);
|
||||
}
|
||||
return strdup(str);
|
||||
}
|
||||
|
||||
char* taos_strndup(const char* str, size_t size, const char* file, uint32_t line) {
|
||||
switch (allocMode) {
|
||||
case TAOS_ALLOC_MODE_DEFAULT:
|
||||
return strndup(str, size);
|
||||
|
||||
case TAOS_ALLOC_MODE_RANDOM_FAIL:
|
||||
return strndup_random(str, size, file, line);
|
||||
|
||||
case TAOS_ALLOC_MODE_DETECT_LEAK:
|
||||
return strndup_detect_leak(str, size, file, line);
|
||||
}
|
||||
return strndup(str, size);
|
||||
}
|
||||
|
||||
ssize_t taos_getline(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
|
||||
switch (allocMode) {
|
||||
case TAOS_ALLOC_MODE_DEFAULT:
|
||||
return getline(lineptr, n, stream);
|
||||
|
||||
case TAOS_ALLOC_MODE_RANDOM_FAIL:
|
||||
return getline_random(lineptr, n, stream, file, line);
|
||||
|
||||
case TAOS_ALLOC_MODE_DETECT_LEAK:
|
||||
return getline_detect_leak(lineptr, n, stream, file, line);
|
||||
}
|
||||
return getline(lineptr, n, stream);
|
||||
}
|
||||
|
||||
static void close_alloc_log() {
|
||||
if (fpAllocLog != NULL) {
|
||||
if (fpAllocLog != stdout) {
|
||||
fclose(fpAllocLog);
|
||||
}
|
||||
fpAllocLog = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void taosSetAllocMode(int mode, const char* path, bool autoDump) {
|
||||
assert(mode >= TAOS_ALLOC_MODE_DEFAULT);
|
||||
assert(mode <= TAOS_ALLOC_MODE_DETECT_LEAK);
|
||||
|
||||
if (fpAllocLog != NULL || allocMode != TAOS_ALLOC_MODE_DEFAULT) {
|
||||
printf("memory allocation mode can only be set once.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
if (path == NULL || path[0] == 0) {
|
||||
fpAllocLog = stdout;
|
||||
} else if ((fpAllocLog = fopen(path, "w")) != NULL) {
|
||||
atexit(close_alloc_log);
|
||||
} else {
|
||||
printf("failed to open memory allocation log file '%s', errno=%d\n", path, errno);
|
||||
return;
|
||||
}
|
||||
|
||||
allocMode = mode;
|
||||
|
||||
if (mode == TAOS_ALLOC_MODE_RANDOM_FAIL) {
|
||||
startTime = taosGetTimestampSec() + 10;
|
||||
return;
|
||||
}
|
||||
|
||||
if (autoDump && mode == TAOS_ALLOC_MODE_DETECT_LEAK) {
|
||||
atexit(dump_memory_leak);
|
||||
|
||||
struct sigaction act = {0};
|
||||
act.sa_handler = dump_memory_leak_on_sig;
|
||||
sigaction(SIGFPE, &act, NULL);
|
||||
sigaction(SIGSEGV, &act, NULL);
|
||||
sigaction(SIGILL, &act, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void taosDumpMemoryLeak() {
|
||||
dump_memory_leak();
|
||||
close_alloc_log();
|
||||
}
|
||||
|
||||
#else // 'TAOS_MEM_CHECK' not defined
|
||||
|
||||
void taosSetAllocMode(int mode, const char* path, bool autoDump) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
void taosDumpMemoryLeak() {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
#endif // TAOS_MEM_CHECK
|
||||
|
|
|
@ -224,6 +224,7 @@ static SKeyword keywordTable[] = {
|
|||
{"METRICS", TK_METRICS},
|
||||
{"STABLE", TK_STABLE},
|
||||
{"FILE", TK_FILE},
|
||||
{"VNODES", TK_VNODES},
|
||||
};
|
||||
|
||||
/* This is the hash table */
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
char version[64] = "1.6.4.0";
|
||||
char compatible_version[64] = "1.6.1.0";
|
||||
char gitinfo[128] = "d04354a8ac2f7dd9ba521d755e5d484a203783d9";
|
||||
char buildinfo[512] = "Built by root at 2019-11-11 10:23";
|
||||
char gitinfo[128] = "b6e308866e315483915f4c42a2717547ed0b9d36";
|
||||
char buildinfo[512] = "Built by ubuntu at 2019-11-26 21:56";
|
||||
|
|
Loading…
Reference in New Issue