From e161556f5184f5a9f50297811e798701132242b3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 15:35:25 +0800 Subject: [PATCH 1/8] set ts column index for function --- include/libs/executor/storageapi.h | 2 +- include/libs/function/function.h | 8 +++++--- include/libs/stream/streamState.h | 2 +- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/scanoperator.c | 2 +- .../libs/executor/src/streamcountwindowoperator.c | 2 +- .../libs/executor/src/streameventwindowoperator.c | 2 +- .../libs/executor/src/streamtimewindowoperator.c | 14 +++++++------- source/libs/function/src/builtinsimpl.c | 5 +---- source/libs/stream/src/streamState.c | 5 ++++- tests/script/tsim/stream/basic5.sim | 15 ++++++++++----- 11 files changed, 33 insertions(+), 26 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index b47a162a1a..10697e64b2 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -328,7 +328,7 @@ typedef struct SStateStore { int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateClear)(SStreamState* pState); - void (*streamStateSetNumber)(SStreamState* pState, int32_t number); + void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0fa84c99c6..a1074d6901 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -29,6 +29,7 @@ struct SResultRowEntryInfo; struct SFunctionNode; typedef struct SScalarParam SScalarParam; +typedef struct SStreamState SStreamState; typedef struct SFuncExecEnv { int32_t calcMemSize; @@ -126,7 +127,7 @@ typedef struct SInputColumnInfoData { typedef struct SSerializeDataHandle { struct SDiskbasedBuf *pBuf; int32_t currentPage; - void *pState; + SStreamState *pState; } SSerializeDataHandle; // incremental state storage @@ -164,7 +165,7 @@ typedef struct STdbState { void *txn; } STdbState; -typedef struct { +struct SStreamState { STdbState *pTdbState; struct SStreamFileState *pFileState; int32_t number; @@ -173,7 +174,8 @@ typedef struct { int64_t streamId; int64_t streamBackendRid; int8_t dump; -} SStreamState; + int32_t tsIndex; +}; typedef struct SFunctionStateStore { int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index c603f9f5ac..f1c9d712e8 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); -void streamStateSetNumber(SStreamState* pState, int32_t number); +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5263546765..628311591a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -897,7 +897,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi); + SStorageAPI* pApi, int32_t tsIndex); void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup); void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a6613e589a..dc5ce60f95 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2903,7 +2903,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateInfo = NULL; pInfo->pTableScanOp = pTableScanOp; if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) { - pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex); } pInfo->readHandle = *pHandle; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 720734431f..db06775a18 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -659,7 +659,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index ef5c2572d9..c948b57534 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -711,7 +711,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 51071e2b4a..86d428cddf 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1522,7 +1522,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -1705,7 +1705,7 @@ static TSKEY sesionTs(void* pKey) { int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi) { + SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -1721,7 +1721,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pSup->pState) = *pState; - pSup->stateStore.streamStateSetNumber(pSup->pState, -1); + pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, @@ -2950,7 +2950,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3175,7 +3175,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream } SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info; pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i); + pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex); taosArrayPush(pInfo->pChildren, &pChildOp); } } @@ -3845,7 +3845,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int16_t type = pColNode->node.resType.type; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, - GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4082,7 +4082,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b709abc30b..5a792f6139 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3203,10 +3203,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* SWinKey key = {0}; if (pCtx->saveHandle.pBuf == NULL) { - SColumnInfoData* pColInfo = pCtx->input.pPTS; - if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) { - pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); - } + SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex); ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); key.groupId = pSrcBlock->info.id.groupId; key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b53dc9daa6..18bc672c8d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -395,7 +395,10 @@ int32_t streamStateClear(SStreamState* pState) { #endif } -void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) { + pState->number = number; + pState->tsIndex = tsIdex; +} int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) { #ifdef USE_ROCKSDB diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index f507ab7d3b..7b5f587feb 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -372,17 +372,22 @@ print step4============= sql create database test6 vgroups 4; sql use test6; -sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int); +sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s); +sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s); +sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s); +sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a); +sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9; +sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2); sleep 1000 -sql insert into t1 values(1648791211000,1,2,3,1.0); -sql insert into t1 values(1648791213000,2,3,4,1.1); -sql insert into t2 values(1648791215000,3,4,5,1.1); -sql insert into t2 values(1648791217000,4,5,6,1.1); +sql insert into t1 values(1648791211000,1,2,3,0); +sql insert into t1 values(1648791213000,2,3,4,0); +sql insert into t2 values(1648791215000,3,4,5,0); +sql insert into t2 values(1648791217000,4,5,6,0); $loop_count = 0 From f23a8a37bc3f157b85870066f5f6f2e8ee2299eb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 16:43:02 +0800 Subject: [PATCH 2/8] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 1 - source/dnode/snode/inc/sndInt.h | 4 ++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 9 +++++++++ source/libs/stream/src/streamMeta.c | 2 -- source/libs/stream/src/streamStart.c | 3 +-- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 138fad0ddb..c12bb146b4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -516,7 +516,6 @@ typedef struct SStreamMeta { TdThreadMutex backendMutex; SMetaHbInfo* pHbInfo; STaskUpdateInfo updateInfo; - SHashObj* pUpdateTaskSet; int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfPausedTasks; int64_t rid; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 68f7f756d5..024c3c6bae 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -30,11 +30,11 @@ extern "C" { #endif -typedef struct SSnode { +struct SSnode { char* path; SStreamMeta* pMeta; SMsgCb msgCb; -} SSnode; +}; #if 0 typedef struct { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0f7f74f78b..2fa9f9a9ff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { STaskStartInfo* pStartInfo = &pMeta->startInfo; int32_t vgId = pMeta->vgId; + bool scanWal = false; streamMetaWLock(pMeta); if (pStartInfo->taskStarting == 1) { @@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { pStartInfo->restartCount = 0; tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); } + + scanWal = true; } } streamMetaWUnLock(pMeta); + + if (scanWal && (vgId != SNODE_HANDLE)) { + tqDebug("vgId:%d start scan wal for executing tasks", vgId); + tqScanWalAsync(pMeta->ahandle, true); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aae3594905..8d5e4f3c87 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn) { - int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); - taosHashCleanup(pMeta->pUpdateTaskSet); taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0161f382ba..f2a694a554 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -398,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { if (pTask->status.taskStatus == TASK_STATUS__HALT) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); - // halt it self for count window stream task until the related - // fill history task completd. + // halt it self for count window stream task until the related fill history task completed. stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); From 8a5532dd8811de75adf9053e995c058d71f7110f Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 10 Apr 2024 16:48:02 +0800 Subject: [PATCH 3/8] add cfg value check --- include/util/tunit.h | 4 +- include/util/tutil.h | 2 + source/util/src/tconfig.c | 33 ++++--- source/util/src/tunit.c | 141 ++++++++++++++++------------- source/util/src/tutil.c | 18 ++++ tests/system-test/0-others/show.py | 43 +++++++++ 6 files changed, 163 insertions(+), 78 deletions(-) diff --git a/include/util/tunit.h b/include/util/tunit.h index de37c85929..207431fa7d 100644 --- a/include/util/tunit.h +++ b/include/util/tunit.h @@ -22,10 +22,10 @@ extern "C" { #endif -int64_t taosStrHumanToInt64(const char* str); +int32_t taosStrHumanToInt64(const char* str, int64_t* out); void taosInt64ToHumanStr(int64_t val, char* outStr); -int32_t taosStrHumanToInt32(const char* str); +int32_t taosStrHumanToInt32(const char* str, int32_t* out); void taosInt32ToHumanStr(int32_t val, char* outStr); #ifdef __cplusplus diff --git a/include/util/tutil.h b/include/util/tutil.h index de2cd205f2..54ce6fc849 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -56,6 +56,8 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); +int32_t parseCfgReal(const char* str, double* out); + static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { T_MD5_CTX context; tMD5Init(&context); diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 8fdc2654c5..caca123777 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -174,7 +174,9 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty } static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - int32_t ival = taosStrHumanToInt32(value); + int32_t ival; + int32_t code = taosStrHumanToInt32(value, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax); @@ -188,7 +190,9 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st } static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - int64_t ival = taosStrHumanToInt64(value); + int64_t ival; + int32_t code = taosStrHumanToInt64(value, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax); @@ -202,15 +206,16 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st } static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - float fval = (float)atof(value); - if (fval < pItem->fmin || fval > pItem->fmax) { + double dval; + int32_t code = parseCfgReal(value, &dval); + if (dval < pItem->fmin || dval > pItem->fmax) { uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), - cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax); + cfgStypeStr(stype), dval, pItem->fmin, pItem->fmax); terrno = TSDB_CODE_OUT_OF_RANGE; return -1; } - pItem->fval = fval; + pItem->fval = (float)dval; pItem->stype = stype; return 0; } @@ -408,7 +413,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p } } break; case CFG_DTYPE_INT32: { - int32_t ival = (int32_t)taosStrHumanToInt32(pVal); + int32_t ival; + int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax); @@ -417,7 +424,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p } } break; case CFG_DTYPE_INT64: { - int64_t ival = (int64_t)taosStrHumanToInt64(pVal); + int64_t ival; + int32_t code = taosStrHumanToInt64(pVal, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax); @@ -427,9 +436,11 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p } break; case CFG_DTYPE_FLOAT: case CFG_DTYPE_DOUBLE: { - float fval = (float)atof(pVal); - if (fval < pItem->fmin || fval > pItem->fmax) { - uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), fval, + double dval; + int32_t code = parseCfgReal(pVal, &dval); + if (code != TSDB_CODE_SUCCESS) return code; + if (dval < pItem->fmin || dval > pItem->fmax) { + uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval, pItem->fmin, pItem->fmax); terrno = TSDB_CODE_OUT_OF_RANGE; return -1; diff --git a/source/util/src/tunit.c b/source/util/src/tunit.c index 378f23613a..09f59f1e40 100644 --- a/source/util/src/tunit.c +++ b/source/util/src/tunit.c @@ -23,45 +23,74 @@ #define UNIT_ONE_PEBIBYTE (UNIT_ONE_TEBIBYTE * UNIT_SIZE_CONVERT_FACTOR) #define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR) -int64_t taosStrHumanToInt64(const char* str) { - size_t sLen = strlen(str); - if (sLen < 2) return atoll(str); - - int64_t val = 0; - - char* strNoUnit = NULL; - char unit = str[sLen - 1]; - if ((unit == 'P') || (unit == 'p')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_PEBIBYTE; - } else if ((unit == 'T') || (unit == 't')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_TEBIBYTE; - } else if ((unit == 'G') || (unit == 'g')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE; - } else if ((unit == 'M') || (unit == 'm')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE; - } else if ((unit == 'K') || (unit == 'k')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE; - } else { - val = atoll(str); +static int32_t parseCfgIntWithUnit(const char* str, double *res) { + double val, temp = INT64_MAX; + char* endPtr; + errno = 0; + val = taosStr2Int64(str, &endPtr, 0); + if (*endPtr == '.' || errno == ERANGE) { + errno = 0; + val = taosStr2Double(str, &endPtr); } + if (endPtr == str || errno == ERANGE || isnan(val)) { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + while (isspace((unsigned char)*endPtr)) endPtr++; + uint64_t factor = 1; + if (*endPtr != '\0') { + switch (*endPtr) { + case 'P': + case 'p': { + temp /= UNIT_ONE_PEBIBYTE; + factor = UNIT_ONE_PEBIBYTE; + } break; + case 'T': + case 't': { + temp /= UNIT_ONE_TEBIBYTE; + factor = UNIT_ONE_TEBIBYTE; + } break; + case 'G': + case 'g': { + temp /= UNIT_ONE_GIBIBYTE; + factor = UNIT_ONE_GIBIBYTE; + } break; + case 'M': + case 'm': { + temp /= UNIT_ONE_MEBIBYTE; + factor = UNIT_ONE_MEBIBYTE; + } break; + case 'K': + case 'k': { + temp /= UNIT_ONE_KIBIBYTE; + factor = UNIT_ONE_KIBIBYTE; + } break; + default: + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + if ((val > 0 && val > temp) || (val < 0 && val < -temp)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + endPtr++; + val *= factor; + } + while (isspace((unsigned char)*endPtr)) endPtr++; + if (*endPtr) { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + val = rint(val); + *res = val; + return TSDB_CODE_SUCCESS; +} - taosMemoryFree(strNoUnit); - return val; +int32_t taosStrHumanToInt64(const char* str, int64_t *out) { + double res; + int32_t code = parseCfgIntWithUnit(str, &res); + if (code == TSDB_CODE_SUCCESS) *out = (int64_t)res; + return code; } #ifdef BUILD_NO_CALL @@ -83,35 +112,17 @@ void taosInt64ToHumanStr(int64_t val, char* outStr) { } #endif -int32_t taosStrHumanToInt32(const char* str) { - size_t sLen = strlen(str); - if (sLen < 2) return atoll(str); - - int32_t val = 0; - - char* strNoUnit = NULL; - char unit = str[sLen - 1]; - if ((unit == 'G') || (unit == 'g')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE; - } else if ((unit == 'M') || (unit == 'm')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE; - } else if ((unit == 'K') || (unit == 'k')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE; - } else { - val = atoll(str); +int32_t taosStrHumanToInt32(const char* str, int32_t* out) { + double res; + int32_t code = parseCfgIntWithUnit(str, &res); + if (code == TSDB_CODE_SUCCESS) { + if (res < INT32_MIN || res > INT32_MAX) { + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + *out = (int32_t)res; } - - taosMemoryFree(strNoUnit); - return val; + return code; } #ifdef BUILD_NO_CALL diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 6b6878ec83..f201edcb5e 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -496,3 +496,21 @@ size_t twcsncspn(const TdUcs4 *wcs, size_t size, const TdUcs4 *reject, size_t rs return index; } + +int32_t parseCfgReal(const char* str, double* out) { + double val; + char *endPtr; + errno = 0; + val = taosStr2Double(str, &endPtr); + if (str == endPtr || errno == ERANGE || isnan(val)) { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + while(isspace((unsigned char)*endPtr)) endPtr++; + if (*endPtr != '\0') { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + *out = val; + return TSDB_CODE_SUCCESS; +} diff --git a/tests/system-test/0-others/show.py b/tests/system-test/0-others/show.py index 75d7116e03..bc1239fae8 100644 --- a/tests/system-test/0-others/show.py +++ b/tests/system-test/0-others/show.py @@ -240,6 +240,49 @@ class TDTestCase: self.show_create_sysdb_sql() self.show_create_systb_sql() self.show_column_name() + self.test_show_variables() + + def get_variable(self, name: str, local: bool = True): + if local: + sql = 'show local variables' + else: + sql = f'select `value` from information_schema.ins_dnode_variables where name like "{name}"' + tdSql.query(sql, queryTimes=1) + res = tdSql.queryResult + if local: + for row in res: + if row[0] == name: + return row[1] + else: + if len(res) > 0: + return res[0][0] + raise Exception(f"variable {name} not found") + + def test_show_variables(self): + epsion = 0.0000001 + var = 'minimalTmpDirGB' + expect_val: float = 10.11 + sql = f'ALTER LOCAL "{var}" "{expect_val}"' + tdSql.execute(sql) + val: float = float(self.get_variable(var)) + if val != expect_val: + tdLog.exit(f'failed to set local {var} to {expect_val} actually {val}') + + error_vals = ['a', '10a', '', '1.100r', '1.12 r'] + for error_val in error_vals: + tdSql.error(f'ALTER LOCAL "{var}" "{error_val}"') + + var = 'supportVnodes' + expect_val = 1240 ## 1.211111 * 1024 + sql = f'ALTER DNODE 1 "{var}" "1.211111k"' + tdSql.execute(sql, queryTimes=1) + val = int(self.get_variable(var, False)) + if val != expect_val: + tdLog.exit(f'failed to set dnode {var} to {expect_val} actually {val}') + + error_vals = ['a', '10a', '', '1.100r', '1.12 r', '5k'] + for error_val in error_vals: + tdSql.error(f'ALTER DNODE 1 "{var}" "{error_val}"') def stop(self): tdSql.close() From c6d492d3fd3c2507d2a5d13e87bf6f4ddda48309 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 17:41:59 +0800 Subject: [PATCH 4/8] set ts column index for function --- include/libs/stream/tstreamFileState.h | 9 +++++-- .../executor/src/streamcountwindowoperator.c | 3 +-- .../executor/src/streameventwindowoperator.c | 2 +- .../executor/src/streamtimewindowoperator.c | 24 +++---------------- source/libs/stream/src/streamSessionState.c | 7 ++++++ source/libs/stream/src/streamState.c | 12 +++++----- source/libs/stream/src/tstreamFileState.c | 10 +++++++- 7 files changed, 34 insertions(+), 33 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index a9a198d194..68b9c4baa2 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState; typedef SList SStreamSnapshot; typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); -typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); @@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); typedef int32_t (*_state_file_clear_fn)(SStreamState* pState); +typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2); SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, @@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); void* getRowStateBuff(SStreamFileState* pFileState); @@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); +//function +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index db06775a18..224808b41a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -657,6 +657,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -675,8 +676,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index c948b57534..eef002ff35 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -709,6 +709,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -716,7 +717,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 86d428cddf..98b04af525 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1730,25 +1730,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); - int32_t pageSize = 4096; - while (pageSize < pSup->resultRowSize * 4) { - pageSize <<= 1u; - } - // at least four pages need to be in buffer - int32_t bufSize = 4096 * 256; - if (bufSize <= pageSize) { - bufSize = pageSize * 4; - } - - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_DISKSPACE; - qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir); - return terrno; - } - - int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { - pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf; + pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } pSup->pSessionAPI = pApi; @@ -2948,6 +2931,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh .deleteMark = getDeleteMark(&pSessionNode->window, 0), }; + pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -2957,7 +2941,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } @@ -3843,14 +3826,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys } int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } - - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 295132a4f5..687b4bcf12 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -202,6 +202,13 @@ _end: return code; } +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + SWinKey* pTmpkey = pKey; + ASSERT(keyLen == sizeof(SWinKey)); + SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; + return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen); +} + int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSessionKey* pKey = pPos->pKey; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 18bc672c8d..1f46384448 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); return code; #else @@ -290,10 +290,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; return code; #else diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f86ab6b8a3..19f403a6a6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -58,6 +58,8 @@ struct SStreamFileState { _state_file_remove_fn stateFileRemoveFn; _state_file_get_fn stateFileGetFn; _state_file_clear_fn stateFileClearFn; + + _state_fun_get_fn stateFunctionGetFn; }; typedef SRowBuffPos SRowBuffInfo; @@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = intervalFileGetFn; pFileState->stateFileClearFn = streamStateClear_rocksdb; pFileState->cfName = taosStrdup("state"); + pFileState->stateFunctionGetFn = getRowBuff; } else { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = sessionWinStateCleanup; @@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = sessionFileGetFn; pFileState->stateFileClearFn = streamStateSessionClear_rocksdb; pFileState->cfName = taosStrdup("sess"); + pFileState->stateFunctionGetFn = getSessionRowBuff; } if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { @@ -736,7 +740,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { return TSDB_CODE_SUCCESS; } -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { pFileState->flushMark = TMAX(pFileState->flushMark, ts); @@ -754,3 +758,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; } + +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen); +} From e261023ee60eb5d03a71196f56c25517047e0615 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 18:42:44 +0800 Subject: [PATCH 5/8] fix(stream): add lock, and fix race condition. --- source/dnode/vnode/src/tq/tqUtil.c | 6 ++++++ source/libs/stream/src/streamTask.c | 11 ++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 6029575e2c..d8440e996f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -501,6 +501,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b } // extract the required source task for a given stream, identified by streamId + streamMetaRLock(pMeta); + + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + for (int32_t i = 0; i < numOfTasks; ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); if (pId->streamId != streamId) { @@ -552,5 +556,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b walCloseReader(pReader); } + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 44f70f8b19..88c8c85dec 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) { bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { bool ret = false; - // double check + taosThreadMutexLock(&pTask->lock); if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { - taosThreadMutexLock(&pTask->lock); - if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { - pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; - ret = true; - } - taosThreadMutexUnlock(&pTask->lock); + pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; + ret = true; } + taosThreadMutexUnlock(&pTask->lock); return ret; } From fabb986aa8cff6a5cba7e3685c8269c02eeaf70a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 19:54:33 +0800 Subject: [PATCH 6/8] fix(stream): fix stack overflow, caused by print epset. --- include/common/tmisce.h | 16 +---------- source/common/src/tmisce.c | 31 +++++++++++++++++++++ source/dnode/mnode/impl/src/mndStream.c | 5 ++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 6 ++-- source/libs/stream/src/streamTask.c | 6 ++-- source/libs/transport/src/transCli.c | 4 +-- 6 files changed, 43 insertions(+), 25 deletions(-) diff --git a/include/common/tmisce.h b/include/common/tmisce.h index afb33c733a..267ca814d4 100644 --- a/include/common/tmisce.h +++ b/include/common/tmisce.h @@ -29,21 +29,7 @@ typedef struct SCorEpSet { #define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse])) -#define EPSET_TO_STR(_eps, tbuf) \ - do { \ - int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \ - for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \ - if (_i == (_eps)->numOfEps - 1) { \ - len += \ - snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ - } else { \ - len += \ - snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ - } \ - } \ - len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \ - } while (0); - +int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len); int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp); void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 1606b45eed..8558ccb447 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) { tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn)); } } + void epAssign(SEp* pDst, SEp* pSrc) { if (pSrc == NULL || pDst == NULL) { return; @@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) { tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn)); pDst->port = pSrc->port; } + void epsetSort(SEpSet* pDst) { if (pDst->numOfEps <= 1) { return; @@ -127,6 +129,35 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) { return ep; } +int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) { + int len = snprintf(pBuf, bufLen, "epset:{"); + if (len < 0) { + return -1; + } + + for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) { + int32_t ret = 0; + + if (_i == pEpSet->numOfEps - 1) { + ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); + } else { + ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); + } + + if (ret < 0) { + return -1; + } + + len += ret; + } + + if (len < bufLen) { + /*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse); + } + + return TSDB_CODE_SUCCESS; +} + int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { SJson* pJson = tjsonCreateObject(); if (pJson == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ff05db417e..8f9afb2adc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1747,7 +1747,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); char buf[256] = {0}; - EPSET_TO_STR(&pCurrent->epset, buf); + epsetToStr(&pCurrent->epset, buf, tListLen(buf)); + mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); @@ -1898,7 +1899,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { taosArrayPush(plist, pEntry); char buf[256] = {0}; - EPSET_TO_STR(&pEntry->epset, buf); + epsetToStr(&pEntry->epset, buf, tListLen(buf)); mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf); } taosHashCleanup(pHash); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index a124b4052c..d5bc12f9df 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { } char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); + epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); taosArrayPush(pVgroupListSnapshot, &entry); @@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { entry.nodeId = SNODE_HANDLE; char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); + epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); taosArrayPush(pVgroupListSnapshot, &entry); sdbRelease(pSdb, pObj); @@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } char buf[256] = {0}; - EPSET_TO_STR(&epset, buf); + epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 88c8c85dec..c34e162326 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp if (pTask->info.nodeId == nodeId) { // execution task should be moved away epsetAssign(&pTask->info.epSet, pEpSet); - EPSET_TO_STR(pEpSet, buf) + epsetToStr(pEpSet, buf, tListLen(buf)); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); } @@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { char buf[512] = {0}; - EPSET_TO_STR(pEpSet, buf); + epsetToStr(pEpSet, buf, tListLen(buf)); int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { @@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { char buf[512] = {0}; - EPSET_TO_STR(pEpSet, buf); + epsetToStr(pEpSet, buf, tListLen(buf)); int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 062609baac..79699a755a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { STransConnCtx* pCtx = pMsg->ctx; STraceId* trace = &pMsg->msg.info.traceId; char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); + epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, pCtx->retryNextInterval); return; @@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (hasEpSet) { if (rpcDebugFlag & DEBUG_TRACE) { char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); + epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } } From 917487b1103533b3e86e39ce662163f95a89d8b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 19:55:31 +0800 Subject: [PATCH 7/8] refactor: do some internal refactor. --- source/common/src/tmisce.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 8558ccb447..77dd8344b1 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -137,7 +137,6 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) { for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) { int32_t ret = 0; - if (_i == pEpSet->numOfEps - 1) { ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); } else { From b2cefa80393aeaa5fb8c6672d147223e25f6a788 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 11 Apr 2024 09:10:16 +0800 Subject: [PATCH 8/8] release state buff --- source/libs/stream/src/streamState.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1f46384448..ad6f5d48dc 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -295,6 +295,7 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; + streamStateReleaseBuf(pState, pVal, false); return code; #else return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen); @@ -332,7 +333,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) { int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) { int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal); - streamFileStateReleaseBuff(pState->pFileState, pos, false); + streamStateReleaseBuf(pState, pos, false); return code; }