[td-98] fix bugs in query when data in both cache and files
This commit is contained in:
parent
4b5dab104f
commit
fa5e2698fe
|
@ -2399,8 +2399,8 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
|
||||||
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
|
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
|
||||||
if (pTableMetaInfo->pTableMeta != NULL) {
|
if (pTableMetaInfo->pTableMeta != NULL) {
|
||||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||||
tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns,
|
tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
|
||||||
tinfo.numOfTags);
|
tinfo.numOfTags, pTableMetaInfo->pTableMeta);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2239,24 +2239,6 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
|
||||||
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
|
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
|
|
||||||
if (pQInfo == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
||||||
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
|
|
||||||
|
|
||||||
if (pQInfo->pTableDataInfo != NULL) {
|
|
||||||
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
|
|
||||||
for (int32_t j = 0; j < 0; ++j) {
|
|
||||||
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tfree(pQInfo->pTableDataInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
|
int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
|
||||||
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
|
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
|
||||||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
|
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
|
||||||
|
@ -2264,14 +2246,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
|
||||||
pQuery->window.ekey, pQuery->order.order);
|
pQuery->window.ekey, pQuery->order.order);
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
// pQInfo->over = 1;
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->status = 0;
|
pQuery->status = 0;
|
||||||
|
|
||||||
pQuery->rec = (SResultRec){0};
|
|
||||||
pQuery->rec = (SResultRec){0};
|
pQuery->rec = (SResultRec){0};
|
||||||
|
|
||||||
changeExecuteScanOrder(pQuery, true);
|
changeExecuteScanOrder(pQuery, true);
|
||||||
|
@ -5925,7 +5903,7 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
setQueryKilled(pQInfo);
|
setQueryKilled(pQInfo);
|
||||||
|
|
||||||
dTrace("QInfo:%p start to free SQInfo", pQInfo);
|
dTrace("QInfo:%p start to free QInfo", pQInfo);
|
||||||
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
|
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
|
||||||
tfree(pQuery->sdata[col]);
|
tfree(pQuery->sdata[col]);
|
||||||
}
|
}
|
||||||
|
@ -5939,7 +5917,16 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
// }
|
// }
|
||||||
|
|
||||||
sem_destroy(&(pQInfo->dataReady));
|
sem_destroy(&(pQInfo->dataReady));
|
||||||
vnodeQueryFreeQInfoEx(pQInfo);
|
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
|
||||||
|
|
||||||
|
if (pQInfo->pTableDataInfo != NULL) {
|
||||||
|
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
|
||||||
|
for (int32_t j = 0; j < 0; ++j) {
|
||||||
|
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(pQInfo->pTableDataInfo);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
|
||||||
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
|
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
|
||||||
|
@ -6121,6 +6108,7 @@ _query_over:
|
||||||
}
|
}
|
||||||
|
|
||||||
void qDestroyQueryInfo(SQInfo* pQInfo) {
|
void qDestroyQueryInfo(SQInfo* pQInfo) {
|
||||||
|
dTrace("QInfo:%p query completed", pQInfo);
|
||||||
freeQInfo(pQInfo);
|
freeQInfo(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ref = T_REF_INC(ptNode);
|
int32_t ref = T_REF_INC(ptNode);
|
||||||
pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref)
|
pTrace("%p acquired by data in cache, refcnt:%d", ptNode, ref)
|
||||||
|
|
||||||
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
||||||
assert(ref >= 2);
|
assert(ref >= 2);
|
||||||
|
@ -516,7 +516,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
|
|
||||||
*data = NULL;
|
*data = NULL;
|
||||||
int16_t ref = T_REF_DEC(pNode);
|
int16_t ref = T_REF_DEC(pNode);
|
||||||
pTrace("%p is released, refcnt:%d", pNode, ref);
|
pTrace("%p data released, refcnt:%d", pNode, ref);
|
||||||
|
|
||||||
if (_remove) {
|
if (_remove) {
|
||||||
__cache_wr_lock(pCacheObj);
|
__cache_wr_lock(pCacheObj);
|
||||||
|
|
|
@ -367,14 +367,16 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
|
||||||
SSubmitMsgIter msgIter;
|
SSubmitMsgIter msgIter;
|
||||||
|
|
||||||
tsdbInitSubmitMsgIter(pMsg, &msgIter);
|
tsdbInitSubmitMsgIter(pMsg, &msgIter);
|
||||||
SSubmitBlk *pBlock;
|
SSubmitBlk *pBlock = NULL;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
|
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
|
||||||
if (tsdbInsertDataToTable(repo, pBlock) < 0) {
|
if ((code = tsdbInsertDataToTable(repo, pBlock)) != TSDB_CODE_SUCCESS) {
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -735,7 +737,9 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
|
||||||
|
|
||||||
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
||||||
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
||||||
if (pTable == NULL) return -1;
|
if (pTable == NULL) {
|
||||||
|
return TSDB_CODE_INVALID_TABLE_ID;
|
||||||
|
}
|
||||||
|
|
||||||
SSubmitBlkIter blkIter;
|
SSubmitBlkIter blkIter;
|
||||||
SDataRow row;
|
SDataRow row;
|
||||||
|
@ -747,7 +751,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||||
|
|
|
@ -438,10 +438,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
} else { // check data in cache
|
} else { // check data in cache
|
||||||
|
pQueryHandle->cur.fid = -1;
|
||||||
return hasMoreDataInCacheForSingleModel(pQueryHandle);
|
return hasMoreDataInCacheForSingleModel(pQueryHandle);
|
||||||
}
|
}
|
||||||
} else {
|
} else { // next block in the same file
|
||||||
// next block in the same file
|
|
||||||
cur->slot += step;
|
cur->slot += step;
|
||||||
|
|
||||||
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
||||||
|
@ -526,9 +526,11 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
|
||||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
|
if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
|
||||||
endPos = blockInfo.size - 1;
|
endPos = blockInfo.size - 1;
|
||||||
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
|
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
|
||||||
|
pCheckInfo->lastKey = blockInfo.window.ekey + 1;
|
||||||
} else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
|
} else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
|
||||||
endPos = 0;
|
endPos = 0;
|
||||||
pQueryHandle->realNumOfRows = cur->pos + 1;
|
pQueryHandle->realNumOfRows = cur->pos + 1;
|
||||||
|
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
|
||||||
} else {
|
} else {
|
||||||
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order);
|
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order);
|
||||||
|
|
||||||
|
@ -539,6 +541,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
|
||||||
} else {
|
} else {
|
||||||
pQueryHandle->realNumOfRows = endPos - cur->pos;
|
pQueryHandle->realNumOfRows = endPos - cur->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
|
||||||
} else {
|
} else {
|
||||||
if (endPos > cur->pos) {
|
if (endPos > cur->pos) {
|
||||||
pQueryHandle->realNumOfRows = 0;
|
pQueryHandle->realNumOfRows = 0;
|
||||||
|
@ -546,6 +550,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
|
||||||
} else {
|
} else {
|
||||||
pQueryHandle->realNumOfRows = cur->pos - endPos;
|
pQueryHandle->realNumOfRows = cur->pos - endPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -888,7 +894,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
|
||||||
|
|
||||||
rows = pHandle->realNumOfRows;
|
rows = pHandle->realNumOfRows;
|
||||||
skey = *(TSKEY*) pColInfoEx->pData;
|
skey = *(TSKEY*) pColInfoEx->pData;
|
||||||
ekey = *(TSKEY*) pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1);
|
ekey = *(TSKEY*) ((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pTable->mem != NULL) {
|
if (pTable->mem != NULL) {
|
||||||
|
@ -926,6 +932,10 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
|
||||||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
|
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
|
||||||
|
|
||||||
if (pHandle->cur.fid < 0) {
|
if (pHandle->cur.fid < 0) {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
} else {
|
} else {
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||||
|
@ -945,6 +955,7 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
|
||||||
} else {
|
} else {
|
||||||
doLoadDataFromFileBlock(pHandle);
|
doLoadDataFromFileBlock(pHandle);
|
||||||
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
|
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
|
||||||
|
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1297,7 +1308,6 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo
|
||||||
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
|
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
|
||||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
|
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
|
||||||
|
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||||
|
|
Loading…
Reference in New Issue