From d651ba02ae66e4077f36d7a165d17890771f62e3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 3 Apr 2023 19:54:52 +0800 Subject: [PATCH] fix:lose consume data because of exec close if consume while insert data --- include/libs/executor/executor.h | 2 ++ source/dnode/vnode/src/tq/tqPush.c | 2 +- source/dnode/vnode/src/tq/tqScan.c | 2 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executor.c | 8 ++++++-- source/libs/executor/src/projectoperator.c | 15 +-------------- 6 files changed, 12 insertions(+), 19 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 6b993fe9c7..33172a4f86 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -198,6 +198,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit); +void qStreamSetOpen(qTaskInfo_t tinfo); + void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 1619829115..1bed07e7d9 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -253,7 +253,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 if (qStreamSetScanMemData(pTaskInfo, submit) != 0) { return; } - + qStreamSetOpen(pTaskInfo); // here start to scan submit block to extract the subscribed data int32_t totalRows = 0; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index dff162d527..b4e50312fd 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -81,7 +81,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - + qStreamSetOpen(task); tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8de29662c8..98fba39950 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -192,7 +192,7 @@ enum { OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, OP_EXEC_DONE = 0x9, - OP_EXEC_RECV = 0x11, +// OP_EXEC_RECV = 0x11, }; typedef struct SOperatorFpSet { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fdb2a3da06..8058ffd423 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1074,6 +1074,12 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { return 0; } +void qStreamSetOpen(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + SOperatorInfo* pOperator = pTaskInfo->pRoot; + pOperator->status = OP_NOT_OPENED; +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1086,8 +1092,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } if (subType == TOPIC_SUB_TYPE__COLUMN) { - pOperator->status = OP_OPENED; - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream != 1) { qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 0f7820f076..86c49e0fc8 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -227,17 +227,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - if (pTaskInfo->streamInfo.submit.msgStr) { - pOperator->status = OP_OPENED; - } if (pOperator->status == OP_EXEC_DONE) { -// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { -// pOperator->status = OP_OPENED; -// qDebug("projection in queue model, set status open and return null"); -// return NULL; -// } - return NULL; } @@ -262,11 +253,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL && pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) { -// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { -// pOperator->status = OP_OPENED; -// return NULL; -// } + if (pBlock == NULL) { qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); setOperatorCompleted(pOperator); break;