Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/ly-TS-4243-3.0
This commit is contained in:
commit
7a0ec25912
|
@ -199,6 +199,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
|
||||||
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
|
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
|
||||||
const SDataBlockInfo* pBlockInfo);
|
const SDataBlockInfo* pBlockInfo);
|
||||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
|
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
|
||||||
|
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc);
|
||||||
|
|
||||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
|
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
|
||||||
|
|
||||||
|
|
|
@ -1037,6 +1037,7 @@ typedef struct {
|
||||||
uint8_t scale;
|
uint8_t scale;
|
||||||
int32_t bytes;
|
int32_t bytes;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
uint8_t pk;
|
||||||
} SColumnInfo;
|
} SColumnInfo;
|
||||||
|
|
||||||
typedef struct STimeWindow {
|
typedef struct STimeWindow {
|
||||||
|
|
|
@ -490,9 +490,9 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataBlock->info.rows > 0) {
|
// if (pDataBlock->info.rows > 0) {
|
||||||
// ASSERT(pDataBlock->info.dataLoad == 1);
|
// ASSERT(pDataBlock->info.dataLoad == 1);
|
||||||
}
|
// }
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
if (numOfCols <= 0) {
|
if (numOfCols <= 0) {
|
||||||
|
@ -515,6 +515,51 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) {
|
||||||
|
if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
|
if (numOfCols <= 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
|
||||||
|
if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* skey = colDataGetData(pColInfoData, 0);
|
||||||
|
void* ekey = colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
|
||||||
|
|
||||||
|
if (asc) {
|
||||||
|
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
|
pDataBlock->info.pks[0].val = *(int64_t*) skey;
|
||||||
|
pDataBlock->info.pks[1].val = *(int64_t*) ekey;
|
||||||
|
} else { // todo refactor
|
||||||
|
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
|
||||||
|
pDataBlock->info.pks[0].nData = varDataLen(skey);
|
||||||
|
|
||||||
|
memcpy(pDataBlock->info.pks[1].pData, varDataVal(ekey), varDataLen(ekey));
|
||||||
|
pDataBlock->info.pks[1].nData = varDataLen(ekey);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
|
pDataBlock->info.pks[0].val = *(int64_t*) ekey;
|
||||||
|
pDataBlock->info.pks[1].val = *(int64_t*) skey;
|
||||||
|
} else { // todo refactor
|
||||||
|
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
|
||||||
|
pDataBlock->info.pks[0].nData = varDataLen(ekey);
|
||||||
|
|
||||||
|
memcpy(pDataBlock->info.pks[1].pData, varDataVal(skey), varDataLen(skey));
|
||||||
|
pDataBlock->info.pks[1].nData = varDataLen(skey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
|
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
|
||||||
int32_t capacity = pDest->info.capacity;
|
int32_t capacity = pDest->info.capacity;
|
||||||
|
|
||||||
|
|
|
@ -110,6 +110,10 @@ static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) {
|
||||||
|
|
||||||
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
|
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
|
||||||
int32_t numOfCols) {
|
int32_t numOfCols) {
|
||||||
|
pSupInfo->pk.pk = 0;
|
||||||
|
pSupInfo->numOfPks = 0;
|
||||||
|
pSupInfo->pk.slotId = -1;
|
||||||
|
|
||||||
pSupInfo->smaValid = true;
|
pSupInfo->smaValid = true;
|
||||||
pSupInfo->numOfCols = numOfCols;
|
pSupInfo->numOfCols = numOfCols;
|
||||||
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
|
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
|
||||||
|
@ -129,6 +133,11 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
|
||||||
} else {
|
} else {
|
||||||
pSupInfo->buildBuf[i] = NULL;
|
pSupInfo->buildBuf[i] = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pCols[i].pk) {
|
||||||
|
pSupInfo->pk = pCols[i];
|
||||||
|
pSupInfo->numOfPks += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -197,7 +206,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
|
||||||
pLReader->order = pReader->info.order;
|
pLReader->order = pReader->info.order;
|
||||||
pLReader->window = pReader->info.window;
|
pLReader->window = pReader->info.window;
|
||||||
pLReader->verRange = pReader->info.verRange;
|
pLReader->verRange = pReader->info.verRange;
|
||||||
pLReader->numOfPks = pReader->numOfPks;
|
pLReader->numOfPks = pReader->suppInfo.numOfPks;
|
||||||
|
|
||||||
pLReader->uid = 0;
|
pLReader->uid = 0;
|
||||||
tMergeTreeClose(&pLReader->mergeTree);
|
tMergeTreeClose(&pLReader->mergeTree);
|
||||||
|
@ -430,8 +439,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
||||||
pReader->type = pCond->type;
|
pReader->type = pCond->type;
|
||||||
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||||
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
|
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
|
||||||
pReader->numOfPks = -1;
|
|
||||||
pReader->pkChecked = false;
|
|
||||||
|
|
||||||
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
|
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -449,6 +456,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
||||||
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
||||||
|
|
||||||
|
if (pSup->numOfPks > 0) {
|
||||||
|
pReader->pkComparFn = getComparFunc(pSup->pk.type, 0);
|
||||||
|
}
|
||||||
|
|
||||||
code = tBlockDataCreate(&pReader->status.fileBlockData);
|
code = tBlockDataCreate(&pReader->status.fileBlockData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
@ -1519,12 +1530,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
|
||||||
TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
tsdbRowGetKey(pNextRow, &nextKey);
|
tsdbRowGetKey(pNextRow, &nextKey);
|
||||||
|
|
||||||
if (!pReader->pkChecked) {
|
|
||||||
pReader->pkComparFn = getComparFunc(pSttKey->key.pks[0].type, 0);
|
|
||||||
pReader->pkChecked = true;
|
|
||||||
pReader->numOfPks = pSttKey->key.numOfPKs;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) {
|
if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) {
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -1778,7 +1783,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
||||||
}
|
}
|
||||||
|
|
||||||
// key == tsLast. ts is equal and the primary key exists
|
// key == tsLast. ts is equal and the primary key exists
|
||||||
if (pReader->numOfPks > 0) {
|
if (pReader->suppInfo.numOfPks > 0) {
|
||||||
int32_t res = pkComp1(pReader, pSttKey, &fRow);
|
int32_t res = pkComp1(pReader, pSttKey, &fRow);
|
||||||
if (res < 0) {
|
if (res < 0) {
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
|
@ -2273,7 +2278,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
if (copied) {
|
if (copied) {
|
||||||
if (pReader->numOfPks == 0) {
|
if (pReader->suppInfo.numOfPks == 0) {
|
||||||
pBlockScanInfo->lastProcKey.key.ts = key;
|
pBlockScanInfo->lastProcKey.key.ts = key;
|
||||||
} else { // todo use deep copy instead of shallow copy
|
} else { // todo use deep copy instead of shallow copy
|
||||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
|
||||||
|
@ -2434,6 +2439,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
|
||||||
pResBlock->info.version = pReader->info.verRange.maxVer;
|
pResBlock->info.version = pReader->info.verRange.maxVer;
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
|
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
|
||||||
|
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pk.slotId, ASCENDING_TRAVERSE(pReader->info.order));
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
|
|
||||||
// todo update the pk range for current return data block
|
// todo update the pk range for current return data block
|
||||||
|
@ -2806,19 +2812,29 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
|
||||||
pInfo->version = pReader->info.verRange.maxVer;
|
pInfo->version = pReader->info.verRange.maxVer;
|
||||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
|
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
|
||||||
|
|
||||||
|
if (pReader->suppInfo.pk.pk) {
|
||||||
|
if (IS_NUMERIC_TYPE(pReader->suppInfo.pk.type)) {
|
||||||
|
pInfo->pks[0].val = pBlockInfo->firstPk.val;
|
||||||
|
pInfo->pks[1].val = pBlockInfo->lastPk.val;
|
||||||
|
} else {
|
||||||
|
memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen);
|
||||||
|
memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen);
|
||||||
|
|
||||||
|
pInfo->pks[0].nData = pBlockInfo->firstPKLen;
|
||||||
|
pInfo->pks[1].nData = pBlockInfo->lastPKLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setComposedBlockFlag(pReader, false);
|
setComposedBlockFlag(pReader, false);
|
||||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
|
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
|
||||||
|
|
||||||
// update the last key for the corresponding table
|
// update the last key for the corresponding table
|
||||||
SRowKey* pKey = &pScanInfo->lastProcKey.key;
|
SRowKey* pKey = &pScanInfo->lastProcKey.key;
|
||||||
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
|
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||||
pKey->numOfPKs = pReader->numOfPks;
|
pKey->numOfPKs = pReader->suppInfo.numOfPks;
|
||||||
|
|
||||||
// todo opt allocation, and handle varchar primary key
|
// todo opt allocation, and handle varchar primary key
|
||||||
pKey->pks[0].val = asc ? pBlockInfo->lastPrimaryKey.val : pBlockInfo->firstPrimaryKey.val;
|
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
|
||||||
|
|
||||||
pInfo->pks[0].val = pBlockInfo->firstPrimaryKey.val;
|
|
||||||
pInfo->pks[1].val = pBlockInfo->lastPrimaryKey.val;
|
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64
|
tsdbDebug("%p uid:%" PRIu64
|
||||||
" clean file block retrieved from file, global index:%d, "
|
" clean file block retrieved from file, global index:%d, "
|
||||||
|
@ -3478,14 +3494,6 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
|
||||||
int32_t order = pReader->info.order;
|
int32_t order = pReader->info.order;
|
||||||
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
|
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
|
||||||
|
|
||||||
if (!pReader->pkChecked) {
|
|
||||||
STsdbRowKey k;
|
|
||||||
tsdbRowGetKey(pRow, &k);
|
|
||||||
|
|
||||||
pReader->pkComparFn = getComparFunc(k.key.pks[0].type, 0);
|
|
||||||
pReader->pkChecked = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
|
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
|
||||||
pIter->hasVal = false;
|
pIter->hasVal = false;
|
||||||
|
|
|
@ -372,27 +372,21 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor
|
||||||
pBlockInfo->count = record->count;
|
pBlockInfo->count = record->count;
|
||||||
|
|
||||||
SRowKey* pFirstKey = &record->firstKey.key;
|
SRowKey* pFirstKey = &record->firstKey.key;
|
||||||
if (!pReader->pkChecked) {
|
|
||||||
pReader->pkChecked = true;
|
|
||||||
pReader->numOfPks = pFirstKey->numOfPKs;
|
|
||||||
pReader->pkComparFn = getComparFunc(pFirstKey->pks[0].type, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pFirstKey->numOfPKs > 0) {
|
if (pFirstKey->numOfPKs > 0) {
|
||||||
if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
|
if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
|
||||||
pBlockInfo->firstPrimaryKey.val = pFirstKey->pks[0].val;
|
pBlockInfo->firstPk.val = pFirstKey->pks[0].val;
|
||||||
pBlockInfo->lastPrimaryKey.val = record->lastKey.key.pks[0].val;
|
pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val;
|
||||||
|
|
||||||
pBlockInfo->firstPKLen = 0;
|
pBlockInfo->firstPKLen = 0;
|
||||||
pBlockInfo->lastPKLen = 0;
|
pBlockInfo->lastPKLen = 0;
|
||||||
} else { // todo handle memory alloc error, opt memory alloc perf
|
} else { // todo handle memory alloc error, opt memory alloc perf
|
||||||
pBlockInfo->firstPKLen = pFirstKey->pks[0].nData;
|
pBlockInfo->firstPKLen = pFirstKey->pks[0].nData;
|
||||||
pBlockInfo->firstPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen);
|
pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen);
|
||||||
memcpy(pBlockInfo->firstPrimaryKey.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen);
|
memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen);
|
||||||
|
|
||||||
pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData;
|
pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData;
|
||||||
pBlockInfo->lastPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen);
|
pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen);
|
||||||
memcpy(pBlockInfo->lastPrimaryKey.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen);
|
memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -404,8 +398,6 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
||||||
pBlockIter->numOfBlocks = numOfBlocks;
|
pBlockIter->numOfBlocks = numOfBlocks;
|
||||||
taosArrayClear(pBlockIter->blockList);
|
taosArrayClear(pBlockIter->blockList);
|
||||||
|
|
||||||
pBlockIter->pTableMap = pReader->status.pTableMap;
|
|
||||||
|
|
||||||
// access data blocks according to the offset of each block in asc/desc order.
|
// access data blocks according to the offset of each block in asc/desc order.
|
||||||
int32_t numOfTables = taosArrayGetSize(pTableList);
|
int32_t numOfTables = taosArrayGetSize(pTableList);
|
||||||
|
|
||||||
|
|
|
@ -157,8 +157,10 @@ typedef struct SBlockLoadSuppInfo {
|
||||||
SColumnDataAgg tsColAgg;
|
SColumnDataAgg tsColAgg;
|
||||||
int16_t* colId;
|
int16_t* colId;
|
||||||
int16_t* slotId;
|
int16_t* slotId;
|
||||||
int32_t numOfCols;
|
|
||||||
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
||||||
|
int32_t numOfCols;
|
||||||
|
int32_t numOfPks;
|
||||||
|
SColumnInfo pk;
|
||||||
bool smaValid; // the sma on all queried columns are activated
|
bool smaValid; // the sma on all queried columns are activated
|
||||||
} SBlockLoadSuppInfo;
|
} SBlockLoadSuppInfo;
|
||||||
|
|
||||||
|
@ -189,13 +191,13 @@ typedef struct SFileDataBlockInfo {
|
||||||
union {
|
union {
|
||||||
int64_t val;
|
int64_t val;
|
||||||
uint8_t* pData;
|
uint8_t* pData;
|
||||||
} firstPrimaryKey;
|
} firstPk;
|
||||||
|
|
||||||
int64_t lastKey;
|
int64_t lastKey;
|
||||||
union {
|
union {
|
||||||
int64_t val;
|
int64_t val;
|
||||||
uint8_t* pData;
|
uint8_t* pData;
|
||||||
} lastPrimaryKey;
|
} lastPk;
|
||||||
|
|
||||||
int32_t firstPKLen;
|
int32_t firstPKLen;
|
||||||
int32_t lastPKLen;
|
int32_t lastPKLen;
|
||||||
|
@ -217,7 +219,6 @@ typedef struct SDataBlockIter {
|
||||||
SArray* blockList; // SArray<SFileDataBlockInfo>
|
SArray* blockList; // SArray<SFileDataBlockInfo>
|
||||||
int32_t order;
|
int32_t order;
|
||||||
SDataBlk block; // current SDataBlk data
|
SDataBlk block; // current SDataBlk data
|
||||||
SSHashObj* pTableMap;
|
|
||||||
} SDataBlockIter;
|
} SDataBlockIter;
|
||||||
|
|
||||||
typedef struct SFileBlockDumpInfo {
|
typedef struct SFileBlockDumpInfo {
|
||||||
|
@ -280,8 +281,6 @@ struct STsdbReader {
|
||||||
TsdReaderNotifyCbFn notifyFn;
|
TsdReaderNotifyCbFn notifyFn;
|
||||||
void* notifyParam;
|
void* notifyParam;
|
||||||
__compar_fn_t pkComparFn;
|
__compar_fn_t pkComparFn;
|
||||||
int32_t numOfPks;
|
|
||||||
bool pkChecked;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SBrinRecordIter {
|
typedef struct SBrinRecordIter {
|
||||||
|
|
|
@ -370,7 +370,8 @@ static EDealRes getColumn(SNode** pNode, void* pContext) {
|
||||||
pSColumnNode->slotId = pData->index++;
|
pSColumnNode->slotId = pData->index++;
|
||||||
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
|
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
|
||||||
.type = pSColumnNode->node.resType.type,
|
.type = pSColumnNode->node.resType.type,
|
||||||
.bytes = pSColumnNode->node.resType.bytes};
|
.bytes = pSColumnNode->node.resType.bytes,
|
||||||
|
.pk = pSColumnNode->isPk};
|
||||||
#if TAG_FILTER_DEBUG
|
#if TAG_FILTER_DEBUG
|
||||||
qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
|
qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
|
||||||
#endif
|
#endif
|
||||||
|
@ -1763,6 +1764,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
||||||
pCond->colList[j].type = pColNode->node.resType.type;
|
pCond->colList[j].type = pColNode->node.resType.type;
|
||||||
pCond->colList[j].bytes = pColNode->node.resType.bytes;
|
pCond->colList[j].bytes = pColNode->node.resType.bytes;
|
||||||
pCond->colList[j].colId = pColNode->colId;
|
pCond->colList[j].colId = pColNode->colId;
|
||||||
|
pCond->colList[j].pk = pColNode->isPk;
|
||||||
|
|
||||||
pCond->pSlotList[j] = pNode->slotId;
|
pCond->pSlotList[j] = pNode->slotId;
|
||||||
j += 1;
|
j += 1;
|
||||||
|
|
|
@ -2276,6 +2276,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
|
||||||
pCond->colList->colId = 1;
|
pCond->colList->colId = 1;
|
||||||
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
pCond->colList->bytes = sizeof(TSKEY);
|
pCond->colList->bytes = sizeof(TSKEY);
|
||||||
|
pCond->colList->pk = 0;
|
||||||
|
|
||||||
pCond->pSlotList[0] = 0;
|
pCond->pSlotList[0] = 0;
|
||||||
|
|
||||||
|
|
|
@ -277,7 +277,12 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
|
||||||
return SCAN_TYPE_TABLE;
|
return SCAN_TYPE_TABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) {
|
|
||||||
|
static bool hasPkInTable(const STableMeta* pTableMeta) {
|
||||||
|
return pTableMeta->tableInfo.numOfColumns>=2 && pTableMeta->schema[1].flags & COL_IS_KEY;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema, const STableMeta* pMeta) {
|
||||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||||
if (NULL == pCol) {
|
if (NULL == pCol) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -287,11 +292,13 @@ static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) {
|
||||||
pCol->tableId = tableId;
|
pCol->tableId = tableId;
|
||||||
pCol->colId = pSchema->colId;
|
pCol->colId = pSchema->colId;
|
||||||
pCol->colType = COLUMN_TYPE_COLUMN;
|
pCol->colType = COLUMN_TYPE_COLUMN;
|
||||||
|
pCol->isPk = pSchema->flags & COL_IS_KEY;
|
||||||
|
pCol->tableHasPk = hasPkInTable(pMeta);
|
||||||
strcpy(pCol->colName, pSchema->name);
|
strcpy(pCol->colName, pSchema->name);
|
||||||
return (SNode*)pCol;
|
return (SNode*)pCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) {
|
static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols, const STableMeta* pMeta) {
|
||||||
bool found = false;
|
bool found = false;
|
||||||
SNode* pCol = NULL;
|
SNode* pCol = NULL;
|
||||||
FOREACH(pCol, *pCols) {
|
FOREACH(pCol, *pCols) {
|
||||||
|
@ -302,12 +309,12 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeL
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found) {
|
if (!found) {
|
||||||
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema));
|
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema, pMeta));
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addPkCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) {
|
static int32_t addPkCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols, const STableMeta* pMeta) {
|
||||||
bool found = false;
|
bool found = false;
|
||||||
SNode* pCol = NULL;
|
SNode* pCol = NULL;
|
||||||
FOREACH(pCol, *pCols) {
|
FOREACH(pCol, *pCols) {
|
||||||
|
@ -318,30 +325,26 @@ static int32_t addPkCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pC
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found) {
|
if (!found) {
|
||||||
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema));
|
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema, pMeta));
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasPkInTable(const STableMeta* pTableMeta) {
|
static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols, const STableMeta* pMeta) {
|
||||||
return pTableMeta->tableInfo.numOfColumns>=2 && pTableMeta->schema[1].flags & COL_IS_KEY;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) {
|
|
||||||
if (LIST_LENGTH(*pCols) > 0) {
|
if (LIST_LENGTH(*pCols) > 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema));
|
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema, pMeta));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addDefaultScanCol(const STableMeta* pMeta, SNodeList** pCols) {
|
static int32_t addDefaultScanCol(const STableMeta* pMeta, SNodeList** pCols) {
|
||||||
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
|
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
|
||||||
return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols);
|
return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols, pMeta);
|
||||||
}
|
}
|
||||||
if (hasPkInTable(pMeta)) {
|
if (hasPkInTable(pMeta)) {
|
||||||
addPkCol(pMeta->uid, pMeta->schema + 1, pCols);
|
addPkCol(pMeta->uid, pMeta->schema + 1, pCols, pMeta);
|
||||||
}
|
}
|
||||||
return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols);
|
return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols, pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealTable, bool hasRepeatScanFuncs,
|
static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealTable, bool hasRepeatScanFuncs,
|
||||||
|
|
Loading…
Reference in New Issue