Merge branch 'develop' into coverity_scan

This commit is contained in:
Ping Xiao 2020-09-06 19:24:41 +08:00
commit b2c3e10385
46 changed files with 1876 additions and 1554 deletions

View File

@ -13,6 +13,7 @@ ENDIF ()
SET(TD_ACCOUNT FALSE)
SET(TD_ADMIN FALSE)
SET(TD_GRANT FALSE)
SET(TD_TSDB_PLUGINS FALSE)
SET(TD_COVER FALSE)
SET(TD_MEM_CHECK FALSE)

View File

@ -13,6 +13,10 @@ IF (TD_GRANT)
ADD_DEFINITIONS(-D_GRANT)
ENDIF ()
IF (TD_TSDB_PLUGINS)
ADD_DEFINITIONS(-D_TSDB_PLUGINS)
ENDIF ()
IF (TD_GODLL)
ADD_DEFINITIONS(-D_TD_GO_DLL_)
ENDIF ()

View File

@ -114,6 +114,9 @@ ELSEIF (${OSTYPE} MATCHES "Ningsi80")
MESSAGE(STATUS "input osType: Ningsi80")
ELSEIF (${OSTYPE} MATCHES "Linux")
MESSAGE(STATUS "input osType: Linux")
ELSEIF (${OSTYPE} MATCHES "Alpine")
MESSAGE(STATUS "input osType: Alpine")
SET(TD_APLHINE TRUE)
ELSE ()
MESSAGE(STATUS "input osType unknown: " ${OSTYPE})
ENDIF ()
ENDIF ()

View File

@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.2.0")
SET(TD_VER_NUMBER "2.0.3.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

View File

@ -82,13 +82,23 @@ TDengine缺省的时间戳是毫秒精度但通过修改配置参数enableMic
```
删除数据库。所包含的全部数据表将被删除,谨慎使用
- **修改数据库参数**
```mysql
ALTER DATABASE db_name COMP 2;
```
修改数据库文件压缩标志位有效数字为012. 0表示不压缩1表示一阶段压缩2表示两阶段压缩。修改后可以使用show databases命令查看是否修改成功
```mysql
ALTER DATABASE db_name REPLICA 2;
```
修改数据库副本数有效副本数为1到3。在集群中使用副本数必须小于dnode的数目。修改后可以使用show databases命令查看是否修改成功
- **显示系统所有数据库**
```mysql
SHOW DATABASES;
```
## 表管理
- **创建数据表**

View File

@ -47,7 +47,7 @@ Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
因为TDengine具有很好的水平扩展能力根据总量再根据单个物理机或虚拟机的资源就可以轻松决定需要购置多少台物理机或虚拟机了。
具体计算公式,请参见页面<a href='https://www.taosdata.com/config/config.html'>资源估算方法</a>
**立即计算CPU、内存、存储请参见<a href='https://www.taosdata.com/config/config.html'>资源估算方法</a>**
## 容错和灾备

View File

@ -1,6 +1,5 @@
#!/bin/bash
set -x
$1
docker build --rm -f "Dockerfile" -t tdengine/tdengine:$1 "."
docker login -u tdengine -p $2 #replace the docker registry username and password
docker push tdengine/tdengine:$1

View File

@ -18,41 +18,18 @@ apps:
- network
- network-bind
- system-observe
- systemfiles
taos:
command: taoswrapper.sh
plugs:
- network
- system-observe
- systemfiles
- historyfile
taosdemo:
command: usr/bin/taosdemo
plugs:
- network
plugs:
historyfile:
interface: personal-files
read:
- $HOME/.taos_history
write:
- $HOME/.taos_history
systemfiles:
interface: system-files
read:
- /etc/taos
- /var/lib/taos
- /var/log/taos
- /tmp
write:
- /var/log/taos
- /var/lib/taos
- /tmp
parts:
script:
plugin: dump
@ -115,8 +92,3 @@ layout:
bind: $SNAP_DATA/var/log/taos
/etc/taos:
bind: $SNAP_DATA/etc/taos
hooks:
install:
plugs: [systemfiles, historyfile]

View File

@ -348,6 +348,7 @@ typedef struct SSqlObj {
void * pStream;
void * pSubscription;
char * sqlstr;
char parseRetry;
char retry;
char maxRetry;
SRpcEpSet epSet;

View File

@ -43,6 +43,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->parseRetry= 0;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp;
pSql->fetchFp = fp;

View File

@ -1335,13 +1335,13 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
// make a backup as tsParseInsertSql may modify the string
char* sqlstr = strdup(pSql->sqlstr);
ret = tsParseInsertSql(pSql);
if (sqlstr == NULL || pSql->retry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
if (sqlstr == NULL || pSql->parseRetry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
free(sqlstr);
} else {
tscResetSqlCmdObj(pCmd, true);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->retry++;
pSql->parseRetry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
ret = tsParseInsertSql(pSql);
}
@ -1349,18 +1349,14 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
} else {
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->retry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
tscResetSqlCmdObj(pCmd, true);
pSql->retry++;
pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
}
SQLInfoDestroy(&SQLInfo);
}
if (ret == TSDB_CODE_SUCCESS) {
pSql->retry = 0;
}
/*
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
* so do NOT use pRes->code to determine if the getTableMeta function

View File

@ -5088,7 +5088,7 @@ static int32_t setTimePrecision(SSqlCmd* pCmd, SCMCreateDbMsg* pMsg, SCreateDBIn
}
static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->maxTables = htonl(pCreateDb->maxTablesPerVnode);
pMsg->maxTables = htonl(-1); // max tables can not be set anymore
pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize);
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks);
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);

View File

@ -406,7 +406,7 @@ int doProcessSql(SSqlObj *pSql) {
if (code != TSDB_CODE_SUCCESS) {
pRes->code = code;
tscQueueAsyncRes(pSql);
return pRes->code;
return code;
}
return TSDB_CODE_SUCCESS;

View File

@ -718,6 +718,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
return TSDB_CODE_SUCCESS;
}
// TODO: all subqueries should be freed correctly before close this connection.
void tscCloseTscObj(STscObj* pObj) {
assert(pObj != NULL);
@ -727,6 +728,7 @@ void tscCloseTscObj(STscObj* pObj) {
if (pObj->pDnodeConn != NULL) {
rpcClose(pObj->pDnodeConn);
pObj->pDnodeConn = NULL;
}
tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, pObj->pDnodeConn);

View File

@ -38,6 +38,10 @@ extern "C" {
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
#define TSDB_STATE_BAD_FILE 0x1
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
void *appH;
@ -80,6 +84,7 @@ int32_t tsdbDropRepo(char *rootDir);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
int tsdbGetState(TSDB_REPO_T *repo);
// --------- TSDB TABLE DEFINITION
typedef struct {

View File

@ -101,127 +101,126 @@
#define TK_CONNS 83
#define TK_STATE 84
#define TK_KEEP 85
#define TK_MAXTABLES 86
#define TK_CACHE 87
#define TK_REPLICA 88
#define TK_QUORUM 89
#define TK_DAYS 90
#define TK_MINROWS 91
#define TK_MAXROWS 92
#define TK_BLOCKS 93
#define TK_CTIME 94
#define TK_WAL 95
#define TK_FSYNC 96
#define TK_COMP 97
#define TK_PRECISION 98
#define TK_LP 99
#define TK_RP 100
#define TK_TAGS 101
#define TK_USING 102
#define TK_AS 103
#define TK_COMMA 104
#define TK_NULL 105
#define TK_SELECT 106
#define TK_UNION 107
#define TK_ALL 108
#define TK_FROM 109
#define TK_VARIABLE 110
#define TK_INTERVAL 111
#define TK_FILL 112
#define TK_SLIDING 113
#define TK_ORDER 114
#define TK_BY 115
#define TK_ASC 116
#define TK_DESC 117
#define TK_GROUP 118
#define TK_HAVING 119
#define TK_LIMIT 120
#define TK_OFFSET 121
#define TK_SLIMIT 122
#define TK_SOFFSET 123
#define TK_WHERE 124
#define TK_NOW 125
#define TK_RESET 126
#define TK_QUERY 127
#define TK_ADD 128
#define TK_COLUMN 129
#define TK_TAG 130
#define TK_CHANGE 131
#define TK_SET 132
#define TK_KILL 133
#define TK_CONNECTION 134
#define TK_STREAM 135
#define TK_COLON 136
#define TK_ABORT 137
#define TK_AFTER 138
#define TK_ATTACH 139
#define TK_BEFORE 140
#define TK_BEGIN 141
#define TK_CASCADE 142
#define TK_CLUSTER 143
#define TK_CONFLICT 144
#define TK_COPY 145
#define TK_DEFERRED 146
#define TK_DELIMITERS 147
#define TK_DETACH 148
#define TK_EACH 149
#define TK_END 150
#define TK_EXPLAIN 151
#define TK_FAIL 152
#define TK_FOR 153
#define TK_IGNORE 154
#define TK_IMMEDIATE 155
#define TK_INITIALLY 156
#define TK_INSTEAD 157
#define TK_MATCH 158
#define TK_KEY 159
#define TK_OF 160
#define TK_RAISE 161
#define TK_REPLACE 162
#define TK_RESTRICT 163
#define TK_ROW 164
#define TK_STATEMENT 165
#define TK_TRIGGER 166
#define TK_VIEW 167
#define TK_COUNT 168
#define TK_SUM 169
#define TK_AVG 170
#define TK_MIN 171
#define TK_MAX 172
#define TK_FIRST 173
#define TK_LAST 174
#define TK_TOP 175
#define TK_BOTTOM 176
#define TK_STDDEV 177
#define TK_PERCENTILE 178
#define TK_APERCENTILE 179
#define TK_LEASTSQUARES 180
#define TK_HISTOGRAM 181
#define TK_DIFF 182
#define TK_SPREAD 183
#define TK_TWA 184
#define TK_INTERP 185
#define TK_LAST_ROW 186
#define TK_RATE 187
#define TK_IRATE 188
#define TK_SUM_RATE 189
#define TK_SUM_IRATE 190
#define TK_AVG_RATE 191
#define TK_AVG_IRATE 192
#define TK_TBID 193
#define TK_SEMI 194
#define TK_NONE 195
#define TK_PREV 196
#define TK_LINEAR 197
#define TK_IMPORT 198
#define TK_METRIC 199
#define TK_TBNAME 200
#define TK_JOIN 201
#define TK_METRICS 202
#define TK_STABLE 203
#define TK_INSERT 204
#define TK_INTO 205
#define TK_VALUES 206
#define TK_CACHE 86
#define TK_REPLICA 87
#define TK_QUORUM 88
#define TK_DAYS 89
#define TK_MINROWS 90
#define TK_MAXROWS 91
#define TK_BLOCKS 92
#define TK_CTIME 93
#define TK_WAL 94
#define TK_FSYNC 95
#define TK_COMP 96
#define TK_PRECISION 97
#define TK_LP 98
#define TK_RP 99
#define TK_TAGS 100
#define TK_USING 101
#define TK_AS 102
#define TK_COMMA 103
#define TK_NULL 104
#define TK_SELECT 105
#define TK_UNION 106
#define TK_ALL 107
#define TK_FROM 108
#define TK_VARIABLE 109
#define TK_INTERVAL 110
#define TK_FILL 111
#define TK_SLIDING 112
#define TK_ORDER 113
#define TK_BY 114
#define TK_ASC 115
#define TK_DESC 116
#define TK_GROUP 117
#define TK_HAVING 118
#define TK_LIMIT 119
#define TK_OFFSET 120
#define TK_SLIMIT 121
#define TK_SOFFSET 122
#define TK_WHERE 123
#define TK_NOW 124
#define TK_RESET 125
#define TK_QUERY 126
#define TK_ADD 127
#define TK_COLUMN 128
#define TK_TAG 129
#define TK_CHANGE 130
#define TK_SET 131
#define TK_KILL 132
#define TK_CONNECTION 133
#define TK_STREAM 134
#define TK_COLON 135
#define TK_ABORT 136
#define TK_AFTER 137
#define TK_ATTACH 138
#define TK_BEFORE 139
#define TK_BEGIN 140
#define TK_CASCADE 141
#define TK_CLUSTER 142
#define TK_CONFLICT 143
#define TK_COPY 144
#define TK_DEFERRED 145
#define TK_DELIMITERS 146
#define TK_DETACH 147
#define TK_EACH 148
#define TK_END 149
#define TK_EXPLAIN 150
#define TK_FAIL 151
#define TK_FOR 152
#define TK_IGNORE 153
#define TK_IMMEDIATE 154
#define TK_INITIALLY 155
#define TK_INSTEAD 156
#define TK_MATCH 157
#define TK_KEY 158
#define TK_OF 159
#define TK_RAISE 160
#define TK_REPLACE 161
#define TK_RESTRICT 162
#define TK_ROW 163
#define TK_STATEMENT 164
#define TK_TRIGGER 165
#define TK_VIEW 166
#define TK_COUNT 167
#define TK_SUM 168
#define TK_AVG 169
#define TK_MIN 170
#define TK_MAX 171
#define TK_FIRST 172
#define TK_LAST 173
#define TK_TOP 174
#define TK_BOTTOM 175
#define TK_STDDEV 176
#define TK_PERCENTILE 177
#define TK_APERCENTILE 178
#define TK_LEASTSQUARES 179
#define TK_HISTOGRAM 180
#define TK_DIFF 181
#define TK_SPREAD 182
#define TK_TWA 183
#define TK_INTERP 184
#define TK_LAST_ROW 185
#define TK_RATE 186
#define TK_IRATE 187
#define TK_SUM_RATE 188
#define TK_SUM_IRATE 189
#define TK_AVG_RATE 190
#define TK_AVG_IRATE 191
#define TK_TBID 192
#define TK_SEMI 193
#define TK_NONE 194
#define TK_PREV 195
#define TK_LINEAR 196
#define TK_IMPORT 197
#define TK_METRIC 198
#define TK_TBNAME 199
#define TK_JOIN 200
#define TK_METRICS 201
#define TK_STABLE 202
#define TK_INSERT 203
#define TK_INTO 204
#define TK_VALUES 205
#define TK_SPACE 300

View File

@ -409,7 +409,7 @@ void set_terminal_mode() {
}
}
void get_history_path(char *history) { sprintf(history, "%s/%s", getpwuid(getuid())->pw_dir, HISTORY_FILE); }
void get_history_path(char *history) { sprintf(history, "%s/%s", getenv("HOME"), HISTORY_FILE); }
void clearScreen(int ecmd_pos, int cursor_pos) {
struct winsize w;

View File

@ -36,7 +36,7 @@ extern "C" {
#include "osLinux32.h"
#endif
#ifdef _TD_ALPINE
#ifdef _ALPINE
#include "osAlpine.h"
#endif

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_LINUX64_H
#define TDENGINE_OS_LINUX64_H
#ifndef TDENGINE_OS_ALPINE_H
#define TDENGINE_OS_ALPINE_H
#ifdef __cplusplus
extern "C" {

View File

@ -75,7 +75,9 @@ extern "C" {
#include <fcntl.h>
#include <sys/utsname.h>
#include <sys/resource.h>
#ifndef _ALPINE
#include <error.h>
#endif
#include <linux/sysctl.h>
#ifdef __cplusplus

View File

@ -579,7 +579,11 @@ void taosSetCoreDump() {
struct rlimit rlim;
struct rlimit rlim_new;
if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
#ifndef _ALPINE
uInfo("the old unlimited para: rlim_cur=%" PRIu64 ", rlim_max=%" PRIu64, rlim.rlim_cur, rlim.rlim_max);
#else
uInfo("the old unlimited para: rlim_cur=%llu, rlim_max=%llu", rlim.rlim_cur, rlim.rlim_max);
#endif
rlim_new.rlim_cur = RLIM_INFINITY;
rlim_new.rlim_max = RLIM_INFINITY;
if (setrlimit(RLIMIT_CORE, &rlim_new) != 0) {
@ -591,7 +595,11 @@ void taosSetCoreDump() {
}
if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
#ifndef _ALPINE
uInfo("the new unlimited para: rlim_cur=%" PRIu64 ", rlim_max=%" PRIu64, rlim.rlim_cur, rlim.rlim_max);
#else
uInfo("the new unlimited para: rlim_cur=%llu, rlim_max=%llu", rlim.rlim_cur, rlim.rlim_max);
#endif
}
#ifndef _TD_ARM_
@ -659,4 +667,4 @@ bool taosGetSystemUid(char *uid) {
return false;
}
#endif
#endif

View File

@ -217,7 +217,6 @@ acct_optr(Y) ::= pps(C) tseries(D) storage(P) streams(F) qtime(Q) dbs(E) users(K
%destructor keep {tVariantListDestroy($$);}
keep(Y) ::= KEEP tagitemlist(X). { Y = X; }
tables(Y) ::= MAXTABLES INTEGER(X). { Y = X; }
cache(Y) ::= CACHE INTEGER(X). { Y = X; }
replica(Y) ::= REPLICA INTEGER(X). { Y = X; }
quorum(Y) ::= QUORUM INTEGER(X). { Y = X; }
@ -234,7 +233,6 @@ prec(Y) ::= PRECISION STRING(X). { Y = X; }
%type db_optr {SCreateDBInfo}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);}
db_optr(Y) ::= db_optr(Z) tables(X). { Y = Z; Y.maxTablesPerVnode = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) cache(X). { Y = Z; Y.cacheBlockSize = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) replica(X). { Y = Z; Y.replica = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) quorum(X). { Y = Z; Y.quorum = strtol(X.z, NULL, 10); }
@ -254,7 +252,6 @@ alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);}
alter_db_optr(Y) ::= alter_db_optr(Z) replica(X). { Y = Z; Y.replica = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) quorum(X). { Y = Z; Y.quorum = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) tables(X). { Y = Z; Y.maxTablesPerVnode = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }

View File

@ -121,7 +121,6 @@ static SKeyword keywordTable[] = {
{"MINROWS", TK_MINROWS},
{"MAXROWS", TK_MAXROWS},
{"BLOCKS", TK_BLOCKS},
{"MAXTABLES", TK_MAXTABLES},
{"CACHE", TK_CACHE},
{"CTIME", TK_CTIME},
{"WAL", TK_WAL},

File diff suppressed because it is too large Load Diff

View File

@ -45,6 +45,8 @@ extern int tsdbDebugFlag;
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax)))
// NOTE: Any file format change must increase this version number by 1
// Also, implement the convert function
#define TSDB_FILE_VERSION ((uint32_t)0)
@ -318,6 +320,16 @@ typedef struct {
void* compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper;
// ------------------ tsdbScan.c
typedef struct {
SFileGroup fGroup;
int numOfIdx;
SCompIdx* pCompIdx;
SCompInfo* pCompInfo;
void* pBuf;
FILE* tLogStream;
} STsdbScanHandle;
// Operations
// ------------------ tsdbMeta.c
#define TSDB_INIT_NTABLES 1024
@ -475,6 +487,7 @@ int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
@ -513,7 +526,10 @@ int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
int tsdbWriteCompInfo(SRWHelper* pHelper);
int tsdbWriteCompIdx(SRWHelper* pHelper);
int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer);
int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SCompIdx** ppCompIdx, int* numOfIdx);
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfoImpl(SFile* pFile, SCompIdx* pIdx, SCompInfo** ppCompInfo);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
@ -537,7 +553,7 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
char* tsdbGetMetaFileName(char* rootDir);
void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname);
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname);
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
@ -546,6 +562,16 @@ STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
// ------------------ tsdbScan.c
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid);
STsdbScanHandle* tsdbNewScanHandle();
void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream);
int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid);
int tsdbScanSCompIdx(STsdbScanHandle* pScanHandle);
int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx);
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle);
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,14 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -302,7 +302,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
memset((void *)pFile, 0, sizeof(SFile));
pFile->fd = -1;
tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
if (access(pFile->fname, F_OK) == 0) {
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname);
@ -424,33 +424,57 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
}
}
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
uint32_t version = 0;
STsdbFileInfo info = {0};
int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
int fd = open(fname, O_RDONLY);
if (fd < 0) goto _err;
if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
tsdbError("failed to lseek file %s to start since %s", pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosTRead(fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err;
if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read file %s header part with %d bytes, reason:%s", pFile->fname, TSDB_FILE_HEAD_SIZE,
strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) goto _err;
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("file %s header part is corrupted with failed checksum", pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
void *pBuf = (void *)buf;
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &info);
pBuf = taosDecodeFixedU32(pBuf, version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
off_t offset = lseek(fd, 0, SEEK_END);
return 0;
}
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
uint32_t version = 0;
SFile file;
SFile * pFile = &file;
strncpy(pFile->fname, fname, TSDB_FILENAME_LEN);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err;
off_t offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) goto _err;
close(fd);
tsdbCloseFile(pFile);
*magic = info.magic;
*magic = pFile->info.magic;
*size = offset;
return;
_err:
if (fd >= 0) close(fd);
tsdbCloseFile(pFile);
*magic = TSDB_FILE_INIT_MAGIC;
*size = 0;
}
@ -458,34 +482,23 @@ _err:
// ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version;
char buf[512] = "\0";
tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILE_HEAD_SIZE,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
if (tsdbLoadFileHeader(pFile, &version) < 0) {
tsdbError("vgId:%d failed to load file %s header part since %s", REPO_ID(pRepo), pFile->fname, tstrerror(terrno));
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("vgId:%d file %s head part is corrupted", REPO_ID(pRepo), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
void *pBuf = buf;
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
if (pFile->info.size == TSDB_FILE_HEAD_SIZE) {
pFile->info.size = lseek(pFile->fd, 0, SEEK_END);
}
if (version != TSDB_FILE_VERSION) {
// TODO: deal with error
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
}
@ -529,6 +542,7 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) {
memset(&pFGroup->files[type].info, 0, sizeof(STsdbFileInfo));
pFGroup->files[type].info.magic = TSDB_FILE_INIT_MAGIC;
pFGroup->state = 1;
pRepo->state = TSDB_STATE_BAD_FILE;
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
}
}

View File

@ -142,7 +142,6 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
}
tsdbStartStream(pRepo);
// pRepo->state = TSDB_REPO_STATE_ACTIVE;
tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo));
@ -341,6 +340,10 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
*compStorage = pRepo->stat.compStorage;
}
int tsdbGetState(TSDB_REPO_T *repo) {
return ((STsdbRepo *)repo)->state;
}
// ----------------- INTERNAL FUNCTIONS -----------------
char *tsdbGetMetaFileName(char *rootDir) {
int tlen = (int)(strlen(rootDir) + strlen(TSDB_META_FILE_NAME) + 2);
@ -354,8 +357,8 @@ char *tsdbGetMetaFileName(char *rootDir) {
return fname;
}
void tsdbGetDataFileName(STsdbRepo *pRepo, int fid, int type, char *fname) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", pRepo->rootDir, TSDB_DATA_DIR_NAME, REPO_ID(pRepo), fid, tsdbFileSuffix[type]);
void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, char *fname) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]);
}
int tsdbLockRepo(STsdbRepo *pRepo) {
@ -661,6 +664,8 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
goto _err;
}
pRepo->state = TSDB_STATE_OK;
int code = pthread_mutex_init(&pRepo->mutex, NULL);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);

View File

@ -102,7 +102,8 @@ void tsdbResetHelper(SRWHelper *pHelper) {
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
ASSERT(pHelper != NULL && pGroup != NULL);
SFile *pFile = NULL;
SFile * pFile = NULL;
STsdbRepo *pRepo = pHelper->pRepo;
// Clear the helper object
tsdbResetHelper(pHelper);
@ -112,8 +113,10 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Set the files
pHelper->files.fGroup = *pGroup;
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname);
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
helperNewHeadF(pHelper)->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
helperNewLastF(pHelper)->fname);
}
// Open the files
@ -443,10 +446,64 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return 0;
}
int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffer) {
const char *prefixMsg = "failed to load SCompIdx part";
if (lseek(pFile->fd, offset, SEEK_SET) < 0) {
tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, pFile->fname, offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosTRead(pFile->fd, buffer, len) < len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, offset, len,
strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buffer, len)) {
tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, pFile->fname, offset, len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
return 0;
}
int tsdbDecodeSCompIdxImpl(void *buffer, uint32_t len, SCompIdx **ppCompIdx, int *numOfIdx) {
int nIdx = 0;
void *pPtr = buffer;
while (POINTER_DISTANCE(pPtr, buffer) < (int)(len - sizeof(TSCKSUM))) {
size_t tlen = taosTSizeof(*ppCompIdx);
if (tlen < sizeof(SCompIdx) * (nIdx + 1)) {
*ppCompIdx = (SCompIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2);
if (*ppCompIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
pPtr = tsdbDecodeSCompIdx(pPtr, &((*ppCompIdx)[nIdx]));
if (pPtr == NULL) {
tsdbError("failed to decode SCompIdx part, idx:%d", nIdx);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
nIdx++;
ASSERT(nIdx == 1 || (*ppCompIdx)[nIdx - 1].tid > (*ppCompIdx)[nIdx - 2].tid);
ASSERT(POINTER_DISTANCE(pPtr, buffer) <= (int)(len - sizeof(TSCKSUM)));
}
*numOfIdx = nIdx;
return 0;
}
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
SFile *pFile = helperHeadF(pHelper);
int fd = pFile->fd;
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
// If not load from file, just load it in object
@ -456,54 +513,18 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
return -1;
}
if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
// Load SCompIdx binary from file
if (tsdbLoadCompIdxImpl(pFile, pFile->info.offset, pFile->info.len, (void *)(pHelper->pBuffer)) < 0) {
return -1;
}
if (taosTRead(fd, (void *)(pHelper->pBuffer), pFile->info.len) < (int)pFile->info.len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
// Decode the SCompIdx part
if (tsdbDecodeSCompIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray),
&(pHelper->idxH.numOfIdx)) < 0) {
tsdbError("vgId:%d failed to decode SCompIdx part from file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
tstrerror(errno));
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
// Decode it
pHelper->idxH.numOfIdx = 0;
void *ptr = pHelper->pBuffer;
while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (int)(pFile->info.len - sizeof(TSCKSUM))) {
size_t tlen = taosTSizeof(pHelper->idxH.pIdxArray);
pHelper->idxH.numOfIdx++;
if (tlen < pHelper->idxH.numOfIdx * sizeof(SCompIdx)) {
pHelper->idxH.pIdxArray = (SCompIdx *)taosTRealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2);
if (pHelper->idxH.pIdxArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
ptr = tsdbDecodeSCompIdx(ptr, &(pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1]));
if (ptr == NULL) {
tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
ASSERT(pHelper->idxH.numOfIdx == 1 || pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1].tid >
pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 2].tid);
ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= (int)(pFile->info.len - sizeof(TSCKSUM)));
}
}
}
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
@ -515,36 +536,49 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
return 0;
}
int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) {
const char *prefixMsg = "failed to load SCompInfo/SCompBlock part";
if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) {
tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
*ppCompInfo = taosTRealloc((void *)(*ppCompInfo), pIdx->len);
if (*ppCompInfo == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (taosTRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, pIdx->len,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(*ppCompInfo), pIdx->len)) {
tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, pFile->fname, pIdx->offset, pIdx->len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
return 0;
}
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
SCompIdx *pIdx = &(pHelper->curCompIdx);
int fd = helperHeadF(pHelper)->fd;
SFile *pFile = helperHeadF(pHelper);
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (pIdx->offset > 0) {
ASSERT(pIdx->uid == pHelper->tableInfo.uid);
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperHeadF(pHelper)->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pHelper->pCompInfo = taosTRealloc((void *)pHelper->pCompInfo, pIdx->len);
if (taosTRead(fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
helperHeadF(pHelper)->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) {
tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo),
helperHeadF(pHelper)->fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (tsdbLoadCompInfoImpl(pFile, pIdx, &(pHelper->pCompInfo)) < 0) return -1;
ASSERT(pIdx->uid == pHelper->pCompInfo->uid && pIdx->tid == pHelper->pCompInfo->tid);
}

View File

@ -734,6 +734,11 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
return TSDB_CODE_SUCCESS;
}
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle);
static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
@ -742,11 +747,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
assert(cur->pos >= 0 && cur->pos <= binfo.rows);
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
tsdbDebug("%p key in mem:%"PRId64", %p", pQueryHandle, key, pQueryHandle->qinfo);
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
@ -785,14 +790,32 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
*/
assert(pQueryHandle->outputCapacity >= binfo.rows);
pQueryHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
pCheckInfo->lastKey = cur->lastKey;
if ((cur->pos == 0 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->pos == (binfo.rows - 1) && (!ASCENDING_TRAVERSE(pQueryHandle->order)))) {
pQueryHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
cur->lastKey = binfo.window.ekey + 1;
cur->pos = binfo.rows;
} else {
cur->lastKey = binfo.window.skey - 1;
cur->pos = -1;
}
} else { // partially copy to dest buffer
int32_t endPos = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.rows - 1): 0;
copyAllRemainRowsFromFileBlock(pQueryHandle, pCheckInfo, &binfo, endPos);
cur->mixBlock = true;
}
assert(cur->blockCompleted);
tsdbDebug("create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %p",
cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pQueryHandle);
}
return code;
@ -823,6 +846,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows - 1);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
} else { //desc order, query ended in current block
@ -842,6 +866,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows-1);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
}
@ -912,7 +937,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
return midPos;
}
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
char* pData = NULL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
@ -1137,6 +1162,47 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) {
}
}
static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
int32_t pos = cur->pos;
int32_t start = cur->pos;
int32_t end = endPos;
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(start >= end);
SWAP(start, end, int32_t);
}
assert(pQueryHandle->outputCapacity >= (end - start + 1));
int32_t numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, start, end);
// the time window should always be ascending order: skey <= ekey
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
cur->mixBlock = (start > 0 && end < pBlockInfo->rows - 1);
cur->lastKey = tsArray[endPos] + step;
cur->blockCompleted = true;
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
pos = endPos + step;
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pQueryHandle);
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p",
pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
}
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
@ -1179,37 +1245,13 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t numOfRows = 0;
int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
// no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
int32_t start = cur->pos;
int32_t end = endPos;
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(start, end, int32_t);
}
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
// the time window should always be right order: skey <= ekey
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
cur->lastKey = tsArray[endPos];
pos += (end - start + 1) * step;
cur->blockCompleted =
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pQueryHandle);
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p",
pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
copyAllRemainRowsFromFileBlock(pQueryHandle, pCheckInfo, &blockInfo, endPos);
return;
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL;
@ -1261,7 +1303,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
@ -1285,7 +1327,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t start = -1, end = -1;
getQualifiedRowsPos(pQueryHandle, pos, endPos, numOfRows, &start, &end);
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
@ -1658,7 +1700,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
}
} else {
tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo);
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, pQueryHandle->qinfo);
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0;
@ -2156,7 +2198,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
}
// todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {

View File

@ -0,0 +1,14 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

36
src/tsdb/src/tsdbScan.c Normal file
View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMain.h"
#ifndef _TSDB_PLUGINS
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
STsdbScanHandle* tsdbNewScanHandle() { return NULL; }
void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream) {}
int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
int tsdbScanSCompIdx(STsdbScanHandle* pScanHandle) { return 0; }
int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx) { return 0; }
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; }
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle) {}
#endif

View File

@ -110,7 +110,17 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
*/
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void(*fp)(void*));
/**
* apply the udf before return the result
* @param pHashObj
* @param key
* @param keyLen
* @param fp
* @param d
* @param dsize
* @return
*/
void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize);
/**
* remove item with the specified key

View File

@ -42,7 +42,7 @@ typedef struct SCacheDataNode {
uint64_t signature;
struct STrashElem *pTNodeHeader; // point to trash node head
uint16_t keySize: 15; // max key size: 32kb
bool inTrashCan: 1;// denote if it is in trash or not
bool inTrashcan: 1;// denote if it is in trash or not
uint32_t size; // allocated size for current SCacheDataNode
T_REF_DECLARE()
char *key;

View File

@ -255,10 +255,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
}
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetCB(pHashObj, key, keyLen, NULL);
return taosHashGetCB(pHashObj, key, keyLen, NULL, NULL, 0);
}
void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *)) {
void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) {
if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) {
return NULL;
}
@ -273,7 +273,6 @@ void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*f
// no data, return directly
if (atomic_load_32(&pe->num) == 0) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
return NULL;
}
@ -297,7 +296,11 @@ void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*f
fp(pNode->data);
}
data = pNode->data;
if (d != NULL) {
memcpy(d, pNode->data, dsize);
} else {
data = pNode->data;
}
}
if (pHashObj->type == HASH_ENTRY_LOCK) {

View File

@ -90,7 +90,6 @@ static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->signature != (uint64_t)pNode) {
uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
assert(0);
return;
}
@ -110,7 +109,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t) pElem->pData) {
uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
@ -224,7 +223,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
taosTFree(p);
} else {
taosAddToTrash(pCacheObj, p);
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p);
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data);
}
}
@ -265,17 +264,14 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
return NULL;
}
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn);
if (ptNode != NULL) {
assert ((*ptNode) != NULL && (int64_t) ((*ptNode)->data) != 0x40);
}
SCacheDataNode* ptNode = NULL;
taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*));
void* pData = (ptNode != NULL)? (*ptNode)->data:NULL;
assert((int64_t)pData != 0x40);
void* pData = (ptNode != NULL)? ptNode->data:NULL;
if (pData != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(*ptNode));
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
@ -292,7 +288,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
if (ptNode->signature != (uint64_t)ptNode) {
uError("key: %p the data from cache is invalid", ptNode);
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL;
}
@ -311,7 +307,7 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
if (ptNode->signature != (uint64_t)ptNode) {
uError("key: %p the data from cache is invalid", ptNode);
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL;
}
@ -334,16 +330,16 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
if (pNode->signature != (uint64_t)pNode) {
uError("%p, release invalid cache data", pNode);
uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
return;
}
*data = NULL;
// note: extend lifespan before dec ref count
bool inTrashCan = pNode->inTrashCan;
bool inTrashcan = pNode->inTrashcan;
if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
uDebug("cache:%s data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
}
@ -354,7 +350,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
char* d = pNode->data;
int32_t ref = T_REF_VAL_GET(pNode);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d, intrash:%d", pCacheObj->name, key, d, ref - 1, inTrashCan);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
/*
* If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
@ -363,17 +359,25 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
* that tries to do the same thing.
*/
if (inTrashCan) {
ref = T_REF_DEC(pNode);
if (inTrashcan) {
ref = T_REF_VAL_GET(pNode);
if (ref == 0) {
if (ref == 1) {
// If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
// destroyed by refresh worker if decrease ref count before removing it from linked-list.
assert(pNode->pTNodeHeader->pData == pNode);
__cache_wr_lock(pCacheObj);
doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
__cache_unlock(pCacheObj);
ref = T_REF_DEC(pNode);
assert(ref == 0);
doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
} else {
ref = T_REF_DEC(pNode);
assert(ref >= 0);
}
} else {
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
@ -413,7 +417,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
}
} else {
uDebug("cache:%s, key:%p, %p has been removed from hash table by other thread already, refcnt:%d",
uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d",
pCacheObj->name, pNode->key, pNode->data, ref);
}
}
@ -424,7 +428,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
char* p = pNode->data;
int32_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, key, p, ref, inTrashCan);
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
}
}
@ -495,7 +499,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
}
void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->inTrashCan) { /* node is already in trash */
if (pNode->inTrashcan) { /* node is already in trash */
assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
return;
}
@ -503,7 +507,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode;
pElem->prev = NULL;
pNode->inTrashCan = true;
pNode->inTrashcan = true;
pNode->pTNodeHeader = pElem;
__cache_wr_lock(pCacheObj);
@ -525,7 +529,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
if (pCacheObj->numOfElemsInTrash == 0) {
if (pCacheObj->pTrash != NULL) {
uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash);
}
pCacheObj->pTrash = NULL;
@ -542,7 +546,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
}
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data,
pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;

View File

@ -253,7 +253,7 @@ void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...)
ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId());
#else
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self());
ptm->tm_sec, (int)timeSecs.tv_usec, (unsigned long int)pthread_self());
#endif
va_start(argpointer, format);
len += vsnprintf(buffer + len, MAX_NOTE_LINE_SIZE - len, format, argpointer);

View File

@ -26,7 +26,7 @@ var c1 = conn.cursor();
// c1.execute(query) will execute the query
// Let's create a database named db
try {
c1.execute('create database db;');
c1.execute('create database if not exists db;');
}
catch(err) {
conn.close();

View File

@ -22,8 +22,9 @@ var c1 = conn.cursor();
// c1.query(query) will return a TaosQuery object, of which then we can execute. The execute function then returns a promise
// Let's create a database named db
try {
var query = c1.query('create database db;');
query.execute();
c1.execute('create database if not exists db;');
//var query = c1.query('create database if not exists db;');
//query.execute();
}
catch(err) {
conn.close();
@ -71,6 +72,28 @@ catch (err) {
throw err;
}
Date.prototype.Format = function(fmt){
var o = {
'M+': this.getMonth() + 1,
'd+': this.getDate(),
'H+': this.getHours(),
'm+': this.getMinutes(),
's+': this.getSeconds(),
'S+': this.getMilliseconds()
};
if (/(y+)/.test(fmt)) {
fmt = fmt.replace(RegExp.$1, (this.getFullYear() + '').substr(4 - RegExp.$1.length));
}
for (var k in o) {
if (new RegExp('(' + k + ')').test(fmt)) {
fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (('00' + o[k]).substr(String(o[k]).length)));
}
}
return fmt;
}
// Let's try to insert some random generated data to test with
// We will use the bind function of the TaosQuery object to easily bind values to question marks in the query
// For Timestamps, a normal Datetime object or TaosTimestamp or milliseconds can be passed in through the bind function
@ -79,17 +102,21 @@ let interval = 1000;
try {
for (let i = 0; i < 1000; i++) {
stime.setMilliseconds(stime.getMilliseconds() + interval);
//console.log(stime.Format('yyyy-MM-dd HH:mm:ss.SSS'));
let insertData = [stime,
parseInt(Math.random()*100),
parseInt(Math.random()*300),
parseFloat(Math.random()*10 + 30),
"\"random note!\""];
"Note"];
//c1.execute('insert into db.weather values(' + insertData.join(',') + ' );');
var query = c1.query('insert into db.weather values(?, ?, ?, ?, ?);').bind(insertData);
query.execute();
//var query = c1.query('insert into db.weather values(?, ?, ?, ?, ?);').bind(insertData);
//query.execute();
c1.execute('insert into db.weather values(\"'+stime.Format('yyyy-MM-dd HH:mm:ss.SSS')+'\",'+parseInt(Math.random() * 100)+','+parseInt(Math.random() * 300)+','+parseFloat(Math.random()*10 + 30)+',"Note");');
}
}
catch (err) {
}catch (err) {
conn.close();
throw err;
}
@ -98,7 +125,8 @@ catch (err) {
var retrievedData;
try {
c1.query('select * from db.weather limit 5 offset 100;', true).then(function(result){
result.pretty();
//result.pretty();
console.log('=========>'+JSON.stringify(result));
// Neat!
});

View File

@ -74,21 +74,36 @@ function runTest {
runGeneralCaseOneByOne jenkins/basic.txt
totalSuccess=`grep 'success' $TDENGINE_COVERAGE_REPORT | wc -l`
sed -i "1i\SIM cases test result" $TDENGINE_COVERAGE_REPORT
totalSuccess=`grep 'success' $TDENGINE_COVERAGE_REPORT | wc -l`
if [ "$totalSuccess" -gt "0" ]; then
echo -e "\n${GREEN} ### Total $totalSuccess coverage test case(s) succeed! ### ${NC}" | tee -a $TDENGINE_COVERAGE_REPORT
sed -i -e "2i\ ### Total $totalSuccess SIM test case(s) succeed! ###" $TDENGINE_COVERAGE_REPORT
fi
totalFailed=`grep 'failed\|fault' $TDENGINE_COVERAGE_REPORT | wc -l`
if [ "$totalFailed" -ne "0" ]; then
echo -e "${RED} ### Total $totalFailed coverage test case(s) failed! ### ${NC}\n" | tee -a $TDENGINE_COVERAGE_REPORT
# exit $totalPyFailed
sed -i '3i\### Total $totalFailed SIM test case(s) failed! ###' $TDENGINE_COVERAGE_REPORT
else
sed -i '3i\\n' $TDENGINE_COVERAGE_REPORT
fi
cd $TDENGINE_DIR/tests
rm -rf ../sim
./test-all.sh full python | tee -a $TDENGINE_COVERAGE_REPORT
sed -i "4i\Python cases test result" $TDENGINE_COVERAGE_REPORT
totalPySuccess=`grep 'python case(s) succeed!' $TDENGINE_COVERAGE_REPORT | awk '{print $4}'`
if [ "$totalPySuccess" -gt "0" ]; then
sed -i -e "5i\ ### Total $totalPySuccess Python test case(s) succeed! ###" $TDENGINE_COVERAGE_REPORT
fi
totalPyFailed=`grep 'python case(s) failed!' $TDENGINE_COVERAGE_REPORT | awk '{print $4}'`
if [ -z $totalPyFailed ]; then
sed -i '6i\\n' $TDENGINE_COVERAGE_REPORT
else
sed -i '6i\### Total $totalPyFailed Python test case(s) failed! ###' $TDENGINE_COVERAGE_REPORT
fi
# Test Connector
stopTaosd

View File

@ -1,6 +1,7 @@
#!/bin/bash
WORK_DIR=/mnt/root
TDENGINE_DIR=/root/TDengine
walLevel=`grep "^walLevel" /etc/taos/taos.cfg | awk '{print $2}'`
if [[ "$walLevel" -eq "2" ]]; then
@ -71,9 +72,17 @@ function runCreateTableThenInsert {
restartTaosd
/usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo 2>&1 | tee -a taosdemo-$walPostfix-$today.log"
demoTableAndInsert=`grep "Total:" totaltime.out|awk '{print $2}'`
demoRPS=`grep "records\/second" taosdemo-$walPostfix-$today.log | tail -n1 | awk '{print $13}'`
}
demoTableAndInsert=`grep "Total:" totaltime.out|awk '{print $2}'`
demoRPS=`grep "records\/second" taosdemo-$walPostfix-$today.log | tail -n1 | awk '{print $13}'`
}
function queryPerformance {
echoInfo "Restart Taosd"
restartTaosd
cd $TDENGINE_DIR/tests/pytest
python3 query/queryPerformance.py
}
function generateTaosdemoPlot {
echo "${today} $walPostfix, demoCreateTableOnly: ${demoCreateTableOnly}, demoDeleteTableOnly: ${demoDeleteTableOnly}, demoTableAndInsert: ${demoTableAndInsert}" | tee -a taosdemo-$today.log
@ -101,13 +110,16 @@ today=`date +"%Y%m%d"`
cd $WORK_DIR
echoInfo "Test Create Table Only "
runCreateTableOnly
echoInfo "Test Create Table then Insert data"
echoInfo "Test Delete Table Only"
runDeleteTableOnly
echoInfo "Test Create Table then Insert data"
runCreateTableThenInsert
echoInfo "Query Performance for 10 Billion Records"
queryPerformance
echoInfo "Generate plot for taosdemo"
generateTaosdemoPlot
tar czf $WORK_DIR/taos-log-taosdemo-$today.tar.gz $logDir/*
echoInfo "End of TaosDemo Test" | tee -a $WORK_DIR/cron.log

View File

@ -0,0 +1,82 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
import taos
import time
class taosdemoQueryPerformace:
def initConnection(self):
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taos"
self.conn = taos.connect(
self.host,
self.user,
self.password,
self.config)
def query(self):
cursor = self.conn.cursor()
cursor.execute("use test")
totalTime = 0
for i in range(100):
startTime = time.time()
cursor.execute("select count(*) from test.meters")
totalTime += time.time() - startTime
print("query time for: select count(*) from test.meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
startTime = time.time()
cursor.execute("select count(*) from test.meters where loc='beijing'")
totalTime += time.time() - startTime
print("query time for: select count(*) from test.meters where loc='beijing' %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters where areaid=10")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.meters where areaid=10 %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.t10 interval(10s)")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.t10 interval(10s) %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
startTime = time.time()
cursor.execute("select last_row(*) from meters")
totalTime += time.time() - startTime
print("query time for: select last_row(*) from meters %f seconds" % (totalTime / 100))
if __name__ == '__main__':
perftest = taosdemoQueryPerformace()
perftest.initConnection()
perftest.query()

View File

@ -1,6 +1,7 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 1
system sh/cfg.sh -n dnode1 -c ctime -v 30
system sh/exec.sh -n dnode1 -s start
sleep 3000
@ -21,7 +22,7 @@ $stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db maxrows 200 cache 2 maxTables 4
sql create database $db maxrows 200 cache 2
print ====== create tables
sql use $db

View File

@ -23,7 +23,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)

View File

@ -22,7 +22,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)

View File

@ -15,7 +15,7 @@ $db = $dbPrefix . $i
$tb = $tbPrefix . $i
print =================== step 1
sql create database $db maxTables 4
sql create database $db
sql use $db
sql show vgroups
if $rows != 0 then