Merge branch 'feature/TD-3685' into feature/TD-3188
This commit is contained in:
commit
e66cd0fdda
|
@ -128,18 +128,18 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
|
|||
*/
|
||||
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSecondStageQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
|
||||
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
|
||||
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo);
|
||||
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
|
||||
|
||||
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
|
||||
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, int32_t tableIndex);
|
||||
bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
|
||||
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
|
||||
|
||||
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscIsTwoStageSTableQuery(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscQueryTags(SQueryInfo* pQueryInfo);
|
||||
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
|
||||
|
|
|
@ -121,6 +121,8 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
|
|||
if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
tagLen += pExpr->resBytes;
|
||||
pTagCtx[n++] = &pReducer->pCtx[i];
|
||||
} else if (pExpr->functionId < 0) {
|
||||
continue;
|
||||
} else if ((aAggs[pExpr->functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
|
||||
pCtx = &pReducer->pCtx[i];
|
||||
}
|
||||
|
@ -300,7 +302,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
|
|||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
|
||||
pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
|
||||
pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0);
|
||||
|
||||
pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
|
||||
if (pReducer->pLoserTree == NULL || pRes->code != 0) {
|
||||
|
@ -377,7 +379,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
|
|||
pReducer->pDesc->pColumnModel->capacity = 1;
|
||||
|
||||
// restore the limitation value at the last stage
|
||||
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
|
||||
pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
|
||||
pQueryInfo->limit.offset = pQueryInfo->prjOffset;
|
||||
}
|
||||
|
@ -561,7 +563,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
|
|||
}
|
||||
|
||||
// primary timestamp column is involved in final result
|
||||
if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
|
||||
numOfGroupByCols++;
|
||||
}
|
||||
|
||||
|
@ -751,6 +753,10 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
|
|||
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
|
||||
type = pModel->pFields[i].field.type;
|
||||
bytes = pModel->pFields[i].field.bytes;
|
||||
} else if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
|
||||
type = pUdfInfo->resType;
|
||||
bytes = pUdfInfo->resBytes;
|
||||
} else {
|
||||
if (functionId == TSDB_FUNC_FIRST_DST) {
|
||||
functionId = TSDB_FUNC_FIRST;
|
||||
|
@ -1082,6 +1088,10 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n
|
|||
}
|
||||
|
||||
pCtx->currentStage = MERGE_STAGE;
|
||||
if (functionId < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (needInit) {
|
||||
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
|
||||
}
|
||||
|
@ -1093,7 +1103,24 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n
|
|||
continue;
|
||||
}
|
||||
|
||||
aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]);
|
||||
if (functionId < 0) {
|
||||
int32_t output = 0;
|
||||
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
|
||||
assert (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE);
|
||||
|
||||
if (pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE]) {
|
||||
(*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pLocalMerge->pCtx[j].pInput, pLocalMerge->pCtx[j].size, pLocalMerge->pCtx[j].pOutput, &output, &pUdfInfo->init);
|
||||
|
||||
// set the output value exist
|
||||
pLocalMerge->pCtx[j].resultInfo->numOfRes = output;
|
||||
if (output > 0) {
|
||||
pLocalMerge->pCtx[j].resultInfo->hasResult = DATA_SET_FLAG;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1168,12 +1195,30 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
|
|||
free(buf);
|
||||
}
|
||||
|
||||
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
|
||||
int32_t finalizeRes(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
|
||||
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
|
||||
for (int32_t k = 0; k < size; ++k) {
|
||||
SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k];
|
||||
aAggs[pCtx->functionId].xFinalize(pCtx);
|
||||
|
||||
if (pCtx->functionId < 0) {
|
||||
int32_t output = 0;
|
||||
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * pCtx->functionId - 1);
|
||||
assert (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE);
|
||||
|
||||
if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
|
||||
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, &output, &pUdfInfo->init);
|
||||
|
||||
// set the output value exist
|
||||
pCtx->resultInfo->numOfRes = output;
|
||||
if (output > 0) {
|
||||
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
aAggs[pCtx->functionId].xFinalize(pCtx);
|
||||
}
|
||||
}
|
||||
|
||||
pLocalMerge->hasPrevRow = false;
|
||||
|
@ -1192,13 +1237,13 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
|
|||
* results generated by simple aggregation function, we merge them all into one points
|
||||
* *Exception*: column projection query, required no merge procedure
|
||||
*/
|
||||
bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
|
||||
bool needToMerge(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
|
||||
int32_t ret = 0; // merge all result by default
|
||||
|
||||
int16_t functionId = pLocalMerge->pCtx[0].functionId;
|
||||
|
||||
// todo opt performance
|
||||
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query
|
||||
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query
|
||||
ret = 1; // disable merge procedure
|
||||
} else {
|
||||
tOrderDescriptor *pDesc = pLocalMerge->pDesc;
|
||||
|
@ -1278,7 +1323,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
|
|||
|
||||
tColModelCompact(pModel, pResBuf, pModel->capacity);
|
||||
|
||||
if (tscIsSecondStageQuery(pQueryInfo)) {
|
||||
if (tscIsSecondStageQuery(pCmd, pQueryInfo)) {
|
||||
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize);
|
||||
}
|
||||
|
||||
|
@ -1522,7 +1567,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
if (pLocalMerge->hasPrevRow) {
|
||||
if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) {
|
||||
if (needToMerge(pCmd, pQueryInfo, pLocalMerge, tmpBuffer)) {
|
||||
// belong to the group of the previous row, continue process it
|
||||
doExecuteFinalMerge(pCmd, pLocalMerge, false);
|
||||
|
||||
|
@ -1533,7 +1578,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
|||
* current row does not belong to the group of previous row.
|
||||
* so the processing of previous group is completed.
|
||||
*/
|
||||
int32_t numOfRes = finalizeRes(pQueryInfo, pLocalMerge);
|
||||
int32_t numOfRes = finalizeRes(pCmd, pQueryInfo, pLocalMerge);
|
||||
bool sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer);
|
||||
|
||||
tFilePage *pResBuf = pLocalMerge->pResultBuf;
|
||||
|
@ -1605,7 +1650,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
if (pLocalMerge->hasPrevRow) {
|
||||
finalizeRes(pQueryInfo, pLocalMerge);
|
||||
finalizeRes(pCmd, pQueryInfo, pLocalMerge);
|
||||
}
|
||||
|
||||
if (pLocalMerge->pResultBuf->num) {
|
||||
|
|
|
@ -3112,6 +3112,10 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
|
|||
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
|
||||
if (functionId < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((aAggs[functionId].status & TSDB_FUNCSTATE_STABLE) == 0) {
|
||||
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
return true;
|
||||
|
@ -5193,7 +5197,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode*
|
|||
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
|
||||
// orderby ts query on super table
|
||||
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
|
||||
addPrimaryTsColIntoResult(pQueryInfo);
|
||||
}
|
||||
}
|
||||
|
@ -5817,13 +5821,13 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
|
|||
// todo refactor
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query
|
||||
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
|
||||
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
// for projection query on super table, all queries are subqueries
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) &&
|
||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY)) {
|
||||
pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
|
||||
}
|
||||
|
@ -5859,7 +5863,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
|
|||
pQueryInfo->prjOffset = pQueryInfo->limit.offset;
|
||||
pQueryInfo->vgroupLimit = -1;
|
||||
|
||||
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
|
||||
/*
|
||||
* the offset value should be removed during retrieve data from virtual node, since the
|
||||
* global order are done in client side, so the offset is applied at the client side
|
||||
|
@ -6462,7 +6466,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
// projection query on super table does not compatible with "group by" syntax
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (tscIsProjectionQuery(pCmd, pQueryInfo)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
|
@ -6641,8 +6645,17 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) {
|
|||
|
||||
char tmpBuf[1024] = {0};
|
||||
int32_t tmpLen = 0;
|
||||
char *name = NULL;
|
||||
|
||||
if (pExpr->functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pSql->cmd.pUdfInfo, -1 * pExpr->functionId - 1);
|
||||
name = pUdfInfo->name;
|
||||
} else {
|
||||
name = aAggs[pExpr->functionId].name;
|
||||
}
|
||||
|
||||
tmpLen =
|
||||
sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].name, pExpr->uid, pExpr->colInfo.colId);
|
||||
sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", name, pExpr->uid, pExpr->colInfo.colId);
|
||||
|
||||
if (tmpLen + offset >= totalBufSize - 1) break;
|
||||
|
||||
|
@ -6988,7 +7001,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) {
|
||||
if (!tscIsProjectionQuery(pCmd, pQueryInfo) && pQueryInfo->interval.interval == 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
|
||||
}
|
||||
|
||||
|
|
|
@ -888,7 +888,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
size_t output = tscNumOfFields(pQueryInfo);
|
||||
|
||||
if (tscIsSecondStageQuery(pQueryInfo)) {
|
||||
if (tscIsSecondStageQuery(pCmd, pQueryInfo)) {
|
||||
pQueryMsg->secondStageOutput = htonl((int32_t) output);
|
||||
|
||||
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
|
||||
|
@ -2485,7 +2485,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
|
|||
tscSetResRawPtr(pRes, pQueryInfo);
|
||||
} else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
|
||||
tscSetResRawPtr(pRes, pQueryInfo);
|
||||
} else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
|
||||
} else if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
|
||||
tscSetResRawPtr(pRes, pQueryInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -560,7 +560,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
|
|||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -673,7 +673,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
|
|||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
if (!tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -724,7 +724,7 @@ void taos_stop_query(TAOS_RES *res) {
|
|||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
if (tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) {
|
||||
assert(pSql->rpcRid <= 0);
|
||||
tscKillSTableQuery(pSql);
|
||||
} else {
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "tscSubquery.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsclient.h"
|
||||
#include "qUdf.h"
|
||||
#include "qUtil.h"
|
||||
|
||||
typedef struct SInsertSupporter {
|
||||
|
@ -632,7 +633,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|||
if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
|
||||
(funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {
|
||||
|
||||
int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
|
||||
int16_t functionId = tscIsProjectionQuery(&pNew->cmd, pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
|
||||
|
||||
tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
@ -653,7 +654,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
assert(pTableMetaInfo->pVgroupTables != NULL);
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (tscNonOrderedProjectionQueryOnSTable(&pNew->cmd, pQueryInfo, 0)) {
|
||||
SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
|
||||
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
|
||||
pTableMetaInfo->pVgroupTables = p;
|
||||
|
@ -1442,7 +1443,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
|
|||
}
|
||||
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && numOfRows == 0) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
assert(pQueryInfo->numOfTables == 1);
|
||||
|
||||
|
@ -1479,7 +1480,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
|
|||
}
|
||||
|
||||
// update the records for each subquery in parent sql object.
|
||||
bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
|
||||
bool stableQuery = tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0);
|
||||
for (int32_t i = 0; i < pState->numOfSub; ++i) {
|
||||
if (pParentSql->pSubs[i] == NULL) {
|
||||
tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
|
||||
|
@ -1576,7 +1577,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
|
|||
}
|
||||
|
||||
SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
|
||||
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
|
||||
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, p, 0);
|
||||
if (orderedPrjQuery) {
|
||||
break;
|
||||
}
|
||||
|
@ -1601,7 +1602,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
|
|||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
|
||||
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
|
||||
if (tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
|
||||
pSub->res.completed) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
assert(pQueryInfo->numOfTables == 1);
|
||||
|
@ -1804,7 +1805,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|||
}
|
||||
|
||||
// In case of consequence query from other vnode, do not wait for other query response here.
|
||||
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
|
||||
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0))) {
|
||||
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
|
||||
return;
|
||||
}
|
||||
|
@ -1816,7 +1817,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|||
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
|
||||
* data instead of returning to its invoker
|
||||
*/
|
||||
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0)) {
|
||||
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
|
||||
pSql->cmd.command = TSDB_SQL_FETCH;
|
||||
|
||||
|
@ -2791,6 +2792,22 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
|
||||
tscFreeRetrieveSup(pSql);
|
||||
|
||||
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
SQLFunctionCtx *pCtx = &pParentSql->res.pLocalMerger->pCtx[j];
|
||||
|
||||
int32_t functionId = pCtx->functionId;
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pParentSql->cmd.pUdfInfo, -1 * functionId - 1);
|
||||
int32_t code = initUdfInfo(pUdfInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pParentSql->res.code = code;
|
||||
tscAsyncResultOnError(pParentSql);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// set the command flag must be after the semaphore been correctly set.
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
|
||||
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -2864,7 +2881,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
tscDebug("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
|
||||
pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
|
||||
|
||||
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
|
||||
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
|
||||
pParentSql, pSql, tsMaxNumOfOrderedResults, num);
|
||||
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
|
||||
|
@ -3430,7 +3448,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
|
|||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
|
||||
bool allSubqueryExhausted = true;
|
||||
|
||||
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||
|
@ -3468,7 +3486,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
|
|||
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
|
||||
|
||||
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
|
||||
tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
|
||||
tscIsProjectionQuery(&pSql->pSubs[i]->cmd, pQueryInfo1)) || (pRes1->numOfRows == 0)) {
|
||||
hasData = false;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
if (pQueryInfo == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
|||
}
|
||||
|
||||
// for ordered projection query, iterate all qualified vnodes sequentially
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -139,7 +139,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
|
||||
|
||||
/*
|
||||
|
@ -156,6 +156,15 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
|||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId != TSDB_FUNC_PRJ &&
|
||||
functionId != TSDB_FUNC_TAGPRJ &&
|
||||
functionId != TSDB_FUNC_TAG &&
|
||||
|
@ -171,8 +180,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
|||
}
|
||||
|
||||
// not order by timestamp projection query on super table
|
||||
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
|
||||
bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -180,8 +189,8 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI
|
|||
return pQueryInfo->order.orderColId < 0;
|
||||
}
|
||||
|
||||
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
|
||||
bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||
if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -189,12 +198,21 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde
|
|||
return pQueryInfo->order.orderColId >= 0;
|
||||
}
|
||||
|
||||
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
|
||||
bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
|
||||
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
|
||||
return false;
|
||||
|
@ -223,11 +241,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
|
||||
if (tscIsProjectionQuery(pQueryInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool tscIsSecondStageQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||
size_t numOfOutput = tscNumOfFields(pQueryInfo);
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pArithExprInfo;
|
||||
|
@ -236,6 +250,10 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
if (tscIsProjectionQuery(pCmd, pQueryInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -659,6 +677,30 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
void freeUdfInfo(SUdfInfo* pUdfInfo) {
|
||||
if (pUdfInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY]) {
|
||||
(*(udfDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(&pUdfInfo->init);
|
||||
}
|
||||
|
||||
tfree(pUdfInfo->name);
|
||||
|
||||
if (pUdfInfo->path) {
|
||||
unlink(pUdfInfo->path);
|
||||
}
|
||||
|
||||
tfree(pUdfInfo->path);
|
||||
|
||||
tfree(pUdfInfo->content);
|
||||
|
||||
taosCloseDll(pUdfInfo->handle);
|
||||
}
|
||||
|
||||
|
||||
void* tscDestroyUdfArrayList(SArray* pUdfList) {
|
||||
if (pUdfList == NULL) {
|
||||
return NULL;
|
||||
|
@ -667,10 +709,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) {
|
|||
size_t size = taosArrayGetSize(pUdfList);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SUdfInfo* udf = taosArrayGet(pUdfList, i);
|
||||
if (udf) {
|
||||
tfree(udf->content);
|
||||
tfree(udf->name);
|
||||
}
|
||||
freeUdfInfo(udf);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pUdfList);
|
||||
|
@ -2440,7 +2479,7 @@ void tscDoQuery(SSqlObj* pSql) {
|
|||
if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
|
||||
tscProcessSql(pSql);
|
||||
} else { // secondary stage join query.
|
||||
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
|
||||
if (tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) { // super table query
|
||||
tscLockByThread(&pSql->squeryLock);
|
||||
tscHandleMasterSTableQuery(pSql);
|
||||
tscUnlockByThread(&pSql->squeryLock);
|
||||
|
@ -2454,7 +2493,7 @@ void tscDoQuery(SSqlObj* pSql) {
|
|||
} else if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0) {
|
||||
tscHandleFirstRoundStableQuery(pSql); // todo lock?
|
||||
return;
|
||||
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
|
||||
} else if (tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) { // super table query
|
||||
tscLockByThread(&pSql->squeryLock);
|
||||
tscHandleMasterSTableQuery(pSql);
|
||||
tscUnlockByThread(&pSql->squeryLock);
|
||||
|
@ -2619,7 +2658,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
|
|||
numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
}
|
||||
|
||||
return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
|
||||
return tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) &&
|
||||
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
|
||||
}
|
||||
|
||||
|
@ -2637,7 +2676,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
* no result returned from the current virtual node anymore, try the next vnode if exists
|
||||
* if case of: multi-vnode super table projection query
|
||||
*/
|
||||
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
|
||||
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef TDENGINE_QUDF_H
|
||||
#define TDENGINE_QUDF_H
|
||||
|
||||
enum { TSDB_UDF_FUNC_NORMAL = 0, TSDB_UDF_FUNC_INIT, TSDB_UDF_FUNC_FINALIZE, TSDB_UDF_FUNC_DESTROY, TSDB_UDF_FUNC_MAX_NUM };
|
||||
enum { TSDB_UDF_FUNC_NORMAL = 0, TSDB_UDF_FUNC_INIT, TSDB_UDF_FUNC_FINALIZE, TSDB_UDF_FUNC_MERGE, TSDB_UDF_FUNC_DESTROY, TSDB_UDF_FUNC_MAX_NUM };
|
||||
|
||||
|
||||
|
||||
|
@ -43,16 +43,15 @@ typedef struct SUdfInfo {
|
|||
void *handle; // handle loaded in mem
|
||||
void *funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr
|
||||
SUdfInit init;
|
||||
union { // file path or [in memory] binary content
|
||||
char *content;
|
||||
char *path;
|
||||
};
|
||||
char *content;
|
||||
char *path;
|
||||
} SUdfInfo;
|
||||
|
||||
typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput,
|
||||
int32_t* numOfOutput, int16_t oType, int16_t oByte, SUdfInit* buf);
|
||||
int32_t* numOfOutput, int16_t oType, int16_t oBytes, SUdfInit* buf);
|
||||
typedef int32_t (*udfInitFunc)(SUdfInit* data);
|
||||
typedef void (*udfFinalizeFunc)(char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
|
||||
typedef void (*udfMergeFunc)(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
|
||||
typedef void (*udfDestroyFunc)(SUdfInit* buf);
|
||||
|
||||
|
||||
|
|
|
@ -90,4 +90,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
|||
|
||||
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
|
||||
|
||||
int32_t initUdfInfo(SUdfInfo* pUdfInfo);
|
||||
|
||||
#endif // TDENGINE_QUERYUTIL_H
|
||||
|
|
|
@ -784,9 +784,13 @@ static void doInvokeUdf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int
|
|||
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
|
||||
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
|
||||
|
||||
// set the output value exist
|
||||
pCtx->resultInfo->numOfRes += output;
|
||||
if (output > 0) {
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
pCtx->resultInfo->numOfRes = output;
|
||||
} else {
|
||||
pCtx->resultInfo->numOfRes += output;
|
||||
}
|
||||
|
||||
if (pCtx->resultInfo->numOfRes > 0) {
|
||||
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
|
||||
}
|
||||
|
||||
|
@ -6008,6 +6012,8 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) {
|
|||
}
|
||||
|
||||
tfree(pUdfInfo->path);
|
||||
|
||||
tfree(pUdfInfo->content);
|
||||
|
||||
taosCloseDll(pUdfInfo->handle);
|
||||
|
||||
|
@ -6027,6 +6033,9 @@ static char* getUdfFuncName(char* name, int type) {
|
|||
case TSDB_UDF_FUNC_FINALIZE:
|
||||
sprintf(funcname, "%s_finalize", name);
|
||||
break;
|
||||
case TSDB_UDF_FUNC_MERGE:
|
||||
sprintf(funcname, "%s_merge", name);
|
||||
break;
|
||||
case TSDB_UDF_FUNC_DESTROY:
|
||||
sprintf(funcname, "%s_destroy", name);
|
||||
break;
|
||||
|
@ -6038,7 +6047,7 @@ static char* getUdfFuncName(char* name, int type) {
|
|||
return funcname;
|
||||
}
|
||||
|
||||
static int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
|
||||
int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
|
||||
if (pUdfInfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -6088,6 +6097,7 @@ static int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
|
|||
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE));
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_MERGE));
|
||||
}
|
||||
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_DESTROY));
|
||||
|
|
|
@ -393,32 +393,7 @@ if $rows != 28 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select add_one(f1) from tb1 group by f1;
|
||||
if $rows != 7 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data40 != 6 then
|
||||
return -1
|
||||
endi
|
||||
if $data50 != 7 then
|
||||
return -1
|
||||
endi
|
||||
if $data60 != 8 then
|
||||
return -1
|
||||
endi
|
||||
sql_error select add_one(f1) from tb1 group by f1;
|
||||
|
||||
sql select sum_double(f1) from tb1 group by f1;
|
||||
if $rows != 7 then
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -10,12 +10,12 @@ typedef struct SUdfInit{
|
|||
int const_item; /* 0 if result is independent of arguments */
|
||||
} SUdfInit;
|
||||
|
||||
void add_one(char* data, char type, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
|
||||
int* numOfOutput, SUdfInit* buf) {
|
||||
void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
|
||||
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
|
||||
int i;
|
||||
int r = 0;
|
||||
printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, type, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
|
||||
if (type == 4) {
|
||||
printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
|
||||
if (itype == 4) {
|
||||
for(i=0;i<numOfRows;++i) {
|
||||
printf("input %d - %d", i, *((int *)data + i));
|
||||
*((int *)dataOutput+i)=*((int *)data + i) + 1;
|
||||
|
|
|
@ -10,6 +10,9 @@ typedef struct SUdfInit{
|
|||
int const_item; /* 0 if result is independent of arguments */
|
||||
} SUdfInit;
|
||||
|
||||
#define TSDB_DATA_INT_NULL 0x80000000L
|
||||
|
||||
|
||||
void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
|
||||
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
|
||||
int i;
|
||||
|
@ -17,14 +20,17 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long*
|
|||
printf("sum_double input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
|
||||
if (itype == 4) {
|
||||
r=*(int *)dataOutput;
|
||||
*numOfOutput=0;
|
||||
|
||||
for(i=0;i<numOfRows;++i) {
|
||||
r+=*((int *)data + i);
|
||||
if (tsOutput) {
|
||||
*(long long*)tsOutput=1000000;
|
||||
if (*((int *)data + i) == TSDB_DATA_INT_NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
*numOfOutput=1;
|
||||
r+=*((int *)data + i);
|
||||
*(int *)dataOutput=r;
|
||||
}
|
||||
*(int *)dataOutput=r;
|
||||
*numOfOutput=1;
|
||||
|
||||
printf("sum_double out, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
|
||||
}
|
||||
|
@ -35,27 +41,31 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long*
|
|||
void sum_double_finalize(char* dataOutput, int* numOfOutput, SUdfInit* buf) {
|
||||
int i;
|
||||
int r = 0;
|
||||
printf("sum_double_finalize dataoutput:%p, numOfOutput:%p, buf:%p\n", dataOutput, numOfOutput, buf);
|
||||
printf("sum_double_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
|
||||
*numOfOutput=1;
|
||||
*(int*)(buf->ptr)=*(int*)dataOutput*2;
|
||||
*(int*)dataOutput=*(int*)(buf->ptr);
|
||||
printf("sum_double finalize, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
|
||||
}
|
||||
|
||||
void sum_double_merge(char* dataOutput, int* numOfOutput, SUdfInit* buf) {
|
||||
void sum_double_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
|
||||
int r = 0;
|
||||
int sum = 0;
|
||||
|
||||
printf("sum_double_merge dataoutput:%p, numOfOutput:%p, buf:%p\n", dataOutput, numOfOutput, buf);
|
||||
for (int i = 0; i < *numOfOutput; ++i) {
|
||||
printf("sum_double_merge %d - %d\n", i, *((int*)dataOutput + i));
|
||||
sum +=*((int*)dataOutput + i);
|
||||
printf("sum_double_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
|
||||
for (int i = 0; i < numOfRows; ++i) {
|
||||
printf("sum_double_merge %d - %d\n", i, *((int*)data + i));
|
||||
sum +=*((int*)data + i);
|
||||
}
|
||||
|
||||
*(int*)dataOutput=sum;
|
||||
*numOfOutput=1;
|
||||
*(int*)dataOutput+=sum;
|
||||
if (numOfRows > 0) {
|
||||
*numOfOutput=1;
|
||||
} else {
|
||||
*numOfOutput=0;
|
||||
}
|
||||
|
||||
printf("sum_double sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
|
||||
printf("sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue