From 4c30b53a7c40327828f7a90d229a1359da9e35c4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 16:03:31 +0800 Subject: [PATCH 01/18] feat: make grant revoke work --- include/common/tmsg.h | 10 +- source/dnode/mnode/impl/inc/mndAuth.h | 2 +- source/dnode/mnode/impl/src/mndAuth.c | 17 +--- source/dnode/mnode/impl/src/mndUser.c | 135 ++++++++++++++++---------- 4 files changed, 93 insertions(+), 71 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 72418148b0..8e6e73d649 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -131,12 +131,10 @@ typedef enum _mgmt_table { #define TSDB_ALTER_USER_SUPERUSER 0x2 #define TSDB_ALTER_USER_ADD_READ_DB 0x3 #define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 -#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 -#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 -#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7 -#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 -#define TSDB_ALTER_USER_ADD_ALL_DB 0x9 -#define TSDB_ALTER_USER_REMOVE_ALL_DB 0xA +#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5 +#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6 +#define TSDB_ALTER_USER_ADD_ALL_DB 0x7 +#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8 #define TSDB_ALTER_USER_PRIVILEGES 0x2 diff --git a/source/dnode/mnode/impl/inc/mndAuth.h b/source/dnode/mnode/impl/inc/mndAuth.h index 890879912f..de59a11cd7 100644 --- a/source/dnode/mnode/impl/inc/mndAuth.h +++ b/source/dnode/mnode/impl/inc/mndAuth.h @@ -26,7 +26,7 @@ int32_t mndInitAuth(SMnode *pMnode); void mndCleanupAuth(SMnode *pMnode); int32_t mndCheckCreateUserAuth(SUserObj *pOperUser); -int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter); +int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndCheckDropUserAuth(SUserObj *pOperUser); int32_t mndCheckNodeAuth(SUserObj *pOperUser); diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index 8e5ec40c47..1d89241dd5 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -79,14 +79,12 @@ int32_t mndCheckCreateUserAuth(SUserObj *pOperUser) { return -1; } -int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter) { +int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter) { if (pAlter->alterType == TSDB_ALTER_USER_PASSWD) { if (pOperUser->superUser || strcmp(pUser->user, pOperUser->user) == 0) { return 0; } - } - - if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) { + } else if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) { if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) { terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -95,21 +93,12 @@ int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, if (pOperUser->superUser) { return 0; } - } - - if (pAlter->alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { + } else { if (pOperUser->superUser) { return 0; } } - if (pAlter->alterType == TSDB_ALTER_USER_ADD_READ_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_READ_DB || - pAlter->alterType == TSDB_ALTER_USER_ADD_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { - if (pOperUser->superUser || strcmp(pUser->user, pDb->createUser) == 0) { - return 0; - } - } - terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d0af17ff5c..1706820bdc 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -394,6 +394,8 @@ static SHashObj *mndDupDbHash(SHashObj *pOld) { static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; int32_t code = -1; SUserObj *pUser = NULL; SUserObj *pOperUser = NULL; @@ -429,7 +431,13 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { goto _OVER; } + if (mndCheckAlterUserAuth(pOperUser, pUser, &alterReq) != 0) { + goto _OVER; + } + memcpy(&newUser, pUser, sizeof(SUserObj)); + newUser.authVersion++; + newUser.updateTime = taosGetTimestampMs(); taosRLockLatch(&pUser->lock); newUser.readDbs = mndDupDbHash(pUser->readDbs); @@ -440,63 +448,90 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { goto _OVER; } - int32_t len = strlen(alterReq.dbname) + 1; - SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); - mndReleaseDb(pMnode, pDb); - if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass); memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN); - } else if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { - newUser.superUser = alterReq.superUser; - } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) { - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) { - if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { - taosHashClear(newUser.readDbs); - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) { - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { - if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) { - taosHashClear(newUser.writeDbs); - newUser.authVersion++; - } else { - terrno = TSDB_CODE_MND_INVALID_ALTER_OPER; - goto _OVER; } - newUser.updateTime = taosGetTimestampMs(); + if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { + newUser.superUser = alterReq.superUser; + } - if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) { - goto _OVER; + if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + } else { + while (1) { + SDbObj *pDb = NULL; + pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); + if (pIter == NULL) break; + int32_t len = strlen(pDb->name) + 1; + taosHashPut(newUser.readDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN); + sdbRelease(pSdb, pDb); + } + } + } + + if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + } else { + while (1) { + SDbObj *pDb = NULL; + pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); + if (pIter == NULL) break; + int32_t len = strlen(pDb->name) + 1; + taosHashPut(newUser.writeDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN); + sdbRelease(pSdb, pDb); + } + } + } + + if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + taosHashRemove(newUser.readDbs, alterReq.dbname, len); + } else { + taosHashClear(newUser.readDbs); + } + } + + if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + taosHashRemove(newUser.writeDbs, alterReq.dbname, len); + } else { + taosHashClear(newUser.writeDbs); + } } code = mndAlterUser(pMnode, pUser, &newUser, pReq); From 2186d655e2dc194dfaf7bf8bf1c011770d95329b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 16:03:50 +0800 Subject: [PATCH 02/18] test: add test case for grant revoke user --- source/dnode/mnode/impl/test/user/user.cpp | 6 +- tests/script/tsim/user/privilege1.sim | 71 ++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 tests/script/tsim/user/privilege1.sim diff --git a/source/dnode/mnode/impl/test/user/user.cpp b/source/dnode/mnode/impl/test/user/user.cpp index ee961e9a27..1e03d8ff4a 100644 --- a/source/dnode/mnode/impl/test/user/user.cpp +++ b/source/dnode/mnode/impl/test/user/user.cpp @@ -238,9 +238,10 @@ TEST_F(MndTestUser, 03_Alter_User) { { SAlterUserReq alterReq = {0}; - alterReq.alterType = TSDB_ALTER_USER_CLEAR_WRITE_DB; + alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; strcpy(alterReq.user, "u3"); strcpy(alterReq.pass, "1"); + strcpy(alterReq.dbname, "*"); int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq); void* pReq = rpcMallocCont(contLen); @@ -253,9 +254,10 @@ TEST_F(MndTestUser, 03_Alter_User) { { SAlterUserReq alterReq = {0}; - alterReq.alterType = TSDB_ALTER_USER_CLEAR_READ_DB; + alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; strcpy(alterReq.user, "u3"); strcpy(alterReq.pass, "1"); + strcpy(alterReq.dbname, "*"); int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq); void* pReq = rpcMallocCont(contLen); diff --git a/tests/script/tsim/user/privilege1.sim b/tests/script/tsim/user/privilege1.sim new file mode 100644 index 0000000000..a7c5d9d13d --- /dev/null +++ b/tests/script/tsim/user/privilege1.sim @@ -0,0 +1,71 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== show users +sql create database d1 vgroups 1; +sql create database d2 vgroups 1; +sql create database d3 vgroups 1; +sql show databases +if $rows != 5 then + return -1 +endi + +print =============== create users +sql create user user1 PASS 'user1' +sql create user user2 PASS 'user2' +sql show users +if $rows != 3 then + return -1 +endi + +print =============== test read +sql_error GRANT read ON d1.* to a; +sql_error GRANT read ON d0.* to user1; + +sql GRANT read ON d1.* to user1; +sql GRANT read ON d2.* to user1; +sql GRANT read ON *.* to user1; + +sql REVOKE read ON d1.* from user1; +sql REVOKE read ON d2.* from user1; +sql REVOKE read ON *.* from user1; + +print =============== test write +sql_error GRANT write ON d1.* to a; +sql_error GRANT write ON d0.* to user1; + +sql GRANT write ON d1.* to user1; +sql GRANT write ON d2.* to user1; +sql GRANT write ON *.* to user1; + +sql REVOKE write ON d1.* from user1; +sql REVOKE write ON d2.* from user1; +sql REVOKE write ON *.* from user1; + +print =============== test all +sql_error GRANT all ON d1.* to a; +sql_error GRANT all ON d0.* to user1; + +sql GRANT all ON d1.* to user1; +sql GRANT all ON d2.* to user1; +sql GRANT all ON *.* to user1; + +sql REVOKE all ON d1.* from user1; +sql REVOKE all ON d2.* from user1; +sql REVOKE all ON *.* from user1; + +print =============== test read write +sql_error GRANT read,write ON d1.* to a; +sql_error GRANT read,write ON d0.* to user1; + +sql GRANT read,write ON d1.* to user1; +sql GRANT read,write ON d2.* to user1; +sql GRANT read,write ON *.* to user1; + +sql REVOKE read,write ON d1.* from user1; +sql REVOKE read,write ON d2.* from user1; +sql REVOKE read,write ON *.* from user1; + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From a030a9f32fdc1e9dc04f7ea00b5a1922cdea5ab5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 May 2022 16:06:15 +0800 Subject: [PATCH 03/18] enh(query):support selectivity function and normal column data query. --- source/libs/executor/src/executorimpl.c | 41 +- source/libs/function/inc/builtinsimpl.h | 6 +- source/libs/function/src/builtins.c | 20 +- source/libs/function/src/builtinsimpl.c | 514 ++++++++++++++++++------ 4 files changed, 421 insertions(+), 160 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1327ddb48e..bd9bbec666 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -803,7 +804,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; // this can be set during create the struct - pCtx[k].fpSet.process(&pCtx[k]); + if (pCtx[k].fpSet.process != NULL) + pCtx[k].fpSet.process(&pCtx[k]); } } } @@ -1074,35 +1076,36 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* // set the output buffer for the selectivity + tag query static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { int32_t num = 0; - int16_t tagLen = 0; SqlFunctionCtx* p = NULL; - SqlFunctionCtx** pTagCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES); - if (pTagCtx == NULL) { + SqlFunctionCtx** pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES); + if (pValCtx == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } for (int32_t i = 0; i < numOfOutput; ++i) { - int32_t functionId = pCtx[i].functionId; - - if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { - tagLen += pCtx[i].resDataInfo.bytes; - pTagCtx[num++] = &pCtx[i]; - } else if (1 /*(aAggs[functionId].status & FUNCSTATE_SELECTIVITY) != 0*/) { - p = &pCtx[i]; - } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { - // tag function may be the group by tag column - // ts may be the required primary timestamp column - continue; + if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + pValCtx[num++] = &pCtx[i]; } else { - // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ + p = &pCtx[i]; } +// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { +// tagLen += pCtx[i].resDataInfo.bytes; +// pTagCtx[num++] = &pCtx[i]; +// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { +// // tag function may be the group by tag column +// // ts may be the required primary timestamp column +// continue; +// } else { +// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ +// } } + if (p != NULL) { - p->subsidiaries.pCtx = pTagCtx; + p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; } else { - taosMemoryFreeClear(pTagCtx); + taosMemoryFreeClear(pValCtx); } return TSDB_CODE_SUCCESS; @@ -2219,6 +2222,8 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); if (pCtx[j].fpSet.process) { pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + // do nothing, todo refactor } else { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 1f2ad0797d..97832d007e 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -37,11 +37,11 @@ bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t sumFunction(SqlFunctionCtx *pCtx); int32_t sumInvertFunction(SqlFunctionCtx *pCtx); -bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool minmaxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t minFunction(SqlFunctionCtx* pCtx); int32_t maxFunction(SqlFunctionCtx *pCtx); +int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); @@ -82,6 +82,8 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5aa1b63c79..a7b2607778 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -509,9 +509,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateInOutNum, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = minFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = minFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = minmaxFunctionFinalize }, { .name = "max", @@ -520,9 +520,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateInOutNum, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = minmaxFunctionFinalize }, { .name = "stddev", @@ -562,7 +562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateApercentile, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, .finalizeFunc = functionFinalize }, @@ -581,8 +581,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_BOTTOM, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBottom, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, .processFunc = maxFunction, .finalizeFunc = functionFinalize }, @@ -603,7 +603,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC, .translateFunc = translateLastRow, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, .finalizeFunc = functionFinalize }, @@ -1032,8 +1032,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_SELECT_VALUE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, .translateFunc = translateSelectValue, - .getEnvFunc = NULL, - .initFunc = NULL, + .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. + .initFunc = functionSetup, .sprocessFunc = NULL, .finalizeFunc = NULL } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 9c1601b61a..3d77b7b218 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -37,13 +37,15 @@ typedef struct SAvgRes { int64_t count; } SAvgRes; +typedef struct STuplePos { + int32_t pageId; + int32_t offset; +} STuplePos; + typedef struct STopBotResItem { - SVariant v; - uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data - struct { - int32_t pageId; - int32_t offset; - } tuplePos; // tuple data of this chosen row + SVariant v; + uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data + STuplePos tuplePos; // tuple data of this chosen row } STopBotResItem; typedef struct STopBotRes { @@ -616,101 +618,25 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin return FUNC_DATA_REQUIRED_STATIS_LOAD; } -bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; - } +typedef struct SMinmaxResInfo { + bool assign; // assign the first value or not + int64_t v; + STuplePos tuplePos; +} SMinmaxResInfo; - char* buf = GET_ROWCELL_INTERBUF(pResultInfo); - switch (pCtx->resDataInfo.type) { - case TSDB_DATA_TYPE_INT: - *((int32_t*)buf) = INT32_MIN; - break; - case TSDB_DATA_TYPE_UINT: - *((uint32_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_FLOAT: - *((float*)buf) = -FLT_MAX; - break; - case TSDB_DATA_TYPE_DOUBLE: - SET_DOUBLE_VAL(((double*)buf), -DBL_MAX); - break; - case TSDB_DATA_TYPE_BIGINT: - *((int64_t*)buf) = INT64_MIN; - break; - case TSDB_DATA_TYPE_UBIGINT: - *((uint64_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_SMALLINT: - *((int16_t*)buf) = INT16_MIN; - break; - case TSDB_DATA_TYPE_USMALLINT: - *((uint16_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_TINYINT: - *((int8_t*)buf) = INT8_MIN; - break; - case TSDB_DATA_TYPE_UTINYINT: - *((uint8_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_BOOL: - *((int8_t*)buf) = 0; - break; - default: - assert(0); - } - return true; -} - -bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { +bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; // not initialized since it has been initialized } - char* buf = GET_ROWCELL_INTERBUF(pResultInfo); - switch (pCtx->resDataInfo.type) { - case TSDB_DATA_TYPE_TINYINT: - *((int8_t*)buf) = INT8_MAX; - break; - case TSDB_DATA_TYPE_UTINYINT: - *(uint8_t*)buf = UINT8_MAX; - break; - case TSDB_DATA_TYPE_SMALLINT: - *((int16_t*)buf) = INT16_MAX; - break; - case TSDB_DATA_TYPE_USMALLINT: - *((uint16_t*)buf) = UINT16_MAX; - break; - case TSDB_DATA_TYPE_INT: - *((int32_t*)buf) = INT32_MAX; - break; - case TSDB_DATA_TYPE_UINT: - *((uint32_t*)buf) = UINT32_MAX; - break; - case TSDB_DATA_TYPE_BIGINT: - *((int64_t*)buf) = INT64_MAX; - break; - case TSDB_DATA_TYPE_UBIGINT: - *((uint64_t*)buf) = UINT64_MAX; - break; - case TSDB_DATA_TYPE_FLOAT: - *((float*)buf) = FLT_MAX; - break; - case TSDB_DATA_TYPE_DOUBLE: - SET_DOUBLE_VAL(((double*)buf), DBL_MAX); - break; - case TSDB_DATA_TYPE_BOOL: - *((int8_t*)buf) = 1; - break; - default: - assert(0); - } - + SMinmaxResInfo* buf = GET_ROWCELL_INTERBUF(pResultInfo); + buf->assign = false; + buf->tuplePos.pageId = -1; return true; } bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { - pEnv->calcMemSize = sizeof(int64_t); + pEnv->calcMemSize = sizeof(SMinmaxResInfo); return true; } @@ -758,6 +684,9 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } \ } while (0) +static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); +static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); + int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int32_t numOfElems = 0; @@ -768,7 +697,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int32_t type = pCol->info.type; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SMinmaxResInfo *pBuf = GET_ROWCELL_INTERBUF(pResInfo); // data in current data block are qualified to the query if (pInput->colDataAggIsSet) { @@ -795,11 +724,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (IS_SIGNED_NUMERIC_TYPE(type)) { int64_t prev = 0; - GET_TYPED_DATA(prev, int64_t, type, buf); + GET_TYPED_DATA(prev, int64_t, type, pBuf->v); int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(int64_t*)buf = val; + pBuf->v = val; for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor @@ -809,14 +738,16 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { __ctx->fpSet.process(__ctx); } + + saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { uint64_t prev = 0; - GET_TYPED_DATA(prev, uint64_t, type, buf); + GET_TYPED_DATA(prev, uint64_t, type, pBuf->v); uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(uint64_t*)buf = val; + pBuf->v = val; for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor @@ -829,12 +760,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } } else if (type == TSDB_DATA_TYPE_DOUBLE) { double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key); + UPDATE_DATA(pCtx, *(double*)&pBuf->v, val, numOfElems, isMinFunc, key); } else if (type == TSDB_DATA_TYPE_FLOAT) { double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key); + UPDATE_DATA(pCtx, *(float*)&pBuf->v, val, numOfElems, isMinFunc, key); } + pBuf->assign = true; return numOfElems; } @@ -843,47 +775,318 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) { - LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems); - } else if (type == TSDB_DATA_TYPE_SMALLINT) { - LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems); - } else if (type == TSDB_DATA_TYPE_INT) { - int32_t* pData = (int32_t*)pCol->pData; - int32_t* val = (int32_t*)buf; + int8_t* pData = (int8_t*)pCol->pData; + int8_t* val = (int8_t*)&pBuf->v; for (int32_t i = start; i < start + numOfRows; ++i) { if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } - if ((*val < pData[i]) ^ isMinFunc) { + if (!pBuf->assign) { *val = pData[i]; - TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0; - DO_UPDATE_SUBSID_RES(pCtx, ts); + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } } numOfElems += 1; } + } else if (type == TSDB_DATA_TYPE_SMALLINT) { + int16_t* pData = (int16_t*)pCol->pData; + int16_t* val = (int16_t*)&pBuf->v; -#if defined(_DEBUG_VIEW) - qDebug("max value updated:%d", *retVal); -#endif + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } + } else if (type == TSDB_DATA_TYPE_INT) { + int32_t* pData = (int32_t*)pCol->pData; + int32_t* val = (int32_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_BIGINT) { - LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems); + int64_t* pData = (int64_t*)pCol->pData; + int64_t* val = (int64_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { if (type == TSDB_DATA_TYPE_UTINYINT) { - LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems); + uint8_t* pData = (uint8_t*)pCol->pData; + uint8_t* val = (uint8_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_USMALLINT) { - LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems); + uint16_t* pData = (uint16_t*)pCol->pData; + uint16_t* val = (uint16_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_UINT) { - LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems); + uint32_t* pData = (uint32_t*)pCol->pData; + uint32_t* val = (uint32_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_UBIGINT) { - LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems); + uint64_t* pData = (uint64_t*)pCol->pData; + uint64_t* val = (uint64_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } } else if (type == TSDB_DATA_TYPE_DOUBLE) { - LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems); + double* pData = (double*)pCol->pData; + double* val = (double*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_FLOAT) { - LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems); + float* pData = (float*)pCol->pData; + float* val = (float*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } return numOfElems; @@ -901,6 +1104,53 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + + SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + // todo assign the tag value + int32_t currentRow = pBlock->info.rows; + colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); + + int32_t pageId = pRes->tuplePos.pageId; + int32_t offset = pRes->tuplePos.offset; + if (pRes->tuplePos.pageId != -1) { + SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); + + bool* nullList = (bool*)((char*)pPage + offset); + char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool)); + + // todo set the offset value to optimize the performance. + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + + SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; + int32_t srcSlotId = pFuncParam->pCol->slotId; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + + int32_t ps = 0; + for (int32_t k = 0; k < srcSlotId; ++k) { + SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k); + ps += pSrcCol->info.bytes; + } + + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + if (nullList[srcSlotId]) { + colDataAppendNULL(pDstCol, currentRow); + } else { + colDataAppend(pDstCol, currentRow, (pStart + ps), false); + } + } + } + + return pEntryInfo->numOfRes; +} + bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStddevRes); return true; @@ -1244,6 +1494,14 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } +bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = pNode->node.resType.bytes; + return true; +} + + + static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { if (pTsColInfo == NULL) { return 0; @@ -1624,9 +1882,6 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo); -static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem); -static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem); - int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -1701,7 +1956,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData pItem->uid = uid; // save the data of this tuple - saveTupleData(pCtx, rowIndex, pSrcBlock, pItem); + saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; @@ -1716,15 +1971,14 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData pItem->uid = uid; // save the data of this tuple by over writing the old data - copyTupleData(pCtx, rowIndex, pSrcBlock, pItem); - + copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, topBotResComparFn, NULL, false); } } } -void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) { +void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { SFilePage* pPage = NULL; int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool); @@ -1740,7 +1994,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS } } - pItem->tuplePos.pageId = pCtx->curBufPage; + pPos->pageId = pCtx->curBufPage; // keep the current row data, extract method int32_t offset = 0; @@ -1764,17 +2018,17 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS offset += pCol->info.bytes; } - pItem->tuplePos.offset = pPage->num; + pPos->offset = pPage->num; pPage->num += completeRowSize; setBufPageDirty(pPage, true); releaseBufPage(pCtx->pBuf, pPage); } -void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) { - SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId); +void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { + SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId); - bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset); + bool* nullList = (bool*)((char*)pPage + pPos->offset); char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool)); int32_t offset = 0; From 9cf1cb788f2420f5d212efb5b8cfcecf5b52fe20 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 9 May 2022 17:42:59 +0800 Subject: [PATCH 04/18] feature(udf):move start/stop udfd out of dnode management preparation --- source/dnode/mgmt/implement/src/dmHandle.c | 40 ++++++++++------------ source/dnode/mgmt/interface/inc/dmDef.h | 2 ++ 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 32205b337c..9597307567 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -217,20 +217,20 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) { dmStopStatusThread(pWrapper->pDnode); } -static int32_t dmSpawnUdfd(SDnode *pDnode); +static int32_t dmSpawnUdfd(SUdfdData *pData); void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); - SDnode *pDnode = process->data; - if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pDnode->udfdData.stopCalled)) { + SUdfdData *pData = process->data; + if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) { dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); } else { dInfo("udfd process restart"); - dmSpawnUdfd(pDnode); + dmSpawnUdfd(pData); } } -static int32_t dmSpawnUdfd(SDnode *pDnode) { +static int32_t dmSpawnUdfd(SUdfdData* pData) { dInfo("dnode start spawning udfd"); uv_process_options_t options = {0}; @@ -252,7 +252,6 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { options.exit_cb = dmUdfdExit; - SUdfdData *pData = &pDnode->udfdData; uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); uv_stdio_container_t child_stdio[3]; @@ -268,7 +267,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { char dnodeIdEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0}; - snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); + snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); float numCpuCores = 4; taosGetCpuCores(&numCpuCores); snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); @@ -276,7 +275,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { options.env = envUdfd; int err = uv_spawn(&pData->loop, &pData->process, &options); - pData->process.data = (void*)pDnode; + pData->process.data = (void*)pData; if (err != 0) { dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); @@ -291,18 +290,16 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { } static void dmUdfdStopAsyncCb(uv_async_t *async) { - SDnode *pDnode = async->data; - SUdfdData *pData = &pDnode->udfdData; + SUdfdData *pData = async->data; uv_stop(&pData->loop); } static void dmWatchUdfd(void *args) { - SDnode *pDnode = args; - SUdfdData *pData = &pDnode->udfdData; + SUdfdData *pData = args; uv_loop_init(&pData->loop); uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb); - pData->stopAsync.data = pDnode; - int32_t err = dmSpawnUdfd(pDnode); + pData->stopAsync.data = pData; + int32_t err = dmSpawnUdfd(pData); atomic_store_32(&pData->spawnErr, err); uv_barrier_wait(&pData->barrier); uv_run(&pData->loop, UV_RUN_DEFAULT); @@ -314,18 +311,19 @@ static void dmWatchUdfd(void *args) { return; } -static int32_t dmStartUdfd(SDnode *pDnode) { +static int32_t dmStartUdfd(SDnode *pDnode, int32_t startDnodeId) { char dnodeId[8] = {0}; snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId); uv_os_setenv("DNODE_ID", dnodeId); SUdfdData *pData = &pDnode->udfdData; + pData->dnodeId = startDnodeId; if (pData->startCalled) { dInfo("dnode-mgmt start udfd already called"); return 0; } pData->startCalled = true; uv_barrier_init(&pData->barrier, 2); - uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); + uv_thread_create(&pData->thread, dmWatchUdfd, pData); uv_barrier_wait(&pData->barrier); int32_t err = atomic_load_32(&pData->spawnErr); if (err != 0) { @@ -340,10 +338,10 @@ static int32_t dmStartUdfd(SDnode *pDnode) { return err; } -static int32_t dmStopUdfd(SDnode *pDnode) { +static int32_t dmStopUdfd(SUdfdData *udfdData) { dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", - pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr); - SUdfdData *pData = &pDnode->udfdData; + udfdData->needCleanUp, udfdData->spawnErr); + SUdfdData *pData = udfdData; if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { return 0; } @@ -387,7 +385,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "dnode-transport", "initialized"); - if (dmStartUdfd(pDnode) != 0) { + if (dmStartUdfd(pDnode, pDnode->data.dnodeId) != 0) { dError("failed to start udfd"); } @@ -398,7 +396,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to clean up"); SDnode *pDnode = pWrapper->pDnode; - dmStopUdfd(pDnode); + dmStopUdfd(&pDnode->udfdData); dmStopWorker(pDnode); taosWLockLatch(&pDnode->data.latch); diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index 2e8ad982d8..445e1d42f5 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -156,6 +156,8 @@ typedef struct SUdfdData { uv_pipe_t ctrlPipe; uv_async_t stopAsync; int32_t stopCalled; + + int32_t dnodeId; } SUdfdData; typedef struct SDnode { From ea18b8a7dcb2a0b229db64f804751360d459239d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 May 2022 17:48:23 +0800 Subject: [PATCH 05/18] feature(query): bottom function is available. --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 11 +- source/libs/function/src/builtinsimpl.c | 135 ++++++++++++------------ 3 files changed, 76 insertions(+), 71 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 97832d007e..ce2f0b0651 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); +int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a7b2607778..2be2682bc9 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - // todo + SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; + pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; return TSDB_CODE_SUCCESS; } @@ -569,7 +570,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "top", .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, .translateFunc = translateTop, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, @@ -579,12 +580,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, .translateFunc = translateBottom, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .processFunc = bottomFunction, + .finalizeFunc = topBotFinalize }, { .name = "spread", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3d77b7b218..adacea8efd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1104,22 +1104,32 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex); + int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); + int32_t type = pCtx->input.pData[0]->info.type; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); // todo assign the tag value int32_t currentRow = pBlock->info.rows; - colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); - int32_t pageId = pRes->tuplePos.pageId; - int32_t offset = pRes->tuplePos.offset; - if (pRes->tuplePos.pageId != -1) { + if (type) + colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); + setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); + + return pEntryInfo->numOfRes; +} + +void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex) { + int32_t pageId = pTuplePos->pageId; + int32_t offset = pTuplePos->offset; + if (pTuplePos->pageId != -1) { SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); bool* nullList = (bool*)((char*)pPage + offset); @@ -1141,14 +1151,12 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); if (nullList[srcSlotId]) { - colDataAppendNULL(pDstCol, currentRow); + colDataAppendNULL(pDstCol, rowIndex); } else { - colDataAppend(pDstCol, currentRow, (pStart + ps), false); + colDataAppend(pDstCol, rowIndex, (pStart + ps), false); } } } - - return pEntryInfo->numOfRes; } bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -1880,32 +1888,49 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { } static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo); + uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - // if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) { - // buildTopBotStruct(pRes, pCtx); - // } - SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; int32_t type = pInput->pData[0]->info.type; int32_t start = pInput->startRowIndex; - int32_t numOfRows = pInput->numOfRows; - - for (int32_t i = start; i < numOfRows + start; ++i) { + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } - numOfElems++; + numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t bottomFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElems = 0; + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t type = pInput->pData[0]->info.type; + + int32_t start = pInput->startRowIndex; + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElems++; + char* data = colDataGetData(pCol, i); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false); } return TSDB_CODE_SUCCESS; @@ -1939,7 +1964,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par } void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo) { + uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { STopBotRes* pRes = getTopBotOutputInfo(pCtx); int32_t maxSize = pCtx->param[1].param.i; @@ -1961,10 +1986,17 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, - false); + !isTopQuery); } else { // replace the minimum value in the result - if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || - (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) { + if ((isTopQuery && ( + (IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || + (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || + (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d))) + || (!isTopQuery && ( + (IS_SIGNED_NUMERIC_TYPE(type) && val.i < pItems[0].v.i) || + (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u < pItems[0].v.u) || + (IS_FLOAT_TYPE(type) && val.d < pItems[0].v.d)) + )) { // replace the old data and the coresponding tuple data STopBotResItem* pItem = &pItems[0]; pItem->v = val; @@ -1973,7 +2005,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple by over writing the old data copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, - topBotResComparFn, NULL, false); + topBotResComparFn, NULL, !isTopQuery); } } } @@ -2005,6 +2037,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS bool isNull = colDataIsNull_s(pCol, rowIndex); if (isNull) { nullList[i] = true; + offset += pCol->info.bytes; continue; } @@ -2057,54 +2090,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); pEntryInfo->complete = true; - int32_t type = pCtx->input.pData[0]->info.type; - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + int32_t type = pCtx->input.pData[0]->info.type; + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); // todo assign the tag value and the corresponding row data int32_t currentRow = pBlock->info.rows; - switch (type) { - case TSDB_DATA_TYPE_INT: { - for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { - STopBotResItem* pItem = &pRes->pItems[i]; - colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i); - - int32_t pageId = pItem->tuplePos.pageId; - int32_t offset = pItem->tuplePos.offset; - if (pItem->tuplePos.pageId != -1) { - SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); - - bool* nullList = (bool*)((char*)pPage + offset); - char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool)); - - // todo set the offset value to optimize the performance. - for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { - SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - - SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; - int32_t srcSlotId = pFuncParam->pCol->slotId; - int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId; - - int32_t ps = 0; - for (int32_t k = 0; k < srcSlotId; ++k) { - SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k); - ps += pSrcCol->info.bytes; - } - - SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); - if (nullList[srcSlotId]) { - colDataAppendNULL(pDstCol, currentRow); - } else { - colDataAppend(pDstCol, currentRow, (pStart + ps), false); - } - } - } - - currentRow += 1; - } - - break; + for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { + STopBotResItem* pItem = &pRes->pItems[i]; + if (type == TSDB_DATA_TYPE_FLOAT) { + float v = pItem->v.d; + colDataAppend(pCol, currentRow, (const char*)&v, false); + } else { + colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } + + setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + currentRow += 1; } return pEntryInfo->numOfRes; From e277a3b1c948e19c35f692473485a3558121a548 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Mon, 9 May 2022 17:56:08 +0800 Subject: [PATCH 06/18] fix/ZhiqiangWang/fix-15189-mv-pipe-file-to-temp --- source/libs/transport/src/transSrv.c | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7f8ad150f0..323ef43e25 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -853,12 +853,13 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, taosThreadOnce(&transModuleInit, uvInitEnv); transSrvInst++; - char pipeName[64]; assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); #ifdef WINDOWS - snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId()); + char pipeName[64]; + snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); #else - snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId()); + char pipeName[PATH_MAX] = {0}; + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); #endif assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); @@ -871,20 +872,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->pTransInst = shandle; srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - - // #ifdef WINDOWS - // uv_file fds[2]; - // if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) { - // #else - // uv_os_sock_t fds[2]; - // if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - // #endif - // goto End; - // } - // uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - // uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - // thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read if (false == addHandleToWorkloop(thrd,pipeName)) { From 55d11618d01a367172ff5e29a19976808fdbb313 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 May 2022 19:13:50 +0800 Subject: [PATCH 07/18] fix(query): fix float value min/max query error. --- source/libs/function/src/builtinsimpl.c | 114 +++++++++++++++--------- tests/script/tsim/insert/basic0.sim | 3 +- 2 files changed, 76 insertions(+), 41 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index adacea8efd..1cb47a0bf1 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -703,7 +703,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pInput->colDataAggIsSet) { numOfElems = pInput->numOfRows - pAgg->numOfNull; ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0); - if (numOfElems == 0) { return numOfElems; } @@ -722,48 +721,79 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // the index is the original position, not the relative position TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL; - if (IS_SIGNED_NUMERIC_TYPE(type)) { - int64_t prev = 0; - GET_TYPED_DATA(prev, int64_t, type, pBuf->v); - - int64_t val = GET_INT64_VAL(tval); - if ((prev < val) ^ isMinFunc) { - pBuf->v = val; - for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { - SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; - if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor - __ctx->tag.i = key; - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - } - - __ctx->fpSet.process(__ctx); - } - + if (!pBuf->assign) { + pBuf->v = *(int64_t*)tval; + if (pCtx->subsidiaries.num > 0) { saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } - } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { - uint64_t prev = 0; - GET_TYPED_DATA(prev, uint64_t, type, pBuf->v); + } else { + if (IS_SIGNED_NUMERIC_TYPE(type)) { + int64_t prev = 0; + GET_TYPED_DATA(prev, int64_t, type, &pBuf->v); - uint64_t val = GET_UINT64_VAL(tval); - if ((prev < val) ^ isMinFunc) { - pBuf->v = val; - for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { - SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; - if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor - __ctx->tag.i = key; - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; + int64_t val = GET_INT64_VAL(tval); + if ((prev < val) ^ isMinFunc) { + pBuf->v = val; + // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { + // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; + // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor + // __ctx->tag.i = key; + // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; + // } + // + // __ctx->fpSet.process(__ctx); + // } + + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } + } - __ctx->fpSet.process(__ctx); + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + uint64_t prev = 0; + GET_TYPED_DATA(prev, uint64_t, type, &pBuf->v); + + uint64_t val = GET_UINT64_VAL(tval); + if ((prev < val) ^ isMinFunc) { + pBuf->v = val; + // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { + // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; + // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor + // __ctx->tag.i = key; + // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; + // } + // + // __ctx->fpSet.process(__ctx); + // } + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else if (type == TSDB_DATA_TYPE_DOUBLE) { + double prev = 0; + GET_TYPED_DATA(prev, int64_t, type, &pBuf->v); + + double val = GET_DOUBLE_VAL(tval); + if ((prev < val) ^ isMinFunc) { + pBuf->v = val; + + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else if (type == TSDB_DATA_TYPE_FLOAT) { + double prev = 0; + GET_TYPED_DATA(prev, int64_t, type, &pBuf->v); + + double val = GET_DOUBLE_VAL(tval); + if ((prev < val) ^ isMinFunc) { + pBuf->v = val; + } + + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } - } else if (type == TSDB_DATA_TYPE_DOUBLE) { - double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *(double*)&pBuf->v, val, numOfElems, isMinFunc, key); - } else if (type == TSDB_DATA_TYPE_FLOAT) { - double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *(float*)&pBuf->v, val, numOfElems, isMinFunc, key); } pBuf->assign = true; @@ -1058,7 +1088,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } } else if (type == TSDB_DATA_TYPE_FLOAT) { float* pData = (float*)pCol->pData; - float* val = (float*)&pBuf->v; + double* val = (double*)&pBuf->v; for (int32_t i = start; i < start + numOfRows; ++i) { if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { @@ -1119,10 +1149,14 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { // todo assign the tag value int32_t currentRow = pBlock->info.rows; - if (type) - colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); - setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); + if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = *(double*) &pRes->v; + colDataAppend(pCol, currentRow, (const char*)&v, false); + } else { + colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); + } + setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); return pEntryInfo->numOfRes; } diff --git a/tests/script/tsim/insert/basic0.sim b/tests/script/tsim/insert/basic0.sim index 94bd0f1ecf..d8dde20e4e 100644 --- a/tests/script/tsim/insert/basic0.sim +++ b/tests/script/tsim/insert/basic0.sim @@ -140,7 +140,8 @@ endi if $data00 != -13 then return -1 endi -if $data01 != -2.30000 then +if $data01 != -2.30000 then + print expect -2.30000, actual: $data01 return -1 endi if $data02 != -3.300000000 then From 09f07dbbcfbf206da70e91829582a8f155bbc2b9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 May 2022 19:16:22 +0800 Subject: [PATCH 08/18] enh(query): add error check for scalar function calculation. --- source/libs/executor/src/executorimpl.c | 7 ++++++- source/libs/executor/src/groupoperator.c | 7 ++++++- source/libs/executor/src/sortoperator.c | 5 ++++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bd9bbec666..2166de9fb2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3979,6 +3979,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -4042,9 +4043,13 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, + pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, pTaskInfo->code); + } + int32_t status = handleLimitOffset(pOperator, pBlock); if (status == PROJECT_RETRIEVE_CONTINUE) { continue; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 183cb9dbe6..2ba3e257b2 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -262,6 +262,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { return NULL; } + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SGroupbyOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->binfo.pRes; @@ -289,7 +291,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated right before apply the group aggregation. if (pInfo->pScalarExprInfo != NULL) { - projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); + pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, pTaskInfo->code); + } } // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a654876513..990dc0f200 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -114,7 +114,10 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { SOperatorInfo* pOperator = param; SSortOperatorInfo* pSort = pOperator->info; if (pOperator->pExpr != NULL) { - projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); + int32_t code = projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pOperator->pTaskInfo->env, code); + } } } From 438a76c8b2dc4fb6af315e6d8dab260f2d62da48 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 May 2022 19:51:52 +0800 Subject: [PATCH 09/18] refactor:add time cost info for schemaless --- source/client/src/clientSml.c | 54 +++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 2a0e85092b..a7ce1a30a8 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -97,6 +97,21 @@ typedef struct { char *buf; } SSmlMsgBuf; +typedef struct { + int32_t code; + int32_t lineNum; + + int32_t numOfSTables; + int32_t numOfCTables; + int32_t numOfCreateSTables; + + int64_t parseTime; + int64_t schemaTime; + int64_t insertBindTime; + int64_t insertRpcTime; + int64_t endTime; +} SSmlCostInfo; + typedef struct { uint64_t id; @@ -114,6 +129,7 @@ typedef struct { SRequestObj *pRequest; SQuery *pQuery; + SSmlCostInfo cost; int32_t affectedRows; SSmlMsgBuf msgBuf; SHashObj *dumplicateKey; // for dumplicate key @@ -444,6 +460,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, schemaAction.createSTable.sTableName); return code; } + info->cost.numOfCreateSTables++; }else if (code == TSDB_CODE_SUCCESS) { } else { uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); @@ -1547,7 +1564,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { tinfo->sTableName = elements.measure; tinfo->sTableNameLen = elements.measureLen; smlBuildChildTableName(tinfo); - uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName); + //uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName); SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); if(tableMeta){ // update meta @@ -1699,12 +1716,33 @@ static int32_t smlInsertData(SSmlHandle* info) { } smlBuildOutput(info->exec, info->pVgHash); + info->cost.insertRpcTime = taosGetTimestampUs(); + launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true); info->affectedRows = taos_affected_rows(info->pRequest); return info->pRequest->code; } +int32_t numOfSTables; +int32_t numOfCTables; +int32_t numOfCreateSTables; + +int64_t parseTime; +int64_t schemaTime; +int64_t insertBindTime; +int64_t insertRpcTime; +int64_t endTime; + +static void printStatisticInfo(SSmlHandle *info){ + uError("SML:0x%"PRIx64" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \ + parse cost:%lld,schema cost:%lld,bind cost:%lld,rpc cost:%lld,total cost:%lld", info->id, info->cost.code, + info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, + info->cost.schemaTime-info->cost.parseTime, info->cost.insertBindTime-info->cost.schemaTime, + info->cost.insertRpcTime-info->cost.insertBindTime, info->cost.endTime-info->cost.insertRpcTime, + info->cost.endTime-info->cost.parseTime); +} + static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) { int32_t code = TSDB_CODE_SUCCESS; @@ -1714,6 +1752,7 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) { goto cleanup; } + info->cost.parseTime = taosGetTimestampUs(); for (int32_t i = 0; i < numLines; ++i) { code = smlParseLine(info, lines[i]); if (code != TSDB_CODE_SUCCESS) { @@ -1721,24 +1760,29 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) { goto cleanup; } } - uDebug("SML:0x%"PRIx64" smlInsertLines parse success. tables %d", info->id, taosHashGetSize(info->childTables)); - uDebug("SML:0x%"PRIx64" smlInsertLines parse success. super tables %d", info->id, taosHashGetSize(info->superTables)); + info->cost.lineNum = numLines; + info->cost.numOfSTables = taosHashGetSize(info->superTables); + info->cost.numOfCTables = taosHashGetSize(info->childTables); + + info->cost.schemaTime = taosGetTimestampUs(); code = smlModifyDBSchemas(info); if (code != 0) { uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code)); goto cleanup; } + info->cost.insertBindTime = taosGetTimestampUs(); code = smlInsertData(info); if (code != 0) { uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code)); goto cleanup; } - - uDebug("SML:0x%"PRIx64" smlInsertLines finish inserting %d lines.", info->id, numLines); + info->cost.endTime = taosGetTimestampUs(); cleanup: + info->cost.code = code; + printStatisticInfo(info); return code; } From 589048e96bb8f9643f3c846d4859a2d3bc1985c9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 9 May 2022 20:14:31 +0800 Subject: [PATCH 10/18] fix(sync): fix memory leak, RespMgr --- source/libs/sync/src/syncMain.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9a8dcc57d9..911d8384f0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -522,6 +522,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { ret = raftStoreClose(pSyncNode->pRaftStore); assert(ret == 0); + syncRespMgrDestroy(pSyncNode->pSyncRespMgr); voteGrantedDestroy(pSyncNode->pVotesGranted); votesRespondDestory(pSyncNode->pVotesRespond); syncIndexMgrDestroy(pSyncNode->pNextIndex); @@ -1138,6 +1139,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { syncNodeReplicate(ths); } + syncEntryDestory(pEntry); return ret; } From d9f76c7b72527c09706744109c7b9ae6614a439e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 May 2022 20:24:17 +0800 Subject: [PATCH 11/18] refactor:dd a function of generate table name --- include/common/tname.h | 13 ++++++++++ source/client/src/clientSml.c | 48 +++++------------------------------ source/common/src/tname.c | 41 ++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 41 deletions(-) diff --git a/include/common/tname.h b/include/common/tname.h index ffa4f8f253..ae2dc32335 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -17,6 +17,7 @@ #define _TD_COMMON_NAME_H_ #include "tdef.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { @@ -62,6 +63,18 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId); bool tNameDBNameEqual(SName* left, SName* right); +typedef struct { + // input + SArray *tags; // element is SSmlKV + const char *sTableName; // super table name + uint8_t sTableNameLen; // the length of super table name + + // output + char *childTableName; // must have size of TSDB_TABLE_NAME_LEN; + uint64_t uid; // child table uid, may be useful +} RandTableName; + +void buildChildTableName(RandTableName *rName); #ifdef __cplusplus } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index a7ce1a30a8..927fec35dc 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -15,6 +15,7 @@ #include "tcommon.h" #include "catalog.h" #include "clientInt.h" +#include "tname.h" //================================================================================================= #define SPACE ' ' @@ -163,19 +164,6 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const return TSDB_CODE_SML_INVALID_DATA; } -static int smlCompareKv(const void* p1, const void* p2) { - SSmlKv* kv1 = *(SSmlKv**)p1; - SSmlKv* kv2 = *(SSmlKv**)p2; - int32_t kvLen1 = kv1->keyLen; - int32_t kvLen2 = kv2->keyLen; - int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2)); - if (res != 0) { - return res; - } else { - return kvLen1-kvLen2; - } -} - static void smlBuildChildTableName(SSmlTableInfo *tags) { int32_t size = taosArrayGetSize(tags->tags); ASSERT(size > 0); @@ -943,20 +931,6 @@ static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { return false; } -static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlHandle* info) { - char *val = NULL; - val = taosHashGet(pHash, key, strlen(key)); - if (val) { - uError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key); - return true; - } - - uint8_t dummy_val = 0; - taosHashPut(pHash, key, strlen(key), &dummy_val, sizeof(uint8_t)); - - return false; -} - static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){ if(!sql) return TSDB_CODE_SML_INVALID_DATA; while (*sql != '\0') { // jump the space at the begining @@ -1563,8 +1537,10 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { tinfo->sTableName = elements.measure; tinfo->sTableNameLen = elements.measureLen; - smlBuildChildTableName(tinfo); - //uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName); + RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=tinfo->sTableNameLen, + .childTableName=tinfo->childTableName}; + buildChildTableName(&rName); + tinfo->uid = rName.uid; SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); if(tableMeta){ // update meta @@ -1724,17 +1700,7 @@ static int32_t smlInsertData(SSmlHandle* info) { return info->pRequest->code; } -int32_t numOfSTables; -int32_t numOfCTables; -int32_t numOfCreateSTables; - -int64_t parseTime; -int64_t schemaTime; -int64_t insertBindTime; -int64_t insertRpcTime; -int64_t endTime; - -static void printStatisticInfo(SSmlHandle *info){ +static void smlPrintStatisticInfo(SSmlHandle *info){ uError("SML:0x%"PRIx64" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \ parse cost:%lld,schema cost:%lld,bind cost:%lld,rpc cost:%lld,total cost:%lld", info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, @@ -1782,7 +1748,7 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) { cleanup: info->cost.code = code; - printStatisticInfo(info); + smlPrintStatisticInfo(info); return code; } diff --git a/source/common/src/tname.c b/source/common/src/tname.c index f4755f5b5e..62ba4bfb79 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -15,6 +15,8 @@ #define _DEFAULT_SOURCE #include "tname.h" +#include "tcommon.h" +#include "tstrbuild.h" #define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) @@ -294,4 +296,43 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { return 0; } +static int compareKv(const void* p1, const void* p2) { + SSmlKv* kv1 = *(SSmlKv**)p1; + SSmlKv* kv2 = *(SSmlKv**)p2; + int32_t kvLen1 = kv1->keyLen; + int32_t kvLen2 = kv2->keyLen; + int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2)); + if (res != 0) { + return res; + } else { + return kvLen1-kvLen2; + } +} +/* + * use stable name and tags to grearate child table name + */ +void buildChildTableName(RandTableName *rName) { + int32_t size = taosArrayGetSize(rName->tags); + ASSERT(size > 0); + taosArraySort(rName->tags, compareKv); + + SStringBuilder sb = {0}; + taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen); + for (int j = 0; j < size; ++j) { + SSmlKv *tagKv = taosArrayGetP(rName->tags, j); + taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); + taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen); + } + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + tMD5Final(&context); + uint64_t digest1 = *(uint64_t*)(context.digest); + uint64_t digest2 = *(uint64_t*)(context.digest + 8); + snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); + taosStringBuilderDestroy(&sb); + rName->uid = digest1; +} From af60b5743daecd0942e803dd926ee598e8dc004f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 May 2022 20:34:44 +0800 Subject: [PATCH 12/18] refactor:fix error in schemaless --- source/client/src/clientSml.c | 29 +---------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 927fec35dc..b251827bd1 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -164,32 +164,6 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const return TSDB_CODE_SML_INVALID_DATA; } -static void smlBuildChildTableName(SSmlTableInfo *tags) { - int32_t size = taosArrayGetSize(tags->tags); - ASSERT(size > 0); - taosArraySort(tags->tags, smlCompareKv); - - SStringBuilder sb = {0}; - taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen); - for (int j = 0; j < size; ++j) { - SSmlKv *tagKv = taosArrayGetP(tags->tags, j); - taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); - taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen); - } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); - T_MD5_CTX context; - tMD5Init(&context); - tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); - tMD5Final(&context); - uint64_t digest1 = *(uint64_t*)(context.digest); - //uint64_t digest2 = *(uint64_t*)(context.digest + 8); - //snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); - snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1); - taosStringBuilderDestroy(&sb); - tags->uid = digest1; -} - static int32_t smlGenerateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[], SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) { // char fieldName[TSDB_COL_NAME_LEN] = {0}; @@ -1702,7 +1676,7 @@ static int32_t smlInsertData(SSmlHandle* info) { static void smlPrintStatisticInfo(SSmlHandle *info){ uError("SML:0x%"PRIx64" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \ - parse cost:%lld,schema cost:%lld,bind cost:%lld,rpc cost:%lld,total cost:%lld", info->id, info->cost.code, + parse cost:%"PRIx64",schema cost:%"PRIx64",bind cost:%"PRIx64",rpc cost:%"PRIx64",total cost:%"PRIx64"", info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.schemaTime-info->cost.parseTime, info->cost.insertBindTime-info->cost.schemaTime, info->cost.insertRpcTime-info->cost.insertBindTime, info->cost.endTime-info->cost.insertRpcTime, @@ -1800,7 +1774,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr } smlDestroyInfo(info); -end: return (TAOS_RES*)request; } From a2e074be3442bacc69e0aa8ec411612715daa91b Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 9 May 2022 20:39:02 +0800 Subject: [PATCH 13/18] feature(udf):move start/stop udfd out of dnode management preparation --- source/dnode/mgmt/implement/src/dmHandle.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 9597307567..cd01f185ce 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -311,11 +311,11 @@ static void dmWatchUdfd(void *args) { return; } -static int32_t dmStartUdfd(SDnode *pDnode, int32_t startDnodeId) { +static int32_t dmStartUdfd(SUdfdData *pUdfdData, int32_t startDnodeId) { char dnodeId[8] = {0}; - snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId); + snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId); uv_os_setenv("DNODE_ID", dnodeId); - SUdfdData *pData = &pDnode->udfdData; + SUdfdData *pData = pUdfdData; pData->dnodeId = startDnodeId; if (pData->startCalled) { dInfo("dnode-mgmt start udfd already called"); @@ -385,7 +385,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "dnode-transport", "initialized"); - if (dmStartUdfd(pDnode, pDnode->data.dnodeId) != 0) { + if (dmStartUdfd(&pDnode->udfdData, pDnode->data.dnodeId) != 0) { dError("failed to start udfd"); } From f3ed6ba9057c371fb255c255dd54a3bcdf325c25 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 May 2022 20:41:00 +0800 Subject: [PATCH 14/18] refactor:fix error in schemaless --- source/client/src/clientSml.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index b251827bd1..6f6d32ee39 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1676,7 +1676,7 @@ static int32_t smlInsertData(SSmlHandle* info) { static void smlPrintStatisticInfo(SSmlHandle *info){ uError("SML:0x%"PRIx64" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \ - parse cost:%"PRIx64",schema cost:%"PRIx64",bind cost:%"PRIx64",rpc cost:%"PRIx64",total cost:%"PRIx64"", info->id, info->cost.code, + parse cost:%"PRId64",schema cost:%"PRId64",bind cost:%"PRId64",rpc cost:%"PRId64",total cost:%"PRId64"", info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.schemaTime-info->cost.parseTime, info->cost.insertBindTime-info->cost.schemaTime, info->cost.insertRpcTime-info->cost.insertBindTime, info->cost.endTime-info->cost.insertRpcTime, From 771a1e319429b46edd5864dc25cbfe6918cadcd3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 May 2022 20:46:00 +0800 Subject: [PATCH 15/18] refactor:fix error in schemaless --- source/client/src/clientSml.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 6f6d32ee39..15d1500860 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1571,7 +1571,7 @@ static void smlDestroyInfo(SSmlHandle* info){ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){ int32_t code = TSDB_CODE_SUCCESS; - SSmlHandle* info = taosMemoryMalloc(sizeof(SSmlHandle)); + SSmlHandle* info = taosMemoryCalloc(1, sizeof(SSmlHandle)); if (NULL == info) { return NULL; } From 623a71d34cf2e8b417cb6b6389a445c09ffda7c8 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 9 May 2022 21:39:41 +0800 Subject: [PATCH 16/18] feature(udf):move start/stop udfd out of dnode management preparation --- include/libs/function/function.h | 11 ++ source/dnode/mgmt/implement/src/dmHandle.c | 143 +------------------ source/libs/function/src/tudf.c | 157 ++++++++++++++++++++- 3 files changed, 171 insertions(+), 140 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 347303a051..88ac1532c2 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -336,6 +336,17 @@ int32_t udfcOpen(); */ int32_t udfcClose(); +/** + * start udfd that serves udf function invocation under dnode startDnodeId + * @param startDnodeId + * @return + */ +int32_t udfStartUdfd(int32_t startDnodeId); +/** + * stop udfd + * @return + */ +int32_t udfStopUdfd(); #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index cd01f185ce..129d41061e 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -217,143 +217,6 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) { dmStopStatusThread(pWrapper->pDnode); } -static int32_t dmSpawnUdfd(SUdfdData *pData); - -void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { - dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); - SUdfdData *pData = process->data; - if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) { - dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); - } else { - dInfo("udfd process restart"); - dmSpawnUdfd(pData); - } -} - -static int32_t dmSpawnUdfd(SUdfdData* pData) { - dInfo("dnode start spawning udfd"); - uv_process_options_t options = {0}; - - char path[PATH_MAX] = {0}; - if (tsProcPath == NULL) { - path[0] = '.'; - } else { - strncpy(path, tsProcPath, strlen(tsProcPath)); - taosDirName(path); - } -#ifdef WINDOWS - strcat(path, "udfd.exe"); -#else - strcat(path, "/udfd"); -#endif - char* argsUdfd[] = {path, "-c", configDir, NULL}; - options.args = argsUdfd; - options.file = path; - - options.exit_cb = dmUdfdExit; - - uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); - - uv_stdio_container_t child_stdio[3]; - child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; - child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe; - child_stdio[1].flags = UV_IGNORE; - child_stdio[2].flags = UV_INHERIT_FD; - child_stdio[2].data.fd = 2; - options.stdio_count = 3; - options.stdio = child_stdio; - - options.flags = UV_PROCESS_DETACHED; - - char dnodeIdEnvItem[32] = {0}; - char thrdPoolSizeEnvItem[32] = {0}; - snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); - float numCpuCores = 4; - taosGetCpuCores(&numCpuCores); - snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); - char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; - options.env = envUdfd; - - int err = uv_spawn(&pData->loop, &pData->process, &options); - pData->process.data = (void*)pData; - - if (err != 0) { - dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); - } - return err; -} - -static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { - if (!uv_is_closing(handle)) { - uv_close(handle, NULL); - } -} - -static void dmUdfdStopAsyncCb(uv_async_t *async) { - SUdfdData *pData = async->data; - uv_stop(&pData->loop); -} - -static void dmWatchUdfd(void *args) { - SUdfdData *pData = args; - uv_loop_init(&pData->loop); - uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb); - pData->stopAsync.data = pData; - int32_t err = dmSpawnUdfd(pData); - atomic_store_32(&pData->spawnErr, err); - uv_barrier_wait(&pData->barrier); - uv_run(&pData->loop, UV_RUN_DEFAULT); - uv_loop_close(&pData->loop); - - uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL); - uv_run(&pData->loop, UV_RUN_DEFAULT); - uv_loop_close(&pData->loop); - return; -} - -static int32_t dmStartUdfd(SUdfdData *pUdfdData, int32_t startDnodeId) { - char dnodeId[8] = {0}; - snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId); - uv_os_setenv("DNODE_ID", dnodeId); - SUdfdData *pData = pUdfdData; - pData->dnodeId = startDnodeId; - if (pData->startCalled) { - dInfo("dnode-mgmt start udfd already called"); - return 0; - } - pData->startCalled = true; - uv_barrier_init(&pData->barrier, 2); - uv_thread_create(&pData->thread, dmWatchUdfd, pData); - uv_barrier_wait(&pData->barrier); - int32_t err = atomic_load_32(&pData->spawnErr); - if (err != 0) { - uv_barrier_destroy(&pData->barrier); - uv_async_send(&pData->stopAsync); - uv_thread_join(&pData->thread); - pData->needCleanUp = false; - dInfo("dnode-mgmt udfd cleaned up after spawn err"); - } else { - pData->needCleanUp = true; - } - return err; -} - -static int32_t dmStopUdfd(SUdfdData *udfdData) { - dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", - udfdData->needCleanUp, udfdData->spawnErr); - SUdfdData *pData = udfdData; - if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { - return 0; - } - atomic_store_32(&pData->stopCalled, 1); - pData->needCleanUp = false; - uv_barrier_destroy(&pData->barrier); - uv_async_send(&pData->stopAsync); - uv_thread_join(&pData->thread); - dInfo("dnode-mgmt udfd cleaned up"); - return 0; -} - static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to init"); SDnode *pDnode = pWrapper->pDnode; @@ -385,7 +248,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "dnode-transport", "initialized"); - if (dmStartUdfd(&pDnode->udfdData, pDnode->data.dnodeId) != 0) { + if (udfStartUdfd(pDnode->data.dnodeId) != 0) { dError("failed to start udfd"); } @@ -396,7 +259,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to clean up"); SDnode *pDnode = pWrapper->pDnode; - dmStopUdfd(&pDnode->udfdData); + + udfStopUdfd(); + dmStopWorker(pDnode); taosWLockLatch(&pDnode->data.latch); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 75b6aeaae9..11502b4c47 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -23,10 +23,165 @@ #include "builtinsimpl.h" #include "functionMgt.h" -//TODO: network error processing. //TODO: add unit test //TODO: include all global variable under context struct +typedef struct SUdfdData { + bool startCalled; + bool needCleanUp; + uv_loop_t loop; + uv_thread_t thread; + uv_barrier_t barrier; + uv_process_t process; + int spawnErr; + uv_pipe_t ctrlPipe; + uv_async_t stopAsync; + int32_t stopCalled; + + int32_t dnodeId; +} SUdfdData; + +SUdfdData udfdGlobal = {0}; + +static int32_t udfSpawnUdfd(SUdfdData *pData); + +void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { + fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); + SUdfdData *pData = process->data; + if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) { + fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); + } else { + fnInfo("udfd process restart"); + udfSpawnUdfd(pData); + } +} + +static int32_t udfSpawnUdfd(SUdfdData* pData) { + fnInfo("dnode start spawning udfd"); + uv_process_options_t options = {0}; + + char path[PATH_MAX] = {0}; + if (tsProcPath == NULL) { + path[0] = '.'; + } else { + strncpy(path, tsProcPath, strlen(tsProcPath)); + taosDirName(path); + } +#ifdef WINDOWS + strcat(path, "udfd.exe"); +#else + strcat(path, "/udfd"); +#endif + char* argsUdfd[] = {path, "-c", configDir, NULL}; + options.args = argsUdfd; + options.file = path; + + options.exit_cb = udfUdfdExit; + + uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); + + uv_stdio_container_t child_stdio[3]; + child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe; + child_stdio[1].flags = UV_IGNORE; + child_stdio[2].flags = UV_INHERIT_FD; + child_stdio[2].data.fd = 2; + options.stdio_count = 3; + options.stdio = child_stdio; + + options.flags = UV_PROCESS_DETACHED; + + char dnodeIdEnvItem[32] = {0}; + char thrdPoolSizeEnvItem[32] = {0}; + snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); + float numCpuCores = 4; + taosGetCpuCores(&numCpuCores); + snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); + char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; + options.env = envUdfd; + + int err = uv_spawn(&pData->loop, &pData->process, &options); + pData->process.data = (void*)pData; + + if (err != 0) { + fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); + } + return err; +} + +static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} + +static void udfUdfdStopAsyncCb(uv_async_t *async) { + SUdfdData *pData = async->data; + uv_stop(&pData->loop); +} + +static void udfWatchUdfd(void *args) { + SUdfdData *pData = args; + uv_loop_init(&pData->loop); + uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb); + pData->stopAsync.data = pData; + int32_t err = udfSpawnUdfd(pData); + atomic_store_32(&pData->spawnErr, err); + uv_barrier_wait(&pData->barrier); + uv_run(&pData->loop, UV_RUN_DEFAULT); + uv_loop_close(&pData->loop); + + uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL); + uv_run(&pData->loop, UV_RUN_DEFAULT); + uv_loop_close(&pData->loop); + return; +} + +int32_t udfStartUdfd(int32_t startDnodeId) { + SUdfdData *pData = &udfdGlobal; + if (pData->startCalled) { + fnInfo("dnode-mgmt start udfd already called"); + return 0; + } + pData->startCalled = true; + char dnodeId[8] = {0}; + snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId); + uv_os_setenv("DNODE_ID", dnodeId); + pData->dnodeId = startDnodeId; + + uv_barrier_init(&pData->barrier, 2); + uv_thread_create(&pData->thread, udfWatchUdfd, pData); + uv_barrier_wait(&pData->barrier); + int32_t err = atomic_load_32(&pData->spawnErr); + if (err != 0) { + uv_barrier_destroy(&pData->barrier); + uv_async_send(&pData->stopAsync); + uv_thread_join(&pData->thread); + pData->needCleanUp = false; + fnInfo("dnode-mgmt udfd cleaned up after spawn err"); + } else { + pData->needCleanUp = true; + } + return err; +} + +int32_t udfStopUdfd() { + SUdfdData *pData = &udfdGlobal; + fnInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", + pData->needCleanUp, pData->spawnErr); + if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { + return 0; + } + atomic_store_32(&pData->stopCalled, 1); + pData->needCleanUp = false; + uv_barrier_destroy(&pData->barrier); + uv_async_send(&pData->stopAsync); + uv_thread_join(&pData->thread); + fnInfo("dnode-mgmt udfd cleaned up"); + return 0; +} + +//============================================================================================== /* Copyright (c) 2013, Ben Noordhuis * The QUEUE is copied from queue.h under libuv * */ From 5916525548ea3095b0e913c7a3806182c804f275 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 9 May 2022 21:50:16 +0800 Subject: [PATCH 17/18] remove null value processing from udf.sim test case --- tests/script/tsim/query/udf.sim | 58 ++++++++++++++++----------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index cabb88ea09..b3cbda5090 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -64,35 +64,35 @@ if $data00 != 1.414213562 then return -1 endi -sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2); -sql select udf1(f1, f2) from t2; -print $rows , $data00 , $data10 , $data20 , $data30 -if $rows != 4 then - return -1 -endi -if $data00 != 88 then - return -1 -endi -if $data10 != 88 then - return -1 -endi - -if $data20 != NULL then - return -1 -endi - -if $data30 != NULL then - return -1 -endi - -sql select udf2(f1, f2) from t2; -print $rows, $data00 -if $rows != 1 then - return -1 -endi -if $data00 != 2.645751311 then - return -1 -endi +#sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2); +#sql select udf1(f1, f2) from t2; +#print $rows , $data00 , $data10 , $data20 , $data30 +#if $rows != 4 then +# return -1 +#endi +#if $data00 != 88 then +# return -1 +#endi +#if $data10 != 88 then +# return -1 +#endi +# +#if $data20 != NULL then +# return -1 +#endi +# +#if $data30 != NULL then +# return -1 +#endi +# +#sql select udf2(f1, f2) from t2; +#print $rows, $data00 +#if $rows != 1 then +# return -1 +#endi +#if $data00 != 2.645751311 then +# return -1 +#endi sql drop function udf1; sql show functions; if $rows != 1 then From c797da79a29da30f93c3c60c717d6c094070c454 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 10 May 2022 00:08:23 +0800 Subject: [PATCH 18/18] fix: update taosBenchmark (#12278) [TD-15368] --- tools/taos-tools | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/taos-tools b/tools/taos-tools index 2f3dfddd4d..0ae9f872c2 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c +Subproject commit 0ae9f872c26d5da8cb61aa9eb00b5c7aeba10ec4