fix: more concurrency read/write
This commit is contained in:
parent
613ca9c67d
commit
65fc81c45b
|
@ -315,6 +315,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
|
tsdbReaderClose(pTSInfo->dataReader);
|
||||||
|
pTSInfo->dataReader = NULL;
|
||||||
#if 0
|
#if 0
|
||||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
||||||
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
||||||
|
@ -358,6 +361,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
// TODO after dropping table, table may be not found
|
// TODO after dropping table, table may be not found
|
||||||
ASSERT(found);
|
ASSERT(found);
|
||||||
|
|
||||||
|
if (pTableScanInfo == NULL) {
|
||||||
|
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
|
||||||
|
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
|
||||||
|
pTableScanInfo->dataReader == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
||||||
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
||||||
pTableScanInfo->cond.twindows.skey = ts + 1;
|
pTableScanInfo->cond.twindows.skey = ts + 1;
|
||||||
|
|
|
@ -982,7 +982,9 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
blockDataCleanup(pSDB);
|
blockDataCleanup(pSDB);
|
||||||
*pRowIndex = 0;
|
*pRowIndex = 0;
|
||||||
return NULL;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
|
tsdbReaderClose(pTableScanInfo->dataReader);
|
||||||
|
pTableScanInfo->dataReader = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResult->info.groupId == pInfo->groupId) {
|
if (pResult->info.groupId == pInfo->groupId) {
|
||||||
|
@ -1003,6 +1005,9 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_
|
||||||
}
|
}
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
|
tsdbReaderClose(pTableScanInfo->dataReader);
|
||||||
|
pTableScanInfo->dataReader = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue