support stable udf

This commit is contained in:
dapan1121 2021-04-08 18:48:04 +08:00
parent e06dce4892
commit 5ae38f3206
10 changed files with 1142 additions and 51 deletions

View File

@ -128,7 +128,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
@ -137,7 +137,7 @@ bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo *pQueryInfo,
bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);

View File

@ -121,6 +121,8 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExpr->resBytes;
pTagCtx[n++] = &pReducer->pCtx[i];
} else if (pExpr->functionId < 0) {
continue;
} else if ((aAggs[pExpr->functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx = &pReducer->pCtx[i];
}
@ -1321,7 +1323,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
if (tscIsSecondStageQuery(pCmd, pQueryInfo)) {
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize);
}

View File

@ -3112,6 +3112,10 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId < 0) {
continue;
}
if ((aAggs[functionId].status & TSDB_FUNCSTATE_STABLE) == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
return true;
@ -6462,7 +6466,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on super table does not compatible with "group by" syntax
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (tscIsProjectionQuery(pCmd, pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
@ -6641,8 +6645,17 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) {
char tmpBuf[1024] = {0};
int32_t tmpLen = 0;
char *name = NULL;
if (pExpr->functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pSql->cmd.pUdfInfo, -1 * pExpr->functionId - 1);
name = pUdfInfo->name;
} else {
name = aAggs[pExpr->functionId].name;
}
tmpLen =
sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].name, pExpr->uid, pExpr->colInfo.colId);
sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", name, pExpr->uid, pExpr->colInfo.colId);
if (tmpLen + offset >= totalBufSize - 1) break;
@ -6988,7 +7001,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) {
if (!tscIsProjectionQuery(pCmd, pQueryInfo) && pQueryInfo->interval.interval == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
}

View File

@ -888,7 +888,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
size_t output = tscNumOfFields(pQueryInfo);
if (tscIsSecondStageQuery(pQueryInfo)) {
if (tscIsSecondStageQuery(pCmd, pQueryInfo)) {
pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;

View File

@ -633,7 +633,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {
int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
int16_t functionId = tscIsProjectionQuery(&pNew->cmd, pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
tscPrintSelectClause(pNew, 0);
@ -3486,7 +3486,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
tscIsProjectionQuery(&pSql->pSubs[i]->cmd, pQueryInfo1)) || (pRes1->numOfRows == 0)) {
hasData = false;
break;
}

View File

@ -198,12 +198,21 @@ bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, in
return pQueryInfo->order.orderColId >= 0;
}
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return false;
}
continue;
}
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
return false;
@ -232,11 +241,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
return true;
}
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
if (tscIsProjectionQuery(pQueryInfo)) {
return false;
}
bool tscIsSecondStageQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
size_t numOfOutput = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pArithExprInfo;
@ -245,6 +250,10 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
}
}
if (tscIsProjectionQuery(pCmd, pQueryInfo)) {
return false;
}
return false;
}

View File

@ -784,9 +784,13 @@ static void doInvokeUdf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
// set the output value exist
pCtx->resultInfo->numOfRes += output;
if (output > 0) {
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
pCtx->resultInfo->numOfRes = output;
} else {
pCtx->resultInfo->numOfRes += output;
}
if (pCtx->resultInfo->numOfRes > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}

View File

@ -393,32 +393,7 @@ if $rows != 28 then
return -1
endi
sql select add_one(f1) from tb1 group by f1;
if $rows != 7 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data10 != 3 then
return -1
endi
if $data20 != 4 then
return -1
endi
if $data30 != 5 then
return -1
endi
if $data40 != 6 then
return -1
endi
if $data50 != 7 then
return -1
endi
if $data60 != 8 then
return -1
endi
sql_error select add_one(f1) from tb1 group by f1;
sql select sum_double(f1) from tb1 group by f1;
if $rows != 7 then

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,9 @@ typedef struct SUdfInit{
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
#define TSDB_DATA_INT_NULL 0x80000000L
void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
@ -17,14 +20,17 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long*
printf("sum_double input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
r=*(int *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
r+=*((int *)data + i);
if (tsOutput) {
*(long long*)tsOutput=1000000;
if (*((int *)data + i) == TSDB_DATA_INT_NULL) {
continue;
}
*numOfOutput=1;
r+=*((int *)data + i);
*(int *)dataOutput=r;
}
*(int *)dataOutput=r;
*numOfOutput=1;
printf("sum_double out, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
}
@ -46,14 +52,18 @@ void sum_double_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t*
int r = 0;
int sum = 0;
printf("sum_double_merge dataoutput:%p, numOfOutput:%d, buf:%p\n", dataOutput, *numOfOutput, buf);
printf("sum_double_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 0; i < numOfRows; ++i) {
printf("sum_double_merge %d - %d\n", i, *((int*)data + i));
sum +=*((int*)data + i);
}
*(int*)dataOutput+=sum;
*numOfOutput=1;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
printf("sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
}