Merge branch 'develop' into test/testcase

This commit is contained in:
liuyq-617 2021-03-17 13:06:12 +08:00
commit cbee31ead8
36 changed files with 1126 additions and 462 deletions

View File

@ -394,7 +394,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
TSKEY k = *(TSKEY *)start;
if (k == 0) {
if (k == INT64_MIN) {
if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
return -1;
} else if (pDataBlocks->tsSource == -1) {

View File

@ -520,7 +520,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qhandle);
} else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
@ -528,12 +528,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qhandle);
}
} else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId);
tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qhandle:%" PRIX64, pSql, pTableMeta->vgId, pSql->res.qhandle);
}
pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);

View File

@ -41,41 +41,46 @@ static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, co
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
switch(type) {
case TSDB_DATA_TYPE_TINYINT: {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:{
int8_t* p = (int8_t*) dest;
int8_t* pSrc = (int8_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
return;
}
case TSDB_DATA_TYPE_SMALLINT: {
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:{
int16_t* p = (int16_t*) dest;
int16_t* pSrc = (int16_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
return;
}
case TSDB_DATA_TYPE_INT: {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: {
int32_t* p = (int32_t*) dest;
int32_t* pSrc = (int32_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
return;
}
case TSDB_DATA_TYPE_BIGINT: {
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: {
int64_t* p = (int64_t*) dest;
int64_t* pSrc = (int64_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
return;
}
case TSDB_DATA_TYPE_FLOAT: {
float* p = (float*) dest;
@ -84,7 +89,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
return;
}
case TSDB_DATA_TYPE_DOUBLE: {
double* p = (double*) dest;
@ -93,7 +98,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
return;
}
default: assert(0);
}

View File

@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else {
if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0);
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
}
if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp);

View File

@ -28,7 +28,7 @@ typedef void* qinfo_t;
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t *qId);
/**
@ -88,9 +88,10 @@ void* qOpenQueryMgmt(int32_t vgId);
void qQueryMgmtNotifyClosed(void* pExecutor);
void qQueryMgmtReOpen(void *pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId);
#ifdef __cplusplus
}

View File

@ -79,6 +79,9 @@ typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
// reset version
typedef int32_t (*FResetVersion)(int32_t vgId, uint64_t fver);
typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd);
typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd);
@ -96,6 +99,7 @@ typedef struct {
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FResetVersion resetVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
} SSyncInfo;
@ -108,8 +112,8 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo *);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype);
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code);
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force);
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force);
void syncRecover(int64_t rid); // recover from other nodes:
int32_t syncGetNodesRole(int64_t rid, SNodesRole *);

View File

@ -65,6 +65,7 @@ void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
uint64_t walGetVersion(twalh);
void walResetVersion(twalh, uint64_t newVer);
#ifdef __cplusplus
}

View File

@ -78,7 +78,7 @@ void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
// vnodeSync
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
// vnodeRead
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);

View File

@ -36,6 +36,7 @@ ELSEIF (TD_DARWIN)
LIST(APPEND SRC ./src/shellDarwin.c)
LIST(APPEND SRC ./src/shellCommand.c)
LIST(APPEND SRC ./src/shellImport.c)
LIST(APPEND SRC ./src/shellCheck.c)
ADD_EXECUTABLE(shell ${SRC})
# linking with dylib
TARGET_LINK_LIBRARIES(shell taos)

View File

@ -51,6 +51,7 @@ typedef struct SShellArguments {
char file[TSDB_FILENAME_LEN];
char dir[TSDB_FILENAME_LEN];
int threadNum;
int check;
char* commands;
int abort;
int port;
@ -71,6 +72,7 @@ void read_history();
void write_history();
void source_file(TAOS* con, char* fptr);
void source_dir(TAOS* con, SShellArguments* args);
void shellCheck(TAOS* con, SShellArguments* args);
void get_history_path(char* history);
void cleanup_handler(void* arg);
void exitShell();

View File

@ -0,0 +1,199 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _GNU_SOURCE
#define _XOPEN_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "shell.h"
#include "shellCommand.h"
#include "tglobal.h"
#include "tutil.h"
#define SHELL_SQL_LEN 1024
static int32_t tbNum = 0;
static int32_t tbMallocNum = 0;
static char ** tbNames = NULL;
static int32_t checkedNum = 0;
static int32_t errorNum = 0;
typedef struct {
pthread_t threadID;
int threadIndex;
int totalThreads;
void * taos;
char * db;
} ShellThreadObj;
static int32_t shellUseDb(TAOS *con, char *db) {
if (db == NULL) {
fprintf(stdout, "no dbname input\n");
return -1;
}
char sql[SHELL_SQL_LEN] = {0};
snprintf(sql, SHELL_SQL_LEN, "use %s", db);
TAOS_RES *pSql = taos_query(con, sql);
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql));
}
taos_free_result(pSql);
return code;
}
static int32_t shellShowTables(TAOS *con, char *db) {
char sql[SHELL_SQL_LEN] = {0};
snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db);
TAOS_RES *pSql = taos_query(con, sql);
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql));
} else {
TAOS_ROW row;
while ((row = taos_fetch_row(pSql))) {
int32_t tbIndex = tbNum++;
if (tbMallocNum < tbNum) {
tbMallocNum = (tbMallocNum * 2 + 1);
tbNames = realloc(tbNames, tbMallocNum * sizeof(char *));
if (tbNames == NULL) {
fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum);
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
break;
}
}
tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN);
strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN);
if (tbIndex % 100000 == 0 && tbIndex != 0) {
fprintf(stdout, "%d tablenames fetched\n", tbIndex);
}
}
}
taos_free_result(pSql);
fprintf(stdout, "total %d tablenames fetched, over\n", tbNum);
return code;
}
static void shellFreeTbnames() {
for (int32_t i = 0; i < tbNum; ++i) {
free(tbNames[i]);
}
free(tbNames);
}
static void *shellCheckThreadFp(void *arg) {
ShellThreadObj *pThread = (ShellThreadObj *)arg;
int32_t interval = tbNum / pThread->totalThreads + 1;
int32_t start = pThread->threadIndex * interval;
int32_t end = (pThread->threadIndex + 1) * interval;
if (end > tbNum) end = tbNum + 1;
char file[32] = {0};
snprintf(file, 32, "tb%d.txt", pThread->threadIndex);
FILE *fp = fopen(file, "w");
if (!fp) {
fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno));
return NULL;
}
char sql[SHELL_SQL_LEN];
for (int32_t t = start; t < end; ++t) {
char *tbname = tbNames[t];
if (tbname == NULL) break;
snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname);
TAOS_RES *pSql = taos_query(pThread->taos, sql);
int32_t code = taos_errno(pSql);
if (code != 0) {
int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname);
fwrite(sql, 1, len, fp);
atomic_add_fetch_32(&errorNum, 1);
}
int32_t cnum = atomic_add_fetch_32(&checkedNum, 1);
if (cnum % 5000 == 0 && cnum != 0) {
fprintf(stdout, "%d tables checked\n", cnum);
}
taos_free_result(pSql);
}
fsync(fileno(fp));
fclose(fp);
return NULL;
}
static void shellRunCheckThreads(TAOS *con, SShellArguments *args) {
pthread_attr_t thattr;
ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj));
for (int t = 0; t < args->threadNum; ++t) {
ShellThreadObj *pThread = threadObj + t;
pThread->threadIndex = t;
pThread->totalThreads = args->threadNum;
pThread->taos = con;
pThread->db = args->database;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) {
fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
exit(0);
}
}
for (int t = 0; t < args->threadNum; ++t) {
pthread_join(threadObj[t].threadID, NULL);
}
for (int t = 0; t < args->threadNum; ++t) {
taos_close(threadObj[t].taos);
}
free(threadObj);
}
void shellCheck(TAOS *con, SShellArguments *args) {
int64_t start = taosGetTimestampMs();
if (shellUseDb(con, args->database) != 0) {
shellFreeTbnames();
return;
}
if (shellShowTables(con, args->database) != 0) {
shellFreeTbnames();
return;
}
fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, args->threadNum);
shellRunCheckThreads(con, args);
int64_t end = taosGetTimestampMs();
fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum,
(end - start) / 1000.0);
}

View File

@ -121,6 +121,12 @@ TAOS *shellInit(SShellArguments *args) {
taos_close(con);
exit(EXIT_SUCCESS);
}
if (args->check != 0) {
shellCheck(con, args);
taos_close(con);
exit(EXIT_SUCCESS);
}
#endif
return con;
@ -412,7 +418,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt < 0 && ms != 0) {
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;

View File

@ -45,6 +45,7 @@ static struct argp_option options[] = {
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"check", 'k', "CHECK", 0, "Check tables."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync."},
@ -130,6 +131,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return -1;
}
break;
case 'k':
arguments->check = atoi(arg);
break;
case 'd':
arguments->database = arg;
break;

View File

@ -41,8 +41,7 @@
"insert_mode": "taosc",
"insert_rows": 100000,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100,
"rows_per_tbl": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,

View File

@ -23,7 +23,6 @@
#ifdef LINUX
#include <argp.h>
#include <assert.h>
#include <inttypes.h>
#ifndef _ALPINE
#include <error.h>
@ -39,11 +38,11 @@
#include <wordexp.h>
#include <regex.h>
#else
#include <assert.h>
#include <regex.h>
#include <stdio.h>
#endif
#include <assert.h>
#include <stdlib.h>
#include "cJSON.h"
@ -221,7 +220,6 @@ typedef struct SSuperTable_S {
int childTblOffset;
int multiThreadWriteOneTbl; // 0: no, 1: yes
int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl
int rowsPerTbl; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision
@ -396,6 +394,8 @@ typedef struct SThreadInfo_S {
uint64_t et;
int64_t lastTs;
// sample data
int samplePos;
// statistics
int64_t totalInsertRows;
int64_t totalAffectedRows;
@ -1126,8 +1126,6 @@ static int printfInsertMeta() {
}else {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
}
printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql);
printf(" rowsPerTbl: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].rowsPerTbl);
printf(" disorderRange: \033[33m%d\033[0m\n",
@ -1287,7 +1285,6 @@ static void printfInsertMetaToFile(FILE* fp) {
}else {
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
}
fprintf(fp, " numberOfTblInOneSql: %d\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql);
fprintf(fp, " rowsPerTbl: %d\n", g_Dbs.db[i].superTbls[j].rowsPerTbl);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio);
@ -2335,7 +2332,8 @@ static int createDatabases() {
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", strlen("us")))) {
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
@ -2351,14 +2349,17 @@ static int createDatabases() {
debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// describe super table, if exists
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
ret = createSuperTable(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
} else {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS;
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j]);
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]);
}
if (0 != ret) {
@ -2715,6 +2716,8 @@ static int readSampleFromCsvFileToMem(
continue;
}
verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen, superTblInfo->lenOfOneRow, getRows);
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen);
getRows++;
@ -3426,24 +3429,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER;
}
cJSON* numberOfTblInOneSql = cJSON_GetObjectItem(stbInfo, "number_of_tbl_in_one_sql");
if (numberOfTblInOneSql && numberOfTblInOneSql->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = numberOfTblInOneSql->valueint;
} else if (!numberOfTblInOneSql) {
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = 0;
} else {
printf("ERROR: failed to read json, numberOfTblInOneSql not found\n");
goto PARSE_OVER;
}
cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "rows_per_tbl");
if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint;
} else if (!rowsPerTbl) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = 1;
g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
printf("ERROR: failed to read json, rowsPerTbl not found\n");
fprintf(stderr, "ERROR: failed to read json, rowsPerTbl input mistake\n");
goto PARSE_OVER;
}
@ -3901,7 +3893,7 @@ PARSE_OVER:
return ret;
}
void prepareSampleData() {
static void prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
//if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) {
@ -3915,7 +3907,7 @@ void prepareSampleData() {
}
}
void postFreeResource() {
static void postFreeResource() {
tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
@ -3942,16 +3934,18 @@ void postFreeResource() {
static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int* sampleUsePos) {
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
int ret = readSampleFromCsvFileToMem(superTblInfo);
/* int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
*/
*sampleUsePos = 0;
}
int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
@ -3967,9 +3961,11 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN);
printf("binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return (-1);
}
@ -3981,15 +3977,24 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
rand_string(buf, stbInfo->columns[i].dataLen);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "\'%s\', ", buf);
tmfree(buf);
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "int", 3)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_int());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bigint", 6)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "float", 5)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_float());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "double", 6)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_double());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "smallint", 8)) {
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"int", 3)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%d, ", rand_int());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"bigint", 6)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"float", 5)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%f, ", rand_float());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"double", 6)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%f, ", rand_double());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"smallint", 8)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_smallint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_tinyint());
@ -4009,255 +4014,6 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
return dataLen;
}
static void syncWriteForNumberOfTblInOneSql(
threadInfo *winfo, char* sampleDataBuf) {
SSuperTable* superTblInfo = winfo->superTblInfo;
int samplePos = 0;
//printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id);
int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen+1, 1);
if (NULL == buffer) {
printf("========calloc size[ %d ] fail!\n", superTblInfo->maxSqlLen);
return;
}
int32_t numberOfTblInOneSql = superTblInfo->numberOfTblInOneSql;
int32_t tbls = winfo->end_table_id - winfo->start_table_id + 1;
if (numberOfTblInOneSql > tbls) {
numberOfTblInOneSql = tbls;
}
uint64_t time_counter = winfo->start_time;
int sampleUsePos;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int64_t st = 0;
int64_t et = 0xffffffff;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
for (int i = 0; i < insertRows;) {
int32_t tbl_id = 0;
for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) {
int64_t start_time = 0;
int inserted = i;
for (int k = 0; k < g_args.num_of_RPR;) {
int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer;
int32_t end_tbl_id = tableSeq + numberOfTblInOneSql;
if (end_tbl_id > winfo->end_table_id) {
end_tbl_id = winfo->end_table_id+1;
}
for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) {
sampleUsePos = samplePos;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo, tbl_id % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
goto free_and_statistics;
}
if (0 == len) {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d using %s.%s tags %s values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s%d using %s.%s tags %s values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
}
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
if (0 == len) {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s values ",
winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s values ",
winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
}
} else { // pre-create child table
if (0 == len) {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s%d values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id);
}
}
start_time = time_counter;
for (int j = 0; j < superTblInfo->rowsPerTbl;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(pstr + len,
superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep,
superTblInfo,
&sampleUsePos);
if (retLen < 0) {
goto free_and_statistics;
}
} else if (0 == strncasecmp(
superTblInfo->dataSource, "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = start_time - taosRandom() % superTblInfo->disorderRange;
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
} else {
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep,
superTblInfo);
}
if (retLen < 0) {
goto free_and_statistics;
}
}
len += retLen;
//inserted++;
j++;
winfo->totalInsertRows++;
if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tableSeq = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
superTblInfo->lenOfOneRow);
goto send_to_server;
}
}
}
tableSeq = tbl_id;
inserted += superTblInfo->rowsPerTbl;
send_to_server:
if (insert_interval) {
st = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000)) {
int sleep_time = insert_interval - (et -st);
printf("sleep: %d ms insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
}
if (0 == strncasecmp(superTblInfo->insertMode,
"taosc",
strlen("taosc"))) {
//printf("multi table===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
int64_t startTs;
int64_t endTs;
startTs = taosGetTimestampUs();
debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(
winfo->taos, buffer, INSERT_TYPE);
if (0 < affectedRows) {
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo->totalAffectedRows += affectedRows;
} else {
fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows);
goto free_and_statistics;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID,
winfo->totalInsertRows,
winfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
//int64_t t2 = taosGetTimestampMs();
//printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0);
} else {
//int64_t t1 = taosGetTimestampMs();
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
//int64_t t2 = taosGetTimestampMs();
//printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if (0 != retCode) {
printf("========restful return fail, threadID[%d]\n", winfo->threadID);
goto free_and_statistics;
}
}
if (insert_interval) {
et = taosGetTimestampUs();
}
break;
}
if (tableSeq > winfo->end_table_id) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos;
}
i = inserted;
time_counter = start_time;
}
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
}
free_and_statistics:
tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows);
return;
}
int32_t generateData(char *res, char **data_type,
int num_of_cols, int64_t timestamp, int lenOfBinary) {
memset(res, 0, MAX_DATA_SIZE);
@ -4319,26 +4075,23 @@ int32_t generateData(char *res, char **data_type,
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
char* sampleDataBuf = NULL;
// each thread read sample data from csv file
if (0 == strncasecmp(superTblInfo->dataSource,
"sample",
strlen("sample"))) {
sampleDataBuf = calloc(
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
if (sampleDataBuf == NULL) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
}
}
superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
fprintf(stderr, "read sample from csv file failed.\n");
tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
}
return 0;
@ -4392,7 +4145,6 @@ static int generateDataBuffer(int32_t tableSeq,
assert(buffer != NULL);
char *pChildTblName;
int childTblCount;
pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (NULL == pChildTblName) {
@ -4400,14 +4152,12 @@ static int generateDataBuffer(int32_t tableSeq,
return -1;
}
if (superTblInfo && (superTblInfo->childTblOffset > 0)) {
// select tbname from stb limit 1 offset tableSeq
getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos,
pThreadInfo->db_name, superTblInfo->sTblName,
&pChildTblName, &childTblCount,
1, tableSeq);
if (superTblInfo && (superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d",
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d",
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq);
}
@ -4467,7 +4217,7 @@ static int generateDataBuffer(int32_t tableSeq,
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) {
if (superTblInfo) {
int retLen = 0;
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
@ -4488,27 +4238,26 @@ static int generateDataBuffer(int32_t tableSeq,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d);
} else {
retLen = generateRowData(
} else {
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo);
}
}
}
if (retLen < 0) {
free(pChildTblName);
return -1;
}
if (retLen < 0) {
free(pChildTblName);
return -1;
}
len += retLen;
len += retLen;
if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite
k++;
break;
}
}
if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite
k++;
break;
}
} else {
int rand_num = taosRandom() % 100;
char data[MAX_DATA_SIZE];
@ -4529,14 +4278,13 @@ static int generateDataBuffer(int32_t tableSeq,
}
pstr += sprintf(pstr, " %s", data);
//assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long
k++;
break;
}
}
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%p\n", __func__, __LINE__, len, k, buffer);
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer);
k++;
startFrom ++;
@ -4572,20 +4320,6 @@ static void* syncWrite(void *sarg) {
return NULL;
}
if (superTblInfo) {
if (0 != prepareSampleDataForSTable(superTblInfo))
return NULL;
if (superTblInfo->numberOfTblInOneSql > 0) {
syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf);
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return NULL;
}
}
int samplePos = 0;
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
@ -4598,7 +4332,7 @@ static void* syncWrite(void *sarg) {
winfo->totalInsertRows = 0;
winfo->totalAffectedRows = 0;
int sampleUsePos;
winfo->samplePos = 0;
for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id;
tableSeq ++) {
@ -4612,10 +4346,8 @@ static void* syncWrite(void *sarg) {
st = taosGetTimestampUs();
}
sampleUsePos = samplePos;
int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows,
i, start_time, &sampleUsePos);
i, start_time, &(winfo->samplePos));
if (generated > 0)
i += generated;
else
@ -4662,16 +4394,12 @@ static void* syncWrite(void *sarg) {
if ((tableSeq == winfo->end_table_id) && superTblInfo &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
samplePos = sampleUsePos;
printf("%s() LN%d samplePos=%d\n", __func__, __LINE__, winfo->samplePos);
}
} // tableSeq
free_and_statistics_2:
tmfree(buffer);
if (superTblInfo) {
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
}
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID,
@ -4842,6 +4570,45 @@ static void startMultiThreadInsertData(int threads, char* db_name,
else
last = 0;
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
fprintf(stderr, "prepare sample data for stable failed!\n");
exit(-1);
}
}
if (superTblInfo && (superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
TAOS* taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
fprintf(stderr, "connect to server fail , reason: %s\n",
taos_errstr(NULL));
exit(-1);
}
superTblInfo->childTblName = (char*)calloc(1,
superTblInfo->childTblLimit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!");
taos_close(taos);
exit(-1);
}
int childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
superTblInfo->childTblLimit,
superTblInfo->childTblOffset);
taos_close(taos);
}
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;

View File

@ -680,7 +680,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
if (pRow != NULL) {
// forward to peers
pRow->processedCount = 0;
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC);
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
if (syncCode <= 0) pRow->processedCount = 1;
if (syncCode < 0) {
@ -700,7 +700,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC);
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
// from wal or forward msg, row not created, should add into hash
if (action == SDB_ACTION_INSERT) {
@ -1119,7 +1119,7 @@ static void *sdbWorkerFp(void *pWorker) {
sdbConfirmForward(1, pRow, pRow->code);
} else {
if (qtype == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code);
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code, false);
}
sdbFreeFromQueue(pRow);
}

View File

@ -308,6 +308,7 @@ enum {
typedef struct SQInfo {
void* signature;
uint64_t qId;
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
@ -429,7 +430,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId);
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);

View File

@ -98,6 +98,9 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
uint64_t queryHandleId = 0;
int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2;
}
@ -4173,8 +4176,10 @@ static SSDataBlock* doTableScan(void* param) {
assert(ret);
}
pResultRowInfo->curIndex = 0;
pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
if (pResultRowInfo->size > 0) {
pResultRowInfo->curIndex = 0;
pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
}
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey);
@ -6109,9 +6114,13 @@ void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) {
pResultInfo->total = 0;
}
FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery,
char* sql) {
char* sql, uint64_t *qId) {
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
@ -6252,7 +6261,9 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
// todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
*qId = pQInfo->qId;
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
return pQInfo;
_cleanup_qinfo:

View File

@ -68,7 +68,7 @@ void freeParam(SQueryParam *param) {
tfree(param->prevResult);
}
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo) {
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t *qId) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
@ -158,7 +158,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql);
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId);
param.sql = NULL;
param.pExprs = NULL;
@ -472,7 +472,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) {
if (pMgmt == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
@ -492,8 +492,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
} else {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000));
pthread_mutex_unlock(&pQueryMgmt->lock);

View File

@ -117,6 +117,7 @@ typedef struct SSyncNode {
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FResetVersion resetVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
pthread_mutex_t mutex;

View File

@ -56,7 +56,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
static void syncStartCheckPeerConn(SSyncPeer *pPeer);
@ -182,6 +182,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode->startSyncFileFp = pInfo->startSyncFileFp;
pNode->stopSyncFileFp = pInfo->stopSyncFileFp;
pNode->getVersionFp = pInfo->getVersionFp;
pNode->resetVersionFp = pInfo->resetVersionFp;
pNode->sendFileFp = pInfo->sendFileFp;
pNode->recvFileFp = pInfo->recvFileFp;
@ -377,24 +378,24 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
return 0;
}
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) {
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype, bool force) {
if (rid <= 0) return 0;
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return 0;
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype, force);
syncReleaseNode(pNode);
return code;
}
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force) {
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return;
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) {
if (pPeer && (pNode->quorum > 1 || force)) {
SFwdRsp rsp;
syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
@ -1413,7 +1414,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
syncReleaseNode(pNode);
}
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) {
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force) {
SSyncPeer *pPeer;
SSyncHead *pSyncHead;
SWalHead * pWalHead = data;
@ -1457,7 +1458,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) {
if ((pNode->quorum > 1 || force) && code == 0) {
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
if (code >= 0) code = 1;
}

View File

@ -238,6 +238,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
(*pNode->stopSyncFileFp)(pNode->vgId, fversion);
nodeVersion = fversion;
if (pNode->resetVersionFp) (*pNode->resetVersionFp)(pNode->vgId, fversion);
sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion);
uint64_t wver = 0;

View File

@ -30,8 +30,9 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion);
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
#ifdef __cplusplus
}

View File

@ -305,6 +305,7 @@ int32_t vnodeOpen(int32_t vgId) {
syncInfo.startSyncFileFp = vnodeStartSyncFile;
syncInfo.stopSyncFileFp = vnodeStopSyncFile;
syncInfo.getVersionFp = vnodeGetVersion;
syncInfo.resetVersionFp = vnodeResetVersion;
syncInfo.sendFileFp = tsdbSyncSend;
syncInfo.recvFileFp = tsdbSyncRecv;
syncInfo.pTsdb = pVnode->tsdb;

View File

@ -247,7 +247,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) {
qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
uint64_t qId = 0;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
@ -259,22 +260,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
handle = qRegisterQInfo(pVnode->qMgmt, qId, (uint64_t)pQInfo);
if (handle == NULL) { // failed to register qhandle
pRsp->code = terrno;
terrno = 0;
vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
vError("vgId:%d, QInfo:%"PRIu64 "-%p register qhandle failed, return to app, code:%s", pVnode->vgId, qId, (void *)pQInfo,
tstrerror(pRsp->code));
qDestroyQueryInfo(pQInfo); // destroy it directly
return pRsp->code;
} else {
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t)pQInfo);
pRsp->qhandle = htobe64(qId);
}
if (handle != NULL &&
vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
vError("vgId:%d, QInfo:%"PRIu64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle,
pRead->rpcHandle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
@ -285,7 +286,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
if (handle != NULL) {
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
vTrace("vgId:%d, QInfo:%"PRIu64 "-%p, dnode query msg disposed, create qhandle and returns to app", vgId, qId, *handle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle);
if (code != TSDB_CODE_SUCCESS) {
pRsp->code = code;
@ -349,7 +350,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
pRetrieve->free = htons(pRetrieve->free);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle,
pRetrieve->free, pRead->rpcHandle);
memset(pRet, 0, sizeof(SRspRet));
@ -360,19 +361,19 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (handle == NULL) {
code = terrno;
terrno = TSDB_CODE_SUCCESS;
} else if ((*handle) != (void *)pRetrieve->qhandle) {
} else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
}
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle);
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
@ -383,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pRead->rpcHandle);
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);

View File

@ -158,7 +158,23 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
return code;
}
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code);
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while reset version", vgId);
return -1;
}
pVnode->fversion = fver;
pVnode->version = fver;
walResetVersion(pVnode->wal, fver);
vDebug("vgId:%d, version reset to %" PRIu64, vgId, fver);
vnodeRelease(pVnode);
return 0;
}
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) {
SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code, force);
}

View File

@ -89,7 +89,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype);
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
if (syncCode < 0) return syncCode;
// write into WAL

View File

@ -446,3 +446,16 @@ uint64_t walGetVersion(twalh param) {
return pWal->version;
}
// Wal version in slave (dnode1) must be reset.
// Because after the data file is recovered from peer (dnode2), the new file version in dnode1 may become smaller than origin.
// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent,
// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur
void walResetVersion(twalh param, uint64_t newVer) {
SWal *pWal = param;
if (pWal == 0) return;
wDebug("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer);
pWal->version = newVer;
}

View File

@ -44,7 +44,6 @@
"childtable_offset": 33,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,

View File

@ -0,0 +1,3 @@
1
2
3
1 1
2 2
3 3

View File

@ -0,0 +1,39 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"confirm_parameter_prompt": "no",
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes"
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 20,
"childtable_limit": 10,
"childtable_offset": 0,
"childtable_prefix": "t_",
"auto_create_table": "no",
"data_source": "sample",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 20,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048000,
"timestamp_step": 1000,
"start_timestamp": "2020-1-1 00:00:00",
"sample_format": "csv",
"sample_file": "./tools/sampledata.csv",
"columns": [{"type": "INT"}],
"tags": [{"type": "INT", "count":1}]
}]
}]
}

View File

@ -59,6 +59,15 @@ class TDTestCase:
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 33000)
os.system("%staosdemo -f tools/insert-tblimit-tboffset0.json" % binPath)
tdSql.execute("reset query cache")
tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 100)
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 20000)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)

View File

@ -0,0 +1,68 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
os.system("%staosdemo -f tools/taosdemo-sampledata.json" % binPath)
tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 20)
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 200)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -1,112 +1,58 @@
run general/parser/alter.sim
sleep 100
run general/parser/alter1.sim
sleep 100
run general/parser/alter_stable.sim
sleep 100
run general/parser/auto_create_tb.sim
sleep 100
run general/parser/auto_create_tb_drop_tb.sim
sleep 100
run general/parser/col_arithmetic_operation.sim
sleep 100
run general/parser/columnValue.sim
sleep 100
run general/parser/commit.sim
sleep 100
run general/parser/create_db.sim
sleep 100
run general/parser/create_mt.sim
sleep 100
run general/parser/create_tb.sim
sleep 100
run general/parser/dbtbnameValidate.sim
sleep 100
run general/parser/fill.sim
sleep 100
run general/parser/fill_stb.sim
sleep 100
#run general/parser/fill_us.sim #
sleep 100
run general/parser/first_last.sim
sleep 100
run general/parser/import_commit1.sim
sleep 100
run general/parser/import_commit2.sim
sleep 100
run general/parser/import_commit3.sim
sleep 100
#run general/parser/import_file.sim
sleep 100
run general/parser/insert_tb.sim
sleep 100
run general/parser/tags_dynamically_specifiy.sim
sleep 100
run general/parser/interp.sim
sleep 100
run general/parser/lastrow.sim
sleep 100
run general/parser/limit.sim
sleep 100
run general/parser/limit1.sim
sleep 100
run general/parser/limit1_tblocks100.sim
sleep 100
run general/parser/limit2.sim
sleep 100
run general/parser/mixed_blocks.sim
sleep 100
run general/parser/nchar.sim
sleep 100
run general/parser/null_char.sim
sleep 100
run general/parser/selectResNum.sim
sleep 100
run general/parser/select_across_vnodes.sim
sleep 100
run general/parser/select_from_cache_disk.sim
sleep 100
run general/parser/set_tag_vals.sim
sleep 100
run general/parser/single_row_in_tb.sim
sleep 100
run general/parser/slimit.sim
sleep 100
run general/parser/slimit1.sim
sleep 100
run general/parser/slimit_alter_tags.sim
sleep 100
run general/parser/tbnameIn.sim
sleep 100
run general/parser/slimit_alter_tags.sim # persistent failed
sleep 100
run general/parser/join.sim
sleep 100
run general/parser/join_multivnode.sim
sleep 100
run general/parser/join_manyblocks.sim
run general/parser/projection_limit_offset.sim
sleep 100
run general/parser/select_with_tags.sim
sleep 100
run general/parser/groupby.sim
sleep 100
run general/parser/tags_filter.sim
sleep 100
run general/parser/topbot.sim
sleep 100
run general/parser/union.sim
sleep 100
run general/parser/constCol.sim
sleep 100
run general/parser/where.sim
sleep 100
run general/parser/timestamp.sim
sleep 100
run general/parser/sliding.sim
sleep 100
run general/parser/function.sim
sleep 100
run general/parser/stableOp.sim
sleep 100
run general/parser/slimit_alter_tags.sim

View File

@ -350,5 +350,13 @@ if $rows != 0 then
return -1
endi
print ==========================>td-3318
sql create table tu(ts timestamp, k int, b binary(12))
sql insert into tu values(now, 1, 'abc')
sql select stddev(k) from tu where b <>'abc' interval(1s)
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,556 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c role -v 1
system sh/cfg.sh -n dnode2 -c role -v 2
system sh/cfg.sh -n dnode3 -c role -v 2
system sh/cfg.sh -n dnode4 -c role -v 2
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
print ============== step1: start dnode1, only deploy mnode
system sh/exec.sh -n dnode1 -s start
sql connect
print ============== step2: start dnode2/dnode3
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sql create dnode $hostname2
sql create dnode $hostname3
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step2
endi
if $data4_2 != ready then
goto step2
endi
if $data4_3 != ready then
goto step2
endi
sleep 1000
print ============== step3
sql create database db replica 2
sql use db
sql create table stb (ts timestamp, c1 int, c2 int) tags(t1 int)
sql create table t1 using stb tags(1)
sql insert into t1 values(1577980800000, 1, 5)
sql insert into t1 values(1577980800001, 2, 4)
sql insert into t1 values(1577980800002, 3, 3)
sql insert into t1 values(1577980800003, 4, 2)
sql insert into t1 values(1577980800004, 5, 1)
sql show db.vgroups
if $data04 != 3 then
return -1
endi
if $data06 != 2 then
return -1
endi
if $data05 != master then
return -1
endi
if $data07 != slave then
return -1
endi
sql select * from t1
if $rows != 5 then
return -1
endi
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
system sh/exec.sh -n dnode3 -s stop -x SIGKILL
print ============== step4
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
$x = 0
step4:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step4
endi
if $data4_2 != ready then
goto step4
endi
if $data4_3 != ready then
goto step4
endi
sql show db.vgroups
if $data04 != 3 then
goto step4
endi
if $data06 != 2 then
goto step4
endi
if $data05 != master then
goto step4
endi
if $data07 != slave then
goto step4
endi
sql create table t2 using stb tags(1)
sql insert into t2 values(1577980800000, 1, 5)
sql insert into t2 values(1577980800001, 2, 4)
sql insert into t2 values(1577980800002, 3, 3)
sql insert into t2 values(1577980800003, 4, 2)
sql insert into t2 values(1577980800004, 5, 1)
sql select * from t2
if $rows != 5 then
return -1
endi
print ============== step5
system sh/exec.sh -n dnode3 -s stop -x SIGKILL
$x = 0
step5:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step5
endi
if $data4_2 != ready then
goto step5
endi
if $data4_3 != offline then
goto step5
endi
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql show db.vgroups
if $data04 != 3 then
goto step5
endi
if $data06 != 2 then
goto step5
endi
if $data05 != offline then
goto step5
endi
if $data07 != master then
goto step5
endi
print ============== step6
sql create table t3 using stb tags(1)
sql insert into t3 values(1577980800000, 1, 5)
sql insert into t3 values(1577980800001, 2, 4)
sql insert into t3 values(1577980800002, 3, 3)
sql insert into t3 values(1577980800003, 4, 2)
sql insert into t3 values(1577980800004, 5, 1)
sql insert into t3 values(1577980800010, 11, 5)
sql insert into t3 values(1577980800011, 12, 4)
sql insert into t3 values(1577980800012, 13, 3)
sql insert into t3 values(1577980800013, 14, 2)
sql insert into t3 values(1577980800014, 15, 1)
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
system sh/exec.sh -n dnode3 -s start
$x = 0
step6:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step6
endi
if $data4_2 != ready then
goto step6
endi
if $data4_3 != ready then
goto step6
endi
sql show db.vgroups
if $data04 != 3 then
goto step6
endi
if $data06 != 2 then
goto step6
endi
if $data05 != slave then
goto step6
endi
if $data07 != master then
goto step6
endi
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
print ============== step7
sql create table t4 using stb tags(1)
sql insert into t4 values(1577980800000, 1, 5)
sql insert into t4 values(1577980800001, 2, 4)
sql insert into t4 values(1577980800002, 3, 3)
sql insert into t4 values(1577980800003, 4, 2)
sql insert into t4 values(1577980800004, 5, 1)
sql insert into t4 values(1577980800010, 11, 5)
sql insert into t4 values(1577980800011, 12, 4)
sql insert into t4 values(1577980800012, 13, 3)
sql insert into t4 values(1577980800013, 14, 2)
sql insert into t4 values(1577980800014, 15, 1)
sql insert into t4 values(1577980800020, 21, 5)
sql insert into t4 values(1577980800021, 22, 4)
sql insert into t4 values(1577980800022, 23, 3)
sql insert into t4 values(1577980800023, 24, 2)
sql insert into t4 values(1577980800024, 25, 1)
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
sql select * from t4
if $rows != 15 then
return -1
endi
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
$x = 0
step7:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step7
endi
if $data4_2 != offline then
goto step7
endi
if $data4_3 != ready then
goto step7
endi
sql show db.vgroups
if $data04 != 3 then
goto step7
endi
if $data06 != 2 then
goto step7
endi
if $data05 != master then
goto step7
endi
if $data07 != offline then
goto step7
endi
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
sql select * from t4
if $rows != 15 then
return -1
endi
print ============== step8
sql create table t5 using stb tags(1)
sql insert into t5 values(1577980800000, 1, 5)
sql insert into t5 values(1577980800001, 2, 4)
sql insert into t5 values(1577980800002, 3, 3)
sql insert into t5 values(1577980800003, 4, 2)
sql insert into t5 values(1577980800004, 5, 1)
sql insert into t5 values(1577980800010, 11, 5)
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
sql select * from t4
if $rows != 15 then
return -1
endi
sql select * from t5
if $rows != 6 then
return -1
endi
system sh/exec.sh -n dnode2 -s start
$x = 0
step8:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step8
endi
if $data4_2 != ready then
goto step8
endi
if $data4_3 != ready then
goto step8
endi
sql show db.vgroups
if $data04 != 3 then
goto step8
endi
if $data06 != 2 then
goto step8
endi
if $data05 != master then
goto step8
endi
if $data07 != slave then
goto step8
endi
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
sql select * from t4
if $rows != 15 then
return -1
endi
sql select * from t5
if $rows != 6 then
return -1
endi
print ============== step9
sql create table t6 using stb tags(1)
sql insert into t6 values(1577980800000, 1, 5)
sql insert into t6 values(1577980800001, 2, 4)
sql insert into t6 values(1577980800002, 3, 3)
sql insert into t6 values(1577980800003, 4, 2)
sql insert into t6 values(1577980800004, 5, 1)
sql insert into t6 values(1577980800010, 11, 5)
sql insert into t6 values(1577980800011, 12, 4)
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
sql select * from t4
if $rows != 15 then
return -1
endi
sql select * from t5
if $rows != 6 then
return -1
endi
sql select * from t6
if $rows != 7 then
return -1
endi
system sh/exec.sh -n dnode3 -s stop -x SIGKILL
$x = 0
step9:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step9
endi
if $data4_2 != ready then
goto step9
endi
if $data4_3 != offline then
goto step9
endi
print ============== 2
sql show db.vgroups
if $data04 != 3 then
goto step7
endi
if $data06 != 2 then
goto step7
endi
if $data05 != offline then
goto step7
endi
if $data07 != master then
goto step7
endi
print ============== 3
sql select * from t1
if $rows != 5 then
return -1
endi
sql select * from t2
if $rows != 5 then
return -1
endi
sql select * from t3
if $rows != 10 then
return -1
endi
sql select * from t4
if $rows != 15 then
return -1
endi
sql select * from t5
if $rows != 6 then
return -1
endi
sql select * from t6
if $rows != 7 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode2 -s stop
system sh/exec.sh -n dnode3 -s stop