support udf interval
This commit is contained in:
parent
baba270e7a
commit
e2ec3fd317
|
@ -756,8 +756,9 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
|
|||
bytes = pModel->pFields[i].field.bytes;
|
||||
} else if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
|
||||
type = pUdfInfo->resType;
|
||||
bytes = pUdfInfo->resBytes;
|
||||
int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false, pUdfInfo);
|
||||
assert(ret == TSDB_CODE_SUCCESS);
|
||||
|
||||
} else {
|
||||
if (functionId == TSDB_FUNC_FIRST_DST) {
|
||||
functionId = TSDB_FUNC_FIRST;
|
||||
|
@ -767,7 +768,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
|
|||
functionId = TSDB_FUNC_STDDEV;
|
||||
}
|
||||
|
||||
int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false);
|
||||
int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false, NULL);
|
||||
assert(ret == TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -1204,12 +1205,14 @@ int32_t finalizeRes(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, SLocalMerger *pLocalM
|
|||
|
||||
if (pCtx->functionId < 0) {
|
||||
int32_t output = 0;
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * pCtx->functionId - 1);
|
||||
assert (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE);
|
||||
|
||||
if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
|
||||
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, &output, &pUdfInfo->init);
|
||||
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init);
|
||||
|
||||
// set the output value exist
|
||||
pCtx->resultInfo->numOfRes = output;
|
||||
|
|
|
@ -341,6 +341,7 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
strcpy(pMsg->path, createInfo->path.z);
|
||||
|
||||
pMsg->funcType = htonl(createInfo->type);
|
||||
pMsg->bufSize = htonl(createInfo->bufSize);
|
||||
|
||||
pMsg->outputType = createInfo->output.type;
|
||||
pMsg->outputLen = htons(createInfo->output.bytes);
|
||||
|
@ -2312,7 +2313,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
int32_t intermediateResSize = 0;
|
||||
|
||||
if (getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize,
|
||||
&intermediateResSize, 0, false) != TSDB_CODE_SUCCESS) {
|
||||
&intermediateResSize, 0, false, NULL) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -2650,7 +2651,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
int16_t type = 0;
|
||||
int32_t inter = 0;
|
||||
|
||||
int32_t ret = getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
|
||||
int32_t ret = getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0, NULL);
|
||||
assert(ret == TSDB_CODE_SUCCESS);
|
||||
|
||||
s.type = (uint8_t)type;
|
||||
|
@ -2674,7 +2675,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
int32_t inter = 0;
|
||||
int16_t resType = 0;
|
||||
int16_t bytes = 0;
|
||||
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0);
|
||||
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0, NULL);
|
||||
|
||||
s.bytes = bytes;
|
||||
s.type = (uint8_t)resType;
|
||||
|
@ -2713,8 +2714,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
|
||||
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, pUdfInfo->resType, pUdfInfo->resBytes,
|
||||
getNewResColId(pQueryInfo), pUdfInfo->resBytes, false);
|
||||
int32_t inter = 0;
|
||||
int16_t resType = 0;
|
||||
int16_t bytes = 0;
|
||||
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, functionId, 0, &resType, &bytes, &inter, 0, false, pUdfInfo);
|
||||
|
||||
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resType, bytes,
|
||||
getNewResColId(pQueryInfo), inter, false);
|
||||
|
||||
memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName));
|
||||
getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1);
|
||||
|
@ -3047,7 +3053,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
|
|||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
|
||||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
|
||||
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->param[0].i64, &type, &bytes,
|
||||
&interBytes, 0, true) != TSDB_CODE_SUCCESS) {
|
||||
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -3078,6 +3084,10 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
|
|||
int32_t inter = 0;
|
||||
|
||||
int32_t functionId = pExpr->functionId;
|
||||
if (functionId < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
|
||||
continue;
|
||||
}
|
||||
|
@ -3091,7 +3101,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &pExpr->resType, &pExpr->resBytes,
|
||||
&inter, 0, false);
|
||||
&inter, 0, false, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6071,11 +6081,15 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
|
|||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
if (pExpr->functionId < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((pExpr->functionId != TSDB_FUNC_TAG_DUMMY && pExpr->functionId != TSDB_FUNC_TS_DUMMY) &&
|
||||
!(pExpr->functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) {
|
||||
SSchema* pColSchema = &pSchema[pExpr->colInfo.colIndex];
|
||||
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->functionId, (int32_t)pExpr->param[0].i64, &pExpr->resType,
|
||||
&pExpr->resBytes, &pExpr->interBytes, tagLength, isSTable);
|
||||
&pExpr->resBytes, &pExpr->interBytes, tagLength, isSTable, NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1158,6 +1158,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
*(int32_t*) pMsg = htonl(pUdfInfo->funcType);
|
||||
pMsg += sizeof(pUdfInfo->funcType);
|
||||
|
||||
*(int32_t*) pMsg = htonl(pUdfInfo->bufSize);
|
||||
pMsg += sizeof(pUdfInfo->bufSize);
|
||||
|
||||
pQueryMsg->udfContentLen = htonl(pUdfInfo->contLen);
|
||||
memcpy(pMsg, pUdfInfo->content, pUdfInfo->contLen);
|
||||
|
@ -2239,6 +2242,7 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
|
|||
pUdfInfo->resType = pFunc->resType;
|
||||
pUdfInfo->funcType = htonl(pFunc->funcType);
|
||||
pUdfInfo->contLen = htonl(pFunc->len);
|
||||
pUdfInfo->bufSize = htonl(pFunc->bufSize);
|
||||
|
||||
pUdfInfo->content = malloc(pUdfInfo->contLen);
|
||||
memcpy(pUdfInfo->content, pFunc->content, pUdfInfo->contLen);
|
||||
|
|
|
@ -1919,7 +1919,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
int16_t type = 0;
|
||||
int32_t inter = 0;
|
||||
|
||||
getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
|
||||
getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0, NULL);
|
||||
|
||||
SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
|
||||
pSupporter->tagSize = s1.bytes;
|
||||
|
|
|
@ -185,6 +185,7 @@ do { \
|
|||
#define TSDB_DB_NAME_LEN 33
|
||||
#define TSDB_FUNC_NAME_LEN 65
|
||||
#define TSDB_FUNC_CODE_LEN (65535 - 512)
|
||||
#define TSDB_FUNC_BUF_SIZE 512
|
||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
|
||||
#define TSDB_COL_NAME_LEN 65
|
||||
|
|
|
@ -180,6 +180,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x0372) //"Invalid func code")
|
||||
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0373) //"Func already exists")
|
||||
#define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0374) //"Invalid func")
|
||||
#define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x0375) //"Invalid func bufSize")
|
||||
|
||||
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) //"Database not specified or available")
|
||||
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //"Database already exists")
|
||||
|
|
|
@ -592,6 +592,7 @@ typedef struct {
|
|||
int32_t funcType;
|
||||
uint8_t outputType;
|
||||
int16_t outputLen;
|
||||
int32_t bufSize;
|
||||
int32_t codeLen;
|
||||
char code[];
|
||||
} SCreateFuncMsg;
|
||||
|
@ -606,6 +607,7 @@ typedef struct {
|
|||
int32_t funcType;
|
||||
int8_t resType;
|
||||
int16_t resBytes;
|
||||
int32_t bufSize;
|
||||
int32_t len;
|
||||
char content[];
|
||||
} SFunctionInfoMsg;
|
||||
|
|
|
@ -102,111 +102,113 @@
|
|||
#define TK_AS 83
|
||||
#define TK_OUTPUTTYPE 84
|
||||
#define TK_AGGREGATE 85
|
||||
#define TK_PPS 86
|
||||
#define TK_TSERIES 87
|
||||
#define TK_DBS 88
|
||||
#define TK_STORAGE 89
|
||||
#define TK_QTIME 90
|
||||
#define TK_CONNS 91
|
||||
#define TK_STATE 92
|
||||
#define TK_KEEP 93
|
||||
#define TK_CACHE 94
|
||||
#define TK_REPLICA 95
|
||||
#define TK_QUORUM 96
|
||||
#define TK_DAYS 97
|
||||
#define TK_MINROWS 98
|
||||
#define TK_MAXROWS 99
|
||||
#define TK_BLOCKS 100
|
||||
#define TK_CTIME 101
|
||||
#define TK_WAL 102
|
||||
#define TK_FSYNC 103
|
||||
#define TK_COMP 104
|
||||
#define TK_PRECISION 105
|
||||
#define TK_UPDATE 106
|
||||
#define TK_CACHELAST 107
|
||||
#define TK_PARTITIONS 108
|
||||
#define TK_LP 109
|
||||
#define TK_RP 110
|
||||
#define TK_UNSIGNED 111
|
||||
#define TK_TAGS 112
|
||||
#define TK_USING 113
|
||||
#define TK_COMMA 114
|
||||
#define TK_NULL 115
|
||||
#define TK_SELECT 116
|
||||
#define TK_UNION 117
|
||||
#define TK_ALL 118
|
||||
#define TK_DISTINCT 119
|
||||
#define TK_FROM 120
|
||||
#define TK_VARIABLE 121
|
||||
#define TK_INTERVAL 122
|
||||
#define TK_SESSION 123
|
||||
#define TK_FILL 124
|
||||
#define TK_SLIDING 125
|
||||
#define TK_ORDER 126
|
||||
#define TK_BY 127
|
||||
#define TK_ASC 128
|
||||
#define TK_DESC 129
|
||||
#define TK_GROUP 130
|
||||
#define TK_HAVING 131
|
||||
#define TK_LIMIT 132
|
||||
#define TK_OFFSET 133
|
||||
#define TK_SLIMIT 134
|
||||
#define TK_SOFFSET 135
|
||||
#define TK_WHERE 136
|
||||
#define TK_NOW 137
|
||||
#define TK_RESET 138
|
||||
#define TK_QUERY 139
|
||||
#define TK_SYNCDB 140
|
||||
#define TK_ADD 141
|
||||
#define TK_COLUMN 142
|
||||
#define TK_TAG 143
|
||||
#define TK_CHANGE 144
|
||||
#define TK_SET 145
|
||||
#define TK_KILL 146
|
||||
#define TK_CONNECTION 147
|
||||
#define TK_STREAM 148
|
||||
#define TK_COLON 149
|
||||
#define TK_ABORT 150
|
||||
#define TK_AFTER 151
|
||||
#define TK_ATTACH 152
|
||||
#define TK_BEFORE 153
|
||||
#define TK_BEGIN 154
|
||||
#define TK_CASCADE 155
|
||||
#define TK_CLUSTER 156
|
||||
#define TK_CONFLICT 157
|
||||
#define TK_COPY 158
|
||||
#define TK_DEFERRED 159
|
||||
#define TK_DELIMITERS 160
|
||||
#define TK_DETACH 161
|
||||
#define TK_EACH 162
|
||||
#define TK_END 163
|
||||
#define TK_EXPLAIN 164
|
||||
#define TK_FAIL 165
|
||||
#define TK_FOR 166
|
||||
#define TK_IGNORE 167
|
||||
#define TK_IMMEDIATE 168
|
||||
#define TK_INITIALLY 169
|
||||
#define TK_INSTEAD 170
|
||||
#define TK_MATCH 171
|
||||
#define TK_KEY 172
|
||||
#define TK_OF 173
|
||||
#define TK_RAISE 174
|
||||
#define TK_REPLACE 175
|
||||
#define TK_RESTRICT 176
|
||||
#define TK_ROW 177
|
||||
#define TK_STATEMENT 178
|
||||
#define TK_TRIGGER 179
|
||||
#define TK_VIEW 180
|
||||
#define TK_SEMI 181
|
||||
#define TK_NONE 182
|
||||
#define TK_PREV 183
|
||||
#define TK_LINEAR 184
|
||||
#define TK_IMPORT 185
|
||||
#define TK_TBNAME 186
|
||||
#define TK_JOIN 187
|
||||
#define TK_INSERT 188
|
||||
#define TK_INTO 189
|
||||
#define TK_VALUES 190
|
||||
#define TK_BUFSIZE 86
|
||||
#define TK_PPS 87
|
||||
#define TK_TSERIES 88
|
||||
#define TK_DBS 89
|
||||
#define TK_STORAGE 90
|
||||
#define TK_QTIME 91
|
||||
#define TK_CONNS 92
|
||||
#define TK_STATE 93
|
||||
#define TK_KEEP 94
|
||||
#define TK_CACHE 95
|
||||
#define TK_REPLICA 96
|
||||
#define TK_QUORUM 97
|
||||
#define TK_DAYS 98
|
||||
#define TK_MINROWS 99
|
||||
#define TK_MAXROWS 100
|
||||
#define TK_BLOCKS 101
|
||||
#define TK_CTIME 102
|
||||
#define TK_WAL 103
|
||||
#define TK_FSYNC 104
|
||||
#define TK_COMP 105
|
||||
#define TK_PRECISION 106
|
||||
#define TK_UPDATE 107
|
||||
#define TK_CACHELAST 108
|
||||
#define TK_PARTITIONS 109
|
||||
#define TK_LP 110
|
||||
#define TK_RP 111
|
||||
#define TK_UNSIGNED 112
|
||||
#define TK_TAGS 113
|
||||
#define TK_USING 114
|
||||
#define TK_COMMA 115
|
||||
#define TK_NULL 116
|
||||
#define TK_SELECT 117
|
||||
#define TK_UNION 118
|
||||
#define TK_ALL 119
|
||||
#define TK_DISTINCT 120
|
||||
#define TK_FROM 121
|
||||
#define TK_VARIABLE 122
|
||||
#define TK_INTERVAL 123
|
||||
#define TK_SESSION 124
|
||||
#define TK_FILL 125
|
||||
#define TK_SLIDING 126
|
||||
#define TK_ORDER 127
|
||||
#define TK_BY 128
|
||||
#define TK_ASC 129
|
||||
#define TK_DESC 130
|
||||
#define TK_GROUP 131
|
||||
#define TK_HAVING 132
|
||||
#define TK_LIMIT 133
|
||||
#define TK_OFFSET 134
|
||||
#define TK_SLIMIT 135
|
||||
#define TK_SOFFSET 136
|
||||
#define TK_WHERE 137
|
||||
#define TK_NOW 138
|
||||
#define TK_RESET 139
|
||||
#define TK_QUERY 140
|
||||
#define TK_SYNCDB 141
|
||||
#define TK_ADD 142
|
||||
#define TK_COLUMN 143
|
||||
#define TK_TAG 144
|
||||
#define TK_CHANGE 145
|
||||
#define TK_SET 146
|
||||
#define TK_KILL 147
|
||||
#define TK_CONNECTION 148
|
||||
#define TK_STREAM 149
|
||||
#define TK_COLON 150
|
||||
#define TK_ABORT 151
|
||||
#define TK_AFTER 152
|
||||
#define TK_ATTACH 153
|
||||
#define TK_BEFORE 154
|
||||
#define TK_BEGIN 155
|
||||
#define TK_CASCADE 156
|
||||
#define TK_CLUSTER 157
|
||||
#define TK_CONFLICT 158
|
||||
#define TK_COPY 159
|
||||
#define TK_DEFERRED 160
|
||||
#define TK_DELIMITERS 161
|
||||
#define TK_DETACH 162
|
||||
#define TK_EACH 163
|
||||
#define TK_END 164
|
||||
#define TK_EXPLAIN 165
|
||||
#define TK_FAIL 166
|
||||
#define TK_FOR 167
|
||||
#define TK_IGNORE 168
|
||||
#define TK_IMMEDIATE 169
|
||||
#define TK_INITIALLY 170
|
||||
#define TK_INSTEAD 171
|
||||
#define TK_MATCH 172
|
||||
#define TK_KEY 173
|
||||
#define TK_OF 174
|
||||
#define TK_RAISE 175
|
||||
#define TK_REPLACE 176
|
||||
#define TK_RESTRICT 177
|
||||
#define TK_ROW 178
|
||||
#define TK_STATEMENT 179
|
||||
#define TK_TRIGGER 180
|
||||
#define TK_VIEW 181
|
||||
#define TK_SEMI 182
|
||||
#define TK_NONE 183
|
||||
#define TK_PREV 184
|
||||
#define TK_LINEAR 185
|
||||
#define TK_IMPORT 186
|
||||
#define TK_TBNAME 187
|
||||
#define TK_JOIN 188
|
||||
#define TK_INSERT 189
|
||||
#define TK_INTO 190
|
||||
#define TK_VALUES 191
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -220,6 +220,7 @@ typedef struct SFuncObj {
|
|||
int32_t contLen;
|
||||
char cont[TSDB_FUNC_CODE_LEN];
|
||||
int32_t funcType;
|
||||
int32_t bufSize;
|
||||
int64_t createdTime;
|
||||
uint8_t resType;
|
||||
int16_t resBytes;
|
||||
|
|
|
@ -31,7 +31,7 @@ void mnodeCancelGetNextFunc(void *pIter);
|
|||
void mnodeIncFuncRef(SFuncObj *pFunc);
|
||||
void mnodeDecFuncRef(SFuncObj *pFunc);
|
||||
|
||||
int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code, char *path, uint8_t outputType, int16_t outputLen, int32_t funcType, SMnodeMsg *pMsg);
|
||||
int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code, char *path, uint8_t outputType, int16_t outputLen, int32_t funcType, int32_t bufSize, SMnodeMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -190,7 +190,7 @@ static int32_t mnodeUpdateFunc(SFuncObj *pFunc, void *pMsg) {
|
|||
return code;
|
||||
}
|
||||
*/
|
||||
int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *codeScript, char *path, uint8_t outputType, int16_t outputLen, int32_t funcType, SMnodeMsg *pMsg) {
|
||||
int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *codeScript, char *path, uint8_t outputType, int16_t outputLen, int32_t funcType, int32_t bufSize, SMnodeMsg *pMsg) {
|
||||
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_GRANT_EXPIRED;
|
||||
}
|
||||
|
@ -217,8 +217,12 @@ int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code
|
|||
return TSDB_CODE_MND_INVALID_FUNC_CODE;
|
||||
}
|
||||
|
||||
if (codeLen < 0 || codeLen > TSDB_FUNC_CODE_LEN - 1) {
|
||||
return TSDB_CODE_MND_INVALID_FUNC_LEN;
|
||||
if (codeLen < 0 || codeLen > TSDB_FUNC_CODE_LEN) {
|
||||
return TSDB_CODE_MND_INVALID_FUNC_CODE;
|
||||
}
|
||||
|
||||
if (bufSize < 0 || bufSize > TSDB_FUNC_BUF_SIZE) {
|
||||
return TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
|
||||
}
|
||||
|
||||
SFuncObj *pFunc = mnodeGetFunc(name);
|
||||
|
@ -237,6 +241,7 @@ int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code
|
|||
pFunc->resType = outputType;
|
||||
pFunc->resBytes = outputLen;
|
||||
pFunc->funcType = funcType;
|
||||
pFunc->bufSize = bufSize;
|
||||
pFunc->sig = 0;
|
||||
pFunc->type = 1; //lua script, refactor
|
||||
|
||||
|
@ -326,6 +331,12 @@ static int32_t mnodeGetFuncMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
|
|||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "bufsize");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htons(cols);
|
||||
strcpy(pMeta->tableFname, "show funcs");
|
||||
pShow->numOfColumns = cols;
|
||||
|
@ -398,6 +409,10 @@ static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, voi
|
|||
*(int32_t *)pWrite = pFunc->contLen;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pFunc->bufSize;
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
mnodeDecFuncRef(pFunc);
|
||||
}
|
||||
|
@ -412,8 +427,9 @@ static int32_t mnodeProcessCreateFuncMsg(SMnodeMsg *pMsg) {
|
|||
pCreate->codeLen = htonl(pCreate->codeLen);
|
||||
pCreate->outputLen = htons(pCreate->outputLen);
|
||||
pCreate->funcType = htonl(pCreate->funcType);
|
||||
pCreate->bufSize = htonl(pCreate->bufSize);
|
||||
|
||||
return mnodeCreateFunc(pMsg->pUser->pAcct, pCreate->name, pCreate->codeLen, pCreate->code, pCreate->path, pCreate->outputType, pCreate->outputLen, pCreate->funcType, pMsg);
|
||||
return mnodeCreateFunc(pMsg->pUser->pAcct, pCreate->name, pCreate->codeLen, pCreate->code, pCreate->path, pCreate->outputType, pCreate->outputLen, pCreate->funcType, pCreate->bufSize, pMsg);
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessDropFuncMsg(SMnodeMsg *pMsg) {
|
||||
|
@ -457,6 +473,7 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) {
|
|||
pFuncInfo->funcType = htonl(pFuncObj->funcType);
|
||||
pFuncInfo->resType = pFuncObj->resType;
|
||||
pFuncInfo->resBytes = htons(pFuncObj->resBytes);
|
||||
pFuncInfo->bufSize = htonl(pFuncObj->bufSize);
|
||||
|
||||
pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->contLen;
|
||||
name =(tstr *)((char *)name + sizeof(*name) + htons(name->len));
|
||||
|
|
|
@ -27,6 +27,7 @@ extern "C" {
|
|||
#include "trpc.h"
|
||||
#include "tvariant.h"
|
||||
#include "tsdb.h"
|
||||
#include "qUdf.h"
|
||||
|
||||
#define TSDB_FUNC_INVALID_ID -1
|
||||
#define TSDB_FUNC_COUNT 0
|
||||
|
@ -223,7 +224,7 @@ typedef struct SAggFunctionInfo {
|
|||
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
||||
|
||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
||||
int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable);
|
||||
int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
|
||||
int32_t isValidFunction(const char* name, int32_t len);
|
||||
|
||||
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
|
||||
|
|
|
@ -438,7 +438,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
|
|||
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
|
||||
SColumnInfo* pTagCols, SUdfInfo* pUdfInfo);
|
||||
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
|
||||
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr);
|
||||
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr, SUdfInfo *pUdfInfo);
|
||||
|
||||
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
|
||||
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
|
||||
|
|
|
@ -179,6 +179,7 @@ typedef struct SCreateFuncInfo {
|
|||
SStrToken name;
|
||||
SStrToken path;
|
||||
int32_t type;
|
||||
int32_t bufSize;
|
||||
TAOS_FIELD output;
|
||||
} SCreateFuncInfo;
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ typedef struct SUdfInfo {
|
|||
int8_t resType; // result type
|
||||
int16_t resBytes; // result byte
|
||||
int32_t contLen; // content length
|
||||
int32_t bufSize; //interbuf size
|
||||
char *name; // function name
|
||||
void *handle; // handle loaded in mem
|
||||
void *funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr
|
||||
|
@ -62,10 +63,10 @@ typedef void (*scriptMergeFunc)(void *pCtx, char* data, int32_t numOfRows, char*
|
|||
typedef void (*scriptDestroyFunc)(void* pCtx);
|
||||
|
||||
// dynamic lib
|
||||
typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput,
|
||||
int32_t* numOfOutput, int16_t oType, int16_t oBytes, SUdfInit* buf);
|
||||
typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, char* dataOutput, char* interBuf,
|
||||
char* tsOutput, int32_t* numOfOutput, int16_t oType, int16_t oBytes, SUdfInit* buf);
|
||||
typedef int32_t (*udfInitFunc)(SUdfInit* data);
|
||||
typedef void (*udfFinalizeFunc)(char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
|
||||
typedef void (*udfFinalizeFunc)(char* dataOutput, char *interBuf, int32_t* numOfOutput, SUdfInit* buf);
|
||||
typedef void (*udfMergeFunc)(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
|
||||
typedef void (*udfDestroyFunc)(SUdfInit* buf);
|
||||
|
||||
|
|
|
@ -194,10 +194,13 @@ cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z).
|
|||
{ setCreateAcctSql(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);}
|
||||
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
|
||||
cmd ::= CREATE TOPIC ifnotexists(Z) ids(X) topic_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
|
||||
cmd ::= CREATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z). { setCreateFuncInfo(pInfo, TSDB_SQL_CREATE_FUNCTION, &X, &Y, &Z, 1);}
|
||||
cmd ::= CREATE AGGREGATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z). { setCreateFuncInfo(pInfo, TSDB_SQL_CREATE_FUNCTION, &X, &Y, &Z, 2);}
|
||||
cmd ::= CREATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) bufsize(B). { setCreateFuncInfo(pInfo, TSDB_SQL_CREATE_FUNCTION, &X, &Y, &Z, &B, 1);}
|
||||
cmd ::= CREATE AGGREGATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) bufsize(B). { setCreateFuncInfo(pInfo, TSDB_SQL_CREATE_FUNCTION, &X, &Y, &Z, &B, 2);}
|
||||
cmd ::= CREATE USER ids(X) PASS ids(Y). { setCreateUserSql(pInfo, &X, &Y);}
|
||||
|
||||
bufsize(Y) ::= . { Y.n = 0; }
|
||||
bufsize(Y) ::= BUFSIZE INTEGER(X). { Y = X; }
|
||||
|
||||
pps(Y) ::= . { Y.n = 0; }
|
||||
pps(Y) ::= PPS INTEGER(X). { Y = X; }
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "qPercentile.h"
|
||||
#include "qTsbuf.h"
|
||||
#include "queryLog.h"
|
||||
#include "qUdf.h"
|
||||
|
||||
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
|
||||
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
|
||||
|
@ -163,16 +164,13 @@ typedef struct SRateInfo {
|
|||
} SRateInfo;
|
||||
|
||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
||||
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable) {
|
||||
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
|
||||
if (!isValidDataType(dataType)) {
|
||||
qError("Illegal data type %d or data type length %d", dataType, dataBytes);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
||||
if (functionId < 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
|
||||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
|
||||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) {
|
||||
|
@ -223,6 +221,20 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
}
|
||||
|
||||
if (isSuperTable) {
|
||||
if (functionId < 0) {
|
||||
if (pUdfInfo->bufSize > 0) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = pUdfInfo->bufSize;
|
||||
*interBytes = *bytes;
|
||||
} else {
|
||||
*type = pUdfInfo->resType;
|
||||
*bytes = pUdfInfo->resBytes;
|
||||
*interBytes = *bytes;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = (int16_t)(dataBytes + DATA_SET_FLAG_SIZE);
|
||||
|
@ -277,7 +289,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (functionId == TSDB_FUNC_SUM) {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(dataType)) {
|
||||
*type = TSDB_DATA_TYPE_BIGINT;
|
||||
|
@ -302,7 +314,20 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
*interBytes = sizeof(STwaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
*type = pUdfInfo->resType;
|
||||
*bytes = pUdfInfo->resBytes;
|
||||
|
||||
if (pUdfInfo->bufSize > 0) {
|
||||
*interBytes = pUdfInfo->bufSize;
|
||||
} else {
|
||||
*interBytes = *bytes;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_AVG) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
|
|
|
@ -810,8 +810,12 @@ static void doInvokeUdf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int
|
|||
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
|
||||
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes);
|
||||
} else {
|
||||
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
|
||||
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList,
|
||||
pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
|
||||
}
|
||||
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
|
@ -3316,12 +3320,14 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
|
|||
for (int32_t j = 0; j < numOfOutput; ++j) {
|
||||
if (pCtx[j].functionId < 0) {
|
||||
int32_t output = 0;
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
|
||||
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
|
||||
if (pRuntimeEnv->pUdfInfo->isScript) {
|
||||
(*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output);
|
||||
} else {
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, interBuf, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3347,12 +3353,14 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
|
|||
for (int32_t j = 0; j < numOfOutput; ++j) {
|
||||
if (pCtx[j].functionId < 0) {
|
||||
int32_t output = 0;
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
|
||||
if (pRuntimeEnv->pUdfInfo->isScript) {
|
||||
(*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output);
|
||||
} else {
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, interBuf, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6230,6 +6238,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
|
|||
pMsg += varDataTLen(name);
|
||||
param->pUdfInfo->funcType = htonl(*(int32_t*)pMsg);
|
||||
pMsg += sizeof(int32_t);
|
||||
|
||||
param->pUdfInfo->bufSize = htonl(*(int32_t*)pMsg);
|
||||
pMsg += sizeof(int32_t);
|
||||
|
||||
param->pUdfInfo->content = malloc(pQueryMsg->udfContentLen);
|
||||
memcpy(param->pUdfInfo->content, pMsg, pQueryMsg->udfContentLen);
|
||||
|
@ -6326,7 +6337,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI
|
|||
} else {
|
||||
SColumnInfo* pCol = &pQueryMsg->colList[j];
|
||||
int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64,
|
||||
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, superTable);
|
||||
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, superTable, NULL);
|
||||
assert(ret == TSDB_CODE_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
@ -6551,15 +6562,10 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu
|
|||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
||||
if (pExprs[i].base.functionId < 0) {
|
||||
pExprs[i].type = pUdfInfo->resType;
|
||||
pExprs[i].bytes = pUdfInfo->resBytes;
|
||||
} else {
|
||||
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
|
||||
&pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
|
||||
tfree(pExprs);
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
|
||||
&pExprs[i].interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
|
||||
tfree(pExprs);
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
||||
if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
|
@ -6577,7 +6583,7 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu
|
|||
}
|
||||
|
||||
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
|
||||
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr) {
|
||||
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr, SUdfInfo *pUdfInfo) {
|
||||
*pExprInfo = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -6616,7 +6622,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
|
|||
|
||||
int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64;
|
||||
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
|
||||
&pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
|
||||
&pExprs[i].interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
|
||||
tfree(pExprs);
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
|
|
@ -1060,7 +1060,7 @@ void setCreateDbInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pToken, SCreateDb
|
|||
pInfo->pMiscInfo->dbOpt.ignoreExists = pIgExists->n; // sql.y has: ifnotexists(X) ::= IF NOT EXISTS. {X.n = 1;}
|
||||
}
|
||||
|
||||
void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken *pPath, TAOS_FIELD *output, int32_t funcType) {
|
||||
void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken *pPath, TAOS_FIELD *output, SStrToken* bufSize, int32_t funcType) {
|
||||
pInfo->type = type;
|
||||
if (pInfo->pMiscInfo == NULL) {
|
||||
pInfo->pMiscInfo = calloc(1, sizeof(SMiscInfo));
|
||||
|
@ -1070,6 +1070,11 @@ void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToke
|
|||
pInfo->pMiscInfo->funcOpt.path = *pPath;
|
||||
pInfo->pMiscInfo->funcOpt.output = *output;
|
||||
pInfo->pMiscInfo->funcOpt.type = funcType;
|
||||
if (bufSize->n > 0) {
|
||||
pInfo->pMiscInfo->funcOpt.bufSize = strtol(bufSize->z, NULL, 10);
|
||||
} else {
|
||||
pInfo->pMiscInfo->funcOpt.bufSize = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -221,7 +221,8 @@ static SKeyword keywordTable[] = {
|
|||
{"FUNCTION", TK_FUNCTION},
|
||||
{"FUNCTIONS", TK_FUNCTIONS},
|
||||
{"OUTPUTTYPE", TK_OUTPUTTYPE},
|
||||
{"AGGREGATE", TK_AGGREGATE}
|
||||
{"AGGREGATE", TK_AGGREGATE},
|
||||
{"BUFSIZE", TK_BUFSIZE}
|
||||
};
|
||||
|
||||
static const char isIdChar[] = {
|
||||
|
|
|
@ -96,7 +96,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
}
|
||||
|
||||
if (param.pSecExprMsg != NULL) {
|
||||
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExprMsg, param.pExprs)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExprMsg, param.pExprs, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
}
|
||||
|
|
3255
src/query/src/sql.c
3255
src/query/src/sql.c
File diff suppressed because it is too large
Load Diff
|
@ -192,6 +192,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_LEN, "Invalid func length")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_CODE, "Invalid func code")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_ALREADY_EXIST, "Func already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC, "Invalid func")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "Database not specified or available")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists")
|
||||
|
|
|
@ -55,4 +55,7 @@ run general/parser/sliding.sim
|
|||
run general/parser/function.sim
|
||||
run general/parser/stableOp.sim
|
||||
run general/parser/slimit_alter_tags.sim
|
||||
run general/parser/udf.sim
|
||||
run general/parser/udf_dll.sim
|
||||
run general/parser/udf_dll_stable.sim
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ typedef struct SUdfInit{
|
|||
int const_item; /* 0 if result is independent of arguments */
|
||||
} SUdfInit;
|
||||
|
||||
void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
|
||||
void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
|
||||
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
|
||||
int i;
|
||||
int r = 0;
|
||||
|
|
|
@ -16,12 +16,12 @@ typedef struct SDemo{
|
|||
short otype;
|
||||
}SDemo;
|
||||
|
||||
void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
|
||||
void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
|
||||
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
|
||||
int i;
|
||||
double r = 0;
|
||||
SDemo *p = (SDemo *)buf->ptr;
|
||||
printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
|
||||
SDemo *p = (SDemo *)interBuf;
|
||||
printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, interBUf:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, interBuf, tsOutput, numOfOutput, buf);
|
||||
|
||||
for(i=0;i<numOfRows;++i) {
|
||||
if (itype == 4) {
|
||||
|
@ -40,35 +40,56 @@ void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, c
|
|||
|
||||
*numOfOutput=1;
|
||||
|
||||
printf("demo out, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
|
||||
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
|
||||
}
|
||||
|
||||
|
||||
void demo_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
|
||||
int i;
|
||||
SDemo *p = (SDemo *)data;
|
||||
SDemo res = {0};
|
||||
printf("demo_merge input data:%p, rows:%d, dataoutput:%p, numOfOutput:%p, buf:%p\n", data, numOfRows, dataOutput, numOfOutput, buf);
|
||||
|
||||
for(i=0;i<numOfRows;++i) {
|
||||
res.sum += p->sum * p->sum;
|
||||
res.num += p->num;
|
||||
p++;
|
||||
}
|
||||
|
||||
p->sum = res.sum;
|
||||
p->num = res.num;
|
||||
|
||||
*numOfOutput=1;
|
||||
|
||||
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void demo_finalize(char* dataOutput, int* numOfOutput, SUdfInit* buf) {
|
||||
SDemo *p = (SDemo *)buf->ptr;
|
||||
printf("demo_finalize dataoutput:%p, numOfOutput:%p, buf:%p\n", dataOutput, numOfOutput, buf);
|
||||
void demo_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
|
||||
SDemo *p = (SDemo *)interBuf;
|
||||
printf("demo_finalize interbuf:%p, numOfOutput:%p, buf:%p, sum:%f, num:%d\n", interBuf, numOfOutput, buf, p->sum, p->num);
|
||||
if (p->otype == 6) {
|
||||
*(float *)dataOutput = (float)(p->sum / p->num);
|
||||
*(float *)dataOutput = (float)(p->sum / p->num);
|
||||
printf("finalize values:%f\n", *(float *)dataOutput);
|
||||
} else if (p->otype == 7) {
|
||||
*(double *)dataOutput = (double)(p->sum / p->num);
|
||||
printf("finalize values:%f\n", *(double *)dataOutput);
|
||||
}
|
||||
|
||||
*numOfOutput=1;
|
||||
|
||||
printf("demo finalize, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
|
||||
printf("demo finalize, numOfOutput:%d\n", *numOfOutput);
|
||||
}
|
||||
|
||||
|
||||
int demo_init(SUdfInit* buf) {
|
||||
buf->ptr = calloc(1, sizeof(SDemo));
|
||||
printf("demo init\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void demo_destroy(SUdfInit* buf) {
|
||||
free(buf->ptr);
|
||||
printf("demo destroy\n");
|
||||
}
|
||||
|
||||
|
|
|
@ -8,3 +8,4 @@ rm -rf /tmp/sum_double.so /tmp/add_one.so
|
|||
|
||||
gcc -g -O0 -fPIC -shared sh/sum_double.c -o /tmp/sum_double.so
|
||||
gcc -g -O0 -fPIC -shared sh/add_one.c -o /tmp/add_one.so
|
||||
gcc -g -O0 -fPIC -shared sh/demo.c -o /tmp/demo.so
|
||||
|
|
|
@ -13,7 +13,7 @@ typedef struct SUdfInit{
|
|||
#define TSDB_DATA_INT_NULL 0x80000000L
|
||||
|
||||
|
||||
void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
|
||||
void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
|
||||
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
|
||||
int i;
|
||||
int r = 0;
|
||||
|
@ -38,7 +38,7 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long*
|
|||
|
||||
|
||||
|
||||
void sum_double_finalize(char* dataOutput, int* numOfOutput, SUdfInit* buf) {
|
||||
void sum_double_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
|
||||
int i;
|
||||
int r = 0;
|
||||
printf("sum_double_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
|
||||
|
|
Loading…
Reference in New Issue