format more code
This commit is contained in:
parent
98f3f183c6
commit
6ed0634fb7
|
@ -61,17 +61,15 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
|
||||||
|
|
||||||
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
|
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
|
||||||
taosCorBeginWrite(&pEpSet->version);
|
taosCorBeginWrite(&pEpSet->version);
|
||||||
pEpSet->epSet = *pNewEpSet;
|
pEpSet->epSet = *pNewEpSet;
|
||||||
taosCorEndWrite(&pEpSet->version);
|
taosCorEndWrite(&pEpSet->version);
|
||||||
}
|
}
|
||||||
|
|
||||||
SEpSet getEpSet_s(SCorEpSet* pEpSet) {
|
SEpSet getEpSet_s(SCorEpSet* pEpSet) {
|
||||||
SEpSet ep = {0};
|
SEpSet ep = {0};
|
||||||
taosCorBeginRead(&pEpSet->version);
|
taosCorBeginRead(&pEpSet->version);
|
||||||
ep = pEpSet->epSet;
|
ep = pEpSet->epSet;
|
||||||
taosCorEndRead(&pEpSet->version);
|
taosCorEndRead(&pEpSet->version);
|
||||||
|
|
||||||
return ep;
|
return ep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -67,11 +67,11 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui
|
||||||
// ==== mktime() kernel code =================//
|
// ==== mktime() kernel code =================//
|
||||||
static int64_t m_deltaUtc = 0;
|
static int64_t m_deltaUtc = 0;
|
||||||
void deltaToUtcInitOnce() {
|
void deltaToUtcInitOnce() {
|
||||||
struct tm tm = {0};
|
struct tm tm = {0};
|
||||||
|
|
||||||
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
|
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
|
||||||
m_deltaUtc = (int64_t)taosMktime(&tm);
|
m_deltaUtc = (int64_t)taosMktime(&tm);
|
||||||
// printf("====delta:%lld\n\n", seconds);
|
// printf("====delta:%lld\n\n", seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
|
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
|
||||||
|
@ -81,8 +81,8 @@ static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int
|
||||||
static char* forwardToTimeStringEnd(char* str);
|
static char* forwardToTimeStringEnd(char* str);
|
||||||
static bool checkTzPresent(const char* str, int32_t len);
|
static bool checkTzPresent(const char* str, int32_t len);
|
||||||
|
|
||||||
static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {parseLocaltime,
|
static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {
|
||||||
parseLocaltimeDst};
|
parseLocaltime, parseLocaltimeDst};
|
||||||
|
|
||||||
int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) {
|
int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) {
|
||||||
/* parse datatime string in with tz */
|
/* parse datatime string in with tz */
|
||||||
|
@ -324,7 +324,7 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) {
|
||||||
|
|
||||||
int32_t leapYearMonthDay = 29;
|
int32_t leapYearMonthDay = 29;
|
||||||
int32_t year = pTm->tm_year + 1900;
|
int32_t year = pTm->tm_year + 1900;
|
||||||
bool isLeapYear = ((year % 100) == 0)? ((year % 400) == 0):((year % 4) == 0);
|
bool isLeapYear = ((year % 100) == 0) ? ((year % 400) == 0) : ((year % 4) == 0);
|
||||||
|
|
||||||
if (isLeapYear && (pTm->tm_mon == 1)) {
|
if (isLeapYear && (pTm->tm_mon == 1)) {
|
||||||
if (pTm->tm_mday > leapYearMonthDay) {
|
if (pTm->tm_mday > leapYearMonthDay) {
|
||||||
|
@ -336,14 +336,14 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) {
|
int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) {
|
||||||
*time = 0;
|
*time = 0;
|
||||||
struct tm tm = {0};
|
struct tm tm = {0};
|
||||||
|
|
||||||
char *str;
|
char* str;
|
||||||
if (delim == 'T') {
|
if (delim == 'T') {
|
||||||
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
|
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
|
||||||
} else if (delim == 0) {
|
} else if (delim == 0) {
|
||||||
|
@ -353,7 +353,7 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr
|
||||||
}
|
}
|
||||||
|
|
||||||
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
||||||
//if parse failed, try "%Y-%m-%d" format
|
// if parse failed, try "%Y-%m-%d" format
|
||||||
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
|
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
|
||||||
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -390,7 +390,7 @@ int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t tim
|
||||||
struct tm tm = {0};
|
struct tm tm = {0};
|
||||||
tm.tm_isdst = -1;
|
tm.tm_isdst = -1;
|
||||||
|
|
||||||
char *str;
|
char* str;
|
||||||
if (delim == 'T') {
|
if (delim == 'T') {
|
||||||
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
|
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
|
||||||
} else if (delim == 0) {
|
} else if (delim == 0) {
|
||||||
|
@ -400,7 +400,7 @@ int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t tim
|
||||||
}
|
}
|
||||||
|
|
||||||
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
||||||
//if parse failed, try "%Y-%m-%d" format
|
// if parse failed, try "%Y-%m-%d" format
|
||||||
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
|
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
|
||||||
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -438,14 +438,12 @@ char getPrecisionUnit(int32_t precision) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
|
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
|
||||||
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI ||
|
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||||
fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
|
||||||
fromPrecision == TSDB_TIME_PRECISION_NANO);
|
fromPrecision == TSDB_TIME_PRECISION_NANO);
|
||||||
assert(toPrecision == TSDB_TIME_PRECISION_MILLI ||
|
assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||||
toPrecision == TSDB_TIME_PRECISION_MICRO ||
|
|
||||||
toPrecision == TSDB_TIME_PRECISION_NANO);
|
toPrecision == TSDB_TIME_PRECISION_NANO);
|
||||||
double tempResult = (double)time;
|
double tempResult = (double)time;
|
||||||
switch(fromPrecision) {
|
switch (fromPrecision) {
|
||||||
case TSDB_TIME_PRECISION_MILLI: {
|
case TSDB_TIME_PRECISION_MILLI: {
|
||||||
switch (toPrecision) {
|
switch (toPrecision) {
|
||||||
case TSDB_TIME_PRECISION_MILLI:
|
case TSDB_TIME_PRECISION_MILLI:
|
||||||
|
@ -459,7 +457,7 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec
|
||||||
time *= 1000000;
|
time *= 1000000;
|
||||||
goto end_;
|
goto end_;
|
||||||
}
|
}
|
||||||
} // end from milli
|
} // end from milli
|
||||||
case TSDB_TIME_PRECISION_MICRO: {
|
case TSDB_TIME_PRECISION_MICRO: {
|
||||||
switch (toPrecision) {
|
switch (toPrecision) {
|
||||||
case TSDB_TIME_PRECISION_MILLI:
|
case TSDB_TIME_PRECISION_MILLI:
|
||||||
|
@ -471,7 +469,7 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec
|
||||||
time *= 1000;
|
time *= 1000;
|
||||||
goto end_;
|
goto end_;
|
||||||
}
|
}
|
||||||
} //end from micro
|
} // end from micro
|
||||||
case TSDB_TIME_PRECISION_NANO: {
|
case TSDB_TIME_PRECISION_NANO: {
|
||||||
switch (toPrecision) {
|
switch (toPrecision) {
|
||||||
case TSDB_TIME_PRECISION_MILLI:
|
case TSDB_TIME_PRECISION_MILLI:
|
||||||
|
@ -481,20 +479,21 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec
|
||||||
case TSDB_TIME_PRECISION_NANO:
|
case TSDB_TIME_PRECISION_NANO:
|
||||||
return time;
|
return time;
|
||||||
}
|
}
|
||||||
} //end from nano
|
} // end from nano
|
||||||
default: {
|
default: {
|
||||||
assert(0);
|
assert(0);
|
||||||
return time; // only to pass windows compilation
|
return time; // only to pass windows compilation
|
||||||
}
|
}
|
||||||
} //end switch fromPrecision
|
} // end switch fromPrecision
|
||||||
end_:
|
end_:
|
||||||
if (tempResult >= (double)INT64_MAX) return INT64_MAX;
|
if (tempResult >= (double)INT64_MAX) return INT64_MAX;
|
||||||
if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL
|
if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL
|
||||||
return time;
|
return time;
|
||||||
}
|
}
|
||||||
|
|
||||||
// !!!!notice:there are precision problems, double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
|
// !!!!notice:there are precision problems, double lose precison if time is too large, for example:
|
||||||
//int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
|
// 1626006833631000000*1.0 = double = 1626006833631000064
|
||||||
|
// int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
|
||||||
// assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
// assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||||
// fromPrecision == TSDB_TIME_PRECISION_NANO);
|
// fromPrecision == TSDB_TIME_PRECISION_NANO);
|
||||||
// assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
|
// assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||||
|
@ -503,53 +502,53 @@ end_:
|
||||||
// ((double)time * factors[fromPrecision][toPrecision]);
|
// ((double)time * factors[fromPrecision][toPrecision]);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
// !!!!notice: double lose precison if time is too large, for example: 1626006833631000000*1.0 = double =
|
||||||
// !!!!notice: double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
|
// 1626006833631000064
|
||||||
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) {
|
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) {
|
||||||
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||||
fromPrecision == TSDB_TIME_PRECISION_NANO);
|
fromPrecision == TSDB_TIME_PRECISION_NANO);
|
||||||
int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
|
int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
|
||||||
double tmp = time;
|
double tmp = time;
|
||||||
switch (toUnit) {
|
switch (toUnit) {
|
||||||
case 's':{
|
case 's': {
|
||||||
tmp /= (NANOSECOND_PER_SEC/factors[fromPrecision]); // the result of division is an integer
|
tmp /= (NANOSECOND_PER_SEC / factors[fromPrecision]); // the result of division is an integer
|
||||||
time /= (NANOSECOND_PER_SEC/factors[fromPrecision]);
|
time /= (NANOSECOND_PER_SEC / factors[fromPrecision]);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'm':
|
case 'm':
|
||||||
tmp /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]); // the result of division is an integer
|
tmp /= (NANOSECOND_PER_MINUTE / factors[fromPrecision]); // the result of division is an integer
|
||||||
time /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]);
|
time /= (NANOSECOND_PER_MINUTE / factors[fromPrecision]);
|
||||||
break;
|
break;
|
||||||
case 'h':
|
case 'h':
|
||||||
tmp /= (NANOSECOND_PER_HOUR/factors[fromPrecision]); // the result of division is an integer
|
tmp /= (NANOSECOND_PER_HOUR / factors[fromPrecision]); // the result of division is an integer
|
||||||
time /= (NANOSECOND_PER_HOUR/factors[fromPrecision]);
|
time /= (NANOSECOND_PER_HOUR / factors[fromPrecision]);
|
||||||
break;
|
break;
|
||||||
case 'd':
|
case 'd':
|
||||||
tmp /= (NANOSECOND_PER_DAY/factors[fromPrecision]); // the result of division is an integer
|
tmp /= (NANOSECOND_PER_DAY / factors[fromPrecision]); // the result of division is an integer
|
||||||
time /= (NANOSECOND_PER_DAY/factors[fromPrecision]);
|
time /= (NANOSECOND_PER_DAY / factors[fromPrecision]);
|
||||||
break;
|
break;
|
||||||
case 'w':
|
case 'w':
|
||||||
tmp /= (NANOSECOND_PER_WEEK/factors[fromPrecision]); // the result of division is an integer
|
tmp /= (NANOSECOND_PER_WEEK / factors[fromPrecision]); // the result of division is an integer
|
||||||
time /= (NANOSECOND_PER_WEEK/factors[fromPrecision]);
|
time /= (NANOSECOND_PER_WEEK / factors[fromPrecision]);
|
||||||
break;
|
break;
|
||||||
case 'a':
|
case 'a':
|
||||||
tmp /= (NANOSECOND_PER_MSEC/factors[fromPrecision]); // the result of division is an integer
|
tmp /= (NANOSECOND_PER_MSEC / factors[fromPrecision]); // the result of division is an integer
|
||||||
time /= (NANOSECOND_PER_MSEC/factors[fromPrecision]);
|
time /= (NANOSECOND_PER_MSEC / factors[fromPrecision]);
|
||||||
break;
|
break;
|
||||||
case 'u':
|
case 'u':
|
||||||
// the result of (NANOSECOND_PER_USEC/(double)factors[fromPrecision]) maybe a double
|
// the result of (NANOSECOND_PER_USEC/(double)factors[fromPrecision]) maybe a double
|
||||||
switch (fromPrecision) {
|
switch (fromPrecision) {
|
||||||
case TSDB_TIME_PRECISION_MILLI:{
|
case TSDB_TIME_PRECISION_MILLI: {
|
||||||
tmp *= 1000;
|
tmp *= 1000;
|
||||||
time *= 1000;
|
time *= 1000;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_TIME_PRECISION_MICRO:{
|
case TSDB_TIME_PRECISION_MICRO: {
|
||||||
tmp /= 1;
|
tmp /= 1;
|
||||||
time /= 1;
|
time /= 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_TIME_PRECISION_NANO:{
|
case TSDB_TIME_PRECISION_NANO: {
|
||||||
tmp /= 1000;
|
tmp /= 1000;
|
||||||
time /= 1000;
|
time /= 1000;
|
||||||
break;
|
break;
|
||||||
|
@ -569,11 +568,11 @@ int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char
|
||||||
return time;
|
return time;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec, int64_t *timeVal) {
|
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal) {
|
||||||
int32_t charLen = varDataLen(inputData);
|
int32_t charLen = varDataLen(inputData);
|
||||||
char *newColData;
|
char* newColData;
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY) {
|
||||||
newColData = taosMemoryCalloc(1, charLen + 1);
|
newColData = taosMemoryCalloc(1, charLen + 1);
|
||||||
memcpy(newColData, varDataVal(inputData), charLen);
|
memcpy(newColData, varDataVal(inputData), charLen);
|
||||||
int32_t ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight);
|
int32_t ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -582,9 +581,9 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec
|
||||||
}
|
}
|
||||||
taosMemoryFree(newColData);
|
taosMemoryFree(newColData);
|
||||||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
newColData = taosMemoryCalloc(1, charLen + TSDB_NCHAR_SIZE);
|
newColData = taosMemoryCalloc(1, charLen + TSDB_NCHAR_SIZE);
|
||||||
int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(inputData), charLen, newColData);
|
int len = taosUcs4ToMbs((TdUcs4*)varDataVal(inputData), charLen, newColData);
|
||||||
if (len < 0){
|
if (len < 0) {
|
||||||
taosMemoryFree(newColData);
|
taosMemoryFree(newColData);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -877,8 +876,8 @@ const char* fmtts(int64_t ts) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) {
|
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) {
|
||||||
char ts[40] = {0};
|
char ts[40] = {0};
|
||||||
struct tm ptm;
|
struct tm ptm;
|
||||||
|
|
||||||
int32_t fractionLen;
|
int32_t fractionLen;
|
||||||
char* format = NULL;
|
char* format = NULL;
|
||||||
|
|
|
@ -209,7 +209,8 @@ static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
|
||||||
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
|
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
|
||||||
assert((int32_t)newSize > pTSBuf->numOfAlloc);
|
assert((int32_t)newSize > pTSBuf->numOfAlloc);
|
||||||
|
|
||||||
STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
|
STSGroupBlockInfoEx* tmp =
|
||||||
|
(STSGroupBlockInfoEx*)taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,9 +15,10 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
#include "tconfig.h"
|
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
|
#include "tconfig.h"
|
||||||
|
|
||||||
|
// clang-format off
|
||||||
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
|
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
|
||||||
#define DM_CFG_DIR "Configuration directory."
|
#define DM_CFG_DIR "Configuration directory."
|
||||||
#define DM_DMP_CFG "Dump configuration."
|
#define DM_DMP_CFG "Dump configuration."
|
||||||
|
@ -28,9 +29,10 @@
|
||||||
#define DM_MACHINE_CODE "Get machine code."
|
#define DM_MACHINE_CODE "Get machine code."
|
||||||
#define DM_VERSION "Print program version."
|
#define DM_VERSION "Print program version."
|
||||||
#define DM_EMAIL "<support@taosdata.com>"
|
#define DM_EMAIL "<support@taosdata.com>"
|
||||||
|
// clang-format on
|
||||||
static struct {
|
static struct {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
bool winServiceMode;
|
bool winServiceMode;
|
||||||
#endif
|
#endif
|
||||||
bool dumpConfig;
|
bool dumpConfig;
|
||||||
bool dumpSdb;
|
bool dumpSdb;
|
||||||
|
@ -101,10 +103,10 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
|
||||||
global.dumpConfig = true;
|
global.dumpConfig = true;
|
||||||
} else if (strcmp(argv[i], "-V") == 0) {
|
} else if (strcmp(argv[i], "-V") == 0) {
|
||||||
global.printVersion = true;
|
global.printVersion = true;
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
} else if (strcmp(argv[i], "--win_service") == 0) {
|
} else if (strcmp(argv[i], "--win_service") == 0) {
|
||||||
global.winServiceMode = true;
|
global.winServiceMode = true;
|
||||||
#endif
|
#endif
|
||||||
} else if (strcmp(argv[i], "-e") == 0) {
|
} else if (strcmp(argv[i], "-e") == 0) {
|
||||||
global.envCmd[cmdEnvIndex] = argv[++i];
|
global.envCmd[cmdEnvIndex] = argv[++i];
|
||||||
cmdEnvIndex++;
|
cmdEnvIndex++;
|
||||||
|
@ -183,7 +185,7 @@ int main(int argc, char const *argv[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
int mainWindows(int argc,char** argv);
|
int mainWindows(int argc, char **argv);
|
||||||
if (global.winServiceMode) {
|
if (global.winServiceMode) {
|
||||||
stratWindowsService(mainWindows);
|
stratWindowsService(mainWindows);
|
||||||
} else {
|
} else {
|
||||||
|
@ -191,7 +193,7 @@ int main(int argc, char const *argv[]) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int mainWindows(int argc,char** argv) {
|
int mainWindows(int argc, char **argv) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (global.generateGrant) {
|
if (global.generateGrant) {
|
||||||
|
|
|
@ -101,7 +101,7 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
||||||
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STraceId * trace = &pMsg->info.traceId;
|
STraceId *trace = &pMsg->info.traceId;
|
||||||
dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
|
|
@ -51,7 +51,7 @@ int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg);
|
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {
|
||||||
qndGetLoad(pMgmt->pQnode, &qload);
|
qndGetLoad(pMgmt->pQnode, &qload);
|
||||||
|
|
||||||
qload.dnodeId = pMgmt->pData->dnodeId;
|
qload.dnodeId = pMgmt->pData->dnodeId;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) {
|
void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) {
|
||||||
|
|
|
@ -52,17 +52,23 @@ static int32_t dmInitMonitor() {
|
||||||
static bool dmCheckDiskSpace() {
|
static bool dmCheckDiskSpace() {
|
||||||
osUpdate();
|
osUpdate();
|
||||||
if (!osDataSpaceAvailable()) {
|
if (!osDataSpaceAvailable()) {
|
||||||
dError("free disk size: %f GB, too little, require %f GB at least at least , quit", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
dError("free disk size: %f GB, too little, require %f GB at least at least , quit",
|
||||||
|
(double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
|
||||||
|
(double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
||||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!osLogSpaceAvailable()) {
|
if (!osLogSpaceAvailable()) {
|
||||||
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
dError("free disk size: %f GB, too little, require %f GB at least at least, quit",
|
||||||
|
(double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
|
||||||
|
(double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
||||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!osTempSpaceAvailable()) {
|
if (!osTempSpaceAvailable()) {
|
||||||
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
dError("free disk size: %f GB, too little, require %f GB at least at least, quit",
|
||||||
|
(double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
|
||||||
|
(double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
||||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -73,7 +79,8 @@ static bool dmCheckDataDirVersion() {
|
||||||
char checkDataDirJsonFileName[PATH_MAX];
|
char checkDataDirJsonFileName[PATH_MAX];
|
||||||
snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
|
snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
|
||||||
if (taosCheckExistFile(checkDataDirJsonFileName)) {
|
if (taosCheckExistFile(checkDataDirJsonFileName)) {
|
||||||
dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!", tsDataDir);
|
dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
|
||||||
|
tsDataDir);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -183,4 +183,3 @@ void dmGetQnodeLoads(SQnodeLoad *pInfo) {
|
||||||
dmReleaseWrapper(pWrapper);
|
dmReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -91,8 +91,8 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
|
||||||
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
||||||
|
|
||||||
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||||
const void * pHead = pMsg;
|
const void *pHead = pMsg;
|
||||||
const void * pBody = pMsg->pCont;
|
const void *pBody = pMsg->pCont;
|
||||||
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
||||||
const int32_t rawBodyLen = pMsg->contLen;
|
const int32_t rawBodyLen = pMsg->contLen;
|
||||||
const int16_t headLen = CEIL8(rawHeadLen);
|
const int16_t headLen = CEIL8(rawHeadLen);
|
||||||
|
@ -261,7 +261,7 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
||||||
proc->wrapper = pWrapper;
|
proc->wrapper = pWrapper;
|
||||||
proc->name = pWrapper->name;
|
proc->name = pWrapper->name;
|
||||||
|
|
||||||
SShm * shm = &proc->shm;
|
SShm *shm = &proc->shm;
|
||||||
int32_t cstart = 0;
|
int32_t cstart = 0;
|
||||||
int32_t csize = CEIL8(shm->size / 2);
|
int32_t csize = CEIL8(shm->size / 2);
|
||||||
int32_t pstart = csize;
|
int32_t pstart = csize;
|
||||||
|
@ -285,13 +285,13 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *dmConsumChildQueue(void *param) {
|
static void *dmConsumChildQueue(void *param) {
|
||||||
SProc * proc = param;
|
SProc *proc = param;
|
||||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||||
SProcQueue * queue = proc->cqueue;
|
SProcQueue *queue = proc->cqueue;
|
||||||
int32_t numOfMsgs = 0;
|
int32_t numOfMsgs = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
EProcFuncType ftype = DND_FUNC_REQ;
|
EProcFuncType ftype = DND_FUNC_REQ;
|
||||||
SRpcMsg * pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from cqueue", proc->name);
|
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
|
@ -328,13 +328,13 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *dmConsumParentQueue(void *param) {
|
static void *dmConsumParentQueue(void *param) {
|
||||||
SProc * proc = param;
|
SProc *proc = param;
|
||||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||||
SProcQueue * queue = proc->pqueue;
|
SProcQueue *queue = proc->pqueue;
|
||||||
int32_t numOfMsgs = 0;
|
int32_t numOfMsgs = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
EProcFuncType ftype = DND_FUNC_REQ;
|
EProcFuncType ftype = DND_FUNC_REQ;
|
||||||
SRpcMsg * pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from pqueue", proc->name);
|
dDebug("node:%s, start to consume from pqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
|
|
|
@ -44,7 +44,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
|
|
||||||
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||||
|
|
|
@ -17,7 +17,7 @@ class DndTestBnode : public ::testing::Test {
|
||||||
test.Init(TD_TMP_DIR_PATH "dbnodeTest", 9112);
|
test.Init(TD_TMP_DIR_PATH "dbnodeTest", 9112);
|
||||||
taosMsleep(1100);
|
taosMsleep(1100);
|
||||||
}
|
}
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
static Testbase test;
|
static Testbase test;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -47,7 +47,7 @@ class Testbase {
|
||||||
int32_t connId;
|
int32_t connId;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
int32_t SendShowReq(int8_t showType, const char *tb, const char* db);
|
int32_t SendShowReq(int8_t showType, const char* tb, const char* db);
|
||||||
int32_t GetShowRows();
|
int32_t GetShowRows();
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
|
|
@ -37,7 +37,7 @@ void Testbase::InitLog(const char* path) {
|
||||||
|
|
||||||
taosGetSystemInfo();
|
taosGetSystemInfo();
|
||||||
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 0.1;
|
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 0.1;
|
||||||
if (taosInitLog("taosdlog", 1) != 0) {
|
if (taosInitLog("taosdlog", 1) != 0) {
|
||||||
printf("failed to init log file\n");
|
printf("failed to init log file\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,12 +22,12 @@
|
||||||
|
|
||||||
#include "mndInt.h"
|
#include "mndInt.h"
|
||||||
|
|
||||||
int32_t mndInitGrant(SMnode *pMnode);
|
int32_t mndInitGrant(SMnode * pMnode);
|
||||||
void mndCleanupGrant();
|
void mndCleanupGrant();
|
||||||
void grantParseParameter();
|
void grantParseParameter();
|
||||||
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value);
|
void grantReset(SMnode * pMnode, EGrantType grant, uint64_t value);
|
||||||
void grantAdd(EGrantType grant, uint64_t value);
|
void grantAdd(EGrantType grant, uint64_t value);
|
||||||
void grantRestore(EGrantType grant, uint64_t value);
|
void grantRestore(EGrantType grant, uint64_t value);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,8 +51,8 @@ extern "C" {
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
|
||||||
typedef int32_t (*MndMsgFp)(SRpcMsg *pMsg);
|
typedef int32_t (*MndMsgFp)(SRpcMsg *pMsg);
|
||||||
typedef int32_t (*MndInitFp)(SMnode *pMnode);
|
typedef int32_t (*MndInitFp)(SMnode *pMnode);
|
||||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
||||||
int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp);
|
int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp);
|
||||||
int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp);
|
int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp);
|
||||||
int32_t mndInitPerfs(SMnode *pMnode);
|
int32_t mndInitPerfs(SMnode *pMnode);
|
||||||
void mndCleanupPerfs(SMnode *pMnode);
|
void mndCleanupPerfs(SMnode *pMnode);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t mndInitQuery(SMnode *pMnode);
|
int32_t mndInitQuery(SMnode *pMnode);
|
||||||
void mndCleanupQuery(SMnode *pMnode);
|
void mndCleanupQuery(SMnode *pMnode);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
|
||||||
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
||||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
||||||
void mndFreeStb(SStbObj *pStb);
|
void mndFreeStb(SStbObj *pStb);
|
||||||
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen);
|
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen);
|
||||||
|
|
||||||
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndBnode.h"
|
#include "mndBnode.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
|
|
@ -440,7 +440,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
|
statusReq.memAvail, statusReq.memTotal, statusReq.numOfCores);
|
||||||
} else {
|
} else {
|
||||||
mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
|
mInfo("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
|
||||||
statusReq.dnodeVer, dnodeVer, reboot);
|
statusReq.dnodeVer, dnodeVer, reboot);
|
||||||
}
|
}
|
||||||
|
|
||||||
pDnode->rebootTime = statusReq.rebootTime;
|
pDnode->rebootTime = statusReq.rebootTime;
|
||||||
|
|
|
@ -102,21 +102,21 @@ void dumpDb(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "compression", i642str(pObj->cfg.compression));
|
tjsonAddStringToObject(item, "compression", i642str(pObj->cfg.compression));
|
||||||
tjsonAddStringToObject(item, "replications", i642str(pObj->cfg.replications));
|
tjsonAddStringToObject(item, "replications", i642str(pObj->cfg.replications));
|
||||||
tjsonAddStringToObject(item, "strict", i642str(pObj->cfg.strict));
|
tjsonAddStringToObject(item, "strict", i642str(pObj->cfg.strict));
|
||||||
tjsonAddStringToObject(item, "cacheLast",i642str( pObj->cfg.cacheLast));
|
tjsonAddStringToObject(item, "cacheLast", i642str(pObj->cfg.cacheLast));
|
||||||
tjsonAddStringToObject(item, "hashMethod", i642str(pObj->cfg.hashMethod));
|
tjsonAddStringToObject(item, "hashMethod", i642str(pObj->cfg.hashMethod));
|
||||||
tjsonAddStringToObject(item, "hashPrefix", i642str(pObj->cfg.hashPrefix));
|
tjsonAddStringToObject(item, "hashPrefix", i642str(pObj->cfg.hashPrefix));
|
||||||
tjsonAddStringToObject(item, "hashSuffix", i642str(pObj->cfg.hashSuffix));
|
tjsonAddStringToObject(item, "hashSuffix", i642str(pObj->cfg.hashSuffix));
|
||||||
tjsonAddStringToObject(item, "sstTrigger", i642str(pObj->cfg.sstTrigger));
|
tjsonAddStringToObject(item, "sstTrigger", i642str(pObj->cfg.sstTrigger));
|
||||||
tjsonAddStringToObject(item, "tsdbPageSize",i642str( pObj->cfg.tsdbPageSize));
|
tjsonAddStringToObject(item, "tsdbPageSize", i642str(pObj->cfg.tsdbPageSize));
|
||||||
tjsonAddStringToObject(item, "schemaless", i642str(pObj->cfg.schemaless));
|
tjsonAddStringToObject(item, "schemaless", i642str(pObj->cfg.schemaless));
|
||||||
tjsonAddStringToObject(item, "walLevel",i642str( pObj->cfg.walLevel));
|
tjsonAddStringToObject(item, "walLevel", i642str(pObj->cfg.walLevel));
|
||||||
tjsonAddStringToObject(item, "walFsyncPeriod", i642str(pObj->cfg.walFsyncPeriod));
|
tjsonAddStringToObject(item, "walFsyncPeriod", i642str(pObj->cfg.walFsyncPeriod));
|
||||||
tjsonAddStringToObject(item, "walRetentionPeriod", i642str(pObj->cfg.walRetentionPeriod));
|
tjsonAddStringToObject(item, "walRetentionPeriod", i642str(pObj->cfg.walRetentionPeriod));
|
||||||
tjsonAddStringToObject(item, "walRetentionSize",i642str( pObj->cfg.walRetentionSize));
|
tjsonAddStringToObject(item, "walRetentionSize", i642str(pObj->cfg.walRetentionSize));
|
||||||
tjsonAddStringToObject(item, "walRollPeriod", i642str(pObj->cfg.walRollPeriod));
|
tjsonAddStringToObject(item, "walRollPeriod", i642str(pObj->cfg.walRollPeriod));
|
||||||
tjsonAddStringToObject(item, "walSegmentSize", i642str(pObj->cfg.walSegmentSize));
|
tjsonAddStringToObject(item, "walSegmentSize", i642str(pObj->cfg.walSegmentSize));
|
||||||
|
|
||||||
tjsonAddStringToObject(item, "numOfRetensions",i642str( pObj->cfg.numOfRetensions));
|
tjsonAddStringToObject(item, "numOfRetensions", i642str(pObj->cfg.numOfRetensions));
|
||||||
for (int32_t i = 0; i < pObj->cfg.numOfRetensions; ++i) {
|
for (int32_t i = 0; i < pObj->cfg.numOfRetensions; ++i) {
|
||||||
SJson *rentensions = tjsonAddArrayToObject(item, "rentensions");
|
SJson *rentensions = tjsonAddArrayToObject(item, "rentensions");
|
||||||
SJson *rentension = tjsonCreateObject();
|
SJson *rentension = tjsonCreateObject();
|
||||||
|
@ -126,7 +126,7 @@ void dumpDb(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "freq", i642str(pRetension->freq));
|
tjsonAddStringToObject(item, "freq", i642str(pRetension->freq));
|
||||||
tjsonAddStringToObject(item, "freqUnit", i642str(pRetension->freqUnit));
|
tjsonAddStringToObject(item, "freqUnit", i642str(pRetension->freqUnit));
|
||||||
tjsonAddStringToObject(item, "keep", i642str(pRetension->keep));
|
tjsonAddStringToObject(item, "keep", i642str(pRetension->keep));
|
||||||
tjsonAddStringToObject(item, "keepUnit",i642str( pRetension->keepUnit));
|
tjsonAddStringToObject(item, "keepUnit", i642str(pRetension->keepUnit));
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
|
@ -150,21 +150,21 @@ void dumpStb(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||||
tjsonAddStringToObject(item, "tagVer",i642str( pObj->tagVer));
|
tjsonAddStringToObject(item, "tagVer", i642str(pObj->tagVer));
|
||||||
tjsonAddStringToObject(item, "colVer", i642str(pObj->colVer));
|
tjsonAddStringToObject(item, "colVer", i642str(pObj->colVer));
|
||||||
tjsonAddStringToObject(item, "smaVer", i642str(pObj->smaVer));
|
tjsonAddStringToObject(item, "smaVer", i642str(pObj->smaVer));
|
||||||
tjsonAddStringToObject(item, "nextColId", i642str(pObj->nextColId));
|
tjsonAddStringToObject(item, "nextColId", i642str(pObj->nextColId));
|
||||||
tjsonAddStringToObject(item, "watermark1", i642str(pObj->watermark[0]));
|
tjsonAddStringToObject(item, "watermark1", i642str(pObj->watermark[0]));
|
||||||
tjsonAddStringToObject(item, "watermark2", i642str(pObj->watermark[1]));
|
tjsonAddStringToObject(item, "watermark2", i642str(pObj->watermark[1]));
|
||||||
tjsonAddStringToObject(item, "maxdelay0",i642str( pObj->maxdelay[0]));
|
tjsonAddStringToObject(item, "maxdelay0", i642str(pObj->maxdelay[0]));
|
||||||
tjsonAddStringToObject(item, "maxdelay1",i642str( pObj->maxdelay[1]));
|
tjsonAddStringToObject(item, "maxdelay1", i642str(pObj->maxdelay[1]));
|
||||||
tjsonAddStringToObject(item, "ttl",i642str( pObj->ttl));
|
tjsonAddStringToObject(item, "ttl", i642str(pObj->ttl));
|
||||||
tjsonAddStringToObject(item, "numOfFuncs",i642str( pObj->numOfFuncs));
|
tjsonAddStringToObject(item, "numOfFuncs", i642str(pObj->numOfFuncs));
|
||||||
tjsonAddStringToObject(item, "commentLen", i642str(pObj->commentLen));
|
tjsonAddStringToObject(item, "commentLen", i642str(pObj->commentLen));
|
||||||
tjsonAddStringToObject(item, "ast1Len", i642str(pObj->ast1Len));
|
tjsonAddStringToObject(item, "ast1Len", i642str(pObj->ast1Len));
|
||||||
tjsonAddStringToObject(item, "ast2Len",i642str( pObj->ast2Len));
|
tjsonAddStringToObject(item, "ast2Len", i642str(pObj->ast2Len));
|
||||||
|
|
||||||
tjsonAddStringToObject(item, "numOfColumns",i642str( pObj->numOfColumns));
|
tjsonAddStringToObject(item, "numOfColumns", i642str(pObj->numOfColumns));
|
||||||
SJson *columns = tjsonAddArrayToObject(item, "columns");
|
SJson *columns = tjsonAddArrayToObject(item, "columns");
|
||||||
for (int32_t i = 0; i < pObj->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pObj->numOfColumns; ++i) {
|
||||||
SJson *column = tjsonCreateObject();
|
SJson *column = tjsonCreateObject();
|
||||||
|
@ -188,7 +188,7 @@ void dumpStb(SSdb *pSdb, SJson *json) {
|
||||||
SSchema *pTag = &pObj->pTags[i];
|
SSchema *pTag = &pObj->pTags[i];
|
||||||
tjsonAddStringToObject(tag, "type", i642str(pTag->type));
|
tjsonAddStringToObject(tag, "type", i642str(pTag->type));
|
||||||
tjsonAddStringToObject(tag, "typestr", tDataTypes[pTag->type].name);
|
tjsonAddStringToObject(tag, "typestr", tDataTypes[pTag->type].name);
|
||||||
tjsonAddStringToObject(tag, "flags",i642str( pTag->flags));
|
tjsonAddStringToObject(tag, "flags", i642str(pTag->flags));
|
||||||
tjsonAddStringToObject(tag, "colId", i642str(pTag->colId));
|
tjsonAddStringToObject(tag, "colId", i642str(pTag->colId));
|
||||||
tjsonAddStringToObject(tag, "bytes", i642str(pTag->bytes));
|
tjsonAddStringToObject(tag, "bytes", i642str(pTag->bytes));
|
||||||
tjsonAddStringToObject(tag, "name", pTag->name);
|
tjsonAddStringToObject(tag, "name", pTag->name);
|
||||||
|
@ -219,16 +219,16 @@ void dumpSma(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||||
tjsonAddStringToObject(item, "dstTbUid", i642str(pObj->dstTbUid));
|
tjsonAddStringToObject(item, "dstTbUid", i642str(pObj->dstTbUid));
|
||||||
tjsonAddStringToObject(item, "intervalUnit", i642str(pObj->intervalUnit));
|
tjsonAddStringToObject(item, "intervalUnit", i642str(pObj->intervalUnit));
|
||||||
tjsonAddStringToObject(item, "slidingUnit",i642str( pObj->slidingUnit));
|
tjsonAddStringToObject(item, "slidingUnit", i642str(pObj->slidingUnit));
|
||||||
tjsonAddStringToObject(item, "timezone", i642str(pObj->timezone));
|
tjsonAddStringToObject(item, "timezone", i642str(pObj->timezone));
|
||||||
tjsonAddStringToObject(item, "dstVgId",i642str( pObj->dstVgId));
|
tjsonAddStringToObject(item, "dstVgId", i642str(pObj->dstVgId));
|
||||||
tjsonAddStringToObject(item, "interval", i642str(pObj->interval));
|
tjsonAddStringToObject(item, "interval", i642str(pObj->interval));
|
||||||
tjsonAddStringToObject(item, "offset", i642str(pObj->offset));
|
tjsonAddStringToObject(item, "offset", i642str(pObj->offset));
|
||||||
tjsonAddStringToObject(item, "sliding", i642str(pObj->sliding));
|
tjsonAddStringToObject(item, "sliding", i642str(pObj->sliding));
|
||||||
tjsonAddStringToObject(item, "exprLen",i642str( pObj->exprLen));
|
tjsonAddStringToObject(item, "exprLen", i642str(pObj->exprLen));
|
||||||
tjsonAddStringToObject(item, "tagsFilterLen", i642str(pObj->tagsFilterLen));
|
tjsonAddStringToObject(item, "tagsFilterLen", i642str(pObj->tagsFilterLen));
|
||||||
tjsonAddStringToObject(item, "sqlLen",i642str( pObj->sqlLen));
|
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
|
||||||
tjsonAddStringToObject(item, "astLen",i642str( pObj->astLen));
|
tjsonAddStringToObject(item, "astLen", i642str(pObj->astLen));
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -247,13 +247,13 @@ void dumpVgroup(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "vgId", i642str(pObj->vgId));
|
tjsonAddStringToObject(item, "vgId", i642str(pObj->vgId));
|
||||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||||
tjsonAddStringToObject(item, "version",i642str(pObj->version));
|
tjsonAddStringToObject(item, "version", i642str(pObj->version));
|
||||||
tjsonAddStringToObject(item, "hashBegin", i642str(pObj->hashBegin));
|
tjsonAddStringToObject(item, "hashBegin", i642str(pObj->hashBegin));
|
||||||
tjsonAddStringToObject(item, "hashEnd", i642str(pObj->hashEnd));
|
tjsonAddStringToObject(item, "hashEnd", i642str(pObj->hashEnd));
|
||||||
tjsonAddStringToObject(item, "db", mndGetDbStr(pObj->dbName));
|
tjsonAddStringToObject(item, "db", mndGetDbStr(pObj->dbName));
|
||||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||||
tjsonAddStringToObject(item, "isTsma", i642str(pObj->isTsma));
|
tjsonAddStringToObject(item, "isTsma", i642str(pObj->isTsma));
|
||||||
tjsonAddStringToObject(item, "replica",i642str( pObj->replica));
|
tjsonAddStringToObject(item, "replica", i642str(pObj->replica));
|
||||||
for (int32_t i = 0; i < pObj->replica; ++i) {
|
for (int32_t i = 0; i < pObj->replica; ++i) {
|
||||||
SJson *replicas = tjsonAddArrayToObject(item, "replicas");
|
SJson *replicas = tjsonAddArrayToObject(item, "replicas");
|
||||||
SJson *replica = tjsonCreateObject();
|
SJson *replica = tjsonCreateObject();
|
||||||
|
@ -281,13 +281,13 @@ void dumpTopic(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||||
tjsonAddStringToObject(item, "version",i642str( pObj->version));
|
tjsonAddStringToObject(item, "version", i642str(pObj->version));
|
||||||
tjsonAddStringToObject(item, "subType",i642str( pObj->subType));
|
tjsonAddStringToObject(item, "subType", i642str(pObj->subType));
|
||||||
tjsonAddStringToObject(item, "withMeta", i642str(pObj->withMeta));
|
tjsonAddStringToObject(item, "withMeta", i642str(pObj->withMeta));
|
||||||
tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid));
|
tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid));
|
||||||
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
|
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
|
||||||
tjsonAddStringToObject(item, "astLen",i642str( pObj->astLen));
|
tjsonAddStringToObject(item, "astLen", i642str(pObj->astLen));
|
||||||
tjsonAddStringToObject(item, "sqlLen",i642str( pObj->sqlLen));
|
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
|
||||||
tjsonAddStringToObject(item, "ntbUid", i642str(pObj->ntbUid));
|
tjsonAddStringToObject(item, "ntbUid", i642str(pObj->ntbUid));
|
||||||
tjsonAddStringToObject(item, "ctbStbUid", i642str(pObj->ctbStbUid));
|
tjsonAddStringToObject(item, "ctbStbUid", i642str(pObj->ctbStbUid));
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
|
@ -365,9 +365,9 @@ void dumpStream(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "totalLevel", i642str(pObj->totalLevel));
|
tjsonAddStringToObject(item, "totalLevel", i642str(pObj->totalLevel));
|
||||||
tjsonAddStringToObject(item, "smaId", i642str(pObj->smaId));
|
tjsonAddStringToObject(item, "smaId", i642str(pObj->smaId));
|
||||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||||
tjsonAddStringToObject(item, "status",i642str( pObj->status));
|
tjsonAddStringToObject(item, "status", i642str(pObj->status));
|
||||||
tjsonAddStringToObject(item, "igExpired",i642str( pObj->igExpired));
|
tjsonAddStringToObject(item, "igExpired", i642str(pObj->igExpired));
|
||||||
tjsonAddStringToObject(item, "trigger",i642str( pObj->trigger));
|
tjsonAddStringToObject(item, "trigger", i642str(pObj->trigger));
|
||||||
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->triggerParam));
|
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->triggerParam));
|
||||||
tjsonAddStringToObject(item, "watermark", i642str(pObj->watermark));
|
tjsonAddStringToObject(item, "watermark", i642str(pObj->watermark));
|
||||||
tjsonAddStringToObject(item, "sourceDbUid", i642str(pObj->sourceDbUid));
|
tjsonAddStringToObject(item, "sourceDbUid", i642str(pObj->sourceDbUid));
|
||||||
|
@ -419,9 +419,9 @@ void dumpUser(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "acct", pObj->acct);
|
tjsonAddStringToObject(item, "acct", pObj->acct);
|
||||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||||
tjsonAddStringToObject(item, "superUser",i642str( pObj->superUser));
|
tjsonAddStringToObject(item, "superUser", i642str(pObj->superUser));
|
||||||
tjsonAddStringToObject(item, "authVersion", i642str(pObj->authVersion));
|
tjsonAddStringToObject(item, "authVersion", i642str(pObj->authVersion));
|
||||||
tjsonAddStringToObject(item, "numOfReadDbs",i642str( taosHashGetSize(pObj->readDbs)));
|
tjsonAddStringToObject(item, "numOfReadDbs", i642str(taosHashGetSize(pObj->readDbs)));
|
||||||
tjsonAddStringToObject(item, "numOfWriteDbs", i642str(taosHashGetSize(pObj->writeDbs)));
|
tjsonAddStringToObject(item, "numOfWriteDbs", i642str(taosHashGetSize(pObj->writeDbs)));
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
}
|
}
|
||||||
|
@ -438,10 +438,10 @@ void dumpDnode(SSdb *pSdb, SJson *json) {
|
||||||
|
|
||||||
SJson *item = tjsonCreateObject();
|
SJson *item = tjsonCreateObject();
|
||||||
tjsonAddItemToArray(items, item);
|
tjsonAddItemToArray(items, item);
|
||||||
tjsonAddStringToObject(item, "id",i642str( pObj->id));
|
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||||
tjsonAddStringToObject(item, "port",i642str( pObj->port));
|
tjsonAddStringToObject(item, "port", i642str(pObj->port));
|
||||||
tjsonAddStringToObject(item, "fqdn", pObj->fqdn);
|
tjsonAddStringToObject(item, "fqdn", pObj->fqdn);
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
}
|
}
|
||||||
|
@ -462,7 +462,7 @@ void dumpSnode(SSdb *pSdb, SJson *json) {
|
||||||
|
|
||||||
SJson *item = tjsonCreateObject();
|
SJson *item = tjsonCreateObject();
|
||||||
tjsonAddItemToArray(items, item);
|
tjsonAddItemToArray(items, item);
|
||||||
tjsonAddStringToObject(item, "id",i642str( pObj->id));
|
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
|
@ -538,15 +538,15 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||||
tjsonAddStringToObject(item, "stage", i642str(pObj->stage));
|
tjsonAddStringToObject(item, "stage", i642str(pObj->stage));
|
||||||
tjsonAddStringToObject(item, "policy", i642str(pObj->policy));
|
tjsonAddStringToObject(item, "policy", i642str(pObj->policy));
|
||||||
tjsonAddStringToObject(item, "conflict",i642str( pObj->conflict));
|
tjsonAddStringToObject(item, "conflict", i642str(pObj->conflict));
|
||||||
tjsonAddStringToObject(item, "exec", i642str(pObj->exec));
|
tjsonAddStringToObject(item, "exec", i642str(pObj->exec));
|
||||||
tjsonAddStringToObject(item, "oper", i642str(pObj->oper));
|
tjsonAddStringToObject(item, "oper", i642str(pObj->oper));
|
||||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||||
tjsonAddStringToObject(item, "dbname", pObj->dbname);
|
tjsonAddStringToObject(item, "dbname", pObj->dbname);
|
||||||
tjsonAddStringToObject(item, "stbname", pObj->stbname);
|
tjsonAddStringToObject(item, "stbname", pObj->stbname);
|
||||||
tjsonAddStringToObject(item, "opername", pObj->opername);
|
tjsonAddStringToObject(item, "opername", pObj->opername);
|
||||||
tjsonAddStringToObject(item, "commitLogNum",i642str( taosArrayGetSize(pObj->commitActions)));
|
tjsonAddStringToObject(item, "commitLogNum", i642str(taosArrayGetSize(pObj->commitActions)));
|
||||||
tjsonAddStringToObject(item, "redoActionNum",i642str(taosArrayGetSize(pObj->redoActions)));
|
tjsonAddStringToObject(item, "redoActionNum", i642str(taosArrayGetSize(pObj->redoActions)));
|
||||||
tjsonAddStringToObject(item, "undoActionNum", i642str(taosArrayGetSize(pObj->undoActions)));
|
tjsonAddStringToObject(item, "undoActionNum", i642str(taosArrayGetSize(pObj->undoActions)));
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
|
@ -45,8 +45,8 @@ int32_t mndPerfsInitMeta(SHashObj *hash) {
|
||||||
meta.sversion = 1;
|
meta.sversion = 1;
|
||||||
meta.tversion = 1;
|
meta.tversion = 1;
|
||||||
|
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
const SSysTableMeta* pSysDbTableMeta = NULL;
|
const SSysTableMeta *pSysDbTableMeta = NULL;
|
||||||
getPerfDbMeta(&pSysDbTableMeta, &size);
|
getPerfDbMeta(&pSysDbTableMeta, &size);
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
@ -150,4 +150,3 @@ void mndCleanupPerfs(SMnode *pMnode) {
|
||||||
taosHashCleanup(pMnode->perfsMeta);
|
taosHashCleanup(pMnode->perfsMeta);
|
||||||
pMnode->perfsMeta = NULL;
|
pMnode->perfsMeta = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndQnode.h"
|
#include "mndQnode.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
|
|
@ -64,36 +64,36 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
int32_t rspSize = 0;
|
int32_t rspSize = 0;
|
||||||
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
|
SBatchReq *batchReq = (SBatchReq *)pMsg->pCont;
|
||||||
int32_t msgNum = ntohl(batchReq->msgNum);
|
int32_t msgNum = ntohl(batchReq->msgNum);
|
||||||
offset += sizeof(SBatchReq);
|
offset += sizeof(SBatchReq);
|
||||||
SBatchMsg req = {0};
|
SBatchMsg req = {0};
|
||||||
SBatchRsp rsp = {0};
|
SBatchRsp rsp = {0};
|
||||||
SRpcMsg reqMsg = *pMsg;
|
SRpcMsg reqMsg = *pMsg;
|
||||||
SRpcMsg rspMsg = {0};
|
SRpcMsg rspMsg = {0};
|
||||||
void* pRsp = NULL;
|
void *pRsp = NULL;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
|
||||||
SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||||
if (NULL == batchRsp) {
|
if (NULL == batchRsp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < msgNum; ++i) {
|
for (int32_t i = 0; i < msgNum; ++i) {
|
||||||
req.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||||
offset += sizeof(req.msgIdx);
|
offset += sizeof(req.msgIdx);
|
||||||
|
|
||||||
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||||
offset += sizeof(req.msgType);
|
offset += sizeof(req.msgType);
|
||||||
|
|
||||||
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||||
offset += sizeof(req.msgLen);
|
offset += sizeof(req.msgLen);
|
||||||
|
|
||||||
req.msg = (char*)pMsg->pCont + offset;
|
req.msg = (char *)pMsg->pCont + offset;
|
||||||
offset += req.msgLen;
|
offset += req.msgLen;
|
||||||
|
|
||||||
reqMsg.msgType = req.msgType;
|
reqMsg.msgType = req.msgType;
|
||||||
|
@ -133,20 +133,20 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
*(int32_t*)((char*)pRsp + offset) = htonl(msgNum);
|
*(int32_t *)((char *)pRsp + offset) = htonl(msgNum);
|
||||||
offset += sizeof(msgNum);
|
offset += sizeof(msgNum);
|
||||||
for (int32_t i = 0; i < msgNum; ++i) {
|
for (int32_t i = 0; i < msgNum; ++i) {
|
||||||
SBatchRsp *p = taosArrayGet(batchRsp, i);
|
SBatchRsp *p = taosArrayGet(batchRsp, i);
|
||||||
|
|
||||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
|
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
|
||||||
offset += sizeof(p->reqType);
|
offset += sizeof(p->reqType);
|
||||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgIdx);
|
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
|
||||||
offset += sizeof(p->msgIdx);
|
offset += sizeof(p->msgIdx);
|
||||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
|
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
|
||||||
offset += sizeof(p->msgLen);
|
offset += sizeof(p->msgLen);
|
||||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
|
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
|
||||||
offset += sizeof(p->rspCode);
|
offset += sizeof(p->rspCode);
|
||||||
memcpy((char*)pRsp + offset, p->msg, p->msgLen);
|
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
|
||||||
offset += p->msgLen;
|
offset += p->msgLen;
|
||||||
|
|
||||||
rpcFreeCont(p->msg);
|
rpcFreeCont(p->msg);
|
||||||
|
|
|
@ -604,7 +604,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
|
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
|
||||||
smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
|
smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndSnode.h"
|
#include "mndSnode.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
|
|
@ -1162,7 +1162,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||||
pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
|
pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
|
||||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
continue;
|
continue;
|
||||||
|
@ -1179,8 +1179,8 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
SNode *pNode = NULL;
|
SNode *pNode = NULL;
|
||||||
FOREACH(pNode, pNodeList) {
|
FOREACH(pNode, pNodeList) {
|
||||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
mInfo("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId,
|
mInfo("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId,
|
||||||
pCol->tableId, pTopic->ctbStbUid);
|
pTopic->ctbStbUid);
|
||||||
|
|
||||||
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
|
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
|
||||||
mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||||
|
@ -1256,8 +1256,8 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
mInfo("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name,
|
mInfo("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbFullName,
|
||||||
stbFullName, suid, colId, pSma->sql);
|
suid, colId, pSma->sql);
|
||||||
|
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
if (nodesStringToNode(pSma->ast, &pAst) != 0) {
|
if (nodesStringToNode(pSma->ast, &pAst) != 0) {
|
||||||
|
|
|
@ -51,9 +51,9 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
||||||
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
|
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
|
||||||
pMgmt->errCode = cbMeta.code;
|
pMgmt->errCode = cbMeta.code;
|
||||||
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
|
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
|
||||||
" role:%s raw:%p",
|
" role:%s raw:%p",
|
||||||
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state),
|
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state),
|
||||||
pRaw);
|
pRaw);
|
||||||
|
|
||||||
if (pMgmt->errCode == 0) {
|
if (pMgmt->errCode == 0) {
|
||||||
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||||
|
|
|
@ -1555,8 +1555,8 @@ static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewD
|
||||||
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
|
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
|
||||||
return -1;
|
return -1;
|
||||||
} else if (inVgroup) {
|
} else if (inVgroup) {
|
||||||
mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
|
mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
|
||||||
pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
|
pDnode->id, pDnode->memAvail, pDnode->memUsed);
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ class MndTestFunc : public ::testing::Test {
|
||||||
|
|
||||||
Testbase MndTestFunc::test;
|
Testbase MndTestFunc::test;
|
||||||
|
|
||||||
void MndTestFunc::SetCode(SCreateFuncReq *pReq, const char *pCode, int32_t size) {
|
void MndTestFunc::SetCode(SCreateFuncReq* pReq, const char* pCode, int32_t size) {
|
||||||
pReq->pCode = (char*)taosMemoryMalloc(size);
|
pReq->pCode = (char*)taosMemoryMalloc(size);
|
||||||
memcpy(pReq->pCode, pCode, size);
|
memcpy(pReq->pCode, pCode, size);
|
||||||
pReq->codeLen = size;
|
pReq->codeLen = size;
|
||||||
|
@ -41,9 +41,7 @@ void MndTestFunc::SetComment(SCreateFuncReq* pReq, const char* pComment) {
|
||||||
strcpy(pReq->pComment, pComment);
|
strcpy(pReq->pComment, pComment);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MndTestFunc::SetBufSize(SCreateFuncReq* pReq, int32_t size) {
|
void MndTestFunc::SetBufSize(SCreateFuncReq* pReq, int32_t size) { pReq->bufSize = size; }
|
||||||
pReq->bufSize = size;
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(MndTestFunc, 01_Show_Func) {
|
TEST_F(MndTestFunc, 01_Show_Func) {
|
||||||
test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", "");
|
test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", "");
|
||||||
|
@ -505,9 +503,8 @@ TEST_F(MndTestFunc, 05_Actual_code) {
|
||||||
EXPECT_EQ(pFuncInfo->signature, 5);
|
EXPECT_EQ(pFuncInfo->signature, 5);
|
||||||
EXPECT_STREQ("comment1", pFuncInfo->pComment);
|
EXPECT_STREQ("comment1", pFuncInfo->pComment);
|
||||||
for (int32_t i = 0; i < 300; ++i) {
|
for (int32_t i = 0; i < 300; ++i) {
|
||||||
EXPECT_EQ(pFuncInfo->pCode[i], (i) % 20);
|
EXPECT_EQ(pFuncInfo->pCode[i], (i) % 20);
|
||||||
}
|
}
|
||||||
tFreeSRetrieveFuncRsp(&retrieveRsp);
|
tFreeSRetrieveFuncRsp(&retrieveRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -32,7 +32,8 @@ class MndTestStb : public ::testing::Test {
|
||||||
void* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, int32_t* pContLen);
|
void* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, int32_t* pContLen);
|
||||||
void* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
void* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
||||||
void* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
void* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
||||||
void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen, int32_t verInBlock);
|
void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen,
|
||||||
|
int32_t verInBlock);
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase MndTestStb::test;
|
Testbase MndTestStb::test;
|
||||||
|
|
|
@ -306,8 +306,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
||||||
pSdb->commitTerm = pSdb->applyTerm;
|
pSdb->commitTerm = pSdb->applyTerm;
|
||||||
pSdb->commitConfig = pSdb->applyConfig;
|
pSdb->commitConfig = pSdb->applyConfig;
|
||||||
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
|
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
|
||||||
mInfo("read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
|
mInfo("read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->commitIndex,
|
||||||
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
|
pSdb->commitTerm, pSdb->commitConfig);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
@ -340,9 +340,9 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
|
||||||
mInfo("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
|
mInfo("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
|
||||||
" term:%" PRId64 " config:%" PRId64 ", file:%s",
|
" term:%" PRId64 " config:%" PRId64 ", file:%s",
|
||||||
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
|
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
|
||||||
curfile);
|
curfile);
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
|
@ -438,7 +438,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
pSdb->commitTerm = pSdb->applyTerm;
|
pSdb->commitTerm = pSdb->applyTerm;
|
||||||
pSdb->commitConfig = pSdb->applyConfig;
|
pSdb->commitConfig = pSdb->applyConfig;
|
||||||
mInfo("write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
mInfo("write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
||||||
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
|
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
@ -556,9 +556,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
|
||||||
if (term != NULL) *term = commitTerm;
|
if (term != NULL) *term = commitTerm;
|
||||||
if (config != NULL) *config = commitConfig;
|
if (config != NULL) *config = commitConfig;
|
||||||
|
|
||||||
mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
|
mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
||||||
" file:%s",
|
pIter, commitIndex, commitTerm, commitConfig, pIter->name);
|
||||||
pIter, commitIndex, commitTerm, commitConfig, pIter->name);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
|
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "trpc.h"
|
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
#include "qnode.h"
|
#include "qnode.h"
|
||||||
|
|
||||||
|
|
|
@ -14,10 +14,10 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
#include "libs/function/function.h"
|
||||||
#include "qndInt.h"
|
#include "qndInt.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
#include "libs/function/function.h"
|
|
||||||
|
|
||||||
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
||||||
|
@ -41,7 +41,7 @@ void qndClose(SQnode *pQnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
|
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
|
||||||
SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
|
SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
|
||||||
SQWorkerStat stat = {0};
|
SQWorkerStat stat = {0};
|
||||||
|
|
||||||
int32_t code = qWorkerGetStat(&handle, pQnode->pQuery, &stat);
|
int32_t code = qWorkerGetStat(&handle, pQnode->pQuery, &stat);
|
||||||
|
@ -64,7 +64,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) {
|
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||||
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
|
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
|
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
|
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
|
||||||
if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
|
if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ctbEntry.commentLen > 0){
|
if (pME->ctbEntry.commentLen > 0) {
|
||||||
if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1;
|
if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
|
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
|
||||||
|
@ -43,7 +43,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
|
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
||||||
if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
|
if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ntbEntry.commentLen > 0){
|
if (pME->ntbEntry.commentLen > 0) {
|
||||||
if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1;
|
if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1;
|
if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1;
|
||||||
|
@ -77,9 +77,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
||||||
if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
|
if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ctbEntry.commentLen > 0){
|
if (pME->ctbEntry.commentLen > 0) {
|
||||||
if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0)
|
if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0) return -1;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
|
||||||
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
|
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
|
||||||
|
@ -87,7 +86,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
||||||
if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
|
if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ntbEntry.commentLen > 0){
|
if (pME->ntbEntry.commentLen > 0) {
|
||||||
if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1;
|
if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
|
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
|
||||||
|
|
|
@ -92,7 +92,8 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t p
|
||||||
days = freqDuration;
|
days = freqDuration;
|
||||||
}
|
}
|
||||||
end:
|
end:
|
||||||
smaInfo("vgId:%d, evaluated duration for level %" PRIi8 " is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration);
|
smaInfo("vgId:%d, evaluated duration for level %" PRIi8 " is %d, raw val:%d", TD_VID(pVnode), level + 1, days,
|
||||||
|
duration);
|
||||||
return days;
|
return days;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -328,4 +328,3 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,28 +17,28 @@
|
||||||
|
|
||||||
// SLDataIter =================================================
|
// SLDataIter =================================================
|
||||||
struct SLDataIter {
|
struct SLDataIter {
|
||||||
SRBTreeNode node;
|
SRBTreeNode node;
|
||||||
SSttBlk *pSttBlk;
|
SSttBlk *pSttBlk;
|
||||||
SDataFReader *pReader;
|
SDataFReader *pReader;
|
||||||
int32_t iStt;
|
int32_t iStt;
|
||||||
int8_t backward;
|
int8_t backward;
|
||||||
int32_t iSttBlk;
|
int32_t iSttBlk;
|
||||||
int32_t iRow;
|
int32_t iRow;
|
||||||
SRowInfo rInfo;
|
SRowInfo rInfo;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
STimeWindow timeWindow;
|
STimeWindow timeWindow;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
SSttBlockLoadInfo* pBlockLoadInfo;
|
SSttBlockLoadInfo *pBlockLoadInfo;
|
||||||
};
|
};
|
||||||
|
|
||||||
SSttBlockLoadInfo* tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols) {
|
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
|
||||||
SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
|
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
|
||||||
if (pLoadInfo == NULL) {
|
if (pLoadInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
||||||
pLoadInfo[i].blockIndex[0] = -1;
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
pLoadInfo[i].blockIndex[1] = -1;
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
|
@ -62,8 +62,8 @@ SSttBlockLoadInfo* tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList,
|
||||||
return pLoadInfo;
|
return pLoadInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
|
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
||||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
pLoadInfo[i].blockIndex[0] = -1;
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
pLoadInfo[i].blockIndex[1] = -1;
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
@ -75,15 +75,15 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void getLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo, int64_t* blocks, double* el) {
|
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
|
||||||
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
||||||
*el += pLoadInfo[i].elapsedTime;
|
*el += pLoadInfo[i].elapsedTime;
|
||||||
*blocks += pLoadInfo[i].loadBlocks;
|
*blocks += pLoadInfo[i].loadBlocks;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
|
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
for (int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
||||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
pLoadInfo[i].blockIndex[0] = -1;
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
pLoadInfo[i].blockIndex[1] = -1;
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
@ -98,11 +98,11 @@ void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
|
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
|
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
|
||||||
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
|
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
|
||||||
return &pInfo->blockData[0];
|
return &pInfo->blockData[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
|
||||||
if (pIter->pSttBlk != NULL) { // current block not loaded yet
|
if (pIter->pSttBlk != NULL) { // current block not loaded yet
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
SBlockData* pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
||||||
|
|
||||||
TABLEID id = {0};
|
TABLEID id = {0};
|
||||||
if (pIter->pSttBlk->suid != 0) {
|
if (pIter->pSttBlk->suid != 0) {
|
||||||
|
@ -126,11 +126,12 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
|
||||||
tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
|
tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
|
||||||
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
|
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st)/ 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
pInfo->elapsedTime += el;
|
pInfo->elapsedTime += el;
|
||||||
pInfo->loadBlocks += 1;
|
pInfo->loadBlocks += 1;
|
||||||
|
|
||||||
tsdbDebug("read last block, index:%d, last file index:%d, elapsed time:%.2f ms, %s", pIter->iSttBlk, pIter->iStt, el, idStr);
|
tsdbDebug("read last block, index:%d, last file index:%d, elapsed time:%.2f ms, %s", pIter->iSttBlk, pIter->iStt,
|
||||||
|
el, idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -141,7 +142,7 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
|
||||||
|
|
||||||
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
}
|
}
|
||||||
|
@ -150,16 +151,17 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// find the earliest block that contains the required records
|
// find the earliest block that contains the required records
|
||||||
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk* pBlockList, int32_t num, int32_t backward) {
|
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
|
||||||
|
int32_t backward) {
|
||||||
int32_t i = index;
|
int32_t i = index;
|
||||||
int32_t step = backward? 1:-1;
|
int32_t step = backward ? 1 : -1;
|
||||||
while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
|
while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
|
||||||
i += step;
|
i += step;
|
||||||
}
|
}
|
||||||
return i - step;
|
return i - step;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t binarySearchForStartBlock(SSttBlk*pBlockList, int32_t num, uint64_t uid, int32_t backward) {
|
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
|
||||||
int32_t midPos = -1;
|
int32_t midPos = -1;
|
||||||
if (num <= 0) {
|
if (num <= 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -195,16 +197,17 @@ static int32_t binarySearchForStartBlock(SSttBlk*pBlockList, int32_t num, uint64
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t* uidList, int32_t num, int32_t backward) {
|
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
|
||||||
|
int32_t backward) {
|
||||||
int32_t i = index;
|
int32_t i = index;
|
||||||
int32_t step = backward? 1:-1;
|
int32_t step = backward ? 1 : -1;
|
||||||
while (i >= 0 && i < num && uid == uidList[i]) {
|
while (i >= 0 && i < num && uid == uidList[i]) {
|
||||||
i += step;
|
i += step;
|
||||||
}
|
}
|
||||||
return i - step;
|
return i - step;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint64_t uid, int32_t backward) {
|
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
|
||||||
int32_t firstPos = 0;
|
int32_t firstPos = 0;
|
||||||
int32_t lastPos = num - 1;
|
int32_t lastPos = num - 1;
|
||||||
|
|
||||||
|
@ -236,8 +239,8 @@ static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||||
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo,
|
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
const char* idStr) {
|
const char *idStr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
if (*pIter == NULL) {
|
if (*pIter == NULL) {
|
||||||
|
@ -277,7 +280,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
} else if (pStart->suid != suid) {
|
} else if (pStart->suid != suid) {
|
||||||
// no qualified stt block existed
|
// no qualified stt block existed
|
||||||
(*pIter)->iSttBlk = -1;
|
(*pIter)->iSttBlk = -1;
|
||||||
double el = (taosGetTimestampUs() - st)/1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
|
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -302,7 +305,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st)/1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
|
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,9 +322,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tLDataIterClose(SLDataIter *pIter) {
|
void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
|
||||||
taosMemoryFree(pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
void tLDataIterNextBlock(SLDataIter *pIter) {
|
void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
|
@ -374,17 +375,18 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void findNextValidRow(SLDataIter *pIter, const char* idStr) {
|
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
|
|
||||||
bool hasVal = false;
|
bool hasVal = false;
|
||||||
int32_t i = pIter->iRow;
|
int32_t i = pIter->iRow;
|
||||||
|
|
||||||
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
|
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
|
||||||
|
|
||||||
// mostly we only need to find the start position for a given table
|
// mostly we only need to find the start position for a given table
|
||||||
if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && pBlockData->aUid != NULL) {
|
if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) &&
|
||||||
i = binarySearchForStartRowIndex((uint64_t*)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
|
pBlockData->aUid != NULL) {
|
||||||
|
i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
|
||||||
if (i == -1) {
|
if (i == -1) {
|
||||||
pIter->iRow = -1;
|
pIter->iRow = -1;
|
||||||
return;
|
return;
|
||||||
|
@ -396,13 +398,15 @@ static void findNextValidRow(SLDataIter *pIter, const char* idStr) {
|
||||||
if (!pIter->backward) {
|
if (!pIter->backward) {
|
||||||
/*if (pBlockData->aUid[i] < pIter->uid) {
|
/*if (pBlockData->aUid[i] < pIter->uid) {
|
||||||
continue;
|
continue;
|
||||||
} else */if (pBlockData->aUid[i] > pIter->uid) {
|
} else */
|
||||||
|
if (pBlockData->aUid[i] > pIter->uid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/*if (pBlockData->aUid[i] > pIter->uid) {
|
/*if (pBlockData->aUid[i] > pIter->uid) {
|
||||||
continue;
|
continue;
|
||||||
} else */if (pBlockData->aUid[i] < pIter->uid) {
|
} else */
|
||||||
|
if (pBlockData->aUid[i] < pIter->uid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -440,7 +444,7 @@ static void findNextValidRow(SLDataIter *pIter, const char* idStr) {
|
||||||
pIter->iRow = (hasVal) ? i : -1;
|
pIter->iRow = (hasVal) ? i : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tLDataIterNextRow(SLDataIter *pIter, const char* idStr) {
|
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
|
|
||||||
|
@ -449,7 +453,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char* idStr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t iBlockL = pIter->iSttBlk;
|
int32_t iBlockL = pIter->iSttBlk;
|
||||||
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
|
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
|
||||||
pIter->iRow += step;
|
pIter->iRow += step;
|
||||||
|
|
||||||
|
@ -527,8 +531,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
pMTree->destroyLoadInfo = destroyLoadInfo;
|
pMTree->destroyLoadInfo = destroyLoadInfo;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
struct SLDataIter* pIter = NULL;
|
struct SLDataIter *pIter = NULL;
|
||||||
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pMTree->pLoadInfo[i], pMTree->idStr);
|
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
|
||||||
|
&pMTree->pLoadInfo[i], pMTree->idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
|
@ -799,4 +799,3 @@ bool vnodeIsLeader(SVnode *pVnode) {
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
#include <tglobal.h>
|
#include <tglobal.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
#include <vnodeInt.h>
|
|
||||||
#include <tmsg.h>
|
#include <tmsg.h>
|
||||||
|
#include <vnodeInt.h>
|
||||||
|
|
||||||
#pragma GCC diagnostic push
|
#pragma GCC diagnostic push
|
||||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
@ -424,7 +424,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_VARCHAR, TSDB_DATA_TYPE_NCHAR};
|
TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_VARCHAR, TSDB_DATA_TYPE_NCHAR};
|
||||||
// last 2 columns for group by tags
|
// last 2 columns for group by tags
|
||||||
// int32_t tSmaTypeArray[tSmaNumOfCols] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL};
|
// int32_t tSmaTypeArray[tSmaNumOfCols] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL};
|
||||||
const char *tSmaGroupbyTags[tSmaGroupSize * tSmaNumOfTags] = {"BeiJing", "HaiDian", "BeiJing", "ChaoYang",
|
const char *tSmaGroupbyTags[tSmaGroupSize * tSmaNumOfTags] = {"BeiJing", "HaiDian", "BeiJing", "ChaoYang",
|
||||||
"ShangHai", "PuDong", "ShangHai", "MinHang"};
|
"ShangHai", "PuDong", "ShangHai", "MinHang"};
|
||||||
TSKEY tSmaSKeyMs = (int64_t)1648535332 * 1000;
|
TSKEY tSmaSKeyMs = (int64_t)1648535332 * 1000;
|
||||||
int64_t tSmaIntervalMs = tSma.interval * 60 * 1000;
|
int64_t tSmaIntervalMs = tSma.interval * 60 * 1000;
|
||||||
|
@ -441,7 +441,6 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
pDataBlock->pDataBlock = taosArrayInit(tSmaNumOfCols, sizeof(SColumnInfoData *));
|
pDataBlock->pDataBlock = taosArrayInit(tSmaNumOfCols, sizeof(SColumnInfoData *));
|
||||||
EXPECT_NE(pDataBlock->pDataBlock, nullptr);
|
EXPECT_NE(pDataBlock->pDataBlock, nullptr);
|
||||||
for (int32_t c = 0; c < tSmaNumOfCols; ++c) {
|
for (int32_t c = 0; c < tSmaNumOfCols; ++c) {
|
||||||
|
|
||||||
SColumnInfoData *pColInfoData = (SColumnInfoData *)taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
SColumnInfoData *pColInfoData = (SColumnInfoData *)taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||||
EXPECT_NE(pColInfoData, nullptr);
|
EXPECT_NE(pColInfoData, nullptr);
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,10 @@ FORMAT_DIR_LIST=(
|
||||||
"include"
|
"include"
|
||||||
"source/os"
|
"source/os"
|
||||||
"source/util"
|
"source/util"
|
||||||
|
"source/common"
|
||||||
|
# "source/libs"
|
||||||
|
# "source/client"
|
||||||
|
"source/dnode"
|
||||||
)
|
)
|
||||||
|
|
||||||
for d in ${FORMAT_DIR_LIST[@]}; do
|
for d in ${FORMAT_DIR_LIST[@]}; do
|
||||||
|
|
Loading…
Reference in New Issue