enh: reuse tsdb cache reader
This commit is contained in:
parent
9f78c22fb5
commit
423ec923ca
|
@ -193,6 +193,7 @@ void *tsdbGetIdx(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||||
|
|
||||||
|
int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
|
||||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||||
uint64_t suid, void **pReader, const char *idstr);
|
uint64_t suid, void **pReader, const char *idstr);
|
||||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
||||||
|
|
|
@ -142,6 +142,18 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOfTables) {
|
||||||
|
SCacheRowsReader* pReader = (SCacheRowsReader*)reader;
|
||||||
|
|
||||||
|
pReader->pTableList = pTableIdList;
|
||||||
|
pReader->numOfTables = numOfTables;
|
||||||
|
pReader->lastTs = INT64_MIN;
|
||||||
|
|
||||||
|
resetLastBlockLoadInfo(pReader->pLoadInfo);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||||
uint64_t suid, void** pReader, const char* idstr) {
|
uint64_t suid, void** pReader, const char* idstr) {
|
||||||
*pReader = NULL;
|
*pReader = NULL;
|
||||||
|
|
|
@ -224,13 +224,17 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
if (NULL == pInfo->pLastrowReader) {
|
||||||
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
|
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
||||||
pTaskInfo->id.str);
|
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
pTaskInfo->id.str);
|
||||||
pInfo->currentGroupIndex += 1;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayClear(pInfo->pUidList);
|
pInfo->currentGroupIndex += 1;
|
||||||
continue;
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tsdbReuseCacherowsReader(pInfo->pLastrowReader, pList, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(pInfo->pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
@ -263,13 +267,14 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
|
//pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
} else {
|
} else {
|
||||||
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
|
//pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue