From 65fc81c45b83ed31678cb1b15b868dfce834daff Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 22 Jul 2022 02:45:11 +0000 Subject: [PATCH] fix: more concurrency read/write --- source/libs/executor/src/executorMain.c | 11 +++++++++++ source/libs/executor/src/scanoperator.c | 7 ++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e0020a496e..06c710f4c4 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -315,6 +315,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pInfo = pOperator->info; if (pOffset->type == TMQ_OFFSET__LOG) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->dataReader); + pTSInfo->dataReader = NULL; #if 0 if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && pInfo->tqReader->pWalReader->curVersion != pOffset->version) { @@ -358,6 +361,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { // TODO after dropping table, table may be not found ASSERT(found); + if (pTableScanInfo == NULL) { + if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, + pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || + pTableScanInfo->dataReader == NULL) { + ASSERT(0); + } + } + tsdbSetTableId(pTableScanInfo->dataReader, uid); int64_t oldSkey = pTableScanInfo->cond.twindows.skey; pTableScanInfo->cond.twindows.skey = ts + 1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a691091fe5..5e96bb2ee2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -982,7 +982,9 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 if (!pResult) { blockDataCleanup(pSDB); *pRowIndex = 0; - return NULL; + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTableScanInfo->dataReader); + pTableScanInfo->dataReader = NULL; } if (pResult->info.groupId == pInfo->groupId) { @@ -1003,6 +1005,9 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_ } if (!pResult) { pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTableScanInfo->dataReader); + pTableScanInfo->dataReader = NULL; return NULL; }