Merge branch 'develop' into feature/query
This commit is contained in:
commit
73075efb6c
|
@ -64,4 +64,5 @@ CMakeError.log
|
|||
/out/isenseconfig/WSL-Clang-Debug
|
||||
/out/isenseconfig/WSL-GCC-Debug
|
||||
/test/cfg
|
||||
/src/.vs
|
||||
/src/.vs
|
||||
*.o
|
||||
|
|
|
@ -301,7 +301,9 @@ void tscSaveSubscriptionProgress(void* sub) {
|
|||
char path[256];
|
||||
sprintf(path, "%s/subscribe", tsDataDir);
|
||||
if (access(path, 0) != 0) {
|
||||
mkdir(path, 0777);
|
||||
if (mkdir(path, 0777) != 0 && errno != EEXIST) {
|
||||
tscError("failed to create subscribe dir: %s", path);
|
||||
}
|
||||
}
|
||||
|
||||
sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic);
|
||||
|
|
|
@ -80,9 +80,8 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn) {
|
|||
}
|
||||
|
||||
void taos_init_imp() {
|
||||
char temp[128];
|
||||
struct stat dirstat;
|
||||
|
||||
char temp[128];
|
||||
|
||||
errno = TSDB_CODE_SUCCESS;
|
||||
srand(taosGetTimestampSec());
|
||||
deltaToUtcInitOnce();
|
||||
|
@ -94,7 +93,9 @@ void taos_init_imp() {
|
|||
taosReadGlobalLogCfg();
|
||||
|
||||
// For log directory
|
||||
if (stat(tsLogDir, &dirstat) < 0) mkdir(tsLogDir, 0755);
|
||||
if (mkdir(tsLogDir, 0755) != 0 && errno != EEXIST) {
|
||||
printf("failed to create log dir:%s\n", tsLogDir);
|
||||
}
|
||||
|
||||
sprintf(temp, "%s/taoslog", tsLogDir);
|
||||
if (taosInitLog(temp, tsNumOfLogLines, 10) < 0) {
|
||||
|
@ -200,7 +201,7 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
|
|||
tscPrint("set shellActivityTimer:%d", tsShellActivityTimer);
|
||||
} else {
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr,
|
||||
tsCfgStatusStr[cfg->cfgStatus], (int32_t *)cfg->ptr);
|
||||
tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
|
||||
}
|
||||
break;
|
||||
|
||||
|
|
|
@ -137,7 +137,7 @@ class TDengineCursor(object):
|
|||
else:
|
||||
raise ProgrammingError(
|
||||
CTaosInterface.errStr(
|
||||
self._result ))
|
||||
self._result ), errno)
|
||||
|
||||
def executemany(self, operation, seq_of_parameters):
|
||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||
|
|
|
@ -139,7 +139,7 @@ class TDengineCursor(object):
|
|||
else:
|
||||
raise ProgrammingError(
|
||||
CTaosInterface.errStr(
|
||||
self._result ))
|
||||
self._result), errno)
|
||||
|
||||
def executemany(self, operation, seq_of_parameters):
|
||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||
|
|
|
@ -117,7 +117,7 @@ class TDengineCursor(object):
|
|||
self._fields = CTaosInterface.useResult(self._result)
|
||||
return self._handle_result()
|
||||
else:
|
||||
raise ProgrammingError(CTaosInterface.errStr(self._result))
|
||||
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
|
||||
|
||||
def executemany(self, operation, seq_of_parameters):
|
||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||
|
|
|
@ -117,7 +117,7 @@ class TDengineCursor(object):
|
|||
self._fields = CTaosInterface.useResult(self._result )
|
||||
return self._handle_result()
|
||||
else:
|
||||
raise ProgrammingError(CTaosInterface.errStr(self._result ))
|
||||
raise ProgrammingError(CTaosInterface.errStr(self._result ), errno)
|
||||
|
||||
def executemany(self, operation, seq_of_parameters):
|
||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||
|
|
|
@ -92,6 +92,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
// Initialize the system
|
||||
if (dnodeInitSystem() < 0) {
|
||||
syslog(LOG_ERR, "Error initialize TDengine system");
|
||||
dPrint("Failed to start TDengine, please check the log at:%s", tsLogDir);
|
||||
closelog();
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
|
|
@ -136,16 +136,17 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USERS, 0, 0x0355, "mnode too
|
|||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TABLE_ALREADY_EXIST, 0, 0x0360, "mnode table already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, 0, 0x0361, "mnode invalid table id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, 0, 0x0362, "mnode invalid table type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, 0, 0x0363, "mnode too many tags")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TABLES, 0, 0x0364, "mnode too many tables")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, 0, 0x0365, "mnode not enough time series")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, 0, 0x0366, "mnode no super table") // operation only available for super table
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, 0, 0x0367, "mnode column name too long")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, 0, 0x0368, "mnode tag already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, 0, 0x0369, "mnode tag not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_ALREAY_EXIST, 0, 0x036A, "mnode field already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_NOT_EXIST, 0, 0x036B, "mnode field not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, 0, 0x0362, "mnode invalid table name")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, 0, 0x0363, "mnode invalid table type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, 0, 0x0364, "mnode too many tags")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TABLES, 0, 0x0365, "mnode too many tables")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, 0, 0x0366, "mnode not enough time series")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, 0, 0x0367, "mnode no super table") // operation only available for super table
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, 0, 0x0368, "mnode column name too long")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, 0, 0x0369, "mnode tag already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, 0, 0x036A, "mnode tag not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_ALREAY_EXIST, 0, 0x036B, "mnode field already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_NOT_EXIST, 0, 0x036C, "mnode field not exist")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, 0, 0x0380, "mnode db not selected")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, 0, 0x0381, "mnode database aleady exist")
|
||||
|
|
|
@ -727,10 +727,6 @@ void read_history() {
|
|||
char f_history[TSDB_FILENAME_LEN];
|
||||
get_history_path(f_history);
|
||||
|
||||
if (access(f_history, R_OK) == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
FILE *f = fopen(f_history, "r");
|
||||
if (f == NULL) {
|
||||
fprintf(stderr, "Opening file %s\n", f_history);
|
||||
|
@ -809,14 +805,6 @@ void source_file(TAOS *con, char *fptr) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (access(fname, R_OK) != 0) {
|
||||
fprintf(stderr, "ERROR: file %s is not readable\n", fptr);
|
||||
|
||||
wordfree(&full_path);
|
||||
free(cmd);
|
||||
return;
|
||||
}
|
||||
|
||||
FILE *f = fopen(fname, "r");
|
||||
if (f == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
|
||||
|
|
|
@ -148,7 +148,11 @@ static void shellSourceFile(TAOS *con, char *fptr) {
|
|||
}
|
||||
|
||||
char *fname = full_path.we_wordv[0];
|
||||
|
||||
if (fname == NULL) {
|
||||
fprintf(stderr, "ERROR: invalid filename\n");
|
||||
return;
|
||||
}
|
||||
|
||||
if (access(fname, F_OK) != 0) {
|
||||
fprintf(stderr, "ERROR: file %s is not exist\n", fptr);
|
||||
|
||||
|
@ -169,6 +173,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
|
|||
if (f == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
|
||||
wordfree(&full_path);
|
||||
free(cmd);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ extern char configDir[];
|
|||
#define MAX_DATA_SIZE 1024
|
||||
#define MAX_NUM_DATATYPE 8
|
||||
#define OPT_ABORT 1 /* –abort */
|
||||
#define STRING_LEN 512
|
||||
|
||||
/* The options we understand. */
|
||||
static struct argp_option options[] = {
|
||||
|
@ -380,10 +381,11 @@ int main(int argc, char *argv[]) {
|
|||
bool insert_only = arguments.insert_only;
|
||||
char **data_type = arguments.datatype;
|
||||
int count_data_type = 0;
|
||||
char dataString[512];
|
||||
char dataString[STRING_LEN];
|
||||
bool do_aggreFunc = true;
|
||||
|
||||
memset(dataString, 0, 512);
|
||||
memset(dataString, 0, STRING_LEN);
|
||||
int len = 0;
|
||||
|
||||
if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0) {
|
||||
do_aggreFunc = false;
|
||||
|
@ -392,8 +394,8 @@ int main(int argc, char *argv[]) {
|
|||
if (strcasecmp(data_type[count_data_type], "") == 0) {
|
||||
break;
|
||||
}
|
||||
strcat(dataString, data_type[count_data_type]);
|
||||
strcat(dataString, " ");
|
||||
|
||||
len += snprintf(dataString + len, STRING_LEN - len, "%s ", data_type[count_data_type]);
|
||||
}
|
||||
|
||||
FILE *fp = fopen(arguments.output_file, "a");
|
||||
|
@ -473,32 +475,29 @@ int main(int argc, char *argv[]) {
|
|||
sprintf(command, "create database %s;", db_name);
|
||||
taos_query(taos, command);
|
||||
|
||||
char cols[512] = "\0";
|
||||
char cols[STRING_LEN] = "\0";
|
||||
int colIndex = 0;
|
||||
len = 0;
|
||||
|
||||
for (; colIndex < ncols_per_record - 1; colIndex++) {
|
||||
if (strcasecmp(data_type[colIndex % count_data_type], "BINARY") != 0) {
|
||||
sprintf(command, ",f%d %s", colIndex + 1, data_type[colIndex % count_data_type]);
|
||||
strcat(cols, command);
|
||||
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s", colIndex + 1, data_type[colIndex % count_data_type]);
|
||||
} else {
|
||||
sprintf(command, ",f%d %s(%d)", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
|
||||
strcat(cols, command);
|
||||
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d)", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
|
||||
}
|
||||
}
|
||||
|
||||
if (strcasecmp(data_type[colIndex % count_data_type], "BINARY") != 0) {
|
||||
sprintf(command, ",f%d %s)", colIndex + 1, data_type[colIndex % count_data_type]);
|
||||
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s)", colIndex + 1, data_type[colIndex % count_data_type]);
|
||||
} else {
|
||||
sprintf(command, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
|
||||
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
|
||||
}
|
||||
|
||||
strcat(cols, command);
|
||||
|
||||
if (!use_metric) {
|
||||
/* Create all the tables; */
|
||||
printf("Creating %d table(s)......\n", ntables);
|
||||
for (int i = 0; i < ntables; i++) {
|
||||
sprintf(command, "create table %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols);
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols);
|
||||
queryDB(taos, command);
|
||||
}
|
||||
|
||||
|
@ -508,7 +507,7 @@ int main(int argc, char *argv[]) {
|
|||
} else {
|
||||
/* Create metric table */
|
||||
printf("Creating meters super table...\n");
|
||||
sprintf(command, "create table %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
|
||||
queryDB(taos, command);
|
||||
printf("meters created!\n");
|
||||
|
||||
|
@ -522,10 +521,10 @@ int main(int argc, char *argv[]) {
|
|||
j = i % 10;
|
||||
}
|
||||
if (j % 2 == 0) {
|
||||
sprintf(command, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j,"shanghai");
|
||||
} else {
|
||||
sprintf(command, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j,"beijing");
|
||||
}
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai");
|
||||
} else {
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing");
|
||||
}
|
||||
queryDB(taos, command);
|
||||
}
|
||||
|
||||
|
|
|
@ -117,8 +117,8 @@ typedef struct {
|
|||
} SDbInfo;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_NAME_LEN + 1];
|
||||
char metric[TSDB_TABLE_NAME_LEN + 1];
|
||||
char name[TSDB_TABLE_NAME_LEN];
|
||||
char metric[TSDB_TABLE_NAME_LEN];
|
||||
} STableRecord;
|
||||
|
||||
typedef struct {
|
||||
|
@ -646,10 +646,9 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) {
|
|||
taosDumpTable(tableRecord.name, tableRecord.metric, arguments, fp);
|
||||
}
|
||||
|
||||
tclose(fd);
|
||||
remove(".table.tmp");
|
||||
close(fd);
|
||||
|
||||
return 0;
|
||||
return remove(".table.tmp");
|
||||
}
|
||||
|
||||
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, SDumpArguments *arguments, FILE *fp) {
|
||||
|
@ -871,7 +870,7 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
|
|||
int fd = -1;
|
||||
STableRecord tableRecord;
|
||||
|
||||
strcpy(tableRecord.metric, metric);
|
||||
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
|
||||
|
||||
sprintf(command, "select tbname from %s", metric);
|
||||
result = taos_query(taos, command);
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
|
||||
void * tsAcctSdb = NULL;
|
||||
static int32_t tsAcctUpdateSize;
|
||||
static void mnodeCreateRootAcct();
|
||||
static int32_t mnodeCreateRootAcct();
|
||||
|
||||
static int32_t mnodeAcctActionDestroy(SSdbOper *pOper) {
|
||||
SAcctObj *pAcct = pOper->pObj;
|
||||
|
@ -79,7 +79,11 @@ static int32_t mnodeAcctActionDecode(SSdbOper *pOper) {
|
|||
|
||||
static int32_t mnodeAcctActionRestored() {
|
||||
if (dnodeIsFirstDeploy()) {
|
||||
mnodeCreateRootAcct();
|
||||
int32_t code = mnodeCreateRootAcct();
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("failed to create root account, reason:%s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
acctInit();
|
||||
|
@ -161,9 +165,9 @@ void mnodeDropUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
|
|||
mnodeDecAcctRef(pAcct);
|
||||
}
|
||||
|
||||
static void mnodeCreateRootAcct() {
|
||||
static int32_t mnodeCreateRootAcct() {
|
||||
int32_t numOfAccts = sdbGetNumOfRows(tsAcctSdb);
|
||||
if (numOfAccts != 0) return;
|
||||
if (numOfAccts != 0) return TSDB_CODE_SUCCESS;
|
||||
|
||||
SAcctObj *pAcct = malloc(sizeof(SAcctObj));
|
||||
memset(pAcct, 0, sizeof(SAcctObj));
|
||||
|
@ -190,7 +194,8 @@ static void mnodeCreateRootAcct() {
|
|||
.table = tsAcctSdb,
|
||||
.pObj = pAcct,
|
||||
};
|
||||
sdbInsertRow(&oper);
|
||||
|
||||
return sdbInsertRow(&oper);
|
||||
}
|
||||
|
||||
#ifndef _ACCT
|
||||
|
|
|
@ -88,9 +88,9 @@ int32_t mnodeStartSystem() {
|
|||
}
|
||||
|
||||
mPrint("starting to initialize mnode ...");
|
||||
struct stat dirstat;
|
||||
if (stat(tsMnodeDir, &dirstat) < 0) {
|
||||
mkdir(tsMnodeDir, 0755);
|
||||
if (mkdir(tsMnodeDir, 0755) != 0 && errno != EEXIST) {
|
||||
mError("failed to init mnode dir:%s, reason:%s", tsMnodeDir, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
dnodeAllocateMnodeWqueue();
|
||||
|
|
|
@ -316,7 +316,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
|
||||
strcpy(pConnectRsp->serverVersion, version);
|
||||
memcpy(pConnectRsp->serverVersion, version, TSDB_VERSION_LEN);
|
||||
pConnectRsp->writeAuth = pUser->writeAuth;
|
||||
pConnectRsp->superAuth = pUser->superAuth;
|
||||
|
||||
|
|
|
@ -714,7 +714,7 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("table:%s, failed to drop table, table not exist", pDrop->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -742,7 +742,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
if (pMsg->pTable == NULL) {
|
||||
if (!pInfo->createFlag) {
|
||||
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
} else {
|
||||
mTrace("table:%s, failed to get table meta, start auto create table ", pInfo->tableId);
|
||||
return mnodeAutoCreateChildTable(pMsg);
|
||||
|
@ -779,7 +779,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
if (pStable->schema == NULL) {
|
||||
free(pStable);
|
||||
mError("table:%s, failed to create, no schema input", pCreate->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
|
||||
|
||||
|
@ -1340,7 +1340,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
if (pRsp->numOfTables != numOfTable) {
|
||||
rpcFreeCont(pRsp);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
} else {
|
||||
pRsp->numOfTables = htonl(pRsp->numOfTables);
|
||||
pMsg->rpcRsp.rsp = pRsp;
|
||||
|
@ -1452,7 +1452,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb
|
|||
if (pSuperTable == NULL) {
|
||||
mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name);
|
||||
mnodeDestroyChildTable(pTable);
|
||||
terrno = TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
terrno = TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
return NULL;
|
||||
}
|
||||
mnodeDecTableRef(pSuperTable);
|
||||
|
@ -2212,7 +2212,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
|
|||
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pAlter->tableId);
|
||||
if (pMsg->pTable == NULL) {
|
||||
mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
||||
pAlter->type = htons(pAlter->type);
|
||||
|
|
|
@ -383,11 +383,11 @@ int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
|||
pTable = mnodeGetTable(pShow->payload);
|
||||
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
|
||||
mnodeDecTableRef(pTable);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
mnodeDecTableRef(pTable);
|
||||
pVgroup = mnodeGetVgroup(((SChildTableObj*)pTable)->vgId);
|
||||
if (NULL == pVgroup) return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
if (NULL == pVgroup) return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
|
||||
} else {
|
||||
|
|
|
@ -229,7 +229,7 @@ static void taosGetSystemLocale() { // get and set default locale
|
|||
uError("can't get locale from system, set it to en_US.UTF-8");
|
||||
strcpy(tsLocale, "en_US.UTF-8");
|
||||
} else {
|
||||
strncpy(tsLocale, locale, tListLen(tsLocale));
|
||||
tstrncpy(tsLocale, locale, tListLen(tsLocale));
|
||||
uError("locale not configured, set to system default:%s", tsLocale);
|
||||
}
|
||||
}
|
||||
|
@ -331,66 +331,7 @@ bool taosGetDisk() {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool taosGetCardName(char *ip, char *name) {
|
||||
struct ifaddrs *ifaddr, *ifa;
|
||||
int family, s;
|
||||
char host[NI_MAXHOST];
|
||||
bool ret = false;
|
||||
|
||||
if (getifaddrs(&ifaddr) == -1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Walk through linked list, maintaining head pointer so we can free list
|
||||
* later */
|
||||
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
|
||||
if (ifa->ifa_addr == NULL) continue;
|
||||
|
||||
family = ifa->ifa_addr->sa_family;
|
||||
if (family != AF_INET) {
|
||||
continue;
|
||||
}
|
||||
|
||||
s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), host,
|
||||
NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
|
||||
if (s != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (strcmp(host, "127.0.0.1") == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: the ip not config
|
||||
// if (strcmp(host, ip) == 0) {
|
||||
strcpy(name, ifa->ifa_name);
|
||||
ret = true;
|
||||
// }
|
||||
}
|
||||
|
||||
freeifaddrs(ifaddr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static bool taosGetCardInfo(int64_t *bytes) {
|
||||
static char tsPublicCard[1000] = {0};
|
||||
static char tsPrivateIp[40];
|
||||
|
||||
if (tsPublicCard[0] == 0) {
|
||||
if (!taosGetCardName(tsPrivateIp, tsPublicCard)) {
|
||||
uError("can't get card name from ip:%s", tsPrivateIp);
|
||||
return false;
|
||||
}
|
||||
int cardNameLen = (int)strlen(tsPublicCard);
|
||||
for (int i = 0; i < cardNameLen; ++i) {
|
||||
if (tsPublicCard[i] == ':') {
|
||||
tsPublicCard[i] = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// uTrace("card name of public ip:%s is %s", tsPublicIp, tsPublicCard);
|
||||
}
|
||||
|
||||
FILE *fp = fopen(tsSysNetFile, "r");
|
||||
if (fp == NULL) {
|
||||
uError("open file:%s failed", tsSysNetFile);
|
||||
|
@ -403,6 +344,7 @@ static bool taosGetCardInfo(int64_t *bytes) {
|
|||
|
||||
size_t len;
|
||||
char * line = NULL;
|
||||
*bytes = 0;
|
||||
|
||||
while (!feof(fp)) {
|
||||
tfree(line);
|
||||
|
@ -411,23 +353,20 @@ static bool taosGetCardInfo(int64_t *bytes) {
|
|||
if (line == NULL) {
|
||||
break;
|
||||
}
|
||||
if (strstr(line, tsPublicCard) != NULL) {
|
||||
break;
|
||||
if (strstr(line, "lo:") != NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
sscanf(line,
|
||||
"%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
|
||||
nouse0, &rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &tbytes, &tpackets);
|
||||
*bytes += (rbytes + tbytes);
|
||||
}
|
||||
if (line != NULL) {
|
||||
sscanf(line, "%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, nouse0, &rbytes, &rpackts, &nouse1, &nouse2, &nouse3,
|
||||
&nouse4, &nouse5, &nouse6, &tbytes, &tpackets);
|
||||
*bytes = rbytes + tbytes;
|
||||
tfree(line);
|
||||
fclose(fp);
|
||||
return true;
|
||||
} else {
|
||||
uWarn("can't get card:%s info from device:%s", tsPublicCard, tsSysNetFile);
|
||||
*bytes = 0;
|
||||
fclose(fp);
|
||||
return false;
|
||||
}
|
||||
|
||||
tfree(line);
|
||||
fclose(fp);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool taosGetBandSpeed(float *bandSpeedKb) {
|
||||
|
@ -443,13 +382,15 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
|
|||
if (lastTime == 0 || lastBytes == 0) {
|
||||
lastTime = curTime;
|
||||
lastBytes = curBytes;
|
||||
return false;
|
||||
*bandSpeedKb = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (lastTime >= curTime || lastBytes > curBytes) {
|
||||
lastTime = curTime;
|
||||
lastBytes = curBytes;
|
||||
return false;
|
||||
*bandSpeedKb = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
double totalBytes = (double)(curBytes - lastBytes) / 1024 * 8; // Kb
|
||||
|
|
|
@ -22,9 +22,27 @@
|
|||
#include "taosdef.h"
|
||||
|
||||
static HttpDecodeMethod gcDecodeMethod = {"grafana", gcProcessRequest};
|
||||
static HttpEncodeMethod gcHeartBeatMethod = {NULL, gcSendHeartBeatResp, NULL, NULL, NULL, NULL, NULL, NULL};
|
||||
static HttpEncodeMethod gcHeartBeatMethod = {
|
||||
.startJsonFp = NULL,
|
||||
.stopJsonFp = gcSendHeartBeatResp,
|
||||
.buildQueryJsonFp = NULL,
|
||||
.buildAffectRowJsonFp = NULL,
|
||||
.initJsonFp = NULL,
|
||||
.cleanJsonFp = NULL,
|
||||
.checkFinishedFp = NULL,
|
||||
.setNextCmdFp = NULL
|
||||
};
|
||||
|
||||
static HttpEncodeMethod gcQueryMethod = {
|
||||
NULL, gcStopQueryJson, gcBuildQueryJson, NULL, gcInitQueryJson, gcCleanQueryJson, NULL, NULL};
|
||||
.startJsonFp = NULL,
|
||||
.stopJsonFp = gcStopQueryJson,
|
||||
.buildQueryJsonFp = gcBuildQueryJson,
|
||||
.buildAffectRowJsonFp = NULL,
|
||||
.initJsonFp = gcInitQueryJson,
|
||||
.cleanJsonFp = gcCleanQueryJson,
|
||||
.checkFinishedFp = NULL,
|
||||
.setNextCmdFp = NULL
|
||||
};
|
||||
|
||||
void gcInitHandle(HttpServer* pServer) { httpAddMethod(pServer, &gcDecodeMethod); }
|
||||
|
||||
|
|
|
@ -445,7 +445,7 @@ void httpJsonPairStatus(JsonBuf* buf, int code) {
|
|||
httpJsonItemToken(buf);
|
||||
if (code == TSDB_CODE_MND_DB_NOT_SELECTED) {
|
||||
httpJsonPair(buf, "desc", 4, "failed to create database", 23);
|
||||
} else if (code == TSDB_CODE_MND_INVALID_TABLE_ID) {
|
||||
} else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
|
||||
httpJsonPair(buf, "desc", 4, "failed to create table", 22);
|
||||
} else
|
||||
httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int)strlen(tstrerror(code)));
|
||||
|
|
|
@ -87,7 +87,7 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
|
|||
}
|
||||
|
||||
if (code < 0) {
|
||||
if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code >= 0 ? 0 : code)) {
|
||||
if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, -code)) {
|
||||
singleCmd->code = code;
|
||||
httpTrace("context:%p, fd:%d, ip:%s, user:%s, process pos jump to:%d, last code:%s, last sql:%s",
|
||||
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos + 1, tstrerror(code), sql);
|
||||
|
|
|
@ -32,12 +32,12 @@ bool httpCheckUsedbSql(char *sql) {
|
|||
|
||||
void httpTimeToString(time_t t, char *buf, int buflen) {
|
||||
memset(buf, 0, (size_t)buflen);
|
||||
char ts[30] = {0};
|
||||
char ts[32] = {0};
|
||||
|
||||
struct tm *ptm;
|
||||
time_t tt = t / 1000;
|
||||
ptm = localtime(&tt);
|
||||
strftime(ts, 64, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
strftime(ts, 31, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
sprintf(buf, "%s.%03ld", ts, t % 1000);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,11 +22,37 @@
|
|||
static HttpDecodeMethod restDecodeMethod = {"rest", restProcessRequest};
|
||||
static HttpDecodeMethod restDecodeMethod2 = {"restful", restProcessRequest};
|
||||
static HttpEncodeMethod restEncodeSqlTimestampMethod = {
|
||||
restStartSqlJson, restStopSqlJson, restBuildSqlTimestampJson, restBuildSqlAffectRowsJson, NULL, NULL, NULL, NULL};
|
||||
.startJsonFp = restStartSqlJson,
|
||||
.stopJsonFp = restStopSqlJson,
|
||||
.buildQueryJsonFp = restBuildSqlTimestampJson,
|
||||
.buildAffectRowJsonFp = restBuildSqlAffectRowsJson,
|
||||
.initJsonFp = NULL,
|
||||
.cleanJsonFp = NULL,
|
||||
.checkFinishedFp = NULL,
|
||||
.setNextCmdFp = NULL
|
||||
};
|
||||
|
||||
static HttpEncodeMethod restEncodeSqlLocalTimeStringMethod = {
|
||||
restStartSqlJson, restStopSqlJson, restBuildSqlLocalTimeStringJson, restBuildSqlAffectRowsJson, NULL, NULL, NULL, NULL};
|
||||
.startJsonFp = restStartSqlJson,
|
||||
.stopJsonFp = restStopSqlJson,
|
||||
.buildQueryJsonFp = restBuildSqlLocalTimeStringJson,
|
||||
.buildAffectRowJsonFp = restBuildSqlAffectRowsJson,
|
||||
.initJsonFp = NULL,
|
||||
.cleanJsonFp = NULL,
|
||||
.checkFinishedFp = NULL,
|
||||
.setNextCmdFp = NULL
|
||||
};
|
||||
|
||||
static HttpEncodeMethod restEncodeSqlUtcTimeStringMethod = {
|
||||
restStartSqlJson, restStopSqlJson, restBuildSqlUtcTimeStringJson, restBuildSqlAffectRowsJson, NULL, NULL, NULL, NULL};
|
||||
.startJsonFp = restStartSqlJson,
|
||||
.stopJsonFp = restStopSqlJson,
|
||||
.buildQueryJsonFp = restBuildSqlUtcTimeStringJson,
|
||||
.buildAffectRowJsonFp = restBuildSqlAffectRowsJson,
|
||||
.initJsonFp = NULL,
|
||||
.cleanJsonFp = NULL,
|
||||
.checkFinishedFp = NULL,
|
||||
.setNextCmdFp = NULL
|
||||
};
|
||||
|
||||
void restInitHandle(HttpServer* pServer) {
|
||||
httpAddMethod(pServer, &restDecodeMethod);
|
||||
|
|
|
@ -62,9 +62,16 @@
|
|||
#define TG_MAX_SORT_TAG_SIZE 20
|
||||
|
||||
static HttpDecodeMethod tgDecodeMethod = {"telegraf", tgProcessRquest};
|
||||
static HttpEncodeMethod tgQueryMethod = {tgStartQueryJson, tgStopQueryJson, NULL,
|
||||
tgBuildSqlAffectRowsJson, tgInitQueryJson, tgCleanQueryJson,
|
||||
tgCheckFinished, tgSetNextCmd};
|
||||
static HttpEncodeMethod tgQueryMethod = {
|
||||
.startJsonFp = tgStartQueryJson,
|
||||
.stopJsonFp = tgStopQueryJson,
|
||||
.buildQueryJsonFp = NULL,
|
||||
.buildAffectRowJsonFp = tgBuildSqlAffectRowsJson,
|
||||
.initJsonFp = tgInitQueryJson,
|
||||
.cleanJsonFp = tgCleanQueryJson,
|
||||
.checkFinishedFp = tgCheckFinished,
|
||||
.setNextCmdFp = tgSetNextCmd
|
||||
};
|
||||
|
||||
static const char DEFAULT_TELEGRAF_CFG[] =
|
||||
"{\"metrics\":["
|
||||
|
@ -303,7 +310,7 @@ bool tgGetUserFromUrl(HttpContext *pContext) {
|
|||
return false;
|
||||
}
|
||||
|
||||
strcpy(pContext->user, pParser->path[TG_USER_URL_POS].pos);
|
||||
tstrncpy(pContext->user, pParser->path[TG_USER_URL_POS].pos, TSDB_USER_LEN);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -313,7 +320,7 @@ bool tgGetPassFromUrl(HttpContext *pContext) {
|
|||
return false;
|
||||
}
|
||||
|
||||
strcpy(pContext->pass, pParser->path[TG_PASS_URL_POS].pos);
|
||||
tstrncpy(pContext->pass, pParser->path[TG_PASS_URL_POS].pos, TSDB_PASSWORD_LEN);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -111,7 +111,7 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) {
|
|||
pContext->ipstr);
|
||||
return false;
|
||||
}
|
||||
} else if (code == TSDB_CODE_MND_INVALID_TABLE_ID) {
|
||||
} else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
|
||||
cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED;
|
||||
if (multiCmds->cmds[multiCmds->pos - 1].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) {
|
||||
multiCmds->pos = (int16_t)(multiCmds->pos - 2);
|
||||
|
|
|
@ -67,7 +67,7 @@ static void *taosProcessTcpData(void *param);
|
|||
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
|
||||
static void taosFreeFdObj(SFdObj *pFdObj);
|
||||
static void taosReportBrokenLink(SFdObj *pFdObj);
|
||||
static void* taosAcceptTcpConnection(void *arg);
|
||||
static void *taosAcceptTcpConnection(void *arg);
|
||||
|
||||
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
||||
SServerObj *pServerObj;
|
||||
|
@ -80,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pServerObj->fd = -1;
|
||||
pServerObj->thread = 0;
|
||||
pServerObj->ip = ip;
|
||||
pServerObj->port = port;
|
||||
|
@ -99,6 +100,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
// initialize parameters in case it may encounter error later
|
||||
pThreadObj = pServerObj->pThreadObj;
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
pThreadObj->pollFd = -1;
|
||||
|
@ -106,18 +108,21 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
pThreadObj->processData = fp;
|
||||
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||
pThreadObj->shandle = shandle;
|
||||
pThreadObj++;
|
||||
}
|
||||
|
||||
// initialize mutex, thread, fd which may fail
|
||||
pThreadObj = pServerObj->pThreadObj;
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||
if (code < 0) {
|
||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
break;;
|
||||
}
|
||||
|
||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||
if (pThreadObj->pollFd < 0) {
|
||||
tError("%s failed to create TCP epoll", label);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
|
@ -125,7 +130,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||
if (code != 0) {
|
||||
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -133,15 +137,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
pThreadObj++;
|
||||
}
|
||||
|
||||
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||
if (pServerObj->fd < 0) code = -1;
|
||||
|
||||
if (code == 0) {
|
||||
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
|
||||
code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosCleanUpTcpServer(pServerObj);
|
||||
pServerObj = NULL;
|
||||
} else {
|
||||
|
@ -204,7 +211,7 @@ void taosCleanUpTcpServer(void *handle) {
|
|||
tfree(pServerObj);
|
||||
}
|
||||
|
||||
static void* taosAcceptTcpConnection(void *arg) {
|
||||
static void *taosAcceptTcpConnection(void *arg) {
|
||||
int connFd = -1;
|
||||
struct sockaddr_in caddr;
|
||||
int threadId = 0;
|
||||
|
@ -212,10 +219,6 @@ static void* taosAcceptTcpConnection(void *arg) {
|
|||
SServerObj *pServerObj;
|
||||
|
||||
pServerObj = (SServerObj *)arg;
|
||||
|
||||
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||
if (pServerObj->fd < 0) return NULL;
|
||||
|
||||
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
||||
|
||||
while (1) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "taosdef.h"
|
||||
#include "taoserror.h"
|
||||
#include "rpcLog.h"
|
||||
#include "rpcUdp.h"
|
||||
#include "rpcHead.h"
|
||||
|
@ -65,6 +66,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
|||
pSet = (SUdpConnSet *)malloc((size_t)size);
|
||||
if (pSet == NULL) {
|
||||
tError("%s failed to allocate UdpConn", label);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -73,30 +75,34 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
|||
pSet->port = port;
|
||||
pSet->shandle = shandle;
|
||||
pSet->fp = fp;
|
||||
pSet->threads = threads;
|
||||
tstrncpy(pSet->label, label, sizeof(pSet->label));
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
int i;
|
||||
uint16_t ownPort;
|
||||
for (int i = 0; i < threads; ++i) {
|
||||
for (i = 0; i < threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
ownPort = (port ? port + i : 0);
|
||||
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
||||
if (pConn->fd < 0) {
|
||||
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
|
||||
if (NULL == pConn->buffer) {
|
||||
tError("%s failed to malloc recv buffer", label);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
struct sockaddr_in sin;
|
||||
unsigned int addrlen = sizeof(sin);
|
||||
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
|
||||
addrlen == sizeof(sin)) {
|
||||
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 &&
|
||||
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
|
||||
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
||||
}
|
||||
|
||||
|
@ -107,23 +113,22 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
|||
pConn->pSet = pSet;
|
||||
pConn->signature = pConn;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
||||
pthread_attr_destroy(&thAttr);
|
||||
if (code != 0) {
|
||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
||||
taosCloseSocket(pConn->fd);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
++pSet->threads;
|
||||
}
|
||||
|
||||
tTrace("%s UDP connection is initialized, ip:%x port:%hu threads:%d", label, ip, port, threads);
|
||||
pthread_attr_destroy(&thAttr);
|
||||
|
||||
if (i != threads) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tTrace("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
|
||||
return pSet;
|
||||
}
|
||||
|
||||
|
@ -136,16 +141,17 @@ void taosCleanUpUdpConnection(void *handle) {
|
|||
for (int i = 0; i < pSet->threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
pConn->signature = NULL;
|
||||
|
||||
// shutdown to signal the thread to exit
|
||||
shutdown(pConn->fd, SHUT_RD);
|
||||
if ( pConn->fd >=0) shutdown(pConn->fd, SHUT_RD);
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSet->threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
pthread_join(pConn->thread, NULL);
|
||||
free(pConn->buffer);
|
||||
taosCloseSocket(pConn->fd);
|
||||
tTrace("chandle:%p is closed", pConn);
|
||||
if (pConn->thread) pthread_join(pConn->thread, NULL);
|
||||
if (pConn->fd >=0) taosCloseSocket(pConn->fd);
|
||||
tfree(pConn->buffer);
|
||||
tTrace("UDP chandle:%p is closed", pConn);
|
||||
}
|
||||
|
||||
tfree(pSet);
|
||||
|
@ -159,7 +165,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
|
|||
SUdpConn *pConn = pSet->udpConn + pSet->index;
|
||||
pConn->port = port;
|
||||
|
||||
tTrace("%s UDP connection is setup, ip:%x:%hu, local:%x:%d", pConn->label, ip, port, pSet->ip, pConn->localPort);
|
||||
tTrace("%s UDP connection is setup, ip:%x:%hu", pConn->label, ip, port);
|
||||
|
||||
return pConn;
|
||||
}
|
||||
|
|
|
@ -74,14 +74,16 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
|||
STable *tsdbDecodeTable(void *cont, int contLen) {
|
||||
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
||||
if (pTable == NULL) return NULL;
|
||||
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
||||
if (pTable->schema == NULL) {
|
||||
free(pTable);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *ptr = cont;
|
||||
T_READ_MEMBER(ptr, int8_t, pTable->type);
|
||||
if (pTable->type != TSDB_CHILD_TABLE) {
|
||||
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
||||
if (pTable->schema == NULL) {
|
||||
free(pTable);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
int len = *(int *)ptr;
|
||||
ptr = (char *)ptr + sizeof(int);
|
||||
pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1);
|
||||
|
@ -620,7 +622,10 @@ static int tsdbFreeTable(STable *pTable) {
|
|||
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
kvRowFree(pTable->tagVal);
|
||||
} else {
|
||||
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
|
||||
if (pTable->schema) {
|
||||
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
|
||||
free(pTable->schema);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||
|
|
|
@ -764,8 +764,8 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
|||
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
|
||||
bool isLast, bool isSuperBlock) {
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows &&
|
||||
rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
|
||||
ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true);
|
||||
|
||||
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
|
||||
int64_t offset = 0;
|
||||
|
@ -905,14 +905,15 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
|
||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
|
||||
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
||||
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
|
||||
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) &&
|
||||
(pHelper->files.nLastF.fd) < 0) {
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
|
||||
goto _err;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else {
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows == blockAtIdx(pHelper, blkIdx)->numOfRows);
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
|
||||
// Merge
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
|
||||
// Write
|
||||
|
@ -936,21 +937,21 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// Key must overlap with the block
|
||||
ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
|
||||
|
||||
TSKEY keyLimit =
|
||||
(blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
|
||||
TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
|
||||
|
||||
// rows1: number of rows must merge in this block
|
||||
int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
|
||||
// rows2: max nuber of rows the block can have more
|
||||
// rows2: max number of rows the block can have more
|
||||
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
|
||||
// rows3: number of rows between this block and the next block
|
||||
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
|
||||
|
||||
ASSERT(rows3 >= rows1);
|
||||
|
||||
if ((rows2 >= rows1) &&
|
||||
(( blockAtIdx(pHelper, blkIdx)->last) ||
|
||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
|
||||
if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
||||
((!blockAtIdx(pHelper, blkIdx)->last) ||
|
||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) &&
|
||||
(pHelper->files.nLastF.fd < 0)))) {
|
||||
rowsWritten = rows1;
|
||||
bool isLast = false;
|
||||
SFile *pFile = NULL;
|
||||
|
@ -964,7 +965,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
|
||||
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else { // Load-Merge-Write
|
||||
} else { // Load-Merge-Write
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
|
||||
|
@ -1106,16 +1107,16 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
|
|||
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
|
||||
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
|
||||
if (pTCompBlock->numOfSubBlocks > 1) {
|
||||
ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len);
|
||||
ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (ptr == NULL) ptr = (void *)((char *)(pHelper->pCompInfo) + pIdx->len - sizeof(TSCKSUM));
|
||||
if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len-sizeof(TSCKSUM));
|
||||
|
||||
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
|
||||
if (tsize > 0) {
|
||||
memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize);
|
||||
memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize);
|
||||
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
|
||||
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
|
||||
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#ifndef _TD_KVSTORE_H_
|
||||
#define _TD_KVSTORE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
typedef int (*iterFunc)(void *, void *cont, int contLen);
|
||||
typedef void (*afterFunc)(void *);
|
||||
|
||||
typedef struct {
|
||||
int64_t size;
|
||||
int64_t tombSize;
|
||||
int64_t nRecords;
|
||||
int64_t nDels;
|
||||
} SStoreInfo;
|
||||
|
||||
typedef struct {
|
||||
char * fname;
|
||||
int fd;
|
||||
char * fsnap;
|
||||
int sfd;
|
||||
char * fnew;
|
||||
int nfd;
|
||||
SHashObj * map;
|
||||
iterFunc iFunc;
|
||||
afterFunc aFunc;
|
||||
void * appH;
|
||||
SStoreInfo info;
|
||||
} SKVStore;
|
||||
|
||||
int tdCreateKVStore(char *fname);
|
||||
int tdDestroyKVStore();
|
||||
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
||||
void tdCloseKVStore(SKVStore *pStore);
|
||||
int tdKVStoreStartCommit(SKVStore *pStore);
|
||||
int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
|
||||
int tdKVStoreEndCommit(SKVStore *pStore);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -18,6 +18,7 @@ void generate_key(unsigned char* key);
|
|||
void generate_sub_keys(unsigned char* main_key, key_set* key_sets);
|
||||
void process_message(unsigned char* message_piece, unsigned char* processed_piece, key_set* key_sets, int mode);
|
||||
|
||||
#if 0
|
||||
int64_t taosDesGenKey() {
|
||||
unsigned int iseed = (unsigned int)time(NULL);
|
||||
srand(iseed);
|
||||
|
@ -27,6 +28,7 @@ int64_t taosDesGenKey() {
|
|||
|
||||
return *((int64_t*)key);
|
||||
}
|
||||
#endif
|
||||
|
||||
char* taosDesImp(unsigned char* key, char* src, unsigned int len, int process_mode) {
|
||||
unsigned int number_of_blocks = len / 8;
|
||||
|
|
|
@ -0,0 +1,292 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <libgen.h>
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "hash.h"
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tcoding.h"
|
||||
#include "tkvstore.h"
|
||||
#include "tulog.h"
|
||||
|
||||
#define TD_KVSTORE_HEADER_SIZE 512
|
||||
#define TD_KVSTORE_MAJOR_VERSION 1
|
||||
#define TD_KVSTORE_MAINOR_VERSION 0
|
||||
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
|
||||
#define TD_KVSTORE_NEW_SUFFIX ".new"
|
||||
|
||||
static int tdInitKVStoreHeader(int fd, char *fname);
|
||||
static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
||||
// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
||||
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
||||
static char * tdGetKVStoreSnapshotFname(char *fdata);
|
||||
static char * tdGetKVStoreNewFname(char *fdata);
|
||||
static void tdFreeKVStore(SKVStore *pStore);
|
||||
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
|
||||
|
||||
int tdCreateKVStore(char *fname) {
|
||||
char *tname = strdup(fname);
|
||||
if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
|
||||
int fd = open(fname, O_RDWR | O_CREAT, 0755);
|
||||
if (fd < 0) {
|
||||
uError("failed to open file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
int code = tdInitKVStoreHeader(fd, fname);
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
|
||||
if (fsync(fd) < 0) {
|
||||
uError("failed to fsync file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
if (close(fd) < 0) {
|
||||
uError("failed to close file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
||||
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
|
||||
if (pStore == NULL) return NULL;
|
||||
|
||||
pStore->fd = open(pStore->fname, O_RDWR);
|
||||
if (pStore->fd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (access(pStore->fsnap, F_OK) == 0) {
|
||||
uTrace("file %s exists, try to recover the KV store", pStore->fsnap);
|
||||
pStore->sfd = open(pStore->fsnap, O_RDONLY);
|
||||
if (pStore->sfd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// TODO: rewind the file
|
||||
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
remove(pStore->fsnap);
|
||||
}
|
||||
|
||||
// TODO: Recover from the file
|
||||
|
||||
return pStore;
|
||||
|
||||
_err:
|
||||
if (pStore->fd > 0) {
|
||||
close(pStore->fd);
|
||||
pStore->fd = -1;
|
||||
}
|
||||
if (pStore->sfd > 0) {
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
}
|
||||
tdFreeKVStore(pStore);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int tdKVStoreStartCommit(SKVStore *pStore) {
|
||||
pStore->fd = open(pStore->fname, O_RDWR);
|
||||
if (pStore->fd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pStore->sfd = open(pStore->fsnap, O_WRONLY | O_CREAT, 0755);
|
||||
if (pStore->sfd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tsendfile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
|
||||
uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (fsync(pStore->sfd) < 0) {
|
||||
uError("failed to fsync file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (close(pStore->sfd) < 0) {
|
||||
uError("failed to close file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
pStore->sfd = -1;
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
if (pStore->sfd > 0) {
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
remove(pStore->fsnap);
|
||||
}
|
||||
if (pStore->fd > 0) {
|
||||
close(pStore->fd);
|
||||
pStore->fd = -1;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int tdKVStoreEndCommit(SKVStore *pStore) {
|
||||
ASSERT(pStore->fd > 0);
|
||||
|
||||
terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info));
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
|
||||
if (fsync(pStore->fd) < 0) {
|
||||
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (close(pStore->fd) < 0) {
|
||||
uError("failed to close file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
remove(pStore->fsnap);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
|
||||
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
|
||||
|
||||
if (lseek(fd, 0, SEEK_SET) < 0) {
|
||||
uError("failed to lseek file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
tdEncodeStoreInfo(buf, pInfo);
|
||||
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
|
||||
if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
|
||||
uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int tdInitKVStoreHeader(int fd, char *fname) {
|
||||
SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0};
|
||||
|
||||
return tdUpdateKVStoreHeader(fd, fname, &info);
|
||||
}
|
||||
|
||||
static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) {
|
||||
buf = taosEncodeVariantI64(buf, pInfo->size);
|
||||
buf = taosEncodeVariantI64(buf, pInfo->tombSize);
|
||||
buf = taosEncodeVariantI64(buf, pInfo->nRecords);
|
||||
buf = taosEncodeVariantI64(buf, pInfo->nDels);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->size));
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
|
||||
|
||||
// return buf;
|
||||
// }
|
||||
|
||||
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
||||
SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore));
|
||||
if (pStore == NULL) goto _err;
|
||||
|
||||
pStore->fname = strdup(fname);
|
||||
if (pStore->map == NULL) goto _err;
|
||||
|
||||
pStore->fsnap = tdGetKVStoreSnapshotFname(fname);
|
||||
if (pStore->fsnap == NULL) goto _err;
|
||||
|
||||
pStore->fnew = tdGetKVStoreNewFname(fname);
|
||||
if (pStore->fnew == NULL) goto _err;
|
||||
|
||||
pStore->fd = -1;
|
||||
pStore->sfd = -1;
|
||||
pStore->nfd = -1;
|
||||
pStore->iFunc = iFunc;
|
||||
pStore->aFunc = aFunc;
|
||||
pStore->appH = appH;
|
||||
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
if (pStore->map == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return pStore;
|
||||
|
||||
_err:
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
tdFreeKVStore(pStore);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tdFreeKVStore(SKVStore *pStore) {
|
||||
if (pStore) {
|
||||
tfree(pStore->fname);
|
||||
tfree(pStore->fsnap);
|
||||
tfree(pStore->fnew);
|
||||
taosHashCleanup(pStore->map);
|
||||
free(pStore);
|
||||
}
|
||||
}
|
||||
|
||||
static char *tdGetKVStoreSnapshotFname(char *fdata) {
|
||||
size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1;
|
||||
char * fname = malloc(size);
|
||||
if (fname == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX);
|
||||
return fname;
|
||||
}
|
||||
|
||||
static char *tdGetKVStoreNewFname(char *fdata) {
|
||||
size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1;
|
||||
char * fname = malloc(size);
|
||||
if (fname == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX);
|
||||
return fname;
|
||||
}
|
|
@ -191,15 +191,14 @@ void taosResetLog() {
|
|||
}
|
||||
|
||||
static bool taosCheckFileIsOpen(char *logFileName) {
|
||||
int32_t exist = access(logFileName, F_OK);
|
||||
if (exist != 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t fd = open(logFileName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
int32_t fd = open(logFileName, O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (fd < 0) {
|
||||
printf("\nfailed to open log file:%s, reason:%s\n", logFileName, strerror(errno));
|
||||
return true;
|
||||
if (errno == ENOENT) {
|
||||
return false;
|
||||
} else {
|
||||
printf("\nfailed to open log file:%s, reason:%s\n", logFileName, strerror(errno));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (taosLockFile(fd)) {
|
||||
|
|
|
@ -87,6 +87,10 @@ void *taosThreadToOpenNewNote(void *param)
|
|||
umask(0);
|
||||
|
||||
int fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (fd < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosLockNote(fd, pNote);
|
||||
lseek(fd, 0, SEEK_SET);
|
||||
|
||||
|
|
|
@ -75,19 +75,29 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
mkdir(tsVnodeDir, 0755);
|
||||
|
||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
if (mkdir(rootDir, 0755) != 0) {
|
||||
vPrint("vgId:%d, failed to create vnode, reason:%s dir:%s", pVnodeCfg->cfg.vgId, strerror(errno), rootDir);
|
||||
if (mkdir(tsVnodeDir, 0755) != 0 && errno != EEXIST) {
|
||||
vError("vgId:%d, failed to create vnode, reason:%s dir:%s", pVnodeCfg->cfg.vgId, strerror(errno), tsVnodeDir);
|
||||
if (errno == EACCES) {
|
||||
return TSDB_CODE_VND_NO_DISK_PERMISSIONS;
|
||||
} else if (errno == ENOSPC) {
|
||||
return TSDB_CODE_VND_NO_DISKSPACE;
|
||||
} else if (errno == ENOENT) {
|
||||
return TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR;
|
||||
} else {
|
||||
return TSDB_CODE_VND_INIT_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
if (mkdir(rootDir, 0755) != 0 && errno != EEXIST) {
|
||||
vError("vgId:%d, failed to create vnode, reason:%s dir:%s", pVnodeCfg->cfg.vgId, strerror(errno), rootDir);
|
||||
if (errno == EACCES) {
|
||||
return TSDB_CODE_VND_NO_DISK_PERMISSIONS;
|
||||
} else if (errno == ENOSPC) {
|
||||
return TSDB_CODE_VND_NO_DISKSPACE;
|
||||
} else if (errno == ENOENT) {
|
||||
return TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR;
|
||||
} else if (errno == EEXIST) {
|
||||
} else {
|
||||
return TSDB_CODE_VND_INIT_FAILED;
|
||||
}
|
||||
|
|
|
@ -730,13 +730,15 @@ class DbState():
|
|||
# when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
|
||||
# by a factor of 500.
|
||||
# TODO: what if it goes beyond 10 years into the future
|
||||
# TODO: fix the error as result of above: "tsdb timestamp is out of range"
|
||||
def setupLastTick(self):
|
||||
t1 = datetime.datetime(2020, 5, 30)
|
||||
t1 = datetime.datetime(2020, 6, 1)
|
||||
t2 = datetime.datetime.now()
|
||||
elSec = t2.timestamp() - t1.timestamp()
|
||||
elSec = int(t2.timestamp() - t1.timestamp()) # maybe a very large number, takes 69 years to exceed Python int range
|
||||
elSec2 = ( elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500 ) ) * 500 # a number representing seconds within 10 years
|
||||
# print("elSec = {}".format(elSec))
|
||||
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
||||
t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec * 500) # see explanation above
|
||||
t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above
|
||||
logger.info("Setting up TICKS to start from: {}".format(t4))
|
||||
return t4
|
||||
|
||||
|
@ -963,7 +965,7 @@ class Task():
|
|||
try:
|
||||
self._executeInternal(te, wt) # TODO: no return value?
|
||||
except taos.error.ProgrammingError as err:
|
||||
self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err))
|
||||
self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err))
|
||||
self._err = err
|
||||
except:
|
||||
self.logDebug("[=] Unexpected exception")
|
||||
|
|
|
@ -205,7 +205,7 @@ class Test (Thread):
|
|||
global written
|
||||
|
||||
dnodesDir = tdDnodes.getDnodesRootDir()
|
||||
dataDir = dnodesDir + '/dnode1/*'
|
||||
dataDir = dnodesDir + '/dnode1/data/*'
|
||||
deleteCmd = 'rm -rf %s' % dataDir
|
||||
os.system(deleteCmd)
|
||||
|
||||
|
|
|
@ -208,7 +208,7 @@ class Test (threading.Thread):
|
|||
global written
|
||||
|
||||
dnodesDir = tdDnodes.getDnodesRootDir()
|
||||
dataDir = dnodesDir + '/dnode1/*'
|
||||
dataDir = dnodesDir + '/dnode1/data/*'
|
||||
deleteCmd = 'rm -rf %s' % dataDir
|
||||
os.system(deleteCmd)
|
||||
|
||||
|
|
|
@ -31,21 +31,26 @@ class TDTestCase:
|
|||
tdSql.prepare()
|
||||
|
||||
tdLog.info("===== step1 =====")
|
||||
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
||||
tdSql.execute(
|
||||
"create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
||||
for i in range(tbNum):
|
||||
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
|
||||
for j in range(rowNum):
|
||||
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
|
||||
tdSql.execute(
|
||||
"insert into tb%d values (now - %dm, %d, %d)" %
|
||||
(i, 1440 - j, j, j))
|
||||
time.sleep(0.1)
|
||||
|
||||
tdLog.info("===== step2 =====")
|
||||
tdSql.query("select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.query(
|
||||
"select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.checkData(0, 1, rowNum)
|
||||
tdSql.checkData(0, 2, rowNum)
|
||||
tdSql.checkData(0, 3, rowNum)
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum)
|
||||
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 1)
|
||||
|
||||
|
@ -67,7 +72,8 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("===== step6 =====")
|
||||
time.sleep(0.1)
|
||||
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 1)
|
||||
|
||||
|
@ -81,14 +87,16 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 3, rowNum)
|
||||
|
||||
tdLog.info("===== step8 =====")
|
||||
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.query(
|
||||
"select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.checkData(0, 1, rowNum * tbNum)
|
||||
tdSql.checkData(0, 2, rowNum * tbNum)
|
||||
tdSql.checkData(0, 3, rowNum * tbNum)
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 1)
|
||||
|
||||
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 2)
|
||||
|
||||
|
@ -110,7 +118,8 @@ class TDTestCase:
|
|||
tdSql.error("select * from s1")
|
||||
|
||||
tdLog.info("===== step12 =====")
|
||||
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 2)
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ class TDTestCase:
|
|||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
|
||||
def run(self):
|
||||
tbNum = 10
|
||||
rowNum = 20
|
||||
|
@ -33,11 +32,14 @@ class TDTestCase:
|
|||
tdSql.prepare()
|
||||
|
||||
tdLog.info("===== step1 =====")
|
||||
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
||||
tdSql.execute(
|
||||
"create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
||||
for i in range(tbNum):
|
||||
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
|
||||
for j in range(rowNum):
|
||||
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
|
||||
tdSql.execute(
|
||||
"insert into tb%d values (now - %dm, %d, %d)" %
|
||||
(i, 1440 - j, j, j))
|
||||
time.sleep(0.1)
|
||||
|
||||
tdLog.info("===== step2 =====")
|
||||
|
@ -45,7 +47,8 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 1, rowNum)
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum)
|
||||
tdSql.execute("create table s0 as select count(col1) from tb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s0 as select count(col1) from tb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 1)
|
||||
|
||||
|
@ -63,7 +66,8 @@ class TDTestCase:
|
|||
tdSql.error("select * from s0")
|
||||
|
||||
tdLog.info("===== step6 =====")
|
||||
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 1)
|
||||
|
||||
|
@ -75,13 +79,15 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 3, rowNum)
|
||||
|
||||
tdLog.info("===== step8 =====")
|
||||
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.query(
|
||||
"select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.checkData(0, 1, totalNum)
|
||||
tdSql.checkData(0, 2, totalNum)
|
||||
tdSql.checkData(0, 3, totalNum)
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 1)
|
||||
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 2)
|
||||
|
||||
|
@ -101,7 +107,8 @@ class TDTestCase:
|
|||
tdSql.error("select * from s1")
|
||||
|
||||
tdLog.info("===== step12 =====")
|
||||
tdSql.execute("create table s1 as select count(col1) from stb0 interval(1d)")
|
||||
tdSql.execute(
|
||||
"create table s1 as select count(col1) from stb0 interval(1d)")
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(tbNum + 2)
|
||||
|
||||
|
@ -112,7 +119,6 @@ class TDTestCase:
|
|||
#tdSql.checkData(0, 2, None)
|
||||
#tdSql.checkData(0, 3, None)
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
|
|
@ -319,6 +319,7 @@ class TDDnodes:
|
|||
self.dnodes.append(TDDnode(8))
|
||||
self.dnodes.append(TDDnode(9))
|
||||
self.dnodes.append(TDDnode(10))
|
||||
self.simDeployed = False
|
||||
|
||||
def init(self, path):
|
||||
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
|
||||
|
@ -378,7 +379,10 @@ class TDDnodes:
|
|||
self.sim = TDSimClient()
|
||||
self.sim.init(self.path)
|
||||
self.sim.setTestCluster(self.testCluster)
|
||||
self.sim.deploy()
|
||||
|
||||
if (self.simDeployed == False):
|
||||
self.sim.deploy()
|
||||
self.simDeployed = True
|
||||
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].setTestCluster(self.testCluster)
|
||||
|
|
|
@ -147,7 +147,7 @@ print =============== step3 - db
|
|||
|
||||
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' select * from d1.t1;' 127.0.0.1:6020/rest/sql
|
||||
print 21-> $system_content
|
||||
if $system_content != @{"status":"error","code":1000,"desc":"mnode invalid table id"}@ then
|
||||
if $system_content != @{"status":"error","code":1000,"desc":"mnode invalid table name"}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue