[td-225] merge develop
This commit is contained in:
commit
315ce829e5
|
@ -114,6 +114,25 @@ mkdir -p ${install_dir}/examples
|
|||
examples_dir="${top_dir}/tests/examples"
|
||||
cp -r ${examples_dir}/c ${install_dir}/examples
|
||||
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
|
||||
if [ -d ${examples_dir}/JDBC/connectionPools/target ]; then
|
||||
rm -rf ${examples_dir}/JDBC/connectionPools/target
|
||||
fi
|
||||
if [ -d ${examples_dir}/JDBC/JDBCDemo/target ]; then
|
||||
rm -rf ${examples_dir}/JDBC/JDBCDemo/target
|
||||
fi
|
||||
if [ -d ${examples_dir}/JDBC/mybatisplus-demo/target ]; then
|
||||
rm -rf ${examples_dir}/JDBC/mybatisplus-demo/target
|
||||
fi
|
||||
if [ -d ${examples_dir}/JDBC/springbootdemo/target ]; then
|
||||
rm -rf ${examples_dir}/JDBC/springbootdemo/target
|
||||
fi
|
||||
if [ -d ${examples_dir}/JDBC/SpringJdbcTemplate/target ]; then
|
||||
rm -rf ${examples_dir}/JDBC/SpringJdbcTemplate/target
|
||||
fi
|
||||
if [ -d ${examples_dir}/JDBC/taosdemo/target ]; then
|
||||
rm -rf ${examples_dir}/JDBC/taosdemo/target
|
||||
fi
|
||||
|
||||
cp -r ${examples_dir}/JDBC ${install_dir}/examples
|
||||
cp -r ${examples_dir}/matlab ${install_dir}/examples
|
||||
cp -r ${examples_dir}/python ${install_dir}/examples
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
name: tdengine
|
||||
base: core18
|
||||
|
||||
version: '2.1.1.0'
|
||||
icon: snap/gui/t-dengine.svg
|
||||
summary: an open-source big data platform designed and optimized for IoT.
|
||||
|
|
|
@ -7421,6 +7421,11 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
const char* msg1 = "point interpolation query needs timestamp";
|
||||
const char* msg2 = "too many tables in from clause";
|
||||
const char* msg3 = "start(end) time of query range required or time range too large";
|
||||
// const char* msg5 = "too many columns in selection clause";
|
||||
// const char* msg6 = "too many tables in from clause";
|
||||
// const char* msg7 = "invalid table alias name";
|
||||
// const char* msg8 = "alias name too long";
|
||||
const char* msg9 = "only tag query not compatible with normal column filter";
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -7540,6 +7545,20 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
}
|
||||
}
|
||||
|
||||
if (tscQueryTags(pQueryInfo)) {
|
||||
SExprInfo* pExpr1 = tscSqlExprGet(pQueryInfo, 0);
|
||||
|
||||
if (pExpr1->base.functionId != TSDB_FUNC_TID_TAG) {
|
||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryInfo->colList);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCols = taosArrayGetP(pQueryInfo->colList, i);
|
||||
if (pCols->info.flist.numOfFilters > 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parse the having clause in the first place
|
||||
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -1902,8 +1902,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true);
|
||||
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
|
||||
|
||||
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid,
|
||||
tNameGetTableName(&pTableMetaInfo->name));
|
||||
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self,
|
||||
pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name), pTableMeta->tableInfo.numOfColumns,
|
||||
pTableMeta->tableInfo.numOfTags);
|
||||
|
||||
free(pTableMeta);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1919,13 +1920,13 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
|
|||
SVgroupsInfo* pVgroupInfo = calloc(1, vgroupsz);
|
||||
assert(pVgroupInfo != NULL);
|
||||
|
||||
pVgroupInfo->numOfVgroups = pVgroupMsg->numOfVgroups;
|
||||
if (pVgroupInfo->numOfVgroups <= 0) {
|
||||
tscDebug("0x%"PRIx64" empty vgroup info, no corresponding tables for stable", id);
|
||||
pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
|
||||
if (pInfo->vgroupList->numOfVgroups <= 0) {
|
||||
tscDebug("0x%" PRIx64 " empty vgroup info, no corresponding tables for stable", pSql->self);
|
||||
} else {
|
||||
for (int32_t j = 0; j < pVgroupInfo->numOfVgroups; ++j) {
|
||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||
// just init, no need to lock
|
||||
SVgroupInfo *pVgroup = &pVgroupInfo->vgroups[j];
|
||||
SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j];
|
||||
|
||||
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||
vmsg->vgId = htonl(vmsg->vgId);
|
||||
|
|
|
@ -627,6 +627,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
|||
if (pSql->sqlstr == NULL) {
|
||||
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
|
||||
tscFreeSqlObj(pSql);
|
||||
free(pStream);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -215,7 +215,7 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
|
|||
taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer);
|
||||
}
|
||||
|
||||
|
||||
//TODO refactor: extract table list name not simply from the sql
|
||||
static SArray* getTableList( SSqlObj* pSql ) {
|
||||
const char* p = strstr( pSql->sqlstr, " from " );
|
||||
assert(p != NULL); // we are sure this is a 'select' statement
|
||||
|
|
|
@ -219,6 +219,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
|
||||
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing")
|
||||
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state")
|
||||
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0515) //"Database is closing")
|
||||
|
||||
// tsdb
|
||||
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
|
||||
|
|
|
@ -470,6 +470,7 @@ typedef struct SThreadInfo_S {
|
|||
|
||||
// seq of query or subscribe
|
||||
uint64_t querySeq; // sequence number of sql command
|
||||
TAOS_SUB* tsub;
|
||||
|
||||
} threadInfo;
|
||||
|
||||
|
@ -929,6 +930,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
&& strcasecmp(argv[i], "BIGINT")
|
||||
&& strcasecmp(argv[i], "DOUBLE")
|
||||
&& strcasecmp(argv[i], "BINARY")
|
||||
&& strcasecmp(argv[i], "TIMESTAMP")
|
||||
&& strcasecmp(argv[i], "NCHAR")) {
|
||||
printHelp();
|
||||
errorPrint("%s", "-b: Invalid data_type!\n");
|
||||
|
@ -950,6 +952,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
&& strcasecmp(token, "BIGINT")
|
||||
&& strcasecmp(token, "DOUBLE")
|
||||
&& strcasecmp(token, "BINARY")
|
||||
&& strcasecmp(token, "TIMESTAMP")
|
||||
&& strcasecmp(token, "NCHAR")) {
|
||||
printHelp();
|
||||
free(g_dupstr);
|
||||
|
@ -1132,11 +1135,11 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
|
|||
}
|
||||
}
|
||||
|
||||
verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
|
||||
if (code != 0) {
|
||||
if (!quiet) {
|
||||
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
|
||||
errorPrint("Failed to execute %s, command length: %d, reason: %s\n",
|
||||
command, (int)strlen(command), taos_errstr(res));
|
||||
errorPrint("Failed to execute %s, reason: %s\n",
|
||||
command, taos_errstr(res));
|
||||
}
|
||||
taos_free_result(res);
|
||||
//taos_close(taos);
|
||||
|
@ -2732,8 +2735,6 @@ static int createSuperTable(
|
|||
snprintf(command, BUFFER_SIZE,
|
||||
"create table if not exists %s.%s (ts timestamp%s) tags %s",
|
||||
dbName, superTbl->sTblName, cols, tags);
|
||||
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
|
||||
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
|
||||
errorPrint( "create supertable %s failed!\n\n",
|
||||
superTbl->sTblName);
|
||||
|
@ -2756,7 +2757,6 @@ static int createDatabasesAndStables() {
|
|||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
if (g_Dbs.db[i].drop) {
|
||||
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
|
||||
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
|
||||
taos_close(taos);
|
||||
return -1;
|
||||
|
@ -2829,7 +2829,6 @@ static int createDatabasesAndStables() {
|
|||
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
|
||||
}
|
||||
|
||||
debugPrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
|
||||
taos_close(taos);
|
||||
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
|
||||
|
@ -2846,8 +2845,6 @@ static int createDatabasesAndStables() {
|
|||
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
|
||||
g_Dbs.db[i].superTbls[j].sTblName);
|
||||
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
|
||||
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
|
||||
|
||||
if ((ret != 0) || (g_Dbs.db[i].drop)) {
|
||||
|
@ -2951,7 +2948,6 @@ static void* createTable(void *sarg)
|
|||
}
|
||||
|
||||
len = 0;
|
||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||
if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)){
|
||||
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
|
||||
free(buffer);
|
||||
|
@ -2967,7 +2963,6 @@ static void* createTable(void *sarg)
|
|||
}
|
||||
|
||||
if (0 != len) {
|
||||
verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
|
||||
if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)) {
|
||||
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
|
||||
}
|
||||
|
@ -2978,7 +2973,7 @@ static void* createTable(void *sarg)
|
|||
}
|
||||
|
||||
static int startMultiThreadCreateChildTable(
|
||||
char* cols, int threads, uint64_t startFrom, int64_t ntables,
|
||||
char* cols, int threads, uint64_t tableFrom, int64_t ntables,
|
||||
char* db_name, SSuperTable* superTblInfo) {
|
||||
|
||||
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
||||
|
@ -3022,10 +3017,10 @@ static int startMultiThreadCreateChildTable(
|
|||
return -1;
|
||||
}
|
||||
|
||||
pThreadInfo->start_table_from = startFrom;
|
||||
pThreadInfo->start_table_from = tableFrom;
|
||||
pThreadInfo->ntables = i<b?a+1:a;
|
||||
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
|
||||
startFrom = pThreadInfo->end_table_to + 1;
|
||||
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
|
||||
tableFrom = pThreadInfo->end_table_to + 1;
|
||||
pThreadInfo->use_metric = true;
|
||||
pThreadInfo->cols = cols;
|
||||
pThreadInfo->minDelay = UINT64_MAX;
|
||||
|
@ -3063,15 +3058,15 @@ static void createChildTables() {
|
|||
|
||||
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
|
||||
uint64_t startFrom = 0;
|
||||
uint64_t tableFrom = 0;
|
||||
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
||||
|
||||
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
|
||||
__func__, __LINE__, g_totalChildTables, startFrom);
|
||||
__func__, __LINE__, g_totalChildTables, tableFrom);
|
||||
startMultiThreadCreateChildTable(
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
|
||||
g_Dbs.threadCountByCreateTbl,
|
||||
startFrom,
|
||||
tableFrom,
|
||||
g_Dbs.db[i].superTbls[j].childTblCount,
|
||||
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
|
||||
}
|
||||
|
@ -4699,8 +4694,8 @@ static int getRowDataFromSample(
|
|||
|
||||
static int64_t generateStbRowData(
|
||||
SSuperTable* stbInfo,
|
||||
char* recBuf, int64_t timestamp
|
||||
) {
|
||||
char* recBuf, int64_t timestamp)
|
||||
{
|
||||
int64_t dataLen = 0;
|
||||
char *pstr = recBuf;
|
||||
int64_t maxLen = MAX_DATA_SIZE;
|
||||
|
@ -4728,23 +4723,23 @@ static int64_t generateStbRowData(
|
|||
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf);
|
||||
tmfree(buf);
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"INT", 3)) {
|
||||
"INT", strlen("INT"))) {
|
||||
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
|
||||
"%d,", rand_int());
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"BIGINT", 6)) {
|
||||
"BIGINT", strlen("BIGINT"))) {
|
||||
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
|
||||
"%"PRId64",", rand_bigint());
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"FLOAT", 5)) {
|
||||
"FLOAT", strlen("FLOAT"))) {
|
||||
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
|
||||
"%f,", rand_float());
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"DOUBLE", 6)) {
|
||||
"DOUBLE", strlen("DOUBLE"))) {
|
||||
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
|
||||
"%f,", rand_double());
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"SMALLINT", 8)) {
|
||||
"SMALLINT", strlen("SMALLINT"))) {
|
||||
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
|
||||
"%d,", rand_smallint());
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
|
@ -4800,6 +4795,8 @@ static int64_t generateData(char *recBuf, char **data_type,
|
|||
pstr += sprintf(pstr, ",%d", rand_int());
|
||||
} else if (strcasecmp(data_type[i % c], "BIGINT") == 0) {
|
||||
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
|
||||
} else if (strcasecmp(data_type[i % c], "TIMESTAMP") == 0) {
|
||||
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
|
||||
} else if (strcasecmp(data_type[i % c], "FLOAT") == 0) {
|
||||
pstr += sprintf(pstr, ",%10.4f", rand_float());
|
||||
} else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) {
|
||||
|
@ -4930,7 +4927,7 @@ static void getTableName(char *pTblName,
|
|||
static int64_t generateDataTailWithoutStb(
|
||||
uint32_t batch, char* buffer,
|
||||
int64_t remainderBufLen, int64_t insertRows,
|
||||
uint64_t startFrom, int64_t startTime,
|
||||
uint64_t recordFrom, int64_t startTime,
|
||||
/* int64_t *pSamplePos, */int64_t *dataLen) {
|
||||
|
||||
uint64_t len = 0;
|
||||
|
@ -4966,9 +4963,9 @@ static int64_t generateDataTailWithoutStb(
|
|||
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
|
||||
__func__, __LINE__, len, k, buffer);
|
||||
|
||||
startFrom ++;
|
||||
recordFrom ++;
|
||||
|
||||
if (startFrom >= insertRows) {
|
||||
if (recordFrom >= insertRows) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -4997,7 +4994,7 @@ static int32_t generateStbDataTail(
|
|||
SSuperTable* superTblInfo,
|
||||
uint32_t batch, char* buffer,
|
||||
int64_t remainderBufLen, int64_t insertRows,
|
||||
uint64_t startFrom, int64_t startTime,
|
||||
uint64_t recordFrom, int64_t startTime,
|
||||
int64_t *pSamplePos, int64_t *dataLen) {
|
||||
uint64_t len = 0;
|
||||
|
||||
|
@ -5046,9 +5043,9 @@ static int32_t generateStbDataTail(
|
|||
verbosePrint("%s() LN%d len=%"PRIu64" k=%ud \nbuffer=%s\n",
|
||||
__func__, __LINE__, len, k, buffer);
|
||||
|
||||
startFrom ++;
|
||||
recordFrom ++;
|
||||
|
||||
if (startFrom >= insertRows) {
|
||||
if (recordFrom >= insertRows) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -5245,6 +5242,7 @@ static int64_t generateInterlaceDataWithoutStb(
|
|||
static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
||||
TAOS_STMT *stmt,
|
||||
char *tableName, uint32_t batch, uint64_t insertRows,
|
||||
uint64_t recordFrom,
|
||||
int64_t startTime, char *buffer)
|
||||
{
|
||||
uint32_t k;
|
||||
|
@ -5264,7 +5262,7 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
|||
return ret;
|
||||
}
|
||||
|
||||
void *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
|
||||
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
|
||||
if (bindArray == NULL) {
|
||||
errorPrint("Failed to allocate %d bind params\n", batch);
|
||||
return -1;
|
||||
|
@ -5276,33 +5274,177 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
|||
} else {
|
||||
tsRand = false;
|
||||
}
|
||||
for (k = 0; k < batch; k++) {
|
||||
for (k = 0; k < batch;) {
|
||||
/* columnCount + 1 (ts) */
|
||||
for (int i = 0; i <= stbInfo->columnCount; i ++) {
|
||||
TAOS_BIND *bind = (TAOS_BIND *)bindArray + (sizeof(TAOS_BIND) * i);
|
||||
if (i == 0) {
|
||||
char data[MAX_DATA_SIZE];
|
||||
memset(data, 0, MAX_DATA_SIZE);
|
||||
|
||||
char *ptr = data;
|
||||
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
|
||||
|
||||
int64_t *bind_ts;
|
||||
|
||||
bind_ts = (int64_t *)ptr;
|
||||
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
int64_t ts;
|
||||
if (tsRand) {
|
||||
ts = startTime + getTSRandTail(
|
||||
*bind_ts = startTime + getTSRandTail(
|
||||
stbInfo->timeStampStep, k,
|
||||
stbInfo->disorderRatio,
|
||||
stbInfo->disorderRange);
|
||||
} else {
|
||||
ts = startTime + stbInfo->timeStampStep * k;
|
||||
*bind_ts = startTime + stbInfo->timeStampStep * k;
|
||||
}
|
||||
bind->buffer = &ts;
|
||||
bind->buffer_length = sizeof(int64_t);
|
||||
bind->buffer = bind_ts;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
|
||||
ptr += bind->buffer_length;
|
||||
|
||||
for (int i = 0; i < stbInfo->columnCount; i ++) {
|
||||
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
|
||||
if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"BINARY", strlen("BINARY"))) {
|
||||
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
|
||||
errorPrint( "binary length overflow, max size:%u\n",
|
||||
(uint32_t)TSDB_MAX_BINARY_LEN);
|
||||
return -1;
|
||||
}
|
||||
char *bind_binary = (char *)ptr;
|
||||
rand_string(bind_binary, stbInfo->columns[i].dataLen);
|
||||
|
||||
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
bind->buffer_length = stbInfo->columns[i].dataLen;
|
||||
bind->buffer = bind_binary;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"NCHAR", strlen("NCHAR"))) {
|
||||
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
|
||||
errorPrint( "nchar length overflow, max size:%u\n",
|
||||
(uint32_t)TSDB_MAX_BINARY_LEN);
|
||||
return -1;
|
||||
}
|
||||
char *bind_nchar = (char *)ptr;
|
||||
rand_string(bind_nchar, stbInfo->columns[i].dataLen);
|
||||
|
||||
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||
bind->buffer_length = strlen(bind_nchar);
|
||||
bind->buffer = bind_nchar;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"INT", strlen("INT"))) {
|
||||
int32_t *bind_int = (int32_t *)ptr;
|
||||
|
||||
*bind_int = rand_int();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_INT;
|
||||
bind->buffer_length = sizeof(int32_t);
|
||||
bind->buffer = bind_int;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"BIGINT", strlen("BIGINT"))) {
|
||||
int64_t *bind_bigint = (int64_t *)ptr;
|
||||
|
||||
*bind_bigint = rand_bigint();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||
bind->buffer_length = sizeof(int64_t);
|
||||
bind->buffer = bind_bigint;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"FLOAT", strlen("FLOAT"))) {
|
||||
float *bind_float = (float *)ptr;
|
||||
|
||||
*bind_float = rand_float();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
bind->buffer_length = sizeof(float);
|
||||
bind->buffer = bind_float;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"DOUBLE", strlen("DOUBLE"))) {
|
||||
double *bind_double = (double *)ptr;
|
||||
|
||||
*bind_double = rand_double();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||
bind->buffer_length = sizeof(double);
|
||||
bind->buffer = bind_double;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"SMALLINT", strlen("SMALLINT"))) {
|
||||
int16_t *bind_smallint = (int16_t *)ptr;
|
||||
|
||||
*bind_smallint = rand_smallint();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||
bind->buffer_length = sizeof(int16_t);
|
||||
bind->buffer = bind_smallint;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"TINYINT", strlen("TINYINT"))) {
|
||||
int8_t *bind_tinyint = (int8_t *)ptr;
|
||||
|
||||
*bind_tinyint = rand_tinyint();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||
bind->buffer_length = sizeof(int8_t);
|
||||
bind->buffer = bind_tinyint;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"BOOL", strlen("BOOL"))) {
|
||||
int8_t *bind_bool = (int8_t *)ptr;
|
||||
|
||||
*bind_bool = rand_bool();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||
bind->buffer_length = sizeof(int8_t);
|
||||
bind->buffer = bind_bool;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
|
||||
ptr += bind->buffer_length;
|
||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
||||
"TIMESTAMP", strlen("TIMESTAMP"))) {
|
||||
int64_t *bind_ts2 = (int64_t *)ptr;
|
||||
|
||||
*bind_ts2 = rand_bigint();
|
||||
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
bind->buffer_length = sizeof(int64_t);
|
||||
bind->buffer = bind_ts2;
|
||||
bind->length = &bind->buffer_length;
|
||||
bind->is_null = NULL;
|
||||
|
||||
ptr += bind->buffer_length;
|
||||
} else {
|
||||
|
||||
errorPrint( "No support data type: %s\n",
|
||||
stbInfo->columns[i].dataType);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
|
||||
// if msg > 3MB, break
|
||||
}
|
||||
|
||||
taos_stmt_bind_param(stmt, bindArray);
|
||||
taos_stmt_add_batch(stmt);
|
||||
|
||||
k++;
|
||||
recordFrom ++;
|
||||
if (recordFrom >= insertRows) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return k;
|
||||
}
|
||||
|
||||
|
@ -5312,7 +5454,7 @@ static int32_t generateStbProgressiveData(
|
|||
int64_t tableSeq,
|
||||
char *dbName, char *buffer,
|
||||
int64_t insertRows,
|
||||
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos,
|
||||
uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos,
|
||||
int64_t *pRemainderBufLen)
|
||||
{
|
||||
assert(buffer != NULL);
|
||||
|
@ -5335,7 +5477,7 @@ static int32_t generateStbProgressiveData(
|
|||
|
||||
return generateStbDataTail(superTblInfo,
|
||||
g_args.num_of_RPR, pstr, *pRemainderBufLen,
|
||||
insertRows, startFrom,
|
||||
insertRows, recordFrom,
|
||||
startTime,
|
||||
pSamplePos, &dataLen);
|
||||
}
|
||||
|
@ -5350,7 +5492,7 @@ static int64_t generateProgressiveDataWithoutStb(
|
|||
/* int64_t tableSeq, */
|
||||
threadInfo *pThreadInfo, char *buffer,
|
||||
int64_t insertRows,
|
||||
uint64_t startFrom, int64_t startTime, /*int64_t *pSamplePos, */
|
||||
uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */
|
||||
int64_t *pRemainderBufLen)
|
||||
{
|
||||
assert(buffer != NULL);
|
||||
|
@ -5371,7 +5513,7 @@ static int64_t generateProgressiveDataWithoutStb(
|
|||
int64_t dataLen;
|
||||
|
||||
return generateDataTailWithoutStb(
|
||||
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
|
||||
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom,
|
||||
startTime,
|
||||
/*pSamplePos, */&dataLen);
|
||||
}
|
||||
|
@ -5685,7 +5827,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
|||
generated = prepareStbStmt(superTblInfo,
|
||||
pThreadInfo->stmt,
|
||||
tableName, g_args.num_of_RPR,
|
||||
insertRows, start_time, pstr);
|
||||
insertRows, i, start_time, pstr);
|
||||
} else {
|
||||
generated = generateStbProgressiveData(
|
||||
superTblInfo,
|
||||
|
@ -5973,7 +6115,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
}
|
||||
|
||||
int64_t ntables = 0;
|
||||
uint64_t startFrom;
|
||||
uint64_t tableFrom;
|
||||
|
||||
if (superTblInfo) {
|
||||
int64_t limit;
|
||||
|
@ -6000,7 +6142,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
}
|
||||
|
||||
ntables = limit;
|
||||
startFrom = offset;
|
||||
tableFrom = offset;
|
||||
|
||||
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
|
||||
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
|
||||
|
@ -6032,7 +6174,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
offset);
|
||||
} else {
|
||||
ntables = g_args.num_of_tables;
|
||||
startFrom = 0;
|
||||
tableFrom = 0;
|
||||
}
|
||||
|
||||
taos_close(taos0);
|
||||
|
@ -6109,10 +6251,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
/* if ((NULL == superTblInfo)
|
||||
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
|
||||
*/
|
||||
pThreadInfo->start_table_from = startFrom;
|
||||
pThreadInfo->start_table_from = tableFrom;
|
||||
pThreadInfo->ntables = i<b?a+1:a;
|
||||
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
|
||||
startFrom = pThreadInfo->end_table_to + 1;
|
||||
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
|
||||
tableFrom = pThreadInfo->end_table_to + 1;
|
||||
/* } else {
|
||||
pThreadInfo->start_table_from = 0;
|
||||
pThreadInfo->ntables = superTblInfo->childTblCount;
|
||||
|
@ -6700,7 +6842,6 @@ static int queryTestProcess() {
|
|||
|
||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
||||
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
|
||||
taos_close(taos);
|
||||
free(infos);
|
||||
|
@ -6752,15 +6893,15 @@ static int queryTestProcess() {
|
|||
b = ntables % threads;
|
||||
}
|
||||
|
||||
uint64_t startFrom = 0;
|
||||
uint64_t tableFrom = 0;
|
||||
for (int i = 0; i < threads; i++) {
|
||||
threadInfo *pThreadInfo = infosOfSub + i;
|
||||
pThreadInfo->threadID = i;
|
||||
|
||||
pThreadInfo->start_table_from = startFrom;
|
||||
pThreadInfo->start_table_from = tableFrom;
|
||||
pThreadInfo->ntables = i<b?a+1:a;
|
||||
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
|
||||
startFrom = pThreadInfo->end_table_to + 1;
|
||||
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
|
||||
tableFrom = pThreadInfo->end_table_to + 1;
|
||||
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
|
||||
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
|
||||
}
|
||||
|
@ -7024,7 +7165,6 @@ static void *specifiedSubscribe(void *sarg) {
|
|||
|
||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
||||
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
||||
taos_close(pThreadInfo->taos);
|
||||
return NULL;
|
||||
|
@ -7210,17 +7350,17 @@ static int subscribeTestProcess() {
|
|||
}
|
||||
|
||||
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||
uint64_t startFrom = 0;
|
||||
uint64_t tableFrom = 0;
|
||||
for (int j = 0; j < threads; j++) {
|
||||
uint64_t seq = i * threads + j;
|
||||
threadInfo *pThreadInfo = infosOfStable + seq;
|
||||
pThreadInfo->threadID = seq;
|
||||
pThreadInfo->querySeq = i;
|
||||
|
||||
pThreadInfo->start_table_from = startFrom;
|
||||
pThreadInfo->start_table_from = tableFrom;
|
||||
pThreadInfo->ntables = j<b?a+1:a;
|
||||
pThreadInfo->end_table_to = j<b?startFrom+a:startFrom+a-1;
|
||||
startFrom = pThreadInfo->end_table_to + 1;
|
||||
pThreadInfo->end_table_to = j<b?tableFrom+a:tableFrom+a-1;
|
||||
tableFrom = pThreadInfo->end_table_to + 1;
|
||||
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
|
||||
pthread_create(pidsOfStable + seq,
|
||||
NULL, superSubscribe, pThreadInfo);
|
||||
|
@ -7452,7 +7592,6 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
|
|||
}
|
||||
|
||||
memcpy(cmd + cmd_len, line, read_len);
|
||||
verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
|
||||
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) {
|
||||
errorPrint("%s() LN%d, queryDbExec %s failed!\n",
|
||||
__func__, __LINE__, cmd);
|
||||
|
|
|
@ -719,13 +719,13 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
|
|||
if (action == SDB_ACTION_INSERT) {
|
||||
return sdbPerformInsertAction(pHead, pTable);
|
||||
} else if (action == SDB_ACTION_DELETE) {
|
||||
if (qtype == TAOS_QTYPE_FWD) {
|
||||
//if (qtype == TAOS_QTYPE_FWD) {
|
||||
// Drop database/stable may take a long time and cause a timeout, so we confirm first then reput it into queue
|
||||
sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
// sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//} else {
|
||||
return sdbPerformDeleteAction(pHead, pTable);
|
||||
}
|
||||
//}
|
||||
} else if (action == SDB_ACTION_UPDATE) {
|
||||
return sdbPerformUpdateAction(pHead, pTable);
|
||||
} else {
|
||||
|
|
|
@ -1194,8 +1194,8 @@ static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagN
|
|||
|
||||
static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
|
||||
mLInfo("msg:%p, app:%p stable %s, add tag result:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
|
||||
tstrerror(code));
|
||||
mLInfo("msg:%p, app:%p stable %s, add tag result:%s, numOfTags:%d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
|
||||
tstrerror(code), pStable->numOfTags);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -121,7 +121,7 @@ static int32_t mnodeVgroupActionDelete(SSdbRow *pRow) {
|
|||
SVgObj *pVgroup = pRow->pObj;
|
||||
|
||||
if (pVgroup->pDb == NULL) {
|
||||
mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName);
|
||||
mError("vgId:%d, db:%s is not exist while delete from hash", pVgroup->vgId, pVgroup->dbName);
|
||||
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
|
|
@ -709,7 +709,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
|
|||
}
|
||||
|
||||
static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
|
||||
int32_t onlineNum = 0;
|
||||
int32_t onlineNum = 0, arbOnlineNum = 0;
|
||||
int32_t masterIndex = -1;
|
||||
int32_t replica = pNode->replica;
|
||||
|
||||
|
@ -723,13 +723,15 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
|
|||
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
|
||||
if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) {
|
||||
onlineNum++;
|
||||
++arbOnlineNum;
|
||||
replica = pNode->replica + 1;
|
||||
}
|
||||
|
||||
if (onlineNum <= replica * 0.5) {
|
||||
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
|
||||
if (nodeRole == TAOS_SYNC_ROLE_MASTER && onlineNum == replica * 0.5 && onlineNum >= 1) {
|
||||
if (nodeRole == TAOS_SYNC_ROLE_MASTER && onlineNum == replica * 0.5 && ((replica > 2 && onlineNum - arbOnlineNum > 1) || pNode->replica < 3)) {
|
||||
sInfo("vgId:%d, self keep work as master, online:%d replica:%d", pNode->vgId, onlineNum, replica);
|
||||
masterIndex = pNode->selfIndex;
|
||||
} else {
|
||||
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
|
||||
sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
|
||||
|
@ -1002,6 +1004,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
|||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||
// nodeVersion = pHead->version;
|
||||
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
|
||||
syncConfirmForward(pNode->rid, pHead->version, code, false);
|
||||
} else {
|
||||
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
||||
code = syncSaveIntoBuffer(pPeer, pHead);
|
||||
|
@ -1404,7 +1407,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
|
|||
pthread_mutex_lock(&pNode->mutex);
|
||||
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
|
||||
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
|
||||
if (ABS(time - pFwdInfo->time) < 2000) break;
|
||||
if (ABS(time - pFwdInfo->time) < 10000) break;
|
||||
|
||||
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
|
||||
pFwdInfo->version, time, pFwdInfo->time);
|
||||
|
|
|
@ -613,7 +613,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
|
|||
|
||||
// todo memory leak if there are object with refcount greater than 0 in hash table?
|
||||
taosHashCleanup(pCacheObj->pHashTable);
|
||||
taosTrashcanEmpty(pCacheObj, true);
|
||||
taosTrashcanEmpty(pCacheObj, false);
|
||||
|
||||
__cache_lock_destroy(pCacheObj);
|
||||
|
||||
|
|
|
@ -454,7 +454,11 @@ void vnodeDestroy(SVnodeObj *pVnode) {
|
|||
}
|
||||
|
||||
if (pVnode->tsdb) {
|
||||
code = tsdbCloseRepo(pVnode->tsdb, 1);
|
||||
// the deleted vnode does not need to commit, so as to speed up the deletion
|
||||
int toCommit = 1;
|
||||
if (pVnode->dropped) toCommit = 0;
|
||||
|
||||
code = tsdbCloseRepo(pVnode->tsdb, toCommit);
|
||||
pVnode->tsdb = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -126,11 +126,16 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) {
|
|||
}
|
||||
|
||||
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
|
||||
void *pVnode = vnodeAcquire(vgId);
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) {
|
||||
vError("vgId:%d, vnode not found while confirm forward", vgId);
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SYN_CONFIRM_EXPIRED && pVnode->status == TAOS_VN_STATUS_CLOSING) {
|
||||
vDebug("vgId:%d, db:%s, vnode is closing while confirm forward", vgId, pVnode->db);
|
||||
code = TSDB_CODE_VND_IS_CLOSING;
|
||||
}
|
||||
|
||||
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,309 @@
|
|||
def pre_test(){
|
||||
|
||||
sh '''
|
||||
sudo rmtaos||echo 'no taosd installed'
|
||||
'''
|
||||
sh '''
|
||||
cd ${WKC}
|
||||
git reset --hard
|
||||
git checkout $BRANCH_NAME
|
||||
git pull
|
||||
git submodule update
|
||||
cd ${WK}
|
||||
git reset --hard
|
||||
git checkout $BRANCH_NAME
|
||||
git pull
|
||||
export TZ=Asia/Harbin
|
||||
date
|
||||
rm -rf ${WK}/debug
|
||||
mkdir debug
|
||||
cd debug
|
||||
cmake .. > /dev/null
|
||||
make > /dev/null
|
||||
make install > /dev/null
|
||||
pip3 install ${WKC}/src/connector/python/linux/python3/
|
||||
'''
|
||||
return 1
|
||||
}
|
||||
pipeline {
|
||||
agent none
|
||||
environment{
|
||||
|
||||
WK = '/var/lib/jenkins/workspace/TDinternal'
|
||||
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
|
||||
}
|
||||
|
||||
stages {
|
||||
stage('Parallel test stage') {
|
||||
parallel {
|
||||
stage('pytest') {
|
||||
agent{label 'slam1'}
|
||||
steps {
|
||||
pre_test()
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
find pytest -name '*'sql|xargs rm -rf
|
||||
./test-all.sh pytest
|
||||
date'''
|
||||
}
|
||||
}
|
||||
stage('test_b1') {
|
||||
agent{label 'slam2'}
|
||||
steps {
|
||||
pre_test()
|
||||
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh b1
|
||||
date'''
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
stage('test_crash_gen') {
|
||||
agent{label "slam3"}
|
||||
steps {
|
||||
pre_test()
|
||||
sh '''
|
||||
cd ${WKC}/tests/pytest
|
||||
'''
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/tests/pytest
|
||||
./crash_gen.sh -a -p -t 4 -s 2000
|
||||
'''
|
||||
}
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/tests/pytest
|
||||
rm -rf /var/lib/taos/*
|
||||
rm -rf /var/log/taos/*
|
||||
./handle_crash_gen_val_log.sh
|
||||
'''
|
||||
}
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/tests/pytest
|
||||
rm -rf /var/lib/taos/*
|
||||
rm -rf /var/log/taos/*
|
||||
./handle_taosd_val_log.sh
|
||||
'''
|
||||
}
|
||||
|
||||
sh'''
|
||||
systemctl start taosd
|
||||
sleep 10
|
||||
'''
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/tests/gotest
|
||||
bash batchtest.sh
|
||||
'''
|
||||
}
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/tests/examples/python/PYTHONConnectorChecker
|
||||
python3 PythonChecker.py
|
||||
'''
|
||||
}
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
|
||||
mvn clean package assembly:single -DskipTests >/dev/null
|
||||
java -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
|
||||
'''
|
||||
}
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/src/connector/jdbc
|
||||
mvn clean package -Dmaven.test.skip=true >/dev/null
|
||||
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
|
||||
java --class-path=../../../../src/connector/jdbc/target:$JAVA_HOME/jre/lib/ext -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
|
||||
'''
|
||||
}
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cp -rf ${WKC}/tests/examples/nodejs ${JENKINS_HOME}/workspace/
|
||||
cd ${JENKINS_HOME}/workspace/nodejs
|
||||
node nodejsChecker.js host=localhost
|
||||
'''
|
||||
}
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${JENKINS_HOME}/workspace/C#NET/src/CheckC#
|
||||
dotnet run
|
||||
'''
|
||||
}
|
||||
sh '''
|
||||
systemctl stop taosd
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh b2
|
||||
date
|
||||
'''
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh full unit
|
||||
date'''
|
||||
}
|
||||
}
|
||||
|
||||
stage('test_valgrind') {
|
||||
agent{label "slam4"}
|
||||
|
||||
steps {
|
||||
pre_test()
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WKC}/tests/pytest
|
||||
nohup taosd >/dev/null &
|
||||
sleep 10
|
||||
python3 concurrent_inquiry.py -c 1
|
||||
|
||||
'''
|
||||
}
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh full jdbc
|
||||
date'''
|
||||
sh '''
|
||||
cd ${WKC}/tests/pytest
|
||||
./valgrind-test.sh 2>&1 > mem-error-out.log
|
||||
./handle_val_log.sh
|
||||
|
||||
date
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh b3
|
||||
date'''
|
||||
sh '''
|
||||
date
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh full example
|
||||
date'''
|
||||
}
|
||||
}
|
||||
|
||||
stage('arm64_build'){
|
||||
agent{label 'arm64'}
|
||||
steps{
|
||||
sh '''
|
||||
cd ${WK}
|
||||
git fetch
|
||||
git checkout develop
|
||||
git pull
|
||||
cd ${WKC}
|
||||
git fetch
|
||||
git checkout develop
|
||||
git pull
|
||||
git submodule update
|
||||
cd ${WKC}/packaging
|
||||
./release.sh -v cluster -c aarch64 -n 2.0.0.0 -m 2.0.0.0
|
||||
|
||||
'''
|
||||
}
|
||||
}
|
||||
stage('arm32_build'){
|
||||
agent{label 'arm32'}
|
||||
steps{
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
cd ${WK}
|
||||
git fetch
|
||||
git checkout develop
|
||||
git pull
|
||||
cd ${WKC}
|
||||
git fetch
|
||||
git checkout develop
|
||||
git pull
|
||||
git submodule update
|
||||
cd ${WKC}/packaging
|
||||
./release.sh -v cluster -c aarch32 -n 2.0.0.0 -m 2.0.0.0
|
||||
|
||||
'''
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
post {
|
||||
success {
|
||||
emailext (
|
||||
subject: "SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
|
||||
body: '''<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
</head>
|
||||
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
|
||||
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
|
||||
<tr>
|
||||
<td><br />
|
||||
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
|
||||
<hr size="2" width="100%" align="center" /></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<ul>
|
||||
<div style="font-size:18px">
|
||||
<li>构建名称>>分支:${PROJECT_NAME}</li>
|
||||
<li>构建结果:<span style="color:green"> Successful </span></li>
|
||||
<li>构建编号:${BUILD_NUMBER}</li>
|
||||
<li>触发用户:${CAUSE}</li>
|
||||
<li>变更概要:${CHANGES}</li>
|
||||
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
|
||||
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
|
||||
<li>变更集:${JELLY_SCRIPT}</li>
|
||||
</div>
|
||||
</ul>
|
||||
</td>
|
||||
</tr>
|
||||
</table></font>
|
||||
</body>
|
||||
</html>''',
|
||||
to: "yqliu@taosdata.com,pxiao@taosdata.com",
|
||||
from: "support@taosdata.com"
|
||||
)
|
||||
}
|
||||
failure {
|
||||
emailext (
|
||||
subject: "FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
|
||||
body: '''<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
</head>
|
||||
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
|
||||
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
|
||||
<tr>
|
||||
<td><br />
|
||||
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
|
||||
<hr size="2" width="100%" align="center" /></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<ul>
|
||||
<div style="font-size:18px">
|
||||
<li>构建名称>>分支:${PROJECT_NAME}</li>
|
||||
<li>构建结果:<span style="color:green"> Successful </span></li>
|
||||
<li>构建编号:${BUILD_NUMBER}</li>
|
||||
<li>触发用户:${CAUSE}</li>
|
||||
<li>变更概要:${CHANGES}</li>
|
||||
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
|
||||
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
|
||||
<li>变更集:${JELLY_SCRIPT}</li>
|
||||
</div>
|
||||
</ul>
|
||||
</td>
|
||||
</tr>
|
||||
</table></font>
|
||||
</body>
|
||||
</html>''',
|
||||
to: "yqliu@taosdata.com,pxiao@taosdata.com",
|
||||
from: "support@taosdata.com"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -64,18 +64,25 @@ function runQueryPerfTest {
|
|||
[ -f $PERFORMANCE_TEST_REPORT ] && rm $PERFORMANCE_TEST_REPORT
|
||||
nohup $WORK_DIR/TDengine/debug/build/bin/taosd -c /etc/taosperf/ > /dev/null 2>&1 &
|
||||
echoInfo "Wait TDengine to start"
|
||||
sleep 300
|
||||
sleep 60
|
||||
echoInfo "Run Performance Test"
|
||||
cd $WORK_DIR/TDengine/tests/pytest
|
||||
|
||||
python3 query/queryPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
|
||||
|
||||
mkdir -p /var/lib/perf/
|
||||
mkdir -p /var/log/perf/
|
||||
rm -rf /var/lib/perf/*
|
||||
rm -rf /var/log/perf/*
|
||||
nohup $WORK_DIR/TDengine/debug/build/bin/taosd -c /etc/perf/ > /dev/null 2>&1 &
|
||||
echoInfo "Wait TDengine to start"
|
||||
sleep 10
|
||||
echoInfo "Run Performance Test"
|
||||
cd $WORK_DIR/TDengine/tests/pytest
|
||||
|
||||
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
|
||||
|
||||
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
|
||||
|
||||
#python3 perfbenchmark/joinPerformance.py | tee -a $PERFORMANCE_TEST_REPORT
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ from queue import Queue, Empty
|
|||
from .shared.config import Config
|
||||
from .shared.db import DbTarget, DbConn
|
||||
from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
|
||||
from .shared.types import DirPath
|
||||
from .shared.types import DirPath, IpcStream
|
||||
|
||||
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
|
||||
# from crash_gen.db import DbConn, DbTarget
|
||||
|
@ -177,13 +177,12 @@ quorum 2
|
|||
return "127.0.0.1"
|
||||
|
||||
def getServiceCmdLine(self): # to start the instance
|
||||
cmdLine = []
|
||||
if Config.getConfig().track_memory_leaks:
|
||||
Logging.info("Invoking VALGRIND on service...")
|
||||
cmdLine = ['valgrind', '--leak-check=yes']
|
||||
return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
|
||||
else:
|
||||
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
|
||||
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||
return cmdLine
|
||||
return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||
|
||||
def _getDnodes(self, dbc):
|
||||
dbc.query("show dnodes")
|
||||
|
@ -281,16 +280,16 @@ class TdeSubProcess:
|
|||
return '[TdeSubProc: pid = {}, status = {}]'.format(
|
||||
self.getPid(), self.getStatus() )
|
||||
|
||||
def getStdOut(self) -> BinaryIO :
|
||||
def getIpcStdOut(self) -> IpcStream :
|
||||
if self._popen.universal_newlines : # alias of text_mode
|
||||
raise CrashGenError("We need binary mode for STDOUT IPC")
|
||||
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
|
||||
return typing.cast(BinaryIO, self._popen.stdout)
|
||||
return typing.cast(IpcStream, self._popen.stdout)
|
||||
|
||||
def getStdErr(self) -> BinaryIO :
|
||||
def getIpcStdErr(self) -> IpcStream :
|
||||
if self._popen.universal_newlines : # alias of text_mode
|
||||
raise CrashGenError("We need binary mode for STDERR IPC")
|
||||
return typing.cast(BinaryIO, self._popen.stderr)
|
||||
return typing.cast(IpcStream, self._popen.stderr)
|
||||
|
||||
# Now it's always running, since we matched the life cycle
|
||||
# def isRunning(self):
|
||||
|
@ -302,11 +301,6 @@ class TdeSubProcess:
|
|||
def _start(self, cmdLine) -> Popen :
|
||||
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||
|
||||
# Sanity check
|
||||
# if self.subProcess: # already there
|
||||
# raise RuntimeError("Corrupt process state")
|
||||
|
||||
|
||||
# Prepare environment variables for coverage information
|
||||
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
|
||||
myEnv = os.environ.copy()
|
||||
|
@ -314,9 +308,8 @@ class TdeSubProcess:
|
|||
|
||||
# print(myEnv)
|
||||
# print("Starting TDengine with env: ", myEnv.items())
|
||||
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
||||
print("Starting TDengine: {}".format(cmdLine))
|
||||
|
||||
# useShell = True # Needed to pass environments into it
|
||||
return Popen(
|
||||
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
|
||||
shell=True, # Always use shell, since we need to pass ENV vars
|
||||
|
@ -732,19 +725,19 @@ class ServiceManagerThread:
|
|||
self._ipcQueue = Queue() # type: Queue
|
||||
self._thread = threading.Thread( # First thread captures server OUTPUT
|
||||
target=self.svcOutputReader,
|
||||
args=(subProc.getStdOut(), self._ipcQueue, logDir))
|
||||
args=(subProc.getIpcStdOut(), self._ipcQueue, logDir))
|
||||
self._thread.daemon = True # thread dies with the program
|
||||
self._thread.start()
|
||||
time.sleep(0.01)
|
||||
if not self._thread.is_alive(): # What happened?
|
||||
Logging.info("Failed to started process to monitor STDOUT")
|
||||
Logging.info("Failed to start process to monitor STDOUT")
|
||||
self.stop()
|
||||
raise CrashGenError("Failed to start thread to monitor STDOUT")
|
||||
Logging.info("Successfully started process to monitor STDOUT")
|
||||
|
||||
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
||||
target=self.svcErrorReader,
|
||||
args=(subProc.getStdErr(), self._ipcQueue, logDir))
|
||||
args=(subProc.getIpcStdErr(), self._ipcQueue, logDir))
|
||||
self._thread2.daemon = True # thread dies with the program
|
||||
self._thread2.start()
|
||||
time.sleep(0.01)
|
||||
|
@ -887,14 +880,19 @@ class ServiceManagerThread:
|
|||
print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
|
||||
return None
|
||||
|
||||
def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str
|
||||
def _textChunkGenerator(self, streamIn: IpcStream, logDir: str, logFile: str
|
||||
) -> Generator[TextChunk, None, None]:
|
||||
'''
|
||||
Take an input stream with binary data, produced a generator of decoded
|
||||
"text chunks", and also save the original binary data in a log file.
|
||||
Take an input stream with binary data (likely from Popen), produced a generator of decoded
|
||||
"text chunks".
|
||||
|
||||
Side effect: it also save the original binary data in a log file.
|
||||
'''
|
||||
os.makedirs(logDir, exist_ok=True)
|
||||
logF = open(os.path.join(logDir, logFile), 'wb')
|
||||
if logF is None:
|
||||
Logging.error("Failed to open log file (binary write): {}/{}".format(logDir, logFile))
|
||||
return
|
||||
for bChunk in iter(streamIn.readline, b''):
|
||||
logF.write(bChunk) # Write to log file immediately
|
||||
tChunk = self._decodeBinaryChunk(bChunk) # decode
|
||||
|
@ -902,14 +900,14 @@ class ServiceManagerThread:
|
|||
yield tChunk # TODO: split into actual text lines
|
||||
|
||||
# At the end...
|
||||
streamIn.close() # Close the stream
|
||||
logF.close() # Close the output file
|
||||
streamIn.close() # Close the incoming stream
|
||||
logF.close() # Close the log file
|
||||
|
||||
def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str):
|
||||
def svcOutputReader(self, ipcStdOut: IpcStream, queue, logDir: str):
|
||||
'''
|
||||
The infinite routine that processes the STDOUT stream for the sub process being managed.
|
||||
|
||||
:param stdOut: the IO stream object used to fetch the data from
|
||||
:param ipcStdOut: the IO stream object used to fetch the data from
|
||||
:param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
|
||||
:param logDir: where we should dump a verbatim output file
|
||||
'''
|
||||
|
@ -917,7 +915,7 @@ class ServiceManagerThread:
|
|||
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
||||
# print("This is the svcOutput Reader...")
|
||||
# stdOut.readline() # Skip the first output? TODO: remove?
|
||||
for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') :
|
||||
for tChunk in self._textChunkGenerator(ipcStdOut, logDir, 'stdout.log') :
|
||||
queue.put(tChunk) # tChunk garanteed not to be None
|
||||
self._printProgress("_i")
|
||||
|
||||
|
@ -940,12 +938,12 @@ class ServiceManagerThread:
|
|||
Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
|
||||
self.setStatus(Status.STATUS_STOPPED)
|
||||
|
||||
def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str):
|
||||
def svcErrorReader(self, ipcStdErr: IpcStream, queue, logDir: str):
|
||||
# os.makedirs(logDir, exist_ok=True)
|
||||
# logFile = os.path.join(logDir,'stderr.log')
|
||||
# fErr = open(logFile, 'wb')
|
||||
# for line in iter(err.readline, b''):
|
||||
for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') :
|
||||
for tChunk in self._textChunkGenerator(ipcStdErr, logDir, 'stderr.log') :
|
||||
queue.put(tChunk) # tChunk garanteed not to be None
|
||||
# fErr.write(line)
|
||||
Logging.info("TDengine STDERR: {}".format(tChunk))
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, List, Dict, NewType
|
||||
from typing import Any, BinaryIO, List, Dict, NewType
|
||||
from enum import Enum
|
||||
|
||||
DirPath = NewType('DirPath', str)
|
||||
|
@ -26,3 +26,5 @@ class TdDataType(Enum):
|
|||
|
||||
TdColumns = Dict[str, TdDataType]
|
||||
TdTags = Dict[str, TdDataType]
|
||||
|
||||
IpcStream = NewType('IpcStream', BinaryIO)
|
|
@ -183,7 +183,7 @@ python3 ./test.py -f stable/query_after_reset.py
|
|||
# perfbenchmark
|
||||
python3 ./test.py -f perfbenchmark/bug3433.py
|
||||
#python3 ./test.py -f perfbenchmark/bug3589.py
|
||||
|
||||
python3 ./test.py -f perfbenchmark/taosdemoInsert.py
|
||||
|
||||
#query
|
||||
python3 ./test.py -f query/filter.py
|
||||
|
|
|
@ -31,7 +31,7 @@ class insertFromCSVPerformace:
|
|||
self.host = "127.0.0.1"
|
||||
self.user = "root"
|
||||
self.password = "taosdata"
|
||||
self.config = "/etc/taosperf"
|
||||
self.config = "/etc/perf"
|
||||
self.conn = taos.connect(
|
||||
self.host,
|
||||
self.user,
|
||||
|
|
|
@ -0,0 +1,387 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import taos
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import argparse
|
||||
import subprocess
|
||||
import datetime
|
||||
import re
|
||||
|
||||
|
||||
from multiprocessing import cpu_count
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.dnodes import TDDnode
|
||||
|
||||
class Taosdemo:
|
||||
def __init__(self, clearCache, dbName, keep):
|
||||
self.clearCache = clearCache
|
||||
self.dbname = dbName
|
||||
self.drop = "yes"
|
||||
self.keep = keep
|
||||
self.host = "127.0.0.1"
|
||||
self.user = "root"
|
||||
self.password = "taosdata"
|
||||
# self.config = "/etc/taosperf"
|
||||
# self.conn = taos.connect(
|
||||
# self.host,
|
||||
# self.user,
|
||||
# self.password,
|
||||
# self.config)
|
||||
|
||||
# env config
|
||||
def getBuildPath(self) -> str:
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/debug/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def getExeToolsDir(self) -> str:
|
||||
self.debugdir = self.getBuildPath() + "/debug/build/bin"
|
||||
return self.debugdir
|
||||
|
||||
def getCfgDir(self) -> str:
|
||||
self.config = self.getBuildPath() + "/sim/dnode1/cfg"
|
||||
return self.config
|
||||
|
||||
# taodemo insert file config
|
||||
def dbinfocfg(self) -> dict:
|
||||
return {
|
||||
"name": self.dbname,
|
||||
"drop": self.drop,
|
||||
"replica": 1,
|
||||
"days": 10,
|
||||
"cache": 16,
|
||||
"blocks": 8,
|
||||
"precision": "ms",
|
||||
"keep": self.keep,
|
||||
"minRows": 100,
|
||||
"maxRows": 4096,
|
||||
"comp": 2,
|
||||
"walLevel": 1,
|
||||
"cachelast": 0,
|
||||
"quorum": 1,
|
||||
"fsync": 3000,
|
||||
"update": 0
|
||||
}
|
||||
|
||||
def type_check(func):
|
||||
def wrapper(self, **kwargs):
|
||||
num_types = ["int", "float", "bigint", "tinyint", "smallint", "double"]
|
||||
str_types = ["binary", "nchar"]
|
||||
for k ,v in kwargs.items():
|
||||
if k.lower() not in num_types and k.lower() not in str_types:
|
||||
return f"args {k} type error, not allowed"
|
||||
elif not isinstance(v, (int, list, tuple)):
|
||||
return f"value {v} type error, not allowed"
|
||||
elif k.lower() in num_types and not isinstance(v, int):
|
||||
return f"arg {v} takes 1 positional argument must be type int "
|
||||
elif isinstance(v, (list,tuple)) and len(v) > 2:
|
||||
return f"arg {v} takes from 1 to 2 positional arguments but more than 2 were given "
|
||||
elif isinstance(v,(list,tuple)) and [ False for _ in v if not isinstance(_, int) ]:
|
||||
return f"arg {v} takes from 1 to 2 positional arguments must be type int "
|
||||
else:
|
||||
pass
|
||||
return func(self, **kwargs)
|
||||
return wrapper
|
||||
|
||||
@type_check
|
||||
def column_tag_count(self, **column_tag) -> list :
|
||||
init_column_tag = []
|
||||
for k, v in column_tag.items():
|
||||
if re.search(k, "int, float, bigint, tinyint, smallint, double", re.IGNORECASE):
|
||||
init_column_tag.append({"type": k, "count": v})
|
||||
elif re.search(k, "binary, nchar", re.IGNORECASE):
|
||||
if isinstance(v, int):
|
||||
init_column_tag.append({"type": k, "count": v, "len":8})
|
||||
elif len(v) == 1:
|
||||
init_column_tag.append({"type": k, "count": v[0], "len": 8})
|
||||
else:
|
||||
init_column_tag.append({"type": k, "count": v[0], "len": v[1]})
|
||||
return init_column_tag
|
||||
|
||||
def stbcfg(self, stb: str, child_tab_count: int, rows: int, prechildtab: str, columns: dict, tags: dict) -> dict:
|
||||
return {
|
||||
"name": stb,
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": child_tab_count,
|
||||
"childtable_prefix": prechildtab,
|
||||
"auto_create_table": "no",
|
||||
"batch_create_tbl_num": 10,
|
||||
"data_source": "rand",
|
||||
"insert_mode": "taosc",
|
||||
"insert_rows": rows,
|
||||
"childtable_limit": 0,
|
||||
"childtable_offset": 0,
|
||||
"rows_per_tbl": 1,
|
||||
"max_sql_len": 65480,
|
||||
"disorder_ratio": 0,
|
||||
"disorder_range": 1000,
|
||||
"timestamp_step": 10,
|
||||
"start_timestamp": f"{datetime.datetime.now():%F %X}",
|
||||
"sample_format": "csv",
|
||||
"sample_file": "./sample.csv",
|
||||
"tags_file": "",
|
||||
"columns": self.column_tag_count(**columns),
|
||||
"tags": self.column_tag_count(**tags)
|
||||
}
|
||||
|
||||
def schemecfg(self,intcount=1,floatcount=0,bcount=0,tcount=0,scount=0,doublecount=0,binarycount=0,ncharcount=0):
|
||||
return {
|
||||
"INT": intcount,
|
||||
"FLOAT": floatcount,
|
||||
"BIGINT": bcount,
|
||||
"TINYINT": tcount,
|
||||
"SMALLINT": scount,
|
||||
"DOUBLE": doublecount,
|
||||
"BINARY": binarycount,
|
||||
"NCHAR": ncharcount
|
||||
}
|
||||
|
||||
def insertcfg(self,db: dict, stbs: list) -> dict:
|
||||
return {
|
||||
"filetype": "insert",
|
||||
"cfgdir": self.config,
|
||||
"host": self.host,
|
||||
"port": 6030,
|
||||
"user": self.user,
|
||||
"password": self.password,
|
||||
"thread_count": cpu_count(),
|
||||
"thread_count_create_tbl": cpu_count(),
|
||||
"result_file": "/tmp/insert_res.txt",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"insert_interval": 0,
|
||||
"num_of_records_per_req": 100,
|
||||
"max_sql_len": 1024000,
|
||||
"databases": [{
|
||||
"dbinfo": db,
|
||||
"super_tables": stbs
|
||||
}]
|
||||
}
|
||||
|
||||
def createinsertfile(self,db: dict, stbs: list) -> str:
|
||||
date = datetime.datetime.now()
|
||||
file_create_table = f"/tmp/insert_{date:%F-%H%M}.json"
|
||||
|
||||
with open(file_create_table, 'w') as f:
|
||||
json.dump(self.insertcfg(db, stbs), f)
|
||||
|
||||
return file_create_table
|
||||
|
||||
# taosdemo query file config
|
||||
def querysqls(self, sql: str) -> list:
|
||||
return [{"sql":sql,"result":""}]
|
||||
|
||||
def querycfg(self, sql: str) -> dict:
|
||||
return {
|
||||
"filetype": "query",
|
||||
"cfgdir": self.config,
|
||||
"host": self.host,
|
||||
"port": 6030,
|
||||
"user": self.user,
|
||||
"password": self.password,
|
||||
"confirm_parameter_prompt": "yes",
|
||||
"query_times": 10,
|
||||
"query_mode": "taosc",
|
||||
"databases": self.dbname,
|
||||
"specified_table_query": {
|
||||
"query_interval": 0,
|
||||
"concurrent": cpu_count(),
|
||||
"sqls": self.querysqls(sql)
|
||||
}
|
||||
}
|
||||
|
||||
def createqueryfile(self, sql: str):
|
||||
date = datetime.datetime.now()
|
||||
file_query_table = f"/tmp/query_{date:%F-%H%M}.json"
|
||||
|
||||
with open(file_query_table,"w") as f:
|
||||
json.dump(self.querycfg(sql), f)
|
||||
|
||||
return file_query_table
|
||||
|
||||
# Execute taosdemo, and delete temporary files when finished
|
||||
def taosdemotable(self, filepath: str, resultfile="/dev/null"):
|
||||
taosdemopath = self.getBuildPath() + "/debug/build/bin"
|
||||
with open(filepath,"r") as f:
|
||||
filetype = json.load(f)["filetype"]
|
||||
if filetype == "insert":
|
||||
taosdemo_table_cmd = f"{taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
|
||||
else:
|
||||
taosdemo_table_cmd = f"yes | {taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
|
||||
try:
|
||||
_ = subprocess.check_output(taosdemo_table_cmd, shell=True).decode("utf-8")
|
||||
except subprocess.CalledProcessError as e:
|
||||
_ = e.output
|
||||
|
||||
def droptmpfile(self, filepath: str):
|
||||
drop_file_cmd = f"[ -f {filepath} ] && rm -f {filepath}"
|
||||
try:
|
||||
_ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8")
|
||||
except subprocess.CalledProcessError as e:
|
||||
_ = e.output
|
||||
|
||||
# TODO:需要完成TD-4153的数据插入和客户端请求的性能查询。
|
||||
def td4153insert(self):
|
||||
|
||||
tdLog.printNoPrefix("========== start to create table and insert data ==========")
|
||||
self.dbname = "td4153"
|
||||
db = self.dbinfocfg()
|
||||
stblist = []
|
||||
|
||||
columntype = self.schemecfg(intcount=1, ncharcount=100)
|
||||
tagtype = self.schemecfg(intcount=1)
|
||||
stbname = "stb1"
|
||||
prechild = "t1"
|
||||
stable = self.stbcfg(
|
||||
stb=stbname,
|
||||
prechildtab=prechild,
|
||||
child_tab_count=2,
|
||||
rows=10000,
|
||||
columns=columntype,
|
||||
tags=tagtype
|
||||
)
|
||||
stblist.append(stable)
|
||||
insertfile = self.createinsertfile(db=db, stbs=stblist)
|
||||
|
||||
nmon_file = f"/tmp/insert_{datetime.datetime.now():%F-%H%M}.nmon"
|
||||
cmd = f"nmon -s5 -F {nmon_file} -m /tmp/"
|
||||
try:
|
||||
_ = subprocess.check_output(cmd, shell=True).decode("utf-8")
|
||||
except subprocess.CalledProcessError as e:
|
||||
_ = e.output
|
||||
|
||||
self.taosdemotable(insertfile)
|
||||
self.droptmpfile(insertfile)
|
||||
self.droptmpfile("/tmp/insert_res.txt")
|
||||
|
||||
# In order to prevent too many performance files from being generated, the nmon file is deleted.
|
||||
# and the delete statement can be cancelled during the actual test.
|
||||
self.droptmpfile(nmon_file)
|
||||
|
||||
cmd = f"ps -ef|grep -w nmon| grep -v grep | awk '{{print $2}}'"
|
||||
try:
|
||||
time.sleep(10)
|
||||
_ = subprocess.check_output(cmd,shell=True).decode("utf-8")
|
||||
except BaseException as e:
|
||||
raise e
|
||||
|
||||
def td4153query(self):
|
||||
tdLog.printNoPrefix("========== start to query operation ==========")
|
||||
|
||||
sqls = {
|
||||
"select_all": "select * from stb1",
|
||||
"select_join": "select * from t10, t11 where t10.ts=t11.ts"
|
||||
}
|
||||
for type, sql in sqls.items():
|
||||
result_file = f"/tmp/queryResult_{type}.log"
|
||||
query_file = self.createqueryfile(sql)
|
||||
try:
|
||||
self.taosdemotable(query_file, resultfile=result_file)
|
||||
except subprocess.CalledProcessError as e:
|
||||
out_put = e.output
|
||||
if result_file:
|
||||
print(f"execute rows {type.split('_')[1]} sql, the sql is: {sql}")
|
||||
max_sql_time_cmd = f'''
|
||||
grep -o Spent.*s {result_file} |awk 'NR==1{{max=$2;next}}{{max=max>$2?max:$2}}END{{print "Max=",max,"s"}}'
|
||||
'''
|
||||
max_sql_time = subprocess.check_output(max_sql_time_cmd, shell=True).decode("UTF-8")
|
||||
print(f"{type.split('_')[1]} rows sql time : {max_sql_time}")
|
||||
|
||||
min_sql_time_cmd = f'''
|
||||
grep -o Spent.*s {result_file} |awk 'NR==1{{min=$2;next}}{{min=min<$2?min:$2}}END{{print "Min=",min,"s"}}'
|
||||
'''
|
||||
min_sql_time = subprocess.check_output(min_sql_time_cmd, shell=True).decode("UTF-8")
|
||||
print(f"{type.split('_')[1]} rows sql time : {min_sql_time}")
|
||||
|
||||
avg_sql_time_cmd = f'''
|
||||
grep -o Spent.*s {result_file} |awk '{{sum+=$2}}END{{print "Average=",sum/NR,"s"}}'
|
||||
'''
|
||||
avg_sql_time = subprocess.check_output(avg_sql_time_cmd, shell=True).decode("UTF-8")
|
||||
print(f"{type.split('_')[1]} rows sql time : {avg_sql_time}")
|
||||
|
||||
self.droptmpfile(query_file)
|
||||
self.droptmpfile(result_file)
|
||||
|
||||
drop_query_tmt_file_cmd = " find ./ -name 'querySystemInfo-*' -type f -exec rm {} \; "
|
||||
try:
|
||||
_ = subprocess.check_output(drop_query_tmt_file_cmd, shell=True).decode("utf-8")
|
||||
except subprocess.CalledProcessError as e:
|
||||
_ = e.output
|
||||
pass
|
||||
|
||||
def td4153(self):
|
||||
self.td4153insert()
|
||||
self.td4153query()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'-r',
|
||||
'--remove-cache',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help='clear cache before query (default: False)')
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--database-name',
|
||||
action='store',
|
||||
default='db',
|
||||
type=str,
|
||||
help='Database name to be created (default: db)')
|
||||
parser.add_argument(
|
||||
'-k',
|
||||
'--keep-time',
|
||||
action='store',
|
||||
default=3650,
|
||||
type=int,
|
||||
help='Database keep parameters (default: 3650)')
|
||||
|
||||
args = parser.parse_args()
|
||||
taosdemo = Taosdemo(args.remove_cache, args.database_name, args.keep_time)
|
||||
# taosdemo.conn = taos.connect(
|
||||
# taosdemo.host,
|
||||
# taosdemo.user,
|
||||
# taosdemo.password,
|
||||
# taosdemo.config
|
||||
# )
|
||||
|
||||
debugdir = taosdemo.getExeToolsDir()
|
||||
cfgdir = taosdemo.getCfgDir()
|
||||
cmd = f"{debugdir}/taosd -c {cfgdir} >/dev/null 2>&1 &"
|
||||
try:
|
||||
_ = subprocess.check_output(cmd, shell=True).decode("utf-8")
|
||||
except subprocess.CalledProcessError as e:
|
||||
_ = e.output
|
||||
|
||||
if taosdemo.clearCache:
|
||||
# must be root permission
|
||||
subprocess.check_output("echo 3 > /proc/sys/vm/drop_caches", shell=True).decode("utf-8")
|
||||
|
||||
taosdemo.td4153()
|
|
@ -24,7 +24,7 @@ class taosdemoPerformace:
|
|||
self.host = "127.0.0.1"
|
||||
self.user = "root"
|
||||
self.password = "taosdata"
|
||||
self.config = "/etc/taosperf"
|
||||
self.config = "/etc/perf"
|
||||
self.conn = taos.connect(
|
||||
self.host,
|
||||
self.user,
|
||||
|
@ -77,7 +77,7 @@ class taosdemoPerformace:
|
|||
|
||||
insert_data = {
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taosperf",
|
||||
"cfgdir": "/etc/perf",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
|
|
|
@ -887,10 +887,16 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y);
|
|||
#valid sql: select first(c1), last(c2), count(*) from select_tags_mt0 group by tbname, t1;
|
||||
#valid sql: select first(c1), tbname, t1 from select_tags_mt0 group by t2;
|
||||
|
||||
print ==================================>TD-4231
|
||||
sql_error select t1,tbname from select_tags_mt0 where c1<0
|
||||
sql_error select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
|
||||
|
||||
sql select tbname from select_tags_mt0 where tbname in ('select_tags_tb12');
|
||||
|
||||
sql_error select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
|
||||
sql_error select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
|
||||
sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
|
||||
# this sql is valid: select first(c1), t2 from select_tags_mt0 group by tbname;
|
||||
#valid sql: select first(c1), t2 from select_tags_mt0 group by tbname;
|
||||
|
||||
#sql select first(ts), tbname from select_tags_mt0 group by tbname;
|
||||
#sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
|
||||
|
|
|
@ -158,7 +158,7 @@ if $dnode4Vtatus != offline then
|
|||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
if $dnode3Vtatus != master then
|
||||
if $dnode3Vtatus != unsynced then
|
||||
sleep 2000
|
||||
goto wait_dnode4_vgroup_offline
|
||||
endi
|
||||
|
|
Loading…
Reference in New Issue