more r/w concurrency
This commit is contained in:
parent
6b454a4dd4
commit
657c2dcaa4
|
@ -793,6 +793,9 @@ typedef struct {
|
||||||
TSDBROW memRow, imemRow, fsRow;
|
TSDBROW memRow, imemRow, fsRow;
|
||||||
|
|
||||||
TsdbNextRowState input[3];
|
TsdbNextRowState input[3];
|
||||||
|
SMemTable *pMemTable;
|
||||||
|
SMemTable *pIMemTable;
|
||||||
|
STsdb *pTsdb;
|
||||||
} CacheNextRowIter;
|
} CacheNextRowIter;
|
||||||
|
|
||||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
|
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
|
||||||
|
@ -800,16 +803,20 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
|
|
||||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||||
|
|
||||||
|
tsdbTakeMemSnapshot(pTsdb, &pIter->pMemTable, &pIter->pIMemTable);
|
||||||
|
|
||||||
STbData *pMem = NULL;
|
STbData *pMem = NULL;
|
||||||
if (pTsdb->mem) {
|
if (pIter->pMemTable) {
|
||||||
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
|
tsdbGetTbDataFromMemTable(pIter->pMemTable, suid, uid, &pMem);
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData *pIMem = NULL;
|
STbData *pIMem = NULL;
|
||||||
if (pTsdb->imem) {
|
if (pIter->pIMemTable) {
|
||||||
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
|
tsdbGetTbDataFromMemTable(pIter->pIMemTable, suid, uid, &pIMem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pIter->pTsdb = pTsdb;
|
||||||
|
|
||||||
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||||
|
|
||||||
SDelIdx delIdx;
|
SDelIdx delIdx;
|
||||||
|
@ -878,6 +885,8 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
||||||
taosArrayDestroy(pIter->pSkyline);
|
taosArrayDestroy(pIter->pSkyline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbUntakeMemSnapshot(pIter->pTsdb, pIter->pMemTable, pIter->pIMemTable);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
_err:
|
_err:
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -118,6 +118,8 @@ struct STsdbReader {
|
||||||
char* idStr; // query info handle, for debug purpose
|
char* idStr; // query info handle, for debug purpose
|
||||||
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
||||||
SBlockLoadSuppInfo suppInfo;
|
SBlockLoadSuppInfo suppInfo;
|
||||||
|
SMemTable* pMem;
|
||||||
|
SMemTable* pIMem;
|
||||||
|
|
||||||
SIOCostSummary cost;
|
SIOCostSummary cost;
|
||||||
STSchema* pSchema;
|
STSchema* pSchema;
|
||||||
|
@ -1847,8 +1849,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
|
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
|
||||||
|
|
||||||
STbData* d = NULL;
|
STbData* d = NULL;
|
||||||
if (pReader->pTsdb->mem != NULL) {
|
if (pReader->pMem != NULL) {
|
||||||
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
|
tsdbGetTbDataFromMemTable(pReader->pMem, pReader->suid, pBlockScanInfo->uid, &d);
|
||||||
if (d != NULL) {
|
if (d != NULL) {
|
||||||
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
|
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1868,8 +1870,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData* di = NULL;
|
STbData* di = NULL;
|
||||||
if (pReader->pTsdb->imem != NULL) {
|
if (pReader->pIMem != NULL) {
|
||||||
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
|
tsdbGetTbDataFromMemTable(pReader->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
|
||||||
if (di != NULL) {
|
if (di != NULL) {
|
||||||
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
|
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2809,6 +2811,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbTakeMemSnapshot(pReader->pTsdb, &pReader->pMem, &pReader->pIMem);
|
||||||
|
|
||||||
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
@ -2824,6 +2828,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
|
||||||
|
tsdbUntakeMemSnapshot(pReader->pTsdb, pReader->pMem, pReader->pIMem);
|
||||||
|
|
||||||
taosMemoryFreeClear(pSupInfo->plist);
|
taosMemoryFreeClear(pSupInfo->plist);
|
||||||
taosMemoryFree(pSupInfo->colIds);
|
taosMemoryFree(pSupInfo->colIds);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue