fix bug
This commit is contained in:
parent
7765c48c5d
commit
cf8be044de
|
@ -33,6 +33,8 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define TSWINDOW_INITIALIZER ((STimeWindow) {INT64_MIN, INT64_MAX})
|
#define TSWINDOW_INITIALIZER ((STimeWindow) {INT64_MIN, INT64_MAX})
|
||||||
|
#define TSWINDOW_DESC_INITIALIZER ((STimeWindow) {INT64_MAX, INT64_MIN})
|
||||||
|
|
||||||
#define TSKEY_INITIAL_VAL INT64_MIN
|
#define TSKEY_INITIAL_VAL INT64_MIN
|
||||||
|
|
||||||
// Bytes for each type.
|
// Bytes for each type.
|
||||||
|
|
|
@ -265,6 +265,10 @@ TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
|
||||||
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
|
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
|
||||||
SMemRef *pRef);
|
SMemRef *pRef);
|
||||||
|
|
||||||
|
|
||||||
|
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get the queried table object list
|
* get the queried table object list
|
||||||
* @param pHandle
|
* @param pHandle
|
||||||
|
|
|
@ -33,6 +33,8 @@
|
||||||
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
|
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
|
||||||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
||||||
|
|
||||||
|
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
|
||||||
|
|
||||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
|
||||||
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
||||||
|
@ -1989,7 +1991,7 @@ static bool isCachedLastQuery(SQueryAttr *pQueryAttr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_INITIALIZER)) {
|
if (pQueryAttr->order.order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,12 +94,6 @@ typedef struct SIOCostSummary {
|
||||||
int64_t checkForNextTime;
|
int64_t checkForNextTime;
|
||||||
} SIOCostSummary;
|
} SIOCostSummary;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SCacheLastColInfo {
|
|
||||||
int16_t i;
|
|
||||||
int16_t j;
|
|
||||||
} SCacheLastColInfo;
|
|
||||||
|
|
||||||
typedef struct STsdbQueryHandle {
|
typedef struct STsdbQueryHandle {
|
||||||
STsdbRepo* pTsdb;
|
STsdbRepo* pTsdb;
|
||||||
SQueryFilePos cur; // current position
|
SQueryFilePos cur; // current position
|
||||||
|
@ -124,8 +118,6 @@ typedef struct STsdbQueryHandle {
|
||||||
SFSIter fileIter;
|
SFSIter fileIter;
|
||||||
SReadH rhelper;
|
SReadH rhelper;
|
||||||
STableBlockInfo* pDataBlockInfo;
|
STableBlockInfo* pDataBlockInfo;
|
||||||
|
|
||||||
SCacheLastColInfo lastCols;
|
|
||||||
SDataCols *pDataCols; // in order to hold current file data block
|
SDataCols *pDataCols; // in order to hold current file data block
|
||||||
int32_t allocSize; // allocated data block size
|
int32_t allocSize; // allocated data block size
|
||||||
SMemRef *pMemRef;
|
SMemRef *pMemRef;
|
||||||
|
@ -146,6 +138,7 @@ typedef struct STableGroupSupporter {
|
||||||
|
|
||||||
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
|
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
|
||||||
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
|
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
|
||||||
|
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
|
||||||
static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey);
|
static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey);
|
||||||
|
|
||||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
||||||
|
@ -554,7 +547,6 @@ TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STab
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
|
|
||||||
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
||||||
|
|
||||||
return pQueryHandle;
|
return pQueryHandle;
|
||||||
|
@ -2490,17 +2482,29 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t copyColsFromCacheMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, STable* pTable) {
|
|
||||||
|
static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
|
||||||
|
// the last row is cached in buffer, return it directly.
|
||||||
|
// here note that the pQueryHandle->window must be the TS_INITIALIZER
|
||||||
|
int32_t tgNumOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
||||||
|
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
assert(numOfTables > 0 && tgNumOfCols > 0);
|
||||||
|
|
||||||
|
while (pQueryHandle->activeIndex < numOfTables) {
|
||||||
|
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
|
||||||
|
STable* pTable = pCheckInfo->pTableObj;
|
||||||
char* pData = NULL;
|
char* pData = NULL;
|
||||||
|
|
||||||
STSchema* pSchema = tsdbGetTableSchema(pTable);
|
int32_t numOfCols = pTable->lastColNum;
|
||||||
int32_t numOfCols = schemaNCols(pSchema);
|
|
||||||
int32_t tgNumOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
|
||||||
|
|
||||||
assert(numOfCols == pTable->restoreColumnNum);
|
if (pTable->lastCols == NULL || pTable->lastColNum <= 0) {
|
||||||
assert(pTable->lastCols != NULL);
|
tsdbWarn("no last cached for table, uid:%" PRIu64 ",tid:%d", pTable->tableId.uid, pTable->tableId.tid);
|
||||||
|
pQueryHandle->activeIndex++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t i = pQueryHandle->lastCols.i, j = pQueryHandle->lastCols.j;
|
int32_t i = 0, j = 0;
|
||||||
while(i < tgNumOfCols && j < numOfCols) {
|
while(i < tgNumOfCols && j < numOfCols) {
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
if (pTable->lastCols[j].colId < pColInfo->info.colId) {
|
if (pTable->lastCols[j].colId < pColInfo->info.colId) {
|
||||||
|
@ -2512,9 +2516,9 @@ static int32_t copyColsFromCacheMem(STsdbQueryHandle* pQueryHandle, int32_t capa
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
|
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
|
||||||
} else {
|
} else {
|
||||||
pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
|
pData = (char*)pColInfo->pData + (pQueryHandle->outputCapacity + numOfRows - 1) * pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTable->lastCols[j].bytes > 0) {
|
if (pTable->lastCols[j].bytes > 0) {
|
||||||
|
@ -2565,6 +2569,17 @@ static int32_t copyColsFromCacheMem(STsdbQueryHandle* pQueryHandle, int32_t capa
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
|
||||||
|
} else {
|
||||||
|
pData = (char*)pColInfo->pData + (pQueryHandle->outputCapacity + numOfRows - 1) * pColInfo->info.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n == 0) {
|
||||||
|
*(TSKEY *)pData = pTable->lastCols[j].ts;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
setVardataNull(pData, pColInfo->info.type);
|
setVardataNull(pData, pColInfo->info.type);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2572,49 +2587,17 @@ static int32_t copyColsFromCacheMem(STsdbQueryHandle* pQueryHandle, int32_t capa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
++i;
|
numOfRows++;
|
||||||
++j;
|
assert(numOfRows < pQueryHandle->outputCapacity);
|
||||||
|
|
||||||
if (i >= tgNumOfCols || j >= numOfCols) {
|
|
||||||
pQueryHandle->lastCols.i = 0;
|
|
||||||
pQueryHandle->lastCols.j = 0;
|
|
||||||
pQueryHandle->activeIndex++;
|
|
||||||
} else {
|
|
||||||
pQueryHandle->lastCols.i = i;
|
|
||||||
pQueryHandle->lastCols.j = j;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i++;
|
i++;
|
||||||
j++;
|
j++;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->lastCols.i = 0;
|
|
||||||
pQueryHandle->lastCols.j = 0;
|
|
||||||
pQueryHandle->activeIndex++;
|
pQueryHandle->activeIndex++;
|
||||||
|
|
||||||
return 0;
|
if (numOfRows > 0) {
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
|
|
||||||
// the last row is cached in buffer, return it directly.
|
|
||||||
// here note that the pQueryHandle->window must be the TS_INITIALIZER
|
|
||||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
|
|
||||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
|
||||||
assert(numOfTables > 0 && numOfCols > 0);
|
|
||||||
|
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
|
||||||
|
|
||||||
TSKEY key = TSKEY_INITIAL_VAL;
|
|
||||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
|
|
||||||
|
|
||||||
while (pQueryHandle->activeIndex < numOfTables) {
|
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
|
|
||||||
|
|
||||||
if (copyColsFromCacheMem(pQueryHandle, pQueryHandle->outputCapacity, 0, numOfCols, pCheckInfo->pTableObj, NULL)) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2850,7 +2833,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
|
||||||
TSDB_RLOCK_TABLE(pTable);
|
TSDB_RLOCK_TABLE(pTable);
|
||||||
*lastKey = pTable->lastKey;
|
*lastKey = pTable->lastKey;
|
||||||
|
|
||||||
if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow == 1) {
|
if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) {
|
||||||
*pRes = tdDataRowDup(pTable->lastRow);
|
*pRes = tdDataRowDup(pTable->lastRow);
|
||||||
if (*pRes == NULL) {
|
if (*pRes == NULL) {
|
||||||
TSDB_RUNLOCK_TABLE(pTable);
|
TSDB_RUNLOCK_TABLE(pTable);
|
||||||
|
@ -2899,7 +2882,6 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle, STableGroupInfo *grou
|
||||||
assert(pQueryHandle != NULL && groupList != NULL);
|
assert(pQueryHandle != NULL && groupList != NULL);
|
||||||
|
|
||||||
SDataRow pRow = NULL;
|
SDataRow pRow = NULL;
|
||||||
TSKEY key = TSKEY_INITIAL_VAL;
|
|
||||||
|
|
||||||
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
|
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
|
||||||
assert(group != NULL);
|
assert(group != NULL);
|
||||||
|
|
Loading…
Reference in New Issue