Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode_refact1
This commit is contained in:
commit
2bb03ed330
|
@ -269,6 +269,8 @@ typedef struct SSchema {
|
|||
#define SSCHMEA_BYTES(s) ((s)->bytes)
|
||||
#define SSCHMEA_NAME(s) ((s)->name)
|
||||
|
||||
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
|
@ -658,6 +660,7 @@ typedef struct {
|
|||
int8_t outputType;
|
||||
int32_t outputLen;
|
||||
int32_t bufSize;
|
||||
int32_t codeLen;
|
||||
int64_t signature;
|
||||
char* pComment;
|
||||
char* pCode;
|
||||
|
|
|
@ -218,7 +218,7 @@ static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return
|
|||
|
||||
#define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow?
|
||||
|
||||
void tdMergeBitmap(uint8_t *srcBitmap, int32_t srcLen, uint8_t *dstBitmap);
|
||||
void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap);
|
||||
static FORCE_INLINE void tdRowCopy(void *dst, STSRow *row) { memcpy(dst, row, TD_ROW_LEN(row)); }
|
||||
static FORCE_INLINE int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType);
|
||||
static FORCE_INLINE int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx, TDRowValT valType);
|
||||
|
@ -307,8 +307,8 @@ static FORCE_INLINE int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx,
|
|||
// use literal value directly and not use formula to simplify the codes
|
||||
switch (nOffset) {
|
||||
case 0:
|
||||
*pDestByte = ((*pDestByte) & 0x3F) | (valType << 6);
|
||||
// set the value and clear other partitions for offset 0
|
||||
*pDestByte = (valType << 6);
|
||||
// *pDestByte |= (valType << 6);
|
||||
break;
|
||||
case 1:
|
||||
|
@ -417,8 +417,8 @@ static FORCE_INLINE int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, T
|
|||
// use literal value directly and not use formula to simplify the codes
|
||||
switch (nOffset) {
|
||||
case 0:
|
||||
*pDestByte = ((*pDestByte) & 0x7F) | (valType << 7);
|
||||
// set the value and clear other partitions for offset 0
|
||||
*pDestByte = (valType << 7);
|
||||
// *pDestByte |= (valType << 7);
|
||||
break;
|
||||
case 1:
|
||||
|
@ -1107,11 +1107,11 @@ static FORCE_INLINE bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col
|
|||
if (TD_IS_TP_ROW(pIter->pRow)) {
|
||||
STColumn *pCol = NULL;
|
||||
STSchema *pSchema = pIter->pSchema;
|
||||
while (pIter->colIdx <= pSchema->numOfCols) {
|
||||
while (pIter->colIdx < pSchema->numOfCols) {
|
||||
pCol = &pSchema->columns[pIter->colIdx]; // 1st column of schema is primary TS key
|
||||
if (colId == pCol->colId) {
|
||||
break;
|
||||
} else if (colId < pCol->colId) {
|
||||
} else if (pCol->colId < colId) {
|
||||
++pIter->colIdx;
|
||||
continue;
|
||||
} else {
|
||||
|
@ -1237,6 +1237,101 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows,
|
|||
return result;
|
||||
}
|
||||
|
||||
static void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
|
||||
if (tdValTypeIsNull(pVal->valType)) {
|
||||
printf("NULL ");
|
||||
return;
|
||||
} else if (tdValTypeIsNone(pVal->valType)) {
|
||||
printf("NONE ");
|
||||
return;
|
||||
}
|
||||
switch (colType) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
printf("%s ", (*(int8_t *)pVal->val) == 0 ? "false" : "true");
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
printf("%" PRIi8 " ", *(int8_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
printf("%" PRIi16 " ", *(int16_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
printf("%" PRIi32 " ", *(int32_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
printf("%" PRIi64 " ", *(int64_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
printf("%f ", *(float *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
printf("%lf ", *(double *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
printf("VARCHAR ");
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
printf("%" PRIi64 " ", *(int64_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
printf("NCHAR ");
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
printf("%" PRIu8 " ", *(uint8_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
printf("%" PRIu16 " ", *(uint16_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
printf("%" PRIu32 " ", *(uint32_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
printf("%" PRIu64 " ", *(uint64_t *)pVal->val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
printf("JSON ");
|
||||
break;
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
printf("VARBIN ");
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
printf("DECIMAL ");
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
printf("BLOB ");
|
||||
break;
|
||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||
printf("MedBLOB ");
|
||||
break;
|
||||
// case TSDB_DATA_TYPE_BINARY:
|
||||
// printf("BINARY ");
|
||||
// break;
|
||||
case TSDB_DATA_TYPE_MAX:
|
||||
printf("UNDEF ");
|
||||
break;
|
||||
default:
|
||||
printf("UNDEF ");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema) {
|
||||
STSRowIter iter = {0};
|
||||
tdSTSRowIterInit(&iter, pSchema);
|
||||
tdSTSRowIterReset(&iter, row);
|
||||
printf(">>>");
|
||||
for (int i = 0; i < pSchema->numOfCols; ++i) {
|
||||
STColumn *stCol = pSchema->columns + i;
|
||||
SCellVal sVal = {.valType = 255, .val = NULL};
|
||||
if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) {
|
||||
break;
|
||||
}
|
||||
ASSERT(sVal.valType == 0 || sVal.valType == 1 || sVal.valType == 2);
|
||||
tdSCellValPrint(&sVal, stCol->type);
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
#ifdef TROW_ORIGIN_HZ
|
||||
typedef struct {
|
||||
uint32_t nRows;
|
||||
|
|
|
@ -148,94 +148,95 @@
|
|||
#define TK_VARIABLES 130
|
||||
#define TK_BNODES 131
|
||||
#define TK_SNODES 132
|
||||
#define TK_LIKE 133
|
||||
#define TK_INDEX 134
|
||||
#define TK_FULLTEXT 135
|
||||
#define TK_FUNCTION 136
|
||||
#define TK_INTERVAL 137
|
||||
#define TK_TOPIC 138
|
||||
#define TK_AS 139
|
||||
#define TK_DESC 140
|
||||
#define TK_DESCRIBE 141
|
||||
#define TK_RESET 142
|
||||
#define TK_QUERY 143
|
||||
#define TK_EXPLAIN 144
|
||||
#define TK_ANALYZE 145
|
||||
#define TK_VERBOSE 146
|
||||
#define TK_NK_BOOL 147
|
||||
#define TK_RATIO 148
|
||||
#define TK_COMPACT 149
|
||||
#define TK_VNODES 150
|
||||
#define TK_IN 151
|
||||
#define TK_OUTPUTTYPE 152
|
||||
#define TK_AGGREGATE 153
|
||||
#define TK_BUFSIZE 154
|
||||
#define TK_STREAM 155
|
||||
#define TK_INTO 156
|
||||
#define TK_TRIGGER 157
|
||||
#define TK_AT_ONCE 158
|
||||
#define TK_WINDOW_CLOSE 159
|
||||
#define TK_WATERMARK 160
|
||||
#define TK_KILL 161
|
||||
#define TK_CONNECTION 162
|
||||
#define TK_MERGE 163
|
||||
#define TK_VGROUP 164
|
||||
#define TK_REDISTRIBUTE 165
|
||||
#define TK_SPLIT 166
|
||||
#define TK_SYNCDB 167
|
||||
#define TK_NULL 168
|
||||
#define TK_NK_QUESTION 169
|
||||
#define TK_NK_ARROW 170
|
||||
#define TK_ROWTS 171
|
||||
#define TK_TBNAME 172
|
||||
#define TK_QSTARTTS 173
|
||||
#define TK_QENDTS 174
|
||||
#define TK_WSTARTTS 175
|
||||
#define TK_WENDTS 176
|
||||
#define TK_WDURATION 177
|
||||
#define TK_CAST 178
|
||||
#define TK_NOW 179
|
||||
#define TK_TODAY 180
|
||||
#define TK_TIMEZONE 181
|
||||
#define TK_COUNT 182
|
||||
#define TK_FIRST 183
|
||||
#define TK_LAST 184
|
||||
#define TK_LAST_ROW 185
|
||||
#define TK_BETWEEN 186
|
||||
#define TK_IS 187
|
||||
#define TK_NK_LT 188
|
||||
#define TK_NK_GT 189
|
||||
#define TK_NK_LE 190
|
||||
#define TK_NK_GE 191
|
||||
#define TK_NK_NE 192
|
||||
#define TK_MATCH 193
|
||||
#define TK_NMATCH 194
|
||||
#define TK_CONTAINS 195
|
||||
#define TK_JOIN 196
|
||||
#define TK_INNER 197
|
||||
#define TK_SELECT 198
|
||||
#define TK_DISTINCT 199
|
||||
#define TK_WHERE 200
|
||||
#define TK_PARTITION 201
|
||||
#define TK_BY 202
|
||||
#define TK_SESSION 203
|
||||
#define TK_STATE_WINDOW 204
|
||||
#define TK_SLIDING 205
|
||||
#define TK_FILL 206
|
||||
#define TK_VALUE 207
|
||||
#define TK_NONE 208
|
||||
#define TK_PREV 209
|
||||
#define TK_LINEAR 210
|
||||
#define TK_NEXT 211
|
||||
#define TK_GROUP 212
|
||||
#define TK_HAVING 213
|
||||
#define TK_ORDER 214
|
||||
#define TK_SLIMIT 215
|
||||
#define TK_SOFFSET 216
|
||||
#define TK_LIMIT 217
|
||||
#define TK_OFFSET 218
|
||||
#define TK_ASC 219
|
||||
#define TK_NULLS 220
|
||||
#define TK_CLUSTER 133
|
||||
#define TK_LIKE 134
|
||||
#define TK_INDEX 135
|
||||
#define TK_FULLTEXT 136
|
||||
#define TK_FUNCTION 137
|
||||
#define TK_INTERVAL 138
|
||||
#define TK_TOPIC 139
|
||||
#define TK_AS 140
|
||||
#define TK_DESC 141
|
||||
#define TK_DESCRIBE 142
|
||||
#define TK_RESET 143
|
||||
#define TK_QUERY 144
|
||||
#define TK_EXPLAIN 145
|
||||
#define TK_ANALYZE 146
|
||||
#define TK_VERBOSE 147
|
||||
#define TK_NK_BOOL 148
|
||||
#define TK_RATIO 149
|
||||
#define TK_COMPACT 150
|
||||
#define TK_VNODES 151
|
||||
#define TK_IN 152
|
||||
#define TK_OUTPUTTYPE 153
|
||||
#define TK_AGGREGATE 154
|
||||
#define TK_BUFSIZE 155
|
||||
#define TK_STREAM 156
|
||||
#define TK_INTO 157
|
||||
#define TK_TRIGGER 158
|
||||
#define TK_AT_ONCE 159
|
||||
#define TK_WINDOW_CLOSE 160
|
||||
#define TK_WATERMARK 161
|
||||
#define TK_KILL 162
|
||||
#define TK_CONNECTION 163
|
||||
#define TK_MERGE 164
|
||||
#define TK_VGROUP 165
|
||||
#define TK_REDISTRIBUTE 166
|
||||
#define TK_SPLIT 167
|
||||
#define TK_SYNCDB 168
|
||||
#define TK_NULL 169
|
||||
#define TK_NK_QUESTION 170
|
||||
#define TK_NK_ARROW 171
|
||||
#define TK_ROWTS 172
|
||||
#define TK_TBNAME 173
|
||||
#define TK_QSTARTTS 174
|
||||
#define TK_QENDTS 175
|
||||
#define TK_WSTARTTS 176
|
||||
#define TK_WENDTS 177
|
||||
#define TK_WDURATION 178
|
||||
#define TK_CAST 179
|
||||
#define TK_NOW 180
|
||||
#define TK_TODAY 181
|
||||
#define TK_TIMEZONE 182
|
||||
#define TK_COUNT 183
|
||||
#define TK_FIRST 184
|
||||
#define TK_LAST 185
|
||||
#define TK_LAST_ROW 186
|
||||
#define TK_BETWEEN 187
|
||||
#define TK_IS 188
|
||||
#define TK_NK_LT 189
|
||||
#define TK_NK_GT 190
|
||||
#define TK_NK_LE 191
|
||||
#define TK_NK_GE 192
|
||||
#define TK_NK_NE 193
|
||||
#define TK_MATCH 194
|
||||
#define TK_NMATCH 195
|
||||
#define TK_CONTAINS 196
|
||||
#define TK_JOIN 197
|
||||
#define TK_INNER 198
|
||||
#define TK_SELECT 199
|
||||
#define TK_DISTINCT 200
|
||||
#define TK_WHERE 201
|
||||
#define TK_PARTITION 202
|
||||
#define TK_BY 203
|
||||
#define TK_SESSION 204
|
||||
#define TK_STATE_WINDOW 205
|
||||
#define TK_SLIDING 206
|
||||
#define TK_FILL 207
|
||||
#define TK_VALUE 208
|
||||
#define TK_NONE 209
|
||||
#define TK_PREV 210
|
||||
#define TK_LINEAR 211
|
||||
#define TK_NEXT 212
|
||||
#define TK_GROUP 213
|
||||
#define TK_HAVING 214
|
||||
#define TK_ORDER 215
|
||||
#define TK_SLIMIT 216
|
||||
#define TK_SOFFSET 217
|
||||
#define TK_LIMIT 218
|
||||
#define TK_OFFSET 219
|
||||
#define TK_ASC 220
|
||||
#define TK_NULLS 221
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
|
|
@ -110,7 +110,10 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_QENDTS,
|
||||
FUNCTION_TYPE_WSTARTTS,
|
||||
FUNCTION_TYPE_WENDTS,
|
||||
FUNCTION_TYPE_WDURATION
|
||||
FUNCTION_TYPE_WDURATION,
|
||||
|
||||
// user defined funcion
|
||||
FUNCTION_TYPE_UDF = 10000
|
||||
} EFunctionType;
|
||||
|
||||
struct SqlFunctionCtx;
|
||||
|
@ -138,6 +141,7 @@ bool fmIsWindowClauseFunc(int32_t funcId);
|
|||
bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
||||
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
|
||||
bool fmIsMultiResFunc(int32_t funcId);
|
||||
bool fmIsUserDefinedFunc(int32_t funcId);
|
||||
|
||||
typedef enum EFuncDataRequired {
|
||||
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||
|
|
|
@ -295,6 +295,16 @@ typedef struct SDropStreamStmt {
|
|||
bool ignoreNotExists;
|
||||
} SDropStreamStmt;
|
||||
|
||||
typedef struct SCreateFunctionStmt {
|
||||
ENodeType type;
|
||||
bool ignoreExists;
|
||||
char funcName[TSDB_FUNC_NAME_LEN];
|
||||
bool isAgg;
|
||||
char libraryPath[PATH_MAX];
|
||||
SDataType outputDt;
|
||||
int32_t bufSize;
|
||||
} SCreateFunctionStmt;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -134,6 +134,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_SHOW_QNODES_STMT,
|
||||
QUERY_NODE_SHOW_SNODES_STMT,
|
||||
QUERY_NODE_SHOW_BNODES_STMT,
|
||||
QUERY_NODE_SHOW_CLUSTER_STMT,
|
||||
QUERY_NODE_SHOW_DATABASES_STMT,
|
||||
QUERY_NODE_SHOW_FUNCTIONS_STMT,
|
||||
QUERY_NODE_SHOW_INDEXES_STMT,
|
||||
|
|
|
@ -239,6 +239,8 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_FUNC_BUF_SIZE 512
|
||||
#define TSDB_FUNC_TYPE_SCALAR 1
|
||||
#define TSDB_FUNC_TYPE_AGGREGATE 2
|
||||
#define TSDB_FUNC_SCRIPT_BIN_LIB 0
|
||||
#define TSDB_FUNC_SCRIPT_LUA 1
|
||||
#define TSDB_FUNC_MAX_RETRIEVE 1024
|
||||
|
||||
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
|
||||
|
|
|
@ -1561,6 +1561,7 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
|
|||
if (tEncodeI8(&encoder, pReq->outputType) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->outputLen) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->bufSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->codeLen) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->signature) < 0) return -1;
|
||||
|
||||
int32_t codeSize = 0;
|
||||
|
@ -1600,6 +1601,7 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
|
|||
if (tDecodeI8(&decoder, &pReq->outputType) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->outputLen) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->bufSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->codeLen) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1;
|
||||
|
||||
int32_t codeSize = 0;
|
||||
|
@ -3759,3 +3761,27 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
|
|||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STSchema *tdGetSTSChemaFromSSChema(SSchema **pSchema, int32_t nCols) {
|
||||
STSchemaBuilder schemaBuilder = {0};
|
||||
if (tdInitTSchemaBuilder(&schemaBuilder, 0) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nCols; i++) {
|
||||
SSchema *schema = *pSchema + i;
|
||||
if (tdAddColToSchema(&schemaBuilder, schema->type, schema->flags, schema->colId, schema->bytes) < 0) {
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
||||
if (pNSchema == NULL) {
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
return pNSchema;
|
||||
}
|
||||
|
|
|
@ -24,9 +24,8 @@ const uint8_t tdVTypeByte[2][3] = {{
|
|||
},
|
||||
{
|
||||
// 1 bit
|
||||
TD_VTYPE_NORM_BYTE_I,
|
||||
TD_VTYPE_NULL_BYTE_I,
|
||||
TD_VTYPE_NULL_BYTE_I, // padding
|
||||
TD_VTYPE_NORM_BYTE_I, TD_VTYPE_NULL_BYTE_I,
|
||||
TD_VTYPE_NULL_BYTE_I, // padding
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -224,17 +223,40 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) {
|
|||
* @brief Merge bitmap from 2 bits to 1 bits, and the memory buffer should be guaranteed by the invoker.
|
||||
*
|
||||
* @param srcBitmap
|
||||
* @param srcLen
|
||||
* @param nBits
|
||||
* @param dstBitmap
|
||||
*/
|
||||
void tdMergeBitmap(uint8_t *srcBitmap, int32_t srcLen, uint8_t *dstBitmap) {
|
||||
void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) {
|
||||
int32_t i = 0, j = 0;
|
||||
int32_t nBytes = TD_BITMAP_BYTES(nBits);
|
||||
int32_t nStrictBytes = nBits / 4;
|
||||
int32_t nPartialBits = nBits - nStrictBytes * 4;
|
||||
|
||||
if (srcLen > 0) {
|
||||
switch (nPartialBits) {
|
||||
case 0:
|
||||
// NOTHING TODO
|
||||
break;
|
||||
case 1: {
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes);
|
||||
*(uint8_t *)lastByte &= 0xC0;
|
||||
} break;
|
||||
case 2: {
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes);
|
||||
*(uint8_t *)lastByte &= 0xF0;
|
||||
} break;
|
||||
case 3: {
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes);
|
||||
*(uint8_t *)lastByte &= 0xFC;
|
||||
} break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (nBytes > 0) {
|
||||
dstBitmap[j] = (tdGetMergedBitmapByte(srcBitmap[i]) << 4);
|
||||
}
|
||||
|
||||
while ((++i) < srcLen) {
|
||||
while ((++i) < nBytes) {
|
||||
if ((i & 1) == 0) {
|
||||
dstBitmap[j] = (tdGetMergedBitmapByte(srcBitmap[i]) << 4);
|
||||
} else {
|
||||
|
@ -381,17 +403,18 @@ STSRow *tdRowDup(STSRow *row) {
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pCol
|
||||
* @param valType
|
||||
* @param val
|
||||
* @param numOfRows
|
||||
* @param maxPoints
|
||||
* @brief
|
||||
*
|
||||
* @param pCol
|
||||
* @param valType
|
||||
* @param val
|
||||
* @param numOfRows
|
||||
* @param maxPoints
|
||||
* @param bitmapMode default is 0(2 bits), otherwise 1(1 bit)
|
||||
* @return int
|
||||
* @return int
|
||||
*/
|
||||
int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints, int8_t bitmapMode) {
|
||||
int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints,
|
||||
int8_t bitmapMode) {
|
||||
TASSERT(pCol != NULL);
|
||||
|
||||
// Assume that the columns not specified during insert/upsert mean None.
|
||||
|
@ -426,7 +449,7 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int
|
|||
pCol->len += pCol->bytes;
|
||||
}
|
||||
#ifdef TD_SUPPORT_BITMAP
|
||||
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode);
|
||||
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
@ -537,7 +560,8 @@ int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCol
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull, TDRowVerT maxVer) {
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull,
|
||||
TDRowVerT maxVer) {
|
||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
||||
ASSERT(target->numOfCols == source->numOfCols);
|
||||
int offset = 0;
|
||||
|
@ -558,7 +582,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
|
|||
if (tdGetColDataOfRow(&sVal, source->cols + j, i + (*pOffset), source->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
tdAppendValToDataCol(target->cols + j, sVal.valType, sVal.val, target->numOfRows, target->maxPoints, target->bitmapMode);
|
||||
tdAppendValToDataCol(target->cols + j, sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
}
|
||||
}
|
||||
++target->numOfRows;
|
||||
|
@ -605,7 +630,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
|
|||
if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1, src1->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, target->bitmapMode);
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -621,12 +647,14 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
|
|||
TASSERT(0);
|
||||
}
|
||||
if (src2->cols[i].len > 0 && !tdValTypeIsNull(sVal.valType)) {
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, target->bitmapMode);
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
} else if (!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
|
||||
if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1, src1->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, target->bitmapMode);
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
} else if (target->cols[i].len > 0) {
|
||||
dataColSetNullAt(&target->cols[i], target->numOfRows, true, target->bitmapMode);
|
||||
}
|
||||
|
|
|
@ -77,7 +77,9 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
|
|||
SDB_SET_INT64(pRaw, dataPos, pFunc->signature, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
||||
if (pFunc->commentSize > 0) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
||||
}
|
||||
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
|
||||
SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
|
||||
|
@ -125,13 +127,18 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, _OVER)
|
||||
|
||||
pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize);
|
||||
pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize);
|
||||
if (pFunc->pComment == NULL || pFunc->pCode == NULL) {
|
||||
goto _OVER;
|
||||
if (pFunc->commentSize > 0) {
|
||||
pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize);
|
||||
if (pFunc->pComment == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
||||
}
|
||||
|
||||
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
||||
pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize);
|
||||
if (pFunc->pCode == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
|
||||
SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
|
||||
|
||||
|
@ -192,16 +199,20 @@ static int32_t mndCreateFunc(SMnode *pMnode, SNodeMsg *pReq, SCreateFuncReq *pCr
|
|||
func.outputLen = pCreate->outputLen;
|
||||
func.bufSize = pCreate->bufSize;
|
||||
func.signature = pCreate->signature;
|
||||
func.commentSize = strlen(pCreate->pComment) + 1;
|
||||
func.codeSize = strlen(pCreate->pCode) + 1;
|
||||
func.pComment = taosMemoryMalloc(func.commentSize);
|
||||
if (NULL != pCreate->pComment) {
|
||||
func.commentSize = strlen(pCreate->pComment) + 1;
|
||||
func.pComment = taosMemoryMalloc(func.commentSize);
|
||||
}
|
||||
func.codeSize = pCreate->codeLen;
|
||||
func.pCode = taosMemoryMalloc(func.codeSize);
|
||||
if (func.pCode == NULL || func.pCode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
memcpy(func.pComment, pCreate->pComment, func.commentSize);
|
||||
if (func.commentSize > 0) {
|
||||
memcpy(func.pComment, pCreate->pComment, func.commentSize);
|
||||
}
|
||||
memcpy(func.pCode, pCreate->pCode, func.codeSize);
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg);
|
||||
|
@ -293,16 +304,6 @@ static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (createReq.pComment == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (createReq.pComment[0] == 0) {
|
||||
terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (createReq.pCode == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
|
||||
goto _OVER;
|
||||
|
@ -440,14 +441,20 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) {
|
|||
funcInfo.signature = pFunc->signature;
|
||||
funcInfo.commentSize = pFunc->commentSize;
|
||||
funcInfo.codeSize = pFunc->codeSize;
|
||||
funcInfo.pCode = taosMemoryCalloc(1, sizeof(funcInfo.codeSize));
|
||||
funcInfo.pComment = taosMemoryCalloc(1, sizeof(funcInfo.commentSize));
|
||||
if (funcInfo.pCode == NULL || funcInfo.pComment == NULL) {
|
||||
funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize);
|
||||
if (funcInfo.pCode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto RETRIEVE_FUNC_OVER;
|
||||
}
|
||||
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
|
||||
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
|
||||
if (funcInfo.commentSize > 0) {
|
||||
funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
|
||||
if (funcInfo.pComment == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto RETRIEVE_FUNC_OVER;
|
||||
}
|
||||
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
|
||||
}
|
||||
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
|
||||
mndReleaseFunc(pMnode, pFunc);
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ void MndTestFunc::SetCode(SCreateFuncReq* pReq, const char* pCode) {
|
|||
int32_t len = strlen(pCode);
|
||||
pReq->pCode = (char*)taosMemoryCalloc(1, len + 1);
|
||||
strcpy(pReq->pCode, pCode);
|
||||
pReq->codeLen = len;
|
||||
}
|
||||
|
||||
void MndTestFunc::SetComment(SCreateFuncReq* pReq, const char* pComment) {
|
||||
|
@ -60,21 +61,6 @@ TEST_F(MndTestFunc, 02_Create_Func) {
|
|||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_NAME);
|
||||
}
|
||||
|
||||
{
|
||||
SCreateFuncReq createReq = {0};
|
||||
strcpy(createReq.name, "f1");
|
||||
SetCode(&createReq, "code1");
|
||||
|
||||
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSCreateFuncReq(pReq, contLen, &createReq);
|
||||
tFreeSCreateFuncReq(&createReq);
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_COMMENT);
|
||||
}
|
||||
|
||||
{
|
||||
SCreateFuncReq createReq = {0};
|
||||
strcpy(createReq.name, "f1");
|
||||
|
@ -90,22 +76,6 @@ TEST_F(MndTestFunc, 02_Create_Func) {
|
|||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_CODE);
|
||||
}
|
||||
|
||||
{
|
||||
SCreateFuncReq createReq = {0};
|
||||
strcpy(createReq.name, "f1");
|
||||
SetCode(&createReq, "code1");
|
||||
SetComment(&createReq, "");
|
||||
|
||||
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSCreateFuncReq(pReq, contLen, &createReq);
|
||||
tFreeSCreateFuncReq(&createReq);
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_COMMENT);
|
||||
}
|
||||
|
||||
{
|
||||
SCreateFuncReq createReq = {0};
|
||||
strcpy(createReq.name, "f1");
|
||||
|
@ -329,7 +299,6 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
|
|||
EXPECT_EQ(pFuncInfo->bufSize, 6);
|
||||
EXPECT_EQ(pFuncInfo->signature, 18);
|
||||
EXPECT_EQ(pFuncInfo->commentSize, strlen("comment2") + 1);
|
||||
EXPECT_EQ(pFuncInfo->codeSize, strlen("code2") + 1);
|
||||
|
||||
EXPECT_STREQ("comment2", pFuncInfo->pComment);
|
||||
EXPECT_STREQ("code2", pFuncInfo->pCode);
|
||||
|
@ -368,7 +337,6 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
|
|||
EXPECT_EQ(pFuncInfo->bufSize, 6);
|
||||
EXPECT_EQ(pFuncInfo->signature, 18);
|
||||
EXPECT_EQ(pFuncInfo->commentSize, strlen("comment2") + 1);
|
||||
EXPECT_EQ(pFuncInfo->codeSize, strlen("code2") + 1);
|
||||
EXPECT_STREQ("comment2", pFuncInfo->pComment);
|
||||
EXPECT_STREQ("code2", pFuncInfo->pCode);
|
||||
}
|
||||
|
|
|
@ -1350,7 +1350,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
|
|||
if (tBitmaps > 0) {
|
||||
bptr = POINTER_SHIFT(pBlockData, lsize + flen);
|
||||
if (isSuper && !tdDataColsIsBitmapI(pDataCols)) {
|
||||
tdMergeBitmap((uint8_t *)pDataCol->pBitmap, nBitmaps, (uint8_t *)pDataCol->pBitmap);
|
||||
tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap);
|
||||
}
|
||||
tBitmapsLen =
|
||||
tsCompressTinyint((char *)pDataCol->pBitmap, tBitmaps, tBitmaps, bptr, tBitmaps + COMP_OVERFLOW_BYTES,
|
||||
|
|
|
@ -305,7 +305,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
|||
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
|
||||
if (pDataCol->bitmap) {
|
||||
ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
tdMergeBitmap(pDataCol->pBitmap, TD_BITMAP_BYTES(pReadh->pDCols[0]->numOfRows), pDataCol->pBitmap);
|
||||
tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap);
|
||||
tdDataColsSetBitmapI(pReadh->pDCols[0]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
function
|
||||
PRIVATE os util common nodes scalar
|
||||
PRIVATE os util common nodes scalar catalog qcom transport
|
||||
PUBLIC uv_a
|
||||
)
|
||||
|
||||
|
|
|
@ -20,26 +20,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "functionMgt.h"
|
||||
|
||||
#define FUNCTION_NAME_MAX_LENGTH 32
|
||||
|
||||
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n) (1 << n)
|
||||
|
||||
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
|
||||
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
|
||||
#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
|
||||
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
|
||||
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
|
||||
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
|
||||
#define FUNC_MGT_TIMEORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(6)
|
||||
#define FUNC_MGT_PSEUDO_COLUMN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(7)
|
||||
#define FUNC_MGT_WINDOW_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(8)
|
||||
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
|
||||
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
|
||||
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
|
||||
|
||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
#include "functionMgtInt.h"
|
||||
|
||||
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
|
||||
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||
|
|
|
@ -21,9 +21,29 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include "functionMgt.h"
|
||||
// #include "builtins.h"
|
||||
|
||||
#define FUNCTION_NAME_MAX_LENGTH 32
|
||||
|
||||
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n) (1 << n)
|
||||
|
||||
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
|
||||
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
|
||||
#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
|
||||
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
|
||||
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
|
||||
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
|
||||
#define FUNC_MGT_TIMEORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(6)
|
||||
#define FUNC_MGT_PSEUDO_COLUMN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(7)
|
||||
#define FUNC_MGT_WINDOW_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(8)
|
||||
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
|
||||
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
|
||||
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
|
||||
|
||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
#define FUNC_UDF_ID_START_OFFSET_VAL 5000
|
||||
|
||||
extern const int funcMgtUdfNum;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ static int32_t invaildFuncParaNumErrMsg(char* pErrBuf, int32_t len, const char*
|
|||
}
|
||||
|
||||
static int32_t invaildFuncParaTypeErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Invalid datatypes : %s", pFuncName);
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Invalid parameter data type : %s", pFuncName);
|
||||
}
|
||||
|
||||
static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) {
|
||||
|
@ -327,6 +327,10 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
para2Type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
if ((para2Type == TSDB_DATA_TYPE_TIMESTAMP && IS_VAR_DATA_TYPE(para1Type)) ||
|
||||
(para2Type == TSDB_DATA_TYPE_BINARY && para1Type == TSDB_DATA_TYPE_NCHAR)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
int32_t para2Bytes = pFunc->node.resType.bytes;
|
||||
if (para2Bytes <= 0) { //non-positive value or overflow
|
||||
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
|
|
@ -20,16 +20,22 @@
|
|||
#include "taoserror.h"
|
||||
#include "thash.h"
|
||||
#include "builtins.h"
|
||||
#include "catalog.h"
|
||||
|
||||
typedef struct SFuncMgtService {
|
||||
SHashObj* pFuncNameHashTable;
|
||||
SArray* pUdfTable; // SUdfInfo
|
||||
} SFuncMgtService;
|
||||
|
||||
typedef struct SUdfInfo {
|
||||
SDataType outputDt;
|
||||
} SUdfInfo;
|
||||
|
||||
static SFuncMgtService gFunMgtService;
|
||||
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
|
||||
static int32_t initFunctionCode = 0;
|
||||
|
||||
static void doInitFunctionHashTable() {
|
||||
static void doInitFunctionTable() {
|
||||
gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (NULL == gFunMgtService.pFuncNameHashTable) {
|
||||
initFunctionCode = TSDB_CODE_FAILED;
|
||||
|
@ -42,6 +48,8 @@ static void doInitFunctionHashTable() {
|
|||
return;
|
||||
}
|
||||
}
|
||||
|
||||
gFunMgtService.pUdfTable = NULL;
|
||||
}
|
||||
|
||||
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
||||
|
@ -51,25 +59,56 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
|||
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
|
||||
}
|
||||
|
||||
static int32_t getUdfId(const char* pFuncName) {
|
||||
// todo: udf by call catalog
|
||||
if (1) {
|
||||
return -1;
|
||||
}
|
||||
if (NULL == gFunMgtService.pUdfTable) {
|
||||
gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo));
|
||||
}
|
||||
SUdfInfo info = {0}; //todo
|
||||
taosArrayPush(gFunMgtService.pUdfTable, &info);
|
||||
return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL;
|
||||
}
|
||||
|
||||
static int32_t getFuncId(const char* pFuncName) {
|
||||
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
|
||||
if (NULL == pVal) {
|
||||
return getUdfId(pFuncName);
|
||||
}
|
||||
return *(int32_t*)pVal;
|
||||
}
|
||||
|
||||
static int32_t getUdfResultType(SFunctionNode* pFunc) {
|
||||
SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, pFunc->funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1);
|
||||
pFunc->node.resType = pUdf->outputDt;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t fmFuncMgtInit() {
|
||||
taosThreadOnce(&functionHashTableInit, doInitFunctionHashTable);
|
||||
taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
|
||||
return initFunctionCode;
|
||||
}
|
||||
|
||||
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
|
||||
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
|
||||
if (NULL == pVal) {
|
||||
*pFuncId = getFuncId(pFuncName);
|
||||
if (*pFuncId < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
*pFuncId = *(int32_t*)pVal;
|
||||
if (*pFuncId < 0 || *pFuncId >= funcMgtBuiltinsNum) {
|
||||
return TSDB_CODE_FAILED;
|
||||
if (fmIsUserDefinedFunc(*pFuncId)) {
|
||||
*pFuncType = FUNCTION_TYPE_UDF;
|
||||
} else {
|
||||
*pFuncType = funcMgtBuiltins[*pFuncId].type;
|
||||
}
|
||||
*pFuncType = funcMgtBuiltins[*pFuncId].type;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (fmIsUserDefinedFunc(pFunc->funcId)) {
|
||||
return getUdfResultType(pFunc);
|
||||
}
|
||||
|
||||
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -77,7 +116,7 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
}
|
||||
|
||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||
if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
}
|
||||
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
|
||||
|
@ -87,7 +126,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
|
|||
}
|
||||
|
||||
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
|
||||
|
@ -98,7 +137,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
|||
}
|
||||
|
||||
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
|
||||
|
@ -142,6 +181,10 @@ bool fmIsMultiResFunc(int32_t funcId) {
|
|||
return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC);
|
||||
}
|
||||
|
||||
bool fmIsUserDefinedFunc(int32_t funcId) {
|
||||
return funcId > FUNC_UDF_ID_START_OFFSET_VAL;
|
||||
}
|
||||
|
||||
void fmFuncMgtDestroy() {
|
||||
void* m = gFunMgtService.pFuncNameHashTable;
|
||||
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
|
||||
|
|
|
@ -149,7 +149,9 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
|
||||
return makeNode(type, sizeof(SNode));
|
||||
case QUERY_NODE_COMPACT_STMT:
|
||||
break;
|
||||
case QUERY_NODE_CREATE_FUNCTION_STMT:
|
||||
return makeNode(type, sizeof(SCreateFunctionStmt));
|
||||
case QUERY_NODE_DROP_FUNCTION_STMT:
|
||||
break;
|
||||
case QUERY_NODE_CREATE_STREAM_STMT:
|
||||
|
@ -167,6 +169,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_SHOW_QNODES_STMT:
|
||||
case QUERY_NODE_SHOW_SNODES_STMT:
|
||||
case QUERY_NODE_SHOW_BNODES_STMT:
|
||||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_INDEXES_STMT:
|
||||
|
|
|
@ -167,7 +167,7 @@ SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions,
|
|||
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable);
|
||||
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
|
||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups);
|
||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize);
|
||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize);
|
||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName);
|
||||
SNode* createStreamOptions(SAstCreateContext* pCxt);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNode* pQuery);
|
||||
|
|
|
@ -346,6 +346,7 @@ cmd ::= SHOW TOPICS.
|
|||
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT, NULL, NULL); }
|
||||
|
||||
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
|
||||
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
|
||||
|
@ -413,8 +414,8 @@ explain_options(A) ::= explain_options(B) RATIO NK_FLOAT(C).
|
|||
cmd ::= COMPACT VNODES IN NK_LP integer_list(A) NK_RP. { pCxt->pRootNode = createCompactStmt(pCxt, A); }
|
||||
|
||||
/************************************************ create/drop function ************************************************/
|
||||
cmd ::= CREATE agg_func_opt(A) FUNCTION function_name(B)
|
||||
AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, A, &B, &C, D, E); }
|
||||
cmd ::= CREATE agg_func_opt(A) FUNCTION not_exists_opt(F) function_name(B)
|
||||
AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, F, A, &B, &C, D, E); }
|
||||
cmd ::= DROP FUNCTION function_name(A). { pCxt->pRootNode = createDropFunctionStmt(pCxt, &A); }
|
||||
|
||||
%type agg_func_opt { bool }
|
||||
|
|
|
@ -1181,10 +1181,21 @@ SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups) {
|
|||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT);
|
||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt,
|
||||
bool ignoreExists, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize) {
|
||||
if (pLibPath->n <= 2) {
|
||||
pCxt->valid = false;
|
||||
return NULL;
|
||||
}
|
||||
SCreateFunctionStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
pStmt->ignoreExists = ignoreExists;
|
||||
strncpy(pStmt->funcName, pFuncName->z, pFuncName->n);
|
||||
pStmt->isAgg = aggFunc;
|
||||
strncpy(pStmt->libraryPath, pLibPath->z + 1, pLibPath->n - 2);
|
||||
pStmt->outputDt = dataType;
|
||||
pStmt->bufSize = bufSize;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName) {
|
||||
|
|
|
@ -52,6 +52,7 @@ static SKeyword keywordTable[] = {
|
|||
{"CACHE", TK_CACHE},
|
||||
{"CACHELAST", TK_CACHELAST},
|
||||
{"CAST", TK_CAST},
|
||||
{"CLUSTER", TK_CLUSTER},
|
||||
{"COLUMN", TK_COLUMN},
|
||||
{"COMMENT", TK_COMMENT},
|
||||
{"COMP", TK_COMP},
|
||||
|
@ -248,7 +249,6 @@ static SKeyword keywordTable[] = {
|
|||
// {"BEFORE", TK_BEFORE},
|
||||
// {"BEGIN", TK_BEGIN},
|
||||
// {"CASCADE", TK_CASCADE},
|
||||
// {"CLUSTER", TK_CLUSTER},
|
||||
// {"CONFLICT", TK_CONFLICT},
|
||||
// {"COPY", TK_COPY},
|
||||
// {"DEFERRED", TK_DEFERRED},
|
||||
|
|
|
@ -1349,6 +1349,22 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
||||
int32_t code = translateQuery(pCxt, pSetOperator->pLeft);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateQuery(pCxt, pSetOperator->pRight);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateSetOperatorImpl(pCxt, pSetOperator);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int64_t getUnitPerMinute(uint8_t precision) {
|
||||
switch (precision) {
|
||||
case TSDB_TIME_PRECISION_MILLI:
|
||||
|
@ -2590,12 +2606,64 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t readFromFile(char* pName, int32_t *len, char **buf) {
|
||||
int64_t filesize = 0;
|
||||
if (taosStatFile(pName, &filesize, NULL) < 0) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
*len = filesize;
|
||||
|
||||
if (*len <= 0) {
|
||||
return TSDB_CODE_TSC_FILE_EMPTY;
|
||||
}
|
||||
|
||||
*buf = taosMemoryCalloc(1, *len);
|
||||
if (*buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
TdFilePtr tfile = taosOpenFile(pName, O_RDONLY | O_BINARY);
|
||||
if (NULL == tfile) {
|
||||
taosMemoryFreeClear(*buf);
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
int64_t s = taosReadFile(tfile, *buf, *len);
|
||||
if (s != *len) {
|
||||
taosCloseFile(&tfile);
|
||||
taosMemoryFreeClear(*buf);
|
||||
return TSDB_CODE_TSC_APP_ERROR;
|
||||
}
|
||||
taosCloseFile(&tfile);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionStmt* pStmt) {
|
||||
SCreateFuncReq req = {0};
|
||||
strcpy(req.name, pStmt->funcName);
|
||||
req.igExists = pStmt->ignoreExists;
|
||||
req.funcType = pStmt->isAgg ? TSDB_FUNC_TYPE_AGGREGATE : TSDB_FUNC_TYPE_SCALAR;
|
||||
req.scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
|
||||
req.outputType = pStmt->outputDt.type;
|
||||
req.outputLen = pStmt->outputDt.bytes;
|
||||
req.bufSize = pStmt->bufSize;
|
||||
int32_t code = readFromFile(pStmt->libraryPath, &req.codeLen, &req.pCode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_FUNC, (FSerializeFunc)tSerializeSCreateFuncReq, &req);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
code = translateSelect(pCxt, (SSelectStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
code = translateSetOperator(pCxt, (SSetOperator*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_DATABASE_STMT:
|
||||
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
|
||||
break;
|
||||
|
@ -2688,6 +2756,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_DROP_STREAM_STMT:
|
||||
code = translateDropStream(pCxt, (SDropStreamStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_FUNCTION_STMT:
|
||||
code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -2802,6 +2873,7 @@ static const char* getSysDbName(ENodeType type) {
|
|||
case QUERY_NODE_SHOW_BNODES_STMT:
|
||||
case QUERY_NODE_SHOW_SNODES_STMT:
|
||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||
return TSDB_INFORMATION_SCHEMA_DB;
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
|
@ -2844,6 +2916,8 @@ static const char* getSysTableName(ENodeType type) {
|
|||
return TSDB_INS_TABLE_SNODES;
|
||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
return TSDB_INS_TABLE_LICENCES;
|
||||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||
return TSDB_INS_TABLE_CLUSTER;
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
return TSDB_PERFS_TABLE_CONNECTIONS;
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
|
@ -3362,6 +3436,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
case QUERY_NODE_SHOW_SNODES_STMT:
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||
code = rewriteShow(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -768,6 +768,22 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t createSetOperatorLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
||||
SLogicNode* pRoot = NULL;
|
||||
int32_t code = createQueryLogicNode(pCxt, pSetOperator->pLeft, &pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createQueryLogicNode(pCxt, pSetOperator->pRight, &pRoot);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = pRoot;
|
||||
} else {
|
||||
nodesDestroyNode(pRoot);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getMsgType(ENodeType sqlType) {
|
||||
return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType) ? TDMT_VND_CREATE_TABLE : TDMT_VND_SUBMIT;
|
||||
}
|
||||
|
@ -791,6 +807,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
|
|||
return createVnodeModifLogicNode(pCxt, (SVnodeModifOpStmt*)pStmt, pLogicNode);
|
||||
case QUERY_NODE_EXPLAIN_STMT:
|
||||
return createQueryLogicNode(pCxt, ((SExplainStmt*)pStmt)->pQuery, pLogicNode);
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return createSetOperatorLogicNode(pCxt, (SSetOperator*)pStmt, pLogicNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -587,10 +587,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
|
|||
}
|
||||
|
||||
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
if (inputNum != 2 && inputNum!= 3) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t subPos = 0;
|
||||
GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
|
||||
if (subPos == 0) { //subPos needs to be positive or negative values;
|
||||
|
|
Loading…
Reference in New Issue