Merge branch 'develop' into feature/crash_gen

This commit is contained in:
Steven Li 2020-06-08 00:20:48 -07:00
commit a75646bbcf
66 changed files with 896 additions and 414 deletions

View File

@ -111,7 +111,7 @@ matrix:
description: TDengine description: TDengine
# Where email notification of build analysis results will be sent # Where email notification of build analysis results will be sent
notification_email: sdsang@taosdata.com notification_email: sdsang@taosdata.com, slguan@taosdata.com
# Commands to prepare for build_command # Commands to prepare for build_command
# ** likely specific to your build ** # ** likely specific to your build **

View File

@ -162,18 +162,18 @@ done
# output the version info to the buildinfo file. # output the version info to the buildinfo file.
build_time=$(date +"%F %R") build_time=$(date +"%F %R")
echo "char version[64] = \"${version}\";" > ${versioninfo} echo "char version[12] = \"${version}\";" > ${versioninfo}
echo "char compatible_version[64] = \"${compatible_version}\";" >> ${versioninfo} echo "char compatible_version[12] = \"${compatible_version}\";" >> ${versioninfo}
echo "char gitinfo[128] = \"$(git rev-parse --verify HEAD)\";" >> ${versioninfo} echo "char gitinfo[48] = \"$(git rev-parse --verify HEAD)\";" >> ${versioninfo}
if [ "$verMode" != "cluster" ]; then if [ "$verMode" != "cluster" ]; then
echo "char gitinfoOfInternal[128] = \"\";" >> ${versioninfo} echo "char gitinfoOfInternal[48] = \"\";" >> ${versioninfo}
else else
enterprise_dir="${top_dir}/../enterprise" enterprise_dir="${top_dir}/../enterprise"
cd ${enterprise_dir} cd ${enterprise_dir}
echo "char gitinfoOfInternal[128] = \"$(git rev-parse --verify HEAD)\";" >> ${versioninfo} echo "char gitinfoOfInternal[48] = \"$(git rev-parse --verify HEAD)\";" >> ${versioninfo}
cd ${curr_dir} cd ${curr_dir}
fi fi
echo "char buildinfo[512] = \"Built by ${USER} at ${build_time}\";" >> ${versioninfo} echo "char buildinfo[64] = \"Built by ${USER} at ${build_time}\";" >> ${versioninfo}
echo "" >> ${versioninfo} echo "" >> ${versioninfo}
tmp_version=$(echo $version | tr -s "." "_") tmp_version=$(echo $version | tr -s "." "_")
if [ "$verMode" == "cluster" ]; then if [ "$verMode" == "cluster" ]; then

View File

@ -404,6 +404,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, void **taos); void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
int doAsyncParseSql(SSqlObj* pSql);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);

View File

@ -40,39 +40,38 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { int doAsyncParseSql(SSqlObj* pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
pSql->signature = pSql; if (code != TSDB_CODE_SUCCESS) {
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
sem_init(&pSql->rspSem, 0, 0);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("failed to malloc payload"); tscError("failed to malloc payload");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return; return code;
}
// todo check for OOM problem
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
free(pCmd->payload);
return;
} }
pRes->qhandle = 0; pRes->qhandle = 0;
pRes->numOfRows = 1; pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sqlstr);
tscDump("%p SQL: %s", pSql, pSql->sqlstr); tscDump("%p SQL: %s", pSql, pSql->sqlstr);
return tsParseSql(pSql, true);
}
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return;
}
strtolower(pSql->sqlstr, sqlstr);
int32_t code = tsParseSql(pSql, true); int32_t code = doAsyncParseSql(pSql);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -518,15 +517,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pSql->pStream) { if (pSql->pStream) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/* if (!pSql->cmd.parseFinished) {
* NOTE: tsParseSql(pSql, false);
* transfer the sql function for super table query before get meter/metric meta, sem_post(&pSql->rspSem);
* since in callback functions, only tscProcessSql(pStream->pSql) is executed! }
*/ return;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
} else { } else {
tscTrace("%p get tableMeta successfully", pSql); tscTrace("%p get tableMeta successfully", pSql);
} }

View File

@ -515,8 +515,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (ret != 0) { if (ret != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
} }
pCmd->parseFinished = 1;
return TSDB_CODE_SUCCESS; // do not build query message here return TSDB_CODE_SUCCESS; // do not build query message here
} }

View File

@ -19,6 +19,7 @@
#include "tscLog.h" #include "tscLog.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsched.h" #include "tsched.h"
#include "tcache.h"
#include "tsclient.h" #include "tsclient.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
@ -77,30 +78,23 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
int code = tscGetTableMeta(pSql, pTableMetaInfo); int code = tscGetTableMeta(pSql, pTableMetaInfo);
pSql->res.code = code; pSql->res.code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0); code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code; pSql->res.code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
} }
tscTansformSQLFuncForSTableQuery(pQueryInfo);
// failed to get meter/metric meta, retry in 10sec. // failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime);
return;
} else {
tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscDoQuery(pStream->pSql);
tscIncStreamExecutionCount(pStream);
} }
tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscProcessSql(pStream->pSql);
tscIncStreamExecutionCount(pStream);
} }
static void tscProcessStreamTimer(void *handle, void *tmrId) { static void tscProcessStreamTimer(void *handle, void *tmrId) {
@ -147,7 +141,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
retryDelay); retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
tscClearTableMetaInfo(pTableMetaInfo, true); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), true);
tfree(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return; return;
@ -259,7 +254,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->numOfRes); pStream->numOfRes);
// release the metric/meter meta information reference, so data in cache can be updated // release the metric/meter meta information reference, so data in cache can be updated
tscClearTableMetaInfo(pTableMetaInfo, false);
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
tfree(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql); tscSetNextLaunchTimer(pStream, pSql);
} }
} }
@ -480,57 +477,40 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
setErrorInfo(pSql, TSDB_CODE_TSC_OUT_OF_MEMORY, NULL);
return NULL; return NULL;
} }
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
int ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
setErrorInfo(pSql, ret, NULL);
free(pSql);
return NULL;
}
pSql->sqlstr = strdup(sqlstr);
if (pSql->sqlstr == NULL) {
setErrorInfo(pSql, TSDB_CODE_TSC_OUT_OF_MEMORY, NULL);
tfree(pSql);
return NULL;
}
tsem_init(&pSql->rspSem, 0, 0);
SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr);
tscResetSqlCmdObj(&pSql->cmd);
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
setErrorInfo(pSql, ret, NULL);
tscError("%p open stream failed, sql:%s, code:%d", pSql, sqlstr, TSDB_CODE_TSC_OUT_OF_MEMORY);
tscFreeSqlObj(pSql);
return NULL;
}
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, pRes->code, pCmd->payload);
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL; return NULL;
} }
pSql->pStream = pStream;
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pStream == NULL) { if (pSql->sqlstr == NULL) {
setErrorInfo(pSql, TSDB_CODE_TSC_OUT_OF_MEMORY, NULL); tscError("%p failed to malloc sql string buffer", pSql);
tscFreeSqlObj(pSql);
return NULL;;
}
strtolower(pSql->sqlstr, sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
int32_t code = doAsyncParseSql(pSql);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
sem_wait(&pSql->rspSem);
}
if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, pRes->code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
@ -550,13 +530,13 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->ctime = taosGetTimestamp(pStream->precision); pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey; pStream->etime = pQueryInfo->window.ekey;
pSql->pStream = pStream;
tscAddIntoStreamList(pStream); tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream); tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime); pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime);
int64_t starttime = tscGetLaunchTimestamp(pStream); int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscTrace("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, tscTrace("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,

View File

@ -1153,7 +1153,7 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
for (int32_t j = 0; j < numOfFilters; ++j) { for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) { if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1; size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len); pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len); memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);

View File

@ -145,6 +145,7 @@ void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema); void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL); ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;

View File

@ -3724,9 +3724,9 @@
} }
}, },
"websocket-extensions": { "websocket-extensions": {
"version": "0.1.3", "version": "0.1.4",
"resolved": "https://registry.npmjs.org/websocket-extensions/-/websocket-extensions-0.1.3.tgz", "resolved": "https://registry.npmjs.org/websocket-extensions/-/websocket-extensions-0.1.4.tgz",
"integrity": "sha512-nqHUnMXmBzT0w570r2JpJxfiSD1IzoI+HGVdd3aZ0yNi3ngvQ4jv1dtHt5VGxfI2yj5yqImPhOK4vmIh2xMbGg==", "integrity": "sha512-OqedPIGOfsDlo31UNwYbCFMSaO9m9G/0faIHj5/dZFDMFqPTcx6UwqyOy3COEaEOg/9VsGIpdqn62W5KhoKSpg==",
"dev": true "dev": true
}, },
"whatwg-encoding": { "whatwg-encoding": {

View File

@ -2839,8 +2839,9 @@ websocket-driver@>=0.5.1:
websocket-extensions ">=0.1.1" websocket-extensions ">=0.1.1"
websocket-extensions@>=0.1.1: websocket-extensions@>=0.1.1:
version "0.1.3" version "0.1.4"
resolved "https://registry.yarnpkg.com/websocket-extensions/-/websocket-extensions-0.1.3.tgz#5d2ff22977003ec687a4b87073dfbbac146ccf29" resolved "https://registry.yarnpkg.com/websocket-extensions/-/websocket-extensions-0.1.4.tgz#7f8473bc839dfd87608adb95d7eb075211578a42"
integrity sha512-OqedPIGOfsDlo31UNwYbCFMSaO9m9G/0faIHj5/dZFDMFqPTcx6UwqyOy3COEaEOg/9VsGIpdqn62W5KhoKSpg==
whatwg-encoding@^1.0.1: whatwg-encoding@^1.0.1:
version "1.0.3" version "1.0.3"

View File

@ -38,6 +38,7 @@ typedef struct {
int vgId; int vgId;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite; FCqWrite cqWrite;
void *ahandle; void *ahandle;
int num; // number of continuous streams int num; // number of continuous streams
@ -48,7 +49,8 @@ typedef struct {
} SCqContext; } SCqContext;
typedef struct SCqObj { typedef struct SCqObj {
int tid; // table ID uint64_t uid;
int32_t tid; // table ID
int rowSize; // bytes of a row int rowSize; // bytes of a row
char * sqlStr; // SQL string char * sqlStr; // SQL string
STSchema * pSchema; // pointer to schema array STSchema * pSchema; // pointer to schema array
@ -73,6 +75,14 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
strcpy(pContext->user, pCfg->user); strcpy(pContext->user, pCfg->user);
strcpy(pContext->pass, pCfg->pass); strcpy(pContext->pass, pCfg->pass);
const char* db = pCfg->db;
for (const char* p = db; *p != 0; p++) {
if (*p == '.') {
db = p + 1;
break;
}
}
strcpy(pContext->db, db);
pContext->vgId = pCfg->vgId; pContext->vgId = pCfg->vgId;
pContext->cqWrite = pCfg->cqWrite; pContext->cqWrite = pCfg->cqWrite;
pContext->ahandle = ahandle; pContext->ahandle = ahandle;
@ -153,17 +163,19 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) { void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) {
SCqContext *pContext = handle; SCqContext *pContext = handle;
SCqObj *pObj = calloc(sizeof(SCqObj), 1); SCqObj *pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL; if (pObj == NULL) return NULL;
pObj->uid = uid;
pObj->tid = tid; pObj->tid = tid;
pObj->sqlStr = malloc(strlen(sqlStr)+1); pObj->sqlStr = malloc(strlen(sqlStr)+1);
strcpy(pObj->sqlStr, sqlStr); strcpy(pObj->sqlStr, sqlStr);
pObj->pSchema = tdDupSchema(pSchema); pObj->pSchema = tdDupSchema(pSchema);
pObj->rowSize = schemaTLen(pSchema);
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
@ -207,16 +219,16 @@ void cqDrop(void *handle) {
} }
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
return;
} }
return;
} }
int64_t lastKey = 0; int64_t lastKey = 0;
pObj->pContext = pContext;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
if (pObj->pStream) { if (pObj->pStream) {
pContext->num++; pContext->num++;
@ -229,29 +241,49 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj *pObj = (SCqObj *)param; SCqObj *pObj = (SCqObj *)param;
SCqContext *pContext = pObj->pContext; SCqContext *pContext = pObj->pContext;
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) return; if (pObj->pStream == NULL) return;
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
// construct data int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize;
char *buffer = calloc(size, 1); char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer; SWalHead *pHead = (SWalHead *)buffer;
pHead->msgType = TSDB_MSG_TYPE_SUBMIT; SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
pHead->len = size - sizeof(SWalHead);
SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead));
// to do: fill in the SSubmitMsg structure
pSubmit->numOfBlocks = 1;
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
// to do: fill in the SSubmitBlk strucuture
pBlk->tid = pObj->tid;
SDataRow trow = (SDataRow)pBlk->data;
tdInitDataRow(trow, pSchema);
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i;
char* val = (char*)row[i];
if (IS_VAR_DATA_TYPE(c->type)) {
val -= sizeof(VarDataLenT);
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}
pBlk->len = htonl(dataRowLen(trow));
pBlk->uid = htobe64(pObj->uid);
pBlk->tid = htonl(pObj->tid);
pBlk->numOfRows = htons(1);
pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow);
pMsg->header.vgId = htonl(pContext->vgId);
pMsg->header.contLen = htonl(pHead->len);
pMsg->length = pMsg->header.contLen;
pMsg->numOfBlocks = htonl(1);
pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
pHead->version = 0;
// write into vnode write queue // write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
free(buffer);
} }

View File

@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) { for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
} }
tdFreeSchema(pSchema); tdFreeSchema(pSchema);

View File

@ -22,6 +22,7 @@ extern "C" {
int32_t dnodeInitModules(); int32_t dnodeInitModules();
void dnodeStartModules(); void dnodeStartModules();
void dnodeStartStream();
void dnodeCleanupModules(); void dnodeCleanupModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus); void dnodeProcessModuleStatus(uint32_t moduleStatus);

View File

@ -124,6 +124,7 @@ int32_t dnodeInitSystem() {
dnodeStartModules(); dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
dnodeStartStream();
dPrint("TDengine is initialized successfully"); dPrint("TDengine is initialized successfully");

View File

@ -260,11 +260,27 @@ static int32_t dnodeOpenVnodes() {
} }
free(vnodeList); free(vnodeList);
dPrint("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed); dPrint("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeStartStream() {
int32_t vnodeList[TSDB_MAX_VNODES];
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dPrint("Get dnode list failed");
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeStartStream(vnodeList[i]);
}
dPrint("streams started");
}
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes; int32_t numOfVnodes;

View File

@ -113,6 +113,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
} }
int32_t dnodeInitClient() { int32_t dnodeInitClient() {
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
@ -123,7 +124,7 @@ int32_t dnodeInitClient() {
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "t"; rpcInit.user = "t";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.secret = "secret"; rpcInit.secret = secret;
tsDnodeClientRpc = rpcOpen(&rpcInit); tsDnodeClientRpc = rpcOpen(&rpcInit);
if (tsDnodeClientRpc == NULL) { if (tsDnodeClientRpc == NULL) {

View File

@ -192,13 +192,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) { if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead); dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
code = TSDB_CODE_SUCCESS;
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp, .pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len, .contLen = pRead->rspRet.len,
.code = pRead->rspRet.code, .code = code,
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
@ -216,7 +217,7 @@ static void *dnodeProcessReadQueue(void *param) {
break; break;
} }
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);

View File

@ -216,7 +216,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pWrite->contLen; pHead->len = pWrite->contLen;
dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); dTrace("%p, msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
} }

View File

@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_COUNTRY_LEN 20 #define TSDB_COUNTRY_LEN 20
#define TSDB_LOCALE_LEN 64 #define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 64 #define TSDB_TIMEZONE_LEN 64
#define TSDB_LABEL_LEN 8
#define TSDB_FQDN_LEN 128 #define TSDB_FQDN_LEN 128
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6) #define TSDB_EP_LEN (TSDB_FQDN_LEN+6)

View File

@ -27,6 +27,7 @@ typedef struct {
int vgId; int vgId;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN + 1];
FCqWrite cqWrite; FCqWrite cqWrite;
} SCqCfg; } SCqCfg;
@ -41,7 +42,7 @@ void cqStart(void *handle);
void cqStop(void *handle); void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ // cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema); void *cqCreate(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle); void cqDrop(void *handle);

View File

@ -43,7 +43,7 @@ typedef struct {
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status); int (*notifyStatus)(void *, int status);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema); void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle); void (*cqDropFunc)(void *handle);
void *(*configFunc)(int32_t vgId, int32_t sid); void *(*configFunc)(int32_t vgId, int32_t sid);
} STsdbAppH; } STsdbAppH;
@ -118,6 +118,7 @@ int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
void tsdbStartStream(TsdbRepoT *repo);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);

View File

@ -30,7 +30,6 @@ typedef enum _VN_STATUS {
typedef struct { typedef struct {
int len; int len;
int code;
void *rsp; void *rsp;
void *qhandle; //used by query and retrieve msg void *qhandle; //used by query and retrieve msg
} SRspRet; } SRspRet;
@ -38,6 +37,7 @@ typedef struct {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeStartStream(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);

View File

@ -264,13 +264,13 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
// not thread safe, need optimized // not thread safe, need optimized
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg) { int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg) {
pConn->numOfQueries = htonl(pHBMsg->numOfQueries); pConn->numOfQueries = htonl(pHBMsg->numOfQueries);
if (pConn->numOfQueries > 0) { if (pConn->numOfQueries > 0 && pConn->numOfQueries < 20) {
pConn->pQueries = calloc(sizeof(SQueryDesc), pConn->numOfQueries); pConn->pQueries = calloc(sizeof(SQueryDesc), pConn->numOfQueries);
memcpy(pConn->pQueries, pHBMsg->pData, pConn->numOfQueries * sizeof(SQueryDesc)); memcpy(pConn->pQueries, pHBMsg->pData, pConn->numOfQueries * sizeof(SQueryDesc));
} }
pConn->numOfStreams = htonl(pHBMsg->numOfStreams); pConn->numOfStreams = htonl(pHBMsg->numOfStreams);
if (pConn->numOfStreams > 0) { if (pConn->numOfStreams > 0 && pConn->numOfStreams < 20) {
pConn->pStreams = calloc(sizeof(SStreamDesc), pConn->numOfStreams); pConn->pStreams = calloc(sizeof(SStreamDesc), pConn->numOfStreams);
memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc), memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc),
pConn->numOfStreams * sizeof(SStreamDesc)); pConn->numOfStreams * sizeof(SStreamDesc));

View File

@ -47,7 +47,7 @@ typedef struct {
uint16_t localPort; uint16_t localPort;
int8_t connType; int8_t connType;
int index; // for UDP server only, round robin for multiple threads int index; // for UDP server only, round robin for multiple threads
char label[12]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
@ -88,7 +88,7 @@ typedef struct {
} SRpcReqContext; } SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
char info[50];// debug info: label + pConn + ahandle char info[48];// debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
@ -805,16 +805,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pConn == NULL) { if (pConn == NULL) {
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
return NULL; return NULL;
} else { }
if (rpcIsReq(pHead->msgType)) {
pConn->ahandle = (void *)pHead->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
}
}
rpcLockConn(pConn); rpcLockConn(pConn);
sid = pConn->sid;
if (rpcIsReq(pHead->msgType)) {
pConn->ahandle = (void *)pHead->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
}
sid = pConn->sid;
pConn->chandle = pRecv->chandle; pConn->chandle = pRecv->chandle;
pConn->peerIp = pRecv->ip; pConn->peerIp = pRecv->ip;
pConn->peerPort = pRecv->port; pConn->peerPort = pRecv->port;
@ -847,10 +847,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
static void rpcProcessBrokenLink(SRpcConn *pConn) { static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn == NULL) return;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s, link is broken", pConn->info); tTrace("%s, link is broken", pConn->info);
// pConn->chandle = NULL;
rpcLockConn(pConn);
if (pConn->outType) { if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
@ -871,7 +872,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
(*(pRpc->cfp))(&rpcMsg); (*(pRpc->cfp))(&rpcMsg);
*/ */
} }
rpcUnlockConn(pConn);
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
@ -885,7 +887,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
// underlying UDP layer does not know it is server or client // underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType; pRecv->connType = pRecv->connType | pRpc->connType;
if (pRecv->ip == 0 && pConn) { if (pRecv->ip == 0) {
rpcProcessBrokenLink(pConn); rpcProcessBrokenLink(pConn);
return NULL; return NULL;
} }

View File

@ -16,6 +16,8 @@
#include "os.h" #include "os.h"
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
#include "taosdef.h"
#include "taoserror.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcTcp.h" #include "rpcTcp.h"
@ -26,8 +28,9 @@
typedef struct SFdObj { typedef struct SFdObj {
void *signature; void *signature;
int fd; // TCP socket FD int fd; // TCP socket FD
void *thandle; // handle from upper layer, like TAOS int closedByApp; // 1: already closed by App
void *thandle; // handle from upper layer, like TAOS
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
struct SThreadObj *pThreadObj; struct SThreadObj *pThreadObj;
@ -44,7 +47,7 @@ typedef struct SThreadObj {
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
char label[12]; char label[TSDB_LABEL_LEN];
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pPacket); void *(*processData)(SRecvInfo *pPacket);
} SThreadObj; } SThreadObj;
@ -53,7 +56,7 @@ typedef struct {
int fd; int fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char label[12]; char label[TSDB_LABEL_LEN];
int numOfThreads; int numOfThreads;
void * shandle; void * shandle;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
@ -71,6 +74,13 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
if (pServerObj == NULL) {
tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
pServerObj->thread = 0;
pServerObj->ip = ip; pServerObj->ip = ip;
pServerObj->port = port; pServerObj->port = port;
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
@ -79,13 +89,20 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
if (pServerObj->pThreadObj == NULL) { if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
free(pServerObj); free(pServerObj);
return NULL; return NULL;
} }
int code = 0; int code = 0;
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pThreadObj = pServerObj->pThreadObj; pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj->pollFd = -1;
pThreadObj->thread = 0;
pThreadObj->processData = fp; pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
@ -93,23 +110,22 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
code = pthread_mutex_init(&(pThreadObj->mutex), NULL); code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) { if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;; break;;
} }
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1; code = -1;
break; break;
} }
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
@ -118,47 +134,47 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
if (code == 0) { if (code == 0) {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
} }
} }
if (code != 0) { if (code != 0) {
free(pServerObj->pThreadObj); taosCleanUpTcpServer(pServerObj);
free(pServerObj);
pServerObj = NULL; pServerObj = NULL;
} else { } else {
tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads); tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
} }
pthread_attr_destroy(&thattr);
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) { static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true; pThreadObj->stop = true;
eventfd_t fd = -1;
// signal the thread to stop, try graceful method first, if (pThreadObj->thread && pThreadObj->pollFd >=0) {
// and use pthread_cancel when failed // signal the thread to stop, try graceful method first,
struct epoll_event event = { .events = EPOLLIN }; // and use pthread_cancel when failed
eventfd_t fd = eventfd(1, 0); struct epoll_event event = { .events = EPOLLIN };
if (fd == -1) { fd = eventfd(1, 0);
tError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno)); if (fd == -1) {
pthread_cancel(pThreadObj->thread); // failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
tError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno)); pthread_cancel(pThreadObj->thread);
pthread_cancel(pThreadObj->thread); } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
pthread_cancel(pThreadObj->thread);
}
} }
pthread_join(pThreadObj->thread, NULL); if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL);
close(pThreadObj->pollFd); if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd);
if (fd != -1) { if (fd != -1) close(fd);
close(fd);
}
while (pThreadObj->pHead) { while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead; SFdObj *pFdObj = pThreadObj->pHead;
@ -173,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) {
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->thread) pthread_join(pServerObj->thread, NULL);
pthread_join(pServerObj->thread, NULL);
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
@ -211,6 +226,7 @@ static void* taosAcceptTcpConnection(void *arg) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
break; break;
} }
tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno)); tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
continue; continue;
} }
@ -254,6 +270,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
free(pThreadObj); free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
@ -261,6 +278,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label); tError("%s failed to create TCP client epoll", label);
free(pThreadObj); free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
@ -273,6 +291,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if (code != 0) { if (code != 0) {
close(pThreadObj->pollFd); close(pThreadObj->pollFd);
free(pThreadObj); free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
return NULL; return NULL;
} }
@ -287,7 +306,7 @@ void taosCleanUpTcpClient(void *chandle) {
if (pThreadObj == NULL) return; if (pThreadObj == NULL) return;
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
tTrace (":%s, all connections are cleaned up", pThreadObj->label); tTrace ("%s, all connections are cleaned up", pThreadObj->label);
tfree(pThreadObj); tfree(pThreadObj);
} }
@ -318,7 +337,9 @@ void taosCloseTcpConnection(void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
taosFreeFdObj(pFdObj); pFdObj->thandle = NULL;
pFdObj->closedByApp = 1;
shutdown(pFdObj->fd, SHUT_WR);
} }
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
@ -334,7 +355,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
// notify the upper layer, so it will clean the associated context // notify the upper layer, so it will clean the associated context
if (pFdObj->thandle) { if (pFdObj->closedByApp == 0) {
shutdown(pFdObj->fd, SHUT_WR);
SRecvInfo recvInfo; SRecvInfo recvInfo;
recvInfo.msg = NULL; recvInfo.msg = NULL;
recvInfo.msgLen = 0; recvInfo.msgLen = 0;
@ -345,9 +368,59 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
recvInfo.chandle = NULL; recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP; recvInfo.connType = RPC_CONN_TCP;
(*(pThreadObj->processData))(&recvInfo); (*(pThreadObj->processData))(&recvInfo);
} else { }
taosFreeFdObj(pFdObj);
taosFreeFdObj(pFdObj);
}
static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
SRpcHead rpcHead;
int32_t msgLen, leftLen, retLen, headLen;
char *buffer, *msg;
SThreadObj *pThreadObj = pFdObj->pThreadObj;
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
return -1;
} }
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) {
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1;
}
msg = buffer + tsRpcOverhead;
leftLen = msgLen - headLen;
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p, read error, leftLen:%d retLen:%d",
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
free(buffer);
return -1;
}
memcpy(msg, &rpcHead, sizeof(SRpcHead));
pInfo->msg = msg;
pInfo->msgLen = msgLen;
pInfo->ip = pFdObj->ip;
pInfo->port = pFdObj->port;
pInfo->shandle = pThreadObj->shandle;
pInfo->thandle = pFdObj->thandle;;
pInfo->chandle = pFdObj;
pInfo->connType = RPC_CONN_TCP;
if (pFdObj->closedByApp) {
free(buffer);
return -1;
}
return 0;
} }
#define maxEvents 10 #define maxEvents 10
@ -357,7 +430,6 @@ static void *taosProcessTcpData(void *param) {
SFdObj *pFdObj; SFdObj *pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
SRpcHead rpcHead;
while (1) { while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
@ -376,51 +448,23 @@ static void *taosProcessTcpData(void *param) {
continue; continue;
} }
if (events[i].events & EPOLLRDHUP) {
tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle);
taosReportBrokenLink(pFdObj);
continue;
}
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle); tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
if (headLen != sizeof(SRpcHead)) { shutdown(pFdObj->fd, SHUT_WR);
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
taosReportBrokenLink(pFdObj);
continue; continue;
} }
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
char *buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) {
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
taosReportBrokenLink(pFdObj);
continue;
}
char *msg = buffer + tsRpcOverhead;
int32_t leftLen = msgLen - headLen;
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p, read error, leftLen:%d retLen:%d",
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
taosReportBrokenLink(pFdObj);
tfree(buffer);
continue;
}
// tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
} }
@ -433,16 +477,20 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
struct epoll_event event; struct epoll_event event;
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1); SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
if (pFdObj == NULL) return NULL; if (pFdObj == NULL) {
return NULL;
}
pFdObj->closedByApp = 0;
pFdObj->fd = fd; pFdObj->fd = fd;
pFdObj->pThreadObj = pThreadObj; pFdObj->pThreadObj = pThreadObj;
pFdObj->signature = pFdObj; pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = pFdObj; event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tfree(pFdObj); tfree(pFdObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
@ -475,13 +523,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
taosCloseSocket(pFdObj->fd); taosCloseSocket(pFdObj->fd);
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p, TCP thread:%d, number of FDs is negative!!!", tError("%s %p, TCP thread:%d, number of FDs is negative!!!",
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
// remove from the FdObject list
if (pFdObj->prev) { if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next; (pFdObj->prev)->next = pFdObj->next;
} else { } else {

View File

@ -18,6 +18,7 @@
#include "tsystem.h" #include "tsystem.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "taosdef.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcUdp.h" #include "rpcUdp.h"
#include "rpcHead.h" #include "rpcHead.h"
@ -33,7 +34,7 @@ typedef struct {
int fd; int fd;
uint16_t port; // peer port uint16_t port; // peer port
uint16_t localPort; // local port uint16_t localPort; // local port
char label[12]; // copy from udpConnSet; char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread; pthread_t thread;
void *hash; void *hash;
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
@ -49,7 +50,7 @@ typedef struct {
uint16_t port; // local Port uint16_t port; // local Port
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
int threads; int threads;
char label[12]; char label[TSDB_LABEL_LEN];
void *(*fp)(SRecvInfo *pPacket); void *(*fp)(SRecvInfo *pPacket);
SUdpConn udpConn[]; SUdpConn udpConn[];
} SUdpConnSet; } SUdpConnSet;
@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
} }
struct sockaddr_in sin; struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) { addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port); pConn->localPort = (uint16_t)ntohs(sin.sin_port);

View File

@ -473,6 +473,18 @@ TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) {
return TSDB_GET_TABLE_LAST_KEY(pTable); return TSDB_GET_TABLE_LAST_KEY(pTable);
} }
void tsdbStartStream(TsdbRepoT *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
}
}
STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) { STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
// TODO // TODO
return NULL; return NULL;

View File

@ -150,7 +150,6 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
void tsdbOrgMeta(void *pHandle) { void tsdbOrgMeta(void *pHandle) {
STsdbMeta *pMeta = (STsdbMeta *)pHandle; STsdbMeta *pMeta = (STsdbMeta *)pHandle;
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
for (int i = 1; i < pMeta->maxTables; i++) { for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
@ -158,13 +157,6 @@ void tsdbOrgMeta(void *pHandle) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
} }
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, i, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
}
} }
/** /**
@ -683,7 +675,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
if (pTable->type == TSDB_STREAM_TABLE && addIdx) { if (pTable->type == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
} }
pMeta->nTables++; pMeta->nTables++;

View File

@ -14,6 +14,7 @@
*/ */
#include "os.h" #include "os.h"
#include "taosdef.h"
#include "tulog.h" #include "tulog.h"
#include "tsched.h" #include "tsched.h"
#include "ttimer.h" #include "ttimer.h"
@ -21,7 +22,7 @@
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. #define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
typedef struct { typedef struct {
char label[16]; char label[TSDB_LABEL_LEN];
tsem_t emptySem; tsem_t emptySem;
tsem_t fullSem; tsem_t fullSem;
pthread_mutex_t queueMutex; pthread_mutex_t queueMutex;

View File

@ -24,25 +24,37 @@ int taosGetFqdn(char *fqdn) {
hostname[1023] = '\0'; hostname[1023] = '\0';
gethostname(hostname, 1023); gethostname(hostname, 1023);
struct hostent* h; struct addrinfo hints = {0};
h = gethostbyname(hostname); struct addrinfo *result = NULL;
if (h != NULL) {
strcpy(fqdn, h->h_name); hints.ai_flags = AI_CANONNAME;
getaddrinfo(hostname, NULL, &hints, &result);
if (result) {
strcpy(fqdn, result->ai_canonname);
freeaddrinfo(result);
} else { } else {
uError("failed to get host name(%s)", strerror(errno));
code = -1; code = -1;
} }
// to do: free the resources
// free(h);
return code; return code;
} }
uint32_t taosGetIpFromFqdn(const char *fqdn) { uint32_t taosGetIpFromFqdn(const char *fqdn) {
struct hostent * record = gethostbyname(fqdn); struct addrinfo hints = {0};
if(record == NULL) return -1; struct addrinfo *result = NULL;
return ((struct in_addr *)record->h_addr)->s_addr;
getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr *sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in*)sa;
struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr;
freeaddrinfo(result);
return ip;
} else {
return -1;
}
} }
// Function converting an IP address string to an unsigned int. // Function converting an IP address string to an unsigned int.

View File

@ -1,7 +1,7 @@
char version[64] = "2.0.0.0"; char version[12] = "2.0.0.0";
char compatible_version[64] = "2.0.0.0"; char compatible_version[12] = "2.0.0.0";
char gitinfo[128] = "3264067e97300c84caa61ac909d548c9ca56de6b"; char gitinfo[48] = "3264067e97300c84caa61ac909d548c9ca56de6b";
char gitinfoOfInternal[128] = "da88f4a2474737d1f9c76adcf0ff7fd0975e7342"; char gitinfoOfInternal[48] = "da88f4a2474737d1f9c76adcf0ff7fd0975e7342";
char buildinfo[512] = "Built by root at 2020-04-01 14:38"; char buildinfo[64] = "Built by root at 2020-04-01 14:38";
void libtaos_1_6_5_4_Linux_x64() {}; void libtaos_1_6_5_4_Linux_x64() {};

View File

@ -208,8 +208,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
SCqCfg cqCfg = {0}; SCqCfg cqCfg = {0};
sprintf(cqCfg.user, "root"); sprintf(cqCfg.user, "_root");
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue; cqCfg.cqWrite = vnodeWriteToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
@ -277,6 +278,15 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeStartStream(int32_t vnode) {
SVnodeObj* pVnode = vnodeAccquireVnode(vnode);
if (pVnode != NULL) {
tsdbStartStream(pVnode->tsdb);
vnodeRelease(pVnode);
}
return TSDB_CODE_SUCCESS;
}
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) return 0; if (ppVnode == NULL || *ppVnode == NULL) return 0;

View File

@ -39,15 +39,21 @@ void vnodeInitReadFp(void) {
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
if (vnodeProcessReadMsgFp[msgType] == NULL) if (vnodeProcessReadMsgFp[msgType] == NULL) {
vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
}
if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) {
vTrace("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
// TODO: Later, let slave to support query // TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vTrace("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_RPC_NOT_READY;
}
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
} }
@ -60,11 +66,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
if (contLen != 0) { if (contLen != 0) {
pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->qhandle = htobe64((uint64_t) (pQInfo));
pRsp->code = pRet->code; pRsp->code = code;
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
@ -74,9 +80,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
assert(pCont != NULL); assert(pCont != NULL);
pQInfo = pCont; pQInfo = pCont;
code = TSDB_CODE_VND_ACTION_IN_PROGRESS; code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
vTrace("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo);
} }
if (pQInfo != NULL) { if (pQInfo != NULL) {
vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo);
qTableQuery(pQInfo); // do execute query qTableQuery(pQInfo); // do execute query
} }
@ -88,18 +96,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
pRet->code = qRetrieveQueryResultInfo(pQInfo); int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (pRet->code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO //TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { } else {
// todo check code and handle error in build result set // todo check code and handle error in build result set
pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(pQInfo)) { if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo; pRet->qhandle = pQInfo;

View File

@ -2,7 +2,7 @@
reset reset
set terminal png set terminal png
set title "Performance Test Report" font ",20" set title filename font ",20"
set ylabel "Time in Seconds" set ylabel "Time in Seconds"

View File

@ -16,9 +16,26 @@ function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") &&
function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; } # function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; } #
function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; } # function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; } #
function set-Wal { function setMaxConnections {
echo "/etc/taos/taos.cfg maxConnection will be set to $1"
hasText=`grep "maxConnections" /etc/taos/taos.cfg`
if [[ -z "$hasText" ]]; then
echo "maxConnections $1" >> /etc/taos/taos.cfg
else
sed -i 's/^maxConnections.*$/maxConnections '"$1"'/g' /etc/taos/taos.cfg
fi
}
function setWal {
echo "/etc/taos/taos.cfg walLevel will be set to $1" echo "/etc/taos/taos.cfg walLevel will be set to $1"
sed -i 's/^walLevel.*$/walLevel '"$1"'/g' /etc/taos/taos.cfg
hasText=`grep "walLevel" /etc/taos/taos.cfg`
if [[ -z "$hasText" ]]; then
echo "walLevel $1" >> /etc/taos/taos.cfg
else
sed -i 's/^walLevel.*$/walLevel '"$1"'/g' /etc/taos/taos.cfg
fi
} }
function collectSysInfo { function collectSysInfo {
@ -70,15 +87,25 @@ function sendReport {
mimebody="MIME-Version: 1.0\nContent-Type: text/html; charset=utf-8\n" mimebody="MIME-Version: 1.0\nContent-Type: text/html; charset=utf-8\n"
echo -e "to: ${receiver}\nsubject: Perf test report ${today}, commit ID: ${LOCAL_COMMIT}\n" | \ echo -e "to: ${receiver}\nsubject: Perf test report ${today}, commit ID: ${LOCAL_COMMIT}\n" | \
(cat - && uuencode perftest-1d-$today.log perftest-1d-$today.log)| \ (cat - && uuencode perftest-1d-wal1-$today.log perftest-1d-wal1-$today.log)| \
(cat - && uuencode perftest-1d-report.csv perftest-1d-report-$today.csv) | \ (cat - && uuencode perftest-1d-wal1-report.csv perftest-1d-wal1-report-$today.csv) | \
(cat - && uuencode perftest-1d-report.png perftest-1d-report-$today.png) | \ (cat - && uuencode perftest-1d-wal1-report.png perftest-1d-wal1-report-$today.png) | \
(cat - && uuencode perftest-13d-$today.log perftest-13d-$today.log)| \ (cat - && uuencode perftest-13d-wal1-$today.log perftest-13d-wal1-$today.log)| \
(cat - && uuencode perftest-13d-report.csv perftest-13d-report-$today.csv) | \ (cat - && uuencode perftest-13d-wal1-report.csv perftest-13d-wal1-report-$today.csv) | \
(cat - && uuencode perftest-13d-report.png perftest-13d-report-$today.png) | \ (cat - && uuencode perftest-13d-wal1-report.png perftest-13d-wal1-report-$today.png) | \
(cat - && uuencode taosdemo-$today.log taosdemo-$today.log) | \ (cat - && uuencode taosdemo-wal1-$today.log taosdemo-wal1-$today.log) | \
(cat - && uuencode taosdemo-report.csv taosdemo-report-$today.csv) | \ (cat - && uuencode taosdemo-wal1-report.csv taosdemo-wal1-report-$today.csv) | \
(cat - && uuencode taosdemo-report.png taosdemo-report-$today.png) | \ (cat - && uuencode taosdemo-rps-wal1-report.csv taosdemo-rps-wal1-report-$today.csv) | \
(cat - && uuencode taosdemo-wal1-report.png taosdemo-wal1-report-$today.png) | \
(cat - && uuencode perftest-1d-wal2-$today.log perftest-1d-wal2-$today.log)| \
(cat - && uuencode perftest-1d-wal2-report.csv perftest-1d-wal2-report-$today.csv) | \
(cat - && uuencode perftest-1d-wal2-report.png perftest-1d-wal2-report-$today.png) | \
(cat - && uuencode perftest-13d-wal2-$today.log perftest-13d-wal2-$today.log)| \
(cat - && uuencode perftest-13d-wal2-report.csv perftest-13d-wal2-report-$today.csv) | \
(cat - && uuencode perftest-13d-wal2-report.png perftest-13d-wal2-report-$today.png) | \
(cat - && uuencode taosdemo-wal2-$today.log taosdemo-wal2-$today.log) | \
(cat - && uuencode taosdemo-wal2-report.csv taosdemo-wal2-report-$today.csv) | \
(cat - && uuencode taosdemo-rps-wal2-report.csv taosdemo-rps-wal2-report-$today.csv) | \
(cat - && uuencode sysinfo.log sysinfo.txt) | \ (cat - && uuencode sysinfo.log sysinfo.txt) | \
(cat - && uuencode taos.cfg taos-cfg-$today.txt) | \ (cat - && uuencode taos.cfg taos-cfg-$today.txt) | \
ssmtp "${receiver}" ssmtp "${receiver}"
@ -91,17 +118,34 @@ echo -e "cron-ran-at-${today}" >> cron.log
echoInfo "Build TDengine" echoInfo "Build TDengine"
buildTDengine buildTDengine
set-Wal "2" ############################
setMaxConnections 100
############################
setWal "2"
cd /root cd /root
./perftest-tsdb-compare-1d.sh ./perftest-tsdb-compare-1d.sh "wal2"
cd /root cd /root
./perftest-tsdb-compare-13d.sh ./perftest-tsdb-compare-13d.sh "wal2"
cd /root cd /root
./perftest-taosdemo.sh ./perftest-taosdemo.sh "wal2"
#############################
setWal "1"
cd /root
./perftest-tsdb-compare-1d.sh "wal1"
cd /root
./perftest-tsdb-compare-13d.sh "wal1"
cd /root
./perftest-taosdemo.sh "wal1"
#############################
collectSysInfo collectSysInfo
echoInfo "Send Report" echoInfo "Send Report"

View File

@ -1,20 +1,20 @@
#!/bin/bash #!/bin/bash
# Coloured Echoes # # Coloured Echoes
function red_echo { echo -e "\033[31m$@\033[0m"; } # function red_echo { echo -e "\033[31m$@\033[0m"; }
function green_echo { echo -e "\033[32m$@\033[0m"; } # function green_echo { echo -e "\033[32m$@\033[0m"; }
function yellow_echo { echo -e "\033[33m$@\033[0m"; } # function yellow_echo { echo -e "\033[33m$@\033[0m"; }
function white_echo { echo -e "\033[1;37m$@\033[0m"; } # function white_echo { echo -e "\033[1;37m$@\033[0m"; }
# Coloured Printfs # # Coloured Printfs
function red_printf { printf "\033[31m$@\033[0m"; } # function red_printf { printf "\033[31m$@\033[0m"; }
function green_printf { printf "\033[32m$@\033[0m"; } # function green_printf { printf "\033[32m$@\033[0m"; }
function yellow_printf { printf "\033[33m$@\033[0m"; } # function yellow_printf { printf "\033[33m$@\033[0m"; }
function white_printf { printf "\033[1;37m$@\033[0m"; } # function white_printf { printf "\033[1;37m$@\033[0m"; }
# Debugging Outputs # # Debugging Outputs
function white_brackets { local args="$@"; white_printf "["; printf "${args}"; white_printf "]"; } # function white_brackets { local args="$@"; white_printf "["; printf "${args}"; white_printf "]"; }
function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") && echo " ${args}"; } # function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") && echo " ${args}"; }
function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; } # function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; }
function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; } # function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; }
function restartTaosd { function restartTaosd {
systemctl stop taosd systemctl stop taosd
@ -32,39 +32,57 @@ function runCreateTableOnly {
echoInfo "Restart Taosd" echoInfo "Restart Taosd"
restartTaosd restartTaosd
/usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo -n 0 2>&1 | tee taosdemo-$today.log" /usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo -n 0 2>&1 | tee taosdemo-$1-$today.log"
demoTableOnly=`grep "Total:" totaltime.out|awk '{print $2}'` demoCreateTableOnly=`grep "Total:" totaltime.out|awk '{print $2}'`
}
function runDeleteTableOnly {
echoInfo "Restart Taosd"
restartTaosd
/usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo -t 0 -D 1 2>&1 | tee taosdemo-$1-$today.log"
demoDeleteTableOnly=`grep "Total:" totaltime.out|awk '{print $2}'`
} }
function runCreateTableThenInsert { function runCreateTableThenInsert {
echoInfo "Restart Taosd" echoInfo "Restart Taosd"
restartTaosd restartTaosd
/usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo 2>&1 | tee -a taosdemo-$today.log" /usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo 2>&1 | tee -a taosdemo-$1-$today.log"
demoTableAndInsert=`grep "Total:" totaltime.out|awk '{print $2}'` demoTableAndInsert=`grep "Total:" totaltime.out|awk '{print $2}'`
demoRPS=`grep "records\/second" taosdemo-$today.log | tail -n1 | awk '{print $13}'` demoRPS=`grep "records\/second" taosdemo-$1-$today.log | tail -n1 | awk '{print $13}'`
} }
function generateTaosdemoPlot { function generateTaosdemoPlot {
echo "${today}, demoTableOnly: ${demoTableOnly}, demoTableAndInsert: ${demoTableAndInsert}" | tee -a taosdemo-$today.log echo "${today} $1, demoCreateTableOnly: ${demoCreateTableOnly}, demoDeleteTableOnly: ${demoDeleteTableOnly}, demoTableAndInsert: ${demoTableAndInsert}" | tee -a taosdemo-$today.log
echo "${today}, ${demoTableOnly}, ${demoTableAndInsert}, ${demoRPS}" >> taosdemo-report.csv echo "${today}, ${demoCreateTableOnly}, ${demoDeleteTableOnly}, ${demoTableAndInsert}">> taosdemo-$1-report.csv
echo "${today}, ${demoRPS}" >> taosdemo-rps-$1-report.csv
csvLines=`cat taosdemo-report.csv | wc -l` csvLines=`cat taosdemo-$1-report.csv | wc -l`
if [ "$csvLines" -gt "10" ]; then if [ "$csvLines" -gt "10" ]; then
sed -i '1d' taosdemo-report.csv sed -i '1d' taosdemo-$1-report.csv
fi fi
gnuplot -p taosdemo-csv2png.gnuplot csvLines=`cat taosdemo-rps-$1-report.csv | wc -l`
if [ "$csvLines" -gt "10" ]; then
sed -i '1d' taosdemo-rps-$1-report.csv
fi
gnuplot -e "filename='taosdemo-$1-report'" -p taosdemo-csv2png.gnuplot
gnuplot -e "filename='taosdemo-rps-$1-report'" -p taosdemo-rps-csv2png.gnuplot
} }
today=`date +"%Y%m%d"` today=`date +"%Y%m%d"`
cd /root cd /root
echoInfo "Test Create Table Only " echoInfo "Test Create Table Only "
runCreateTableOnly runCreateTableOnly $1
echoInfo "Test Create Table then Insert data" echoInfo "Test Create Table then Insert data"
runCreateTableThenInsert runDeleteTableOnly $1
echoInfo "Test Create Table then Insert data"
runCreateTableThenInsert $1
echoInfo "Generate plot for taosdemo" echoInfo "Generate plot for taosdemo"
generateTaosdemoPlot generateTaosdemoPlot $1
echoInfo "End of TaosDemo Test" echoInfo "End of TaosDemo Test"

View File

@ -33,26 +33,26 @@ function runPerfTest13d {
restartTaosd restartTaosd
cd /home/taos/tliu/timeseriesdatabase-comparisons/build/tsdbcompare cd /home/taos/tliu/timeseriesdatabase-comparisons/build/tsdbcompare
./runreal-13d-csv.sh 2>&1 | tee /root/perftest-13d-$today.log ./runreal-13d-csv.sh $1 2>&1 | tee /root/perftest-13d-$1-$today.log
} }
function generatePerfPlot13d { function generatePerfPlot13d {
cd /root cd /root
csvLines=`cat perftest-13d-report.csv | wc -l` csvLines=`cat perftest-13d-$1-report.csv | wc -l`
if [ "$csvLines" -gt "10" ]; then if [ "$csvLines" -gt "10" ]; then
sed -i '1d' perftest-13d-report.csv sed -i '1d' perftest-13d-$1-report.csv
fi fi
gnuplot -e "filename='perftest-13d-report'" -p perftest-csv2png.gnuplot gnuplot -e "filename='perftest-13d-$1-report'" -p perftest-csv2png.gnuplot
} }
today=`date +"%Y%m%d"` today=`date +"%Y%m%d"`
cd /root cd /root
echoInfo "run Performance Test with 13 days data" echoInfo "run Performance Test with 13 days data"
runPerfTest13d runPerfTest13d $1
echoInfo "Generate plot of 13 days data" echoInfo "Generate plot of 13 days data"
generatePerfPlot13d generatePerfPlot13d $1
echoInfo "End of TSDB-Compare 13-days-data Test" echoInfo "End of TSDB-Compare 13-days-data Test"

View File

@ -33,26 +33,26 @@ function runPerfTest1d {
restartTaosd restartTaosd
cd /home/taos/tliu/timeseriesdatabase-comparisons/build/tsdbcompare cd /home/taos/tliu/timeseriesdatabase-comparisons/build/tsdbcompare
./runreal-1d-csv.sh 2>&1 | tee /root/perftest-1d-$today.log ./runreal-1d-csv.sh $1 2>&1 | tee /root/perftest-1d-$1-$today.log
} }
function generatePerfPlot1d { function generatePerfPlot1d {
cd /root cd /root
csvLines=`cat perftest-1d-report.csv | wc -l` csvLines=`cat perftest-1d-$1-report.csv | wc -l`
if [ "$csvLines" -gt "10" ]; then if [ "$csvLines" -gt "10" ]; then
sed -i '2d' perftest-1d-report.csv sed -i '2d' perftest-1d-$1-report.csv
fi fi
gnuplot -e "filename='perftest-1d-report'" -p perftest-csv2png.gnuplot gnuplot -e "filename='perftest-1d-$1-report'" -p perftest-csv2png.gnuplot
} }
today=`date +"%Y%m%d"` today=`date +"%Y%m%d"`
cd /root cd /root
echoInfo "run Performance Test with 1 day data" echoInfo "run Performance Test with 1 day data"
runPerfTest1d runPerfTest1d $1
echoInfo "Generate plot of 1 day data" echoInfo "Generate plot of 1 day data"
generatePerfPlot1d generatePerfPlot1d $1
echoInfo "End of TSDB-Compare 1-day-data Test" echoInfo "End of TSDB-Compare 1-day-data Test"

View File

@ -143,7 +143,7 @@ echo "------------------------------------------------------"
echo echo
today=`date +"%Y%m%d"` today=`date +"%Y%m%d"`
echo "${today}, ${TDWTM}, ${TDQ1}, ${TDQ2}, ${TDQ3}, ${TDQ4}" >> /root/perftest-13d-report.csv echo "${today}, ${TDWTM}, ${TDQ1}, ${TDQ2}, ${TDQ3}, ${TDQ4}" >> /root/perftest-13d-$1-report.csv
#bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086" #bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086"
#bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020" #bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020"

View File

@ -143,7 +143,7 @@ echo "------------------------------------------------------"
echo echo
today=`date +"%Y%m%d"` today=`date +"%Y%m%d"`
echo "${today}, ${TDWTM}, ${TDQ1}, ${TDQ2}, ${TDQ3}, ${TDQ4}" >> /root/perftest-1d-report.csv echo "${today}, ${TDWTM}, ${TDQ1}, ${TDQ2}, ${TDQ3}, ${TDQ4}" >> /root/perftest-1d-$1-report.csv
#bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086" #bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086"
#bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020" #bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020"

View File

@ -2,7 +2,7 @@
reset reset
set terminal png set terminal png
set title "TaosDemo Performance Report" font ",20" set title filename font ",20"
set ylabel "Time in Seconds" set ylabel "Time in Seconds"
@ -14,13 +14,13 @@ set xlabel "Date"
set style data linespoints set style data linespoints
set terminal pngcairo size 1024,768 enhanced font 'Segoe UI, 10' set terminal pngcairo size 1024,768 enhanced font 'Segoe UI, 10'
set output 'taosdemo-report.png' set output filename . '.png'
set datafile separator ',' set datafile separator ','
set key reverse Left outside set key reverse Left outside
set grid set grid
plot 'taosdemo-report.csv' using 1:2 title "Create 10,000 Table", \ plot filename . '.csv' using 1:2 title "Create 10,000 Tables", \
"" using 1:3 title "Create 10,000 Table and Insert 100,000 data", \ "" using 1:3 title "Delete 10,000 Tables", \
"" using 1:4 title "Request Per Second of Insert 100,000 data" "" using 1:4 title "Create 10,000 Tables and Insert 100,000 records"

View File

@ -0,0 +1,131 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tbNum = 10
rowNum = 20
tdSql.prepare()
tdLog.info("===== step1 =====")
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
for i in range(tbNum):
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
for j in range(rowNum):
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
time.sleep(0.1)
tdLog.info("===== step2 =====")
tdSql.query("select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step3 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdLog.info("===== step4 =====")
tdSql.execute("drop table s0")
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdLog.info("===== step5 =====")
tdSql.error("select * from s0")
tdLog.info("===== step6 =====")
time.sleep(0.1)
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step7 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdLog.info("===== step8 =====")
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.checkData(0, 1, rowNum * tbNum)
tdSql.checkData(0, 2, rowNum * tbNum)
tdSql.checkData(0, 3, rowNum * tbNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step9 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, rowNum * tbNum)
tdSql.checkData(0, 2, rowNum * tbNum)
tdSql.checkData(0, 3, rowNum * tbNum)
tdLog.info("===== step10 =====")
tdSql.execute("drop table s1")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step11 =====")
tdSql.error("select * from s1")
tdLog.info("===== step12 =====")
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step13 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, rowNum * tbNum)
tdSql.checkData(0, 2, rowNum * tbNum)
tdSql.checkData(0, 3, rowNum * tbNum)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -0,0 +1,122 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tbNum = 10
rowNum = 20
totalNum = tbNum * rowNum
tdSql.prepare()
tdLog.info("===== step1 =====")
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
for i in range(tbNum):
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
for j in range(rowNum):
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
time.sleep(0.1)
tdLog.info("===== step2 =====")
tdSql.query("select count(col1) from tb0 interval(1d)")
tdSql.checkData(0, 1, rowNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdSql.execute("create table s0 as select count(col1) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step3 =====")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdLog.info("===== step4 =====")
tdSql.execute("drop table s0")
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdLog.info("===== step5 =====")
tdSql.error("select * from s0")
tdLog.info("===== step6 =====")
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step7 =====")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdLog.info("===== step8 =====")
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.checkData(0, 1, totalNum)
tdSql.checkData(0, 2, totalNum)
tdSql.checkData(0, 3, totalNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step9 =====")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, totalNum)
tdSql.checkData(0, 2, totalNum)
tdSql.checkData(0, 3, totalNum)
tdLog.info("===== step10 =====")
tdSql.execute("drop table s1")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step11 =====")
tdSql.error("select * from s1")
tdLog.info("===== step12 =====")
tdSql.execute("create table s1 as select count(col1) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step13 =====")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, totalNum)
#tdSql.checkData(0, 2, None)
#tdSql.checkData(0, 3, None)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

2
tests/script/bug.sim Normal file
View File

@ -0,0 +1,2 @@
run general/parser/projection_limit_offset.sim
run general/parser/limit2.sim

View File

@ -25,8 +25,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591200000000 + $count
sql insert into $tb values( now+ $ms , 1, 0, $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1, 0, $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -46,8 +46,8 @@ sql create table $tb (ts timestamp, f float, d double, str binary(256))
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591286400000 + $count
sql insert into $tb values( now+ $ms , $count , $count ,'it is a string') sql insert into $tb values( $ms , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -75,8 +75,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591372800000 + $count
sql insert into $tb values( now+ $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw

View File

@ -26,8 +26,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591200000000 + $count
sql insert into $tb values( now+ $ms , 1, 0, $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1, 0, $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -48,8 +48,8 @@ sql create table $tb (ts timestamp, f float, d double, str binary(256))
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591286400000 + $count
sql insert into $tb values( now+ $ms , $count , $count ,'it is a string') sql insert into $tb values( $ms , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -70,8 +70,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591372800000 + $count
sql insert into $tb values( now+ $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw

View File

@ -26,8 +26,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591200000000 + $count
sql insert into $tb values( now+ $ms , 1, 0, $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1, 0, $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -48,8 +48,8 @@ sql create table $tb (ts timestamp, f float, d double, str binary(256))
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591286400000 + $count
sql insert into $tb values( now+ $ms , $count , $count ,'it is a string') sql insert into $tb values( $ms , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -70,8 +70,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591372800000 + $count
sql insert into $tb values( now+ $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw

View File

@ -25,8 +25,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591200000000 + $count
sql insert into $tb values( now+ $ms , 1, 0, $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1, 0, $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -47,8 +47,8 @@ sql create table $tb (ts timestamp, f float, d double, str binary(256))
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591286400000 + $count
sql insert into $tb values( now+ $ms , $count , $count ,'it is a string') sql insert into $tb values( $ms , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw
@ -69,8 +69,8 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0 $count = 0
while $count < $N while $count < $N
$ms = $count . a $ms = 1591372800000 + $count
sql insert into $tb values( now+ $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string') sql insert into $tb values( $ms , 1 , 0 , $count , $count , $count , $count , $count ,'it is a string')
$count = $count + 1 $count = $count + 1
endw endw

View File

@ -66,7 +66,7 @@ endi
system_content curl 127.0.0.1:6020/grafana/login/xx/xx/ system_content curl 127.0.0.1:6020/grafana/login/xx/xx/
print 3-> $system_content print 3-> $system_content
if $system_content != @{"status":"error","code":1000,"desc":"invalid user"}@ then if $system_content != @{"status":"error","code":1000,"desc":"mnode invalid user"}@ then
return -1 return -1
endi endi
@ -78,7 +78,7 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:6020/grafana/login/1/root/1/ system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:6020/grafana/login/1/root/1/
print 5-> $system_content print 5-> $system_content
if $system_content != @{"status":"error","code":1000,"desc":"invalid user"}@ then if $system_content != @{"status":"error","code":1000,"desc":"mnode invalid user"}@ then
return -1 return -1
endi endi

View File

@ -93,7 +93,7 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'create database d1' 127.0.0.1:6020/rest/sql system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'create database d1' 127.0.0.1:6020/rest/sql
print 13-> $system_content print 13-> $system_content
if $system_content != @{"status":"error","code":1000,"desc":"database aleady exist"}@ then if $system_content != @{"status":"error","code":1000,"desc":"mnode database aleady exist"}@ then
return -1 return -1
endi endi
@ -126,7 +126,7 @@ endi
#18 #18
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' show tables;' 127.0.0.1:6020/rest/sql system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' show tables;' 127.0.0.1:6020/rest/sql
print 18-> $system_content print 18-> $system_content
if $system_content != @{"status":"error","code":1000,"desc":"db not selected"}@ then if $system_content != @{"status":"error","code":1000,"desc":"mnode db not selected"}@ then
return -1 return -1
endi endi
@ -147,7 +147,7 @@ print =============== step3 - db
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' select * from d1.t1;' 127.0.0.1:6020/rest/sql system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' select * from d1.t1;' 127.0.0.1:6020/rest/sql
print 21-> $system_content print 21-> $system_content
if $system_content != @{"status":"error","code":1000,"desc":"invalid table id"}@ then if $system_content != @{"status":"error","code":1000,"desc":"mnode invalid table id"}@ then
return -1 return -1
endi endi

View File

@ -69,6 +69,8 @@ print ====== tables created
print ================== restart server to commit data into disk print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
return
sleep 3000 sleep 3000
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
print ================== server restart completed print ================== server restart completed

View File

@ -205,8 +205,8 @@ if $data01 != 20 then
endi endi
print =============== step21 print =============== step21
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step22 print =============== step22
$st = $stPrefix . c1 $st = $stPrefix . c1

View File

@ -76,20 +76,20 @@ endw
sql drop table $mt sql drop table $mt
print =============== step4 print =============== step4
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step5 print =============== step5
$st = $stPrefix . c3 $st = $stPrefix . c3
sql select * from $st sql select * from $st
print ===> select * from $st print ===> select * from $st
print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $data01 != NULL then if $data01 != null then
return -1 return -1
endi endi
if $data02 != NULL then if $data02 != null then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi

View File

@ -187,8 +187,8 @@ $st = $stPrefix . as
#sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7, avg(tbcol) as a8, sum(tbcol) as a9, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7 from $mt where ts < now + 4m interval(1d) #sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7, avg(tbcol) as a8, sum(tbcol) as a9, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7 from $mt where ts < now + 4m interval(1d)
print =============== step9 print =============== step9
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step10 print =============== step10
$st = $stPrefix . c3 $st = $stPrefix . c3

View File

@ -163,8 +163,8 @@ $st = $stPrefix . as
sql create table $st as select count(tbcol) as c from $mt interval(1d) sql create table $st as select count(tbcol) as c from $mt interval(1d)
print =============== step13 print =============== step13
print sleep 22 seconds print sleep 120 seconds
sleep 32000 sleep 120000
print =============== step14 print =============== step14
$st = $stPrefix . c1 $st = $stPrefix . c1

View File

@ -1,12 +1,12 @@
system sh/stop_dnodes.sh #system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 #system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0 #system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 10 #system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 10
system sh/exec.sh -n dnode1 -s start #system sh/exec.sh -n dnode1 -s start
sleep 3000 #sleep 3000
sql connect sql connect
print ======================== dnode1 start print ======================== dnode1 start
@ -56,14 +56,14 @@ print $data00 $data01 $data02 $data03
sql create table $st as select count(*), count(tbcol), count(tbcol2) from $mt interval(10s) sql create table $st as select count(*), count(tbcol), count(tbcol2) from $mt interval(10s)
print =============== step3 print =============== step3
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step4 print =============== step4
sql select * from $st sql select * from $st
print $st ==> $rows1 $data00 $data01 $data02 $data03 print $st ==> $rows1 $data00 $data01 $data02 $data03
if $data13 >= 51 then if $data03 >= 51 then
return -1 return -1
endi endi
@ -90,8 +90,8 @@ while $i < $tbNum
endw endw
print =============== step6 print =============== step6
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step7 print =============== step7

View File

@ -73,8 +73,8 @@ print =============== step3
sql create table $stt as select count(*) from $tb interval(1d) sql create table $stt as select count(*) from $tb interval(1d)
sql create table $stm as select count(*) from $mt interval(1d) sql create table $stm as select count(*) from $mt interval(1d)
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $stt sql select * from $stt
print select count(*) from $stt ===> $data00 $data01 print select count(*) from $stt ===> $data00 $data01
@ -152,8 +152,8 @@ print =============== step8
sql create table $stt as select count(*) from $tb interval(1d) sql create table $stt as select count(*) from $tb interval(1d)
sql create table $stm as select count(*) from $mt interval(1d) sql create table $stm as select count(*) from $mt interval(1d)
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $stt sql select * from $stt
sleep 1000 sleep 1000

View File

@ -78,8 +78,8 @@ if $rows != 11 then
endi endi
print =============== step3 print =============== step3
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
@ -112,8 +112,8 @@ if $rows != 11 then
endi endi
print =============== step7 print =============== step7
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
@ -155,8 +155,8 @@ if $rows != 12 then
endi endi
print =============== step9 print =============== step9
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 200 then if $data01 != 200 then
@ -190,8 +190,8 @@ if $rows != 12 then
endi endi
print =============== step13 print =============== step13
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 200 then if $data01 != 200 then

View File

@ -72,8 +72,8 @@ if $rows != 11 then
endi endi
print =============== step3 print =============== step3
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
@ -100,8 +100,8 @@ if $rows != 11 then
endi endi
print =============== step7 print =============== step7
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
@ -143,8 +143,8 @@ if $rows != 12 then
endi endi
print =============== step9 print =============== step9
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 $data02, $data03 print select * from $st => $data01 $data02, $data03
if $data01 != 200 then if $data01 != 200 then
@ -178,17 +178,17 @@ if $rows != 12 then
endi endi
print =============== step13 print =============== step13
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 $data02, $data03 print select * from $st => $data01 $data02, $data03
if $data01 != 200 then if $data01 != 200 then
return -1 return -1
endi endi
if $data02 != NULL then if $data02 != null then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi

View File

@ -79,8 +79,8 @@ $st = $stPrefix . c3
sql create table $st as select count(tbcol2) from $tb interval(1d) sql create table $st as select count(tbcol2) from $tb interval(1d)
print =============== step5 print =============== step5
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step6 print =============== step6
$st = $stPrefix . c1 $st = $stPrefix . c1
@ -173,8 +173,8 @@ $st = $stPrefix . c3
sql create table $st as select count(*), count(tbcol), count(tbcol2) from $tb interval(1d) sql create table $st as select count(*), count(tbcol), count(tbcol2) from $tb interval(1d)
print =============== step10 print =============== step10
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step11 print =============== step11
#$st = $stPrefix . c3 #$st = $stPrefix . c3

View File

@ -79,8 +79,8 @@ sleep 1000
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
print =============== step4 print =============== step4
print sleep 23 seconds print sleep 120 seconds
sleep 23000 sleep 120000
print =============== step5 print =============== step5
$i = 1 $i = 1

View File

@ -214,8 +214,8 @@ sql select count(tbcol) from $tb where ts < now + 4m interval(1d) group by tgcol
step20: step20:
print =============== step21 print =============== step21
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step22 print =============== step22
$st = $stPrefix . c1 $st = $stPrefix . c1

View File

@ -71,20 +71,20 @@ print =============== step3
sql drop table $tb sql drop table $tb
print =============== step4 print =============== step4
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step5 print =============== step5
$st = $stPrefix . c3 $st = $stPrefix . c3
sql select * from $st sql select * from $st
print ===> select * from $st print ===> select * from $st
print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $data01 != NULL then if $data01 != null then
return -1 return -1
endi endi
if $data02 != NULL then if $data02 != null then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi

View File

@ -191,8 +191,8 @@ $st = $stPrefix . as
#sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, stddev(tbcol) as a7, percentile(tbcol, 1) as a8, count(tbcol) as a9, leastsquares(tbcol, 1, 1) as a10 from $tb where ts < now + 4m interval(1d) #sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, stddev(tbcol) as a7, percentile(tbcol, 1) as a8, count(tbcol) as a9, leastsquares(tbcol, 1, 1) as a10 from $tb where ts < now + 4m interval(1d)
print =============== step10 print =============== step10
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step11 print =============== step11
$st = $stPrefix . c3 $st = $stPrefix . c3

View File

@ -196,8 +196,8 @@ $st = $stPrefix . as
sql create table $st as select count(tbcol) as c from $tb interval(1d) sql create table $st as select count(tbcol) as c from $tb interval(1d)
print =============== step16 print =============== step16
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step17 print =============== step17
$st = $stPrefix . c1 $st = $stPrefix . c1

View File

@ -0,0 +1,48 @@
cd ../../../debug; cmake ..
cd ../../../debug; make
#./test.sh -f general/parser/lastrow.sim
#./test.sh -f general/parser/nchar.sim
#./test.sh -f general/parser/limit.sim
#./test.sh -f general/parser/limit1.sim
#./test.sh -f general/parser/limit1_tblocks100.sim
#./test.sh -f general/parser/binary_escapeCharacter.sim
#./test.sh -f general/parser/projection_limit_offset.sim
#./test.sh -f general/parser/limit2.sim
#./test.sh -f general/stable/metrics.sim
#./test.sh -f general/table/date.sim
#./test.sh -f unique/big/balance.sim
#./test.sh -f unique/column/replica3.sim
#./test.sh -f unique/db/commit.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete.sim
#./test.sh -f unique/db/replica_add12.sim
#./test.sh -f unique/db/replica_add13.sim
#./test.sh -f unique/vnode/replica3_basic.sim
#./test.sh -f unique/dnode/balance1.sim
#./test.sh -f unique/dnode/balance2.sim
#./test.sh -f unique/dnode/balance3.sim
#./test.sh -f unique/cluster/balance1.sim
#./test.sh -f unique/cluster/balance2.sim
#./test.sh -f unique/cluster/balance3.sim