Merge branch '3.0' into feature/trans_impl
This commit is contained in:
commit
07d5c9232d
|
@ -5,6 +5,7 @@ AccessModifierOffset: -1
|
|||
AlignAfterOpenBracket: Align
|
||||
AlignConsecutiveAssignments: false
|
||||
AlignConsecutiveDeclarations: true
|
||||
AlignConsecutiveMacros: true
|
||||
AlignEscapedNewlinesLeft: true
|
||||
AlignOperands: true
|
||||
AlignTrailingComments: true
|
||||
|
@ -86,6 +87,5 @@ SpacesInSquareBrackets: false
|
|||
Standard: Auto
|
||||
TabWidth: 8
|
||||
UseTab: Never
|
||||
AlignConsecutiveDeclarations: true
|
||||
...
|
||||
|
||||
|
|
|
@ -1,18 +1,19 @@
|
|||
[submodule "src/connector/go"]
|
||||
path = src/connector/go
|
||||
url = git@github.com:taosdata/driver-go.git
|
||||
[submodule "src/connector/grafanaplugin"]
|
||||
path = src/connector/grafanaplugin
|
||||
url = git@github.com:taosdata/grafanaplugin.git
|
||||
[submodule "src/connector/hivemq-tdengine-extension"]
|
||||
path = src/connector/hivemq-tdengine-extension
|
||||
url = git@github.com:taosdata/hivemq-tdengine-extension.git
|
||||
[submodule "tests/examples/rust"]
|
||||
path = tests/examples/rust
|
||||
url = https://github.com/songtianyi/tdengine-rust-bindings.git
|
||||
[submodule "deps/jemalloc"]
|
||||
path = deps/jemalloc
|
||||
url = https://github.com/jemalloc/jemalloc
|
||||
[submodule "deps/TSZ"]
|
||||
path = deps/TSZ
|
||||
url = https://github.com/taosdata/TSZ.git
|
||||
[submodule "tests"]
|
||||
path = tests
|
||||
url = https://github.com/taosdata/tests
|
||||
branch = 3.0
|
||||
[submodule "examples/rust"]
|
||||
path = examples/rust
|
||||
url = https://github.com/songtianyi/tdengine-rust-bindings.git
|
||||
|
|
|
@ -113,8 +113,9 @@ typedef struct SParsedDataColInfo {
|
|||
int16_t numOfCols;
|
||||
int16_t numOfBound;
|
||||
uint16_t flen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
|
||||
uint16_t extendedVarLen;
|
||||
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
|
||||
int32_t * boundedColumns; // bound column idx according to schema
|
||||
SBoundColumn * cols;
|
||||
SBoundIdxInfo *colIdxInfo;
|
||||
|
@ -123,26 +124,14 @@ typedef struct SParsedDataColInfo {
|
|||
|
||||
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
|
||||
|
||||
typedef struct {
|
||||
int32_t dataLen; // len of SDataRow
|
||||
int32_t kvLen; // len of SKVRow
|
||||
} SMemRowInfo;
|
||||
typedef struct {
|
||||
uint8_t memRowType; // default is 0, that is SDataRow
|
||||
uint8_t compareStat; // 0 no need, 1 need compare
|
||||
TDRowTLenT kvRowInitLen;
|
||||
SMemRowInfo *rowInfo;
|
||||
int32_t rowSize;
|
||||
} SMemRowBuilder;
|
||||
|
||||
typedef enum {
|
||||
ROW_COMPARE_NO_NEED = 0,
|
||||
ROW_COMPARE_NEED = 1,
|
||||
} ERowCompareStat;
|
||||
|
||||
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec);
|
||||
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
|
||||
int32_t allNullLen);
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo);
|
||||
void destroyMemRowBuilder(SMemRowBuilder *pBuilder);
|
||||
|
||||
/**
|
||||
|
@ -175,38 +164,6 @@ static FORCE_INLINE void tscGetMemRowAppendInfo(SSchema *pSchema, uint8_t memRow
|
|||
*colId = pSchema[schemaIdx].colId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Applicable to consume by multi-columns
|
||||
*
|
||||
* @param row
|
||||
* @param value
|
||||
* @param isCopyVarData In some scenario, the varVal is copied to row directly before calling tdAppend***ColVal()
|
||||
* @param colId
|
||||
* @param colType
|
||||
* @param idx index in SSchema
|
||||
* @param pBuilder
|
||||
* @param spd
|
||||
* @return FORCE_INLINE
|
||||
*/
|
||||
static FORCE_INLINE void tscAppendMemRowColVal(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||
int8_t colType, int32_t toffset, SMemRowBuilder *pBuilder,
|
||||
int32_t rowNum) {
|
||||
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
|
||||
SMemRowInfo *pRowInfo = pBuilder->rowInfo + rowNum;
|
||||
tdGetColAppendDeltaLen(value, colType, &pRowInfo->dataLen, &pRowInfo->kvLen);
|
||||
}
|
||||
}
|
||||
|
||||
// Applicable to consume by one row
|
||||
static FORCE_INLINE void tscAppendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
|
||||
uint8_t compareStat) {
|
||||
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
|
||||
if (compareStat == ROW_COMPARE_NEED) {
|
||||
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
|
||||
}
|
||||
}
|
||||
typedef struct STableDataBlocks {
|
||||
SName tableName;
|
||||
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||
|
@ -513,16 +470,6 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
|
|||
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
|
||||
if (isDataRow(row)) {
|
||||
if (kvLen < (dataLen * KVRatioConvert)) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
} else if (kvLen > dataLen) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
|
||||
memRowSetType(row, memRowType);
|
||||
if (isDataRowT(memRowType)) {
|
||||
|
@ -622,8 +569,7 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
|
|||
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
|
||||
|
||||
static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str,
|
||||
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId,
|
||||
int32_t *dataLen, int32_t *kvLen, uint8_t compareStat) {
|
||||
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId) {
|
||||
int64_t iv;
|
||||
int32_t ret;
|
||||
char * endptr = NULL;
|
||||
|
@ -635,26 +581,22 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
switch (pSchema->type) {
|
||||
case TSDB_DATA_TYPE_BOOL: { // bool
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
|
||||
if (strncmp(pToken->z, "true", pToken->n) == 0) {
|
||||
tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &TRUE_VALUE, true, colId, pSchema->type, toffset);
|
||||
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
|
||||
tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, &FALSE_VALUE, true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
} else if (pToken->type == TK_INTEGER) {
|
||||
iv = strtoll(pToken->z, NULL, 10);
|
||||
tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
|
||||
dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
|
||||
} else if (pToken->type == TK_FLOAT) {
|
||||
double dv = strtod(pToken->z, NULL);
|
||||
tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
|
||||
dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
|
@ -664,8 +606,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -675,15 +616,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -693,15 +633,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -711,15 +650,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
int16_t tmpVal = (int16_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -729,15 +667,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint16_t tmpVal = (uint16_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -747,15 +684,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
int32_t tmpVal = (int32_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -765,15 +701,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint32_t tmpVal = (uint32_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -782,14 +717,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &iv, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -799,14 +733,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint64_t tmpVal = (uint64_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
|
@ -819,14 +752,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
float tmpVal = (float)dv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
|
@ -837,15 +769,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &dv, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
|
||||
if (pToken->type == TK_NULL) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else { // too long values will return invalid sql, not be truncated automatically
|
||||
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
|
||||
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
|
||||
|
@ -853,14 +784,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
|
||||
char *rowEnd = memRowEnd(row);
|
||||
STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n);
|
||||
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (pToken->type == TK_NULL) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
int32_t output = 0;
|
||||
|
@ -872,7 +802,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
return tscInvalidOperationMsg(msg, buf, pToken->z);
|
||||
}
|
||||
varDataSetLen(rowEnd, output);
|
||||
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -881,17 +811,16 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
if (primaryKey) {
|
||||
// When building SKVRow primaryKey, we should not skip even with NULL value.
|
||||
int64_t tmpVal = 0;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
}
|
||||
} else {
|
||||
int64_t tmpVal;
|
||||
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
|
||||
}
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
|
@ -41,52 +41,16 @@ enum {
|
|||
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows);
|
||||
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
|
||||
char *str, char **end);
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
|
||||
int32_t allNullLen) {
|
||||
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
|
||||
if (nRows > 0) {
|
||||
// already init(bind multiple rows by single column)
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) {
|
||||
ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
|
||||
|
||||
// default compareStat is ROW_COMPARE_NO_NEED
|
||||
if (nBoundCols == 0) { // file input
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
float boundRatio = ((float)nBoundCols / (float)nCols);
|
||||
|
||||
if (boundRatio < KVRatioKV) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (boundRatio > KVRatioData) {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pBuilder->compareStat = ROW_COMPARE_NEED;
|
||||
|
||||
if (boundRatio < KVRatioPredict) {
|
||||
uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen;
|
||||
uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen;
|
||||
if (isUtilizeKVRow(kvLen, dataLen)) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
} else {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
}
|
||||
}
|
||||
|
||||
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
|
||||
|
||||
if (nRows > 0) {
|
||||
pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo));
|
||||
if (pBuilder->rowInfo == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nRows; ++i) {
|
||||
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
|
||||
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -457,8 +421,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
|
||||
SSchema * schema = tscGetTableSchema(pTableMeta);
|
||||
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
|
||||
int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
|
||||
int32_t kvLen = pBuilder->kvRowInitLen;
|
||||
bool isParseBindParam = false;
|
||||
|
||||
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
|
||||
|
@ -535,8 +497,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
int16_t colId = -1;
|
||||
tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
|
||||
|
||||
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset,
|
||||
colId, &dataLen, &kvLen, pBuilder->compareStat);
|
||||
int32_t ret =
|
||||
tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
@ -551,13 +513,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
}
|
||||
|
||||
if (!isParseBindParam) {
|
||||
// 2. check and set convert flag
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
|
||||
checkAndConvertMemRow(row, dataLen, kvLen);
|
||||
}
|
||||
|
||||
// 3. set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) {
|
||||
// set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) {
|
||||
SDataRow dataRow = memRowDataBody(row);
|
||||
for (int32_t i = 0; i < spd->numOfCols; ++i) {
|
||||
if (spd->cols[i].valStat == VAL_STAT_NONE) {
|
||||
|
@ -567,7 +524,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
}
|
||||
}
|
||||
|
||||
*len = getExtendedRowSize(pDataBlocks);
|
||||
*len = pBuilder->rowSize;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -620,11 +577,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
|||
|
||||
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
|
||||
|
||||
if (TSDB_CODE_SUCCESS !=
|
||||
(code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound,
|
||||
pDataBlock->boundColumnInfo.allNullLen))) {
|
||||
if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) {
|
||||
return code;
|
||||
}
|
||||
pDataBlock->rowBuilder.rowSize = extendedRowSize;
|
||||
while (1) {
|
||||
index = 0;
|
||||
sToken = tStrGetToken(*str, &index, false);
|
||||
|
@ -703,6 +659,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
|
|||
pColInfo->boundedColumns[i] = i;
|
||||
}
|
||||
pColInfo->allNullLen += pColInfo->flen;
|
||||
pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen
|
||||
pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
|
||||
}
|
||||
|
||||
|
@ -1200,6 +1157,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
int32_t nCols = pColInfo->numOfCols;
|
||||
|
||||
pColInfo->numOfBound = 0;
|
||||
pColInfo->boundNullLen = 0;
|
||||
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols);
|
||||
for (int32_t i = 0; i < nCols; ++i) {
|
||||
pColInfo->cols[i].valStat = VAL_STAT_NONE;
|
||||
|
@ -1249,6 +1207,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
pColInfo->cols[t].valStat = VAL_STAT_HAS;
|
||||
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
|
||||
++pColInfo->numOfBound;
|
||||
switch (pSchema[t].type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
break;
|
||||
default:
|
||||
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||
break;
|
||||
}
|
||||
findColumnIndex = true;
|
||||
if (isOrdered && (lastColIdx > t)) {
|
||||
isOrdered = false;
|
||||
|
@ -1272,6 +1241,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
pColInfo->cols[t].valStat = VAL_STAT_HAS;
|
||||
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
|
||||
++pColInfo->numOfBound;
|
||||
switch (pSchema[t].type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
break;
|
||||
default:
|
||||
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||
break;
|
||||
}
|
||||
findColumnIndex = true;
|
||||
if (isOrdered && (lastColIdx > t)) {
|
||||
isOrdered = false;
|
||||
|
@ -1715,13 +1695,18 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
|
|||
goto _error;
|
||||
}
|
||||
|
||||
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows);
|
||||
int32_t extendedRowSize = getExtendedRowSize(pTableDataBlock);
|
||||
tscAllocateMemIfNeed(pTableDataBlock, extendedRowSize, &maxRows);
|
||||
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
|
||||
if (tokenBuf == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
// insert from .csv means full and ordered columns, thus use SDataRow all the time
|
||||
ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType);
|
||||
pTableDataBlock->rowBuilder.rowSize = extendedRowSize;
|
||||
|
||||
while ((readLen = tgetline(&line, &n, fp)) != -1) {
|
||||
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
|
||||
line[--readLen] = 0;
|
||||
|
|
|
@ -1882,19 +1882,12 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
|
|||
} else {
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
if (isNeedConvertRow(payload)) {
|
||||
convertSMemRow(pDataBlock, payload, pTableDataBlock);
|
||||
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
} else {
|
||||
TDRowTLenT rowTLen = memRowTLen(payload);
|
||||
TDRowLenT rowTLen = memRowTLen(payload);
|
||||
memcpy(pDataBlock, payload, rowTLen);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t len = pBlock->dataLen + pBlock->schemaLen;
|
||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
|
|
|
@ -582,9 +582,9 @@ typedef struct SOrderOperatorInfo {
|
|||
|
||||
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
|
||||
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
|
||||
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
|
||||
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
|
||||
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
|
@ -622,7 +622,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
|||
|
||||
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
||||
|
||||
void* destroyOutputBuf(SSDataBlock* pBlock);
|
||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
||||
|
||||
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||
|
|
|
@ -336,7 +336,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
|
|||
return res;
|
||||
}
|
||||
|
||||
void* destroyOutputBuf(SSDataBlock* pBlock) {
|
||||
void* blockDataDestroy(SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -4835,11 +4835,11 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
|
|||
break;
|
||||
}
|
||||
case OP_TableSeqScan: {
|
||||
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
|
||||
pRuntimeEnv->proot = createTableSeqScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
|
||||
break;
|
||||
}
|
||||
case OP_DataBlocksOptScan: {
|
||||
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
|
||||
pRuntimeEnv->proot = createTableScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
|
||||
break;
|
||||
}
|
||||
case OP_TableScan: {
|
||||
|
@ -5162,7 +5162,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
|
||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||
|
||||
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
|
||||
|
@ -5267,7 +5267,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
|
|||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
|
||||
assert(repeatTime > 0);
|
||||
|
||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||
|
@ -5278,7 +5278,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
|
|||
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||
|
||||
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
|
||||
pOptr->name = "DataBlocksOptimizedScanOperator";
|
||||
pOptr->name = "TableScanOperator";
|
||||
pOptr->operatorType = OP_DataBlocksOptScan;
|
||||
pOptr->pRuntimeEnv = pRuntimeEnv;
|
||||
pOptr->blockingOptr = false;
|
||||
|
@ -5373,7 +5373,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
|
||||
taosArrayDestroy(pInfo->orderColumnList);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
tfree(pInfo->prevRow);
|
||||
}
|
||||
|
||||
|
@ -6566,7 +6566,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
|
|||
tfree(pInfo->rowCellInfoOffset);
|
||||
|
||||
cleanupResultRowInfo(&pInfo->resultRowInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -6590,7 +6590,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
|
||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
tfree(pInfo->p);
|
||||
}
|
||||
|
||||
|
@ -6607,12 +6607,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
STagScanInfo* pInfo = (STagScanInfo*) param;
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
|
||||
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
|
||||
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
|
||||
}
|
||||
|
||||
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -6625,7 +6625,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosHashCleanup(pInfo->pSet);
|
||||
tfree(pInfo->buf);
|
||||
taosArrayDestroy(pInfo->pDistinctDataInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
}
|
||||
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
namespace {
|
||||
// simple test
|
||||
void simpleTest() {
|
||||
SDiskbasedResultBuf* pResultBuf = NULL;
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1);
|
||||
|
||||
int32_t pageId = 0;
|
||||
|
@ -22,40 +22,40 @@ void simpleTest() {
|
|||
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
|
||||
ASSERT_EQ(getResBufSize(pResultBuf), 1024);
|
||||
ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
|
||||
|
||||
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
|
||||
ASSERT_EQ(taosArrayGetSize(list), 1);
|
||||
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
|
||||
|
||||
releaseResBufPage(pResultBuf, pBufPage);
|
||||
releaseBufPage(pResultBuf, pBufPage);
|
||||
|
||||
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
|
||||
tFilePage* t = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t == pBufPage1);
|
||||
|
||||
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t1 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t1 == pBufPage2);
|
||||
|
||||
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t2 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t2 == pBufPage3);
|
||||
|
||||
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t3 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t3 == pBufPage4);
|
||||
|
||||
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t4 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t4 == pBufPage5);
|
||||
|
||||
destroyResultBuf(pResultBuf);
|
||||
}
|
||||
|
||||
void writeDownTest() {
|
||||
SDiskbasedResultBuf* pResultBuf = NULL;
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
|
||||
|
||||
int32_t pageId = 0;
|
||||
|
@ -68,31 +68,31 @@ void writeDownTest() {
|
|||
|
||||
*(int32_t*)(pBufPage->data) = nx;
|
||||
writePageId = pageId;
|
||||
releaseResBufPage(pResultBuf, pBufPage);
|
||||
releaseBufPage(pResultBuf, pBufPage);
|
||||
|
||||
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t1 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t1 == pBufPage1);
|
||||
ASSERT_TRUE(pageId == 1);
|
||||
|
||||
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t2 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t2 == pBufPage2);
|
||||
ASSERT_TRUE(pageId == 2);
|
||||
|
||||
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t3 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t3 == pBufPage3);
|
||||
ASSERT_TRUE(pageId == 3);
|
||||
|
||||
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t4 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t4 == pBufPage4);
|
||||
ASSERT_TRUE(pageId == 4);
|
||||
releaseResBufPage(pResultBuf, t4);
|
||||
releaseBufPage(pResultBuf, t4);
|
||||
|
||||
// flush the written page to disk, and read it out again
|
||||
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
|
||||
tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
|
||||
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
|
||||
|
||||
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
|
||||
|
@ -102,7 +102,7 @@ void writeDownTest() {
|
|||
}
|
||||
|
||||
void recyclePageTest() {
|
||||
SDiskbasedResultBuf* pResultBuf = NULL;
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
|
||||
|
||||
int32_t pageId = 0;
|
||||
|
@ -112,41 +112,41 @@ void recyclePageTest() {
|
|||
|
||||
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
releaseResBufPage(pResultBuf, pBufPage);
|
||||
releaseBufPage(pResultBuf, pBufPage);
|
||||
|
||||
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t1 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t1 == pBufPage1);
|
||||
ASSERT_TRUE(pageId == 1);
|
||||
|
||||
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t2 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t2 == pBufPage2);
|
||||
ASSERT_TRUE(pageId == 2);
|
||||
|
||||
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t3 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t3 == pBufPage3);
|
||||
ASSERT_TRUE(pageId == 3);
|
||||
|
||||
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t4 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t4 == pBufPage4);
|
||||
ASSERT_TRUE(pageId == 4);
|
||||
releaseResBufPage(pResultBuf, t4);
|
||||
releaseBufPage(pResultBuf, t4);
|
||||
|
||||
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
|
||||
tFilePage* t5 = getResBufPage(pResultBuf, pageId);
|
||||
tFilePage* t5 = getBufPage(pResultBuf, pageId);
|
||||
ASSERT_TRUE(t5 == pBufPage5);
|
||||
ASSERT_TRUE(pageId == 5);
|
||||
|
||||
// flush the written page to disk, and read it out again
|
||||
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
|
||||
tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
|
||||
*(int32_t*)(pBufPagex->data) = nx;
|
||||
writePageId = pageId; // update the data
|
||||
releaseResBufPage(pResultBuf, pBufPagex);
|
||||
releaseBufPage(pResultBuf, pBufPagex);
|
||||
|
||||
tFilePage* pBufPagex1 = getResBufPage(pResultBuf, 1);
|
||||
tFilePage* pBufPagex1 = getBufPage(pResultBuf, 1);
|
||||
|
||||
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
|
||||
ASSERT_EQ(taosArrayGetSize(pa), 6);
|
||||
|
|
|
@ -74,6 +74,38 @@ def pre_test(){
|
|||
git pull >/dev/null
|
||||
git fetch origin +refs/pull/${CHANGE_ID}/merge
|
||||
git checkout -qf FETCH_HEAD
|
||||
git submodule update --init --recursive --remote
|
||||
'''
|
||||
script {
|
||||
if (env.CHANGE_TARGET == 'master') {
|
||||
sh '''
|
||||
cd ${WKCT}
|
||||
git checkout master
|
||||
'''
|
||||
}
|
||||
else if(env.CHANGE_TARGET == '2.0'){
|
||||
sh '''
|
||||
cd ${WKCT}
|
||||
git checkout 2.0
|
||||
'''
|
||||
}
|
||||
else if(env.CHANGE_TARGET == '3.0'){
|
||||
sh '''
|
||||
cd ${WKCT}
|
||||
git checkout 3.0
|
||||
'''
|
||||
}
|
||||
else{
|
||||
sh '''
|
||||
cd ${WKCT}
|
||||
git checkout develop
|
||||
'''
|
||||
}
|
||||
}
|
||||
sh'''
|
||||
cd ${WKCT}
|
||||
git pull >/dev/null
|
||||
cd ${WKC}
|
||||
export TZ=Asia/Harbin
|
||||
date
|
||||
rm -rf debug
|
||||
|
@ -81,7 +113,6 @@ def pre_test(){
|
|||
cd debug
|
||||
cmake .. > /dev/null
|
||||
make -j4> /dev/null
|
||||
|
||||
'''
|
||||
return 1
|
||||
}
|
||||
|
@ -92,6 +123,7 @@ pipeline {
|
|||
environment{
|
||||
WK = '/var/lib/jenkins/workspace/TDinternal'
|
||||
WKC= '/var/lib/jenkins/workspace/TDengine'
|
||||
WKCT= '/var/lib/jenkins/workspace/TDengine/tests'
|
||||
}
|
||||
stages {
|
||||
stage('pre_build'){
|
|
@ -13,16 +13,14 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
static void msg_process(tmq_message_t* message) {
|
||||
tmqShowMsg(message);
|
||||
}
|
||||
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
|
||||
|
||||
int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
@ -30,7 +28,7 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -51,28 +49,27 @@ int32_t init_env() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
||||
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));
|
||||
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tu2 using st1 tags(2)");
|
||||
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
|
||||
const char* sql = "select * from st1";
|
||||
const char* sql = "select * from tu1";
|
||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
||||
/*if (taos_errno(pRes) != 0) {*/
|
||||
/*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
/*taos_free_result(pRes);*/
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
@ -91,11 +88,6 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "group.id", "tg2");
|
||||
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
|
||||
return tmq;
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "test_stb_topic_1");
|
||||
tmq_subscribe(tmq, topic_list);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
|
@ -104,8 +96,7 @@ tmq_list_t* build_topic_list() {
|
|||
return topic_list;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t *tmq,
|
||||
tmq_list_t *topics) {
|
||||
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
|
@ -113,20 +104,20 @@ void basic_consume_loop(tmq_t *tmq,
|
|||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
int32_t cnt = 0;
|
||||
clock_t startTime = clock();
|
||||
/*int32_t cnt = 0;*/
|
||||
/*clock_t startTime = clock();*/
|
||||
while (running) {
|
||||
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmqmessage) {
|
||||
cnt++;
|
||||
/*msg_process(tmqmessage);*/
|
||||
/*cnt++;*/
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
/*} else {*/
|
||||
/*break;*/
|
||||
}
|
||||
}
|
||||
clock_t endTime = clock();
|
||||
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
|
||||
/*clock_t endTime = clock();*/
|
||||
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
|
@ -135,8 +126,7 @@ void basic_consume_loop(tmq_t *tmq,
|
|||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
void sync_consume_loop(tmq_t *tmq,
|
||||
tmq_list_t *topics) {
|
||||
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
static const int MIN_COMMIT_COUNT = 1000;
|
||||
|
||||
int msg_count = 0;
|
||||
|
@ -153,8 +143,7 @@ void sync_consume_loop(tmq_t *tmq,
|
|||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
|
||||
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
|
||||
tmq_commit(tmq, NULL, 0);
|
||||
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,11 +154,48 @@ void sync_consume_loop(tmq_t *tmq,
|
|||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
int main() {
|
||||
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
int32_t batchCnt = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
clock_t startTime = clock();
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmqmessage) {
|
||||
batchCnt++;
|
||||
skipLogNum += tmqGetSkipLogNum(tmqmessage);
|
||||
/*msg_process(tmqmessage);*/
|
||||
tmq_message_destroy(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
clock_t endTime = clock();
|
||||
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
|
||||
(double)(endTime - startTime) / CLOCKS_PER_SEC);
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
else
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int code;
|
||||
if (argc > 1) {
|
||||
printf("env init\n");
|
||||
code = init_env();
|
||||
}
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
/*perf_loop(tmq, topic_list);*/
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
/*sync_consume_loop(tmq, topic_list);*/
|
||||
}
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue