fix(query): handle the case of the single block in last files.
This commit is contained in:
parent
d8102fc8d8
commit
4774baa635
|
@ -100,7 +100,6 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||||
uint64_t suid, void** pReader) {
|
uint64_t suid, void** pReader) {
|
||||||
*pReader = NULL;
|
*pReader = NULL;
|
||||||
|
|
||||||
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
|
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -119,6 +118,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
||||||
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
|
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
|
||||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
|
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
|
||||||
p->pTableList = pTableIdList;
|
p->pTableList = pTableIdList;
|
||||||
|
p->numOfTables = numOfTables;
|
||||||
|
|
||||||
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
|
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
|
||||||
if (p->transferBuf == NULL) {
|
if (p->transferBuf == NULL) {
|
||||||
|
@ -205,7 +205,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
|
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
|
||||||
LRUHandle* h = NULL;
|
LRUHandle* h = NULL;
|
||||||
SArray* pRow = NULL;
|
SArray* pRow = NULL;
|
||||||
// size_t numOfTables = taosArrayGetSize(pr->pTableList);
|
|
||||||
bool hasRes = false;
|
bool hasRes = false;
|
||||||
SArray* pLastCols = NULL;
|
SArray* pLastCols = NULL;
|
||||||
|
|
||||||
|
|
|
@ -270,24 +270,27 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void clearBlockScanInfo(STableBlockScanInfo* p) {
|
||||||
|
p->iterInit = false;
|
||||||
|
p->iiter.hasVal = false;
|
||||||
|
|
||||||
|
if (p->iter.iter != NULL) {
|
||||||
|
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->iiter.iter != NULL) {
|
||||||
|
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||||
|
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
||||||
|
tMapDataClear(&p->mapData);
|
||||||
|
}
|
||||||
|
|
||||||
static void destroyBlockScanInfo(SHashObj* pTableMap) {
|
static void destroyBlockScanInfo(SHashObj* pTableMap) {
|
||||||
STableBlockScanInfo* p = NULL;
|
STableBlockScanInfo* p = NULL;
|
||||||
|
|
||||||
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
||||||
p->iterInit = false;
|
clearBlockScanInfo(p);
|
||||||
p->iiter.hasVal = false;
|
|
||||||
|
|
||||||
if (p->iter.iter != NULL) {
|
|
||||||
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (p->iiter.iter != NULL) {
|
|
||||||
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
|
|
||||||
}
|
|
||||||
|
|
||||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
|
||||||
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
|
||||||
tMapDataClear(&p->mapData);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pTableMap);
|
taosHashCleanup(pTableMap);
|
||||||
|
@ -3455,10 +3458,14 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
// TODO refactor: with createDataBlockScanInfo
|
// TODO refactor: with createDataBlockScanInfo
|
||||||
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
|
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
|
||||||
ASSERT(pReader != NULL);
|
ASSERT(pReader != NULL);
|
||||||
|
|
||||||
|
STableBlockScanInfo* p = NULL;
|
||||||
|
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
|
||||||
|
clearBlockScanInfo(p);
|
||||||
|
}
|
||||||
taosHashClear(pReader->status.pTableMap);
|
taosHashClear(pReader->status.pTableMap);
|
||||||
|
|
||||||
STableKeyInfo* pList = (STableKeyInfo*) pTableList;
|
STableKeyInfo* pList = (STableKeyInfo*) pTableList;
|
||||||
|
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
|
STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
|
||||||
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
||||||
|
|
|
@ -95,7 +95,7 @@ typedef struct SColMatchInfo {
|
||||||
int32_t matchType; // determinate the source according to col id or slot id
|
int32_t matchType; // determinate the source according to col id or slot id
|
||||||
} SColMatchInfo;
|
} SColMatchInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct STableListInfo {
|
||||||
bool oneTableForEachGroup;
|
bool oneTableForEachGroup;
|
||||||
int32_t numOfGroups;
|
int32_t numOfGroups;
|
||||||
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
||||||
|
|
|
@ -3658,13 +3658,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
|
|
||||||
|
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pList = taosArrayGet(pTableListInfo->pTableList, 0);
|
size_t num = getTotalTables(pTableListInfo);
|
||||||
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
|
void* pList = NULL;
|
||||||
|
if (num > 0) {
|
||||||
|
pList = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||||
|
}
|
||||||
|
|
||||||
STsdbReader* pReader = NULL;
|
STsdbReader* pReader = NULL;
|
||||||
tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
|
tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
|
||||||
|
|
Loading…
Reference in New Issue