From 74bea443009ad69f9b34c6c8a5e214d2ae88bbd2 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 20 Jun 2023 17:30:59 +0800 Subject: [PATCH] stream op transform --- source/dnode/vnode/src/tq/tq.c | 12 ++++++++++-- source/libs/executor/inc/executorInt.h | 2 ++ source/libs/executor/src/executorInt.c | 14 ++++++++++++++ source/libs/executor/src/filloperator.c | 1 + source/libs/executor/src/groupoperator.c | 1 + source/libs/executor/src/scanoperator.c | 2 +- 6 files changed, 29 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 90790132f1..092a10b1cf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -779,8 +779,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SStreamTask* pSateTask = pTask; + SStreamTask task = {0}; if (pTask->info.fillHistory) { - pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); + task.id = pTask->streamTaskId; + SStreamMeta meta = {0}; + task.pMeta = pTask->pMeta; + pSateTask = &task; } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { @@ -798,8 +802,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { SStreamTask* pSateTask = pTask; + SStreamTask task = {0}; if (pTask->info.fillHistory) { - pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); + task.id = pTask->streamTaskId; + SStreamMeta meta = {0}; + task.pMeta = pTask->pMeta; + pSateTask = &task; } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 07507db836..d2c1812205 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -682,6 +682,8 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr void doClearBufferedBlocks(SStreamScanInfo* pInfo); uint64_t calcGroupId(char* pData, int32_t len); +void streamOpReleaseState(struct SOperatorInfo* pOperator); +void streamOpReloadState(struct SOperatorInfo* pOperator); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 1b23f3ea57..edda6ee03f 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1086,3 +1086,17 @@ void qStreamCloseTsdbReader(void* task) { } } } + +void streamOpReleaseState(SOperatorInfo* pOperator) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.releaseStreamStateFn) { + downstream->fpSet.releaseStreamStateFn(downstream); + } +} + +void streamOpReloadState(SOperatorInfo* pOperator) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index f9e8a32520..bde59013ee 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1560,6 +1560,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 4f29ada7ef..0a98d86af5 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1326,6 +1326,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 817b2a8d5f..0e68b8d0a6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2531,12 +2531,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState); return pOperator;