Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/ly-TS-4243-3.0

This commit is contained in:
54liuyao 2024-03-22 08:37:42 +08:00
commit 680f96156f
9 changed files with 279 additions and 173 deletions

View File

@ -535,8 +535,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
if (asc) {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int64_t*) skey;
pDataBlock->info.pks[1].val = *(int64_t*) ekey;
pDataBlock->info.pks[0].val = *(int32_t*) skey;
pDataBlock->info.pks[1].val = *(int32_t*) ekey;
} else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
pDataBlock->info.pks[0].nData = varDataLen(skey);

View File

@ -125,8 +125,8 @@ int32_t tsdbRowCompare(const void *p1, const void *p2);
int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2);
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
void tsdbColRowGetKey(SBlockData *pBlock, int32_t irow, STsdbRowKey *key);
int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey *pSrc);
void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key);
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
// STSDBRowIter
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);

View File

@ -419,7 +419,6 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
return code;
}
#if 0
// load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
TStatisBlkArray *pStatisBlkArray = NULL;
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
@ -434,7 +433,6 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
return code;
}
#endif
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
@ -817,7 +815,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
tMergeTreeAddIter(pMTree, pIter);
// let's record the time window for current table of uid in the stt files
if (pSttDataInfo != NULL) {
if (pSttDataInfo != NULL && numOfRows > 0) {
taosArrayPush(pSttDataInfo->pTimeWindowList, &w);
pSttDataInfo->numOfRows += numOfRows;
}

View File

@ -25,6 +25,15 @@
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey))
#define tRowGetKeyEx(_pRow, _pKey) \
do { \
if ((_pRow)->type == TSDBROW_ROW_FMT) { \
tRowGetKey((_pRow)->pTSRow, (_pKey)); \
} else { \
tColRowGetKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \
} \
} while (0)
typedef struct {
bool overlapWithNeighborBlock;
bool hasDupTs;
@ -41,8 +50,8 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
STsdbRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pCurKey, SArray* pDelList,
SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
STableBlockScanInfo* pScanInfo);
@ -93,26 +102,84 @@ static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val);
}
static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) {
static int32_t pkComp1(STsdbReader* pReader, SRowKey* p1, TSDBROW* p2) {
if (pReader->pkComparFn == NULL) {
ASSERT(p1->key.ts != TSDBROW_TS(p2));
ASSERT(p1->ts != TSDBROW_TS(p2));
return 0;
}
STsdbRowKey k2 = {0};
tsdbRowGetKey(p2, &k2);
return pReader->pkComparFn(&p1->key.pks[0].val, &k2.key.pks[0].val);
SRowKey k2 = {0};
tRowGetKeyEx(p2, &k2);
return pReader->pkComparFn(&p1->pks[0].val, &k2.pks[0].val);
}
static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) {
return pReader->pkComparFn(&p1->key.pks[0].val, &p2->key.pks[0].val);
static int32_t pkComp2(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slotId, SRowKey* pKey) {
pKey->ts = pBlock->aTSKEY[irow];
if (slotId == -1) {
pKey->numOfPKs = 0;
return;
}
pKey->numOfPKs = 1;
SColData* pColData = &pBlock->aColData[slotId];
SColVal cv;
tColDataGetValue(pColData, irow, &cv);
if (IS_NUMERIC_TYPE(cv.value.type)) {
pKey->pks[0].val = cv.value.val;
} else {
pKey->pks[0].nData = cv.value.nData;
memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData);
}
}
// for test purpose, todo remove it
static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) {
int32_t n = 0;
n += tGetI8(p + n, &index->type);
n += tGetU32v(p + n, &index->offset);
return n;
}
static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
pKey->ts = pKey->ts;
pKey->numOfPKs = pRow->numOfPKs;
if (pKey->numOfPKs == 0) {
return;
}
SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
uint8_t *data = pRow->data;
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
data += tGetPrimaryKeyIndex(data, &indices[i]);
}
// primary keys
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
pKey->pks[i].type = indices[i].type;
if (IS_VAR_DATA_TYPE(indices[i].type)) {
tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData);
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData);
pKey->pks[i].pData += pKey->pks[i].nData;
} else {
pKey->pks[i].val = *(int64_t*) data + indices[i].offset;
}
}
}
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) {
pSupInfo->pk.pk = 0;
pSupInfo->numOfPks = 0;
pSupInfo->pk.slotId = -1;
pSupInfo->pkSrcSlot = -1;
pSupInfo->pkDstSlot = -1;
pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols;
@ -136,6 +203,8 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
if (pCols[i].pk) {
pSupInfo->pk = pCols[i];
pSupInfo->pkSrcSlot = i;
pSupInfo->pkDstSlot = pSlotIdList[i];
pSupInfo->numOfPks += 1;
}
}
@ -207,6 +276,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
pLReader->window = pReader->info.window;
pLReader->verRange = pReader->info.verRange;
pLReader->numOfPks = pReader->suppInfo.numOfPks;
pLReader->pkComparFn = pReader->pkComparFn;
pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree);
@ -431,13 +501,15 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
initReaderStatus(&pReader->status);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->info.suid = pCond->suid;
pReader->info.order = pCond->order;
pReader->info.verRange = getQueryVerRange(pVnode, pCond, level);
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->info.verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
@ -621,9 +693,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
// todo: here we should find the first timestamp that is greater than the lastProcKey
if (asc) {
w.skey = pScanInfo->lastProcKey.key.ts + step;
w.skey = pScanInfo->lastProcKey.ts + step;
} else {
w.ekey = pScanInfo->lastProcKey.key.ts + step;
w.ekey = pScanInfo->lastProcKey.ts + step;
}
if (isEmptyQueryTimeWindow(&w)) {
@ -699,6 +771,24 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int
// pDumpInfo->lastKey.key.ts = maxKey + step;
}
static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks,
bool asc) {
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = numOfPks;
if (pKey->numOfPKs <= 0) {
return;
}
if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
} else {
uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData;
pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen;
memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData);
}
}
static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
@ -954,7 +1044,7 @@ static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInf
record->count = pBlockInfo->count;
}
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLastProcKey) {
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastProcKey) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
@ -1103,7 +1193,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLas
pResBlock->info.rows = dumpedRows;
pDumpInfo->rowIndex += step * dumpedRows;
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey);
tColRowGetKeyDeepCopy(pBlockData, pDumpInfo->rowIndex - step, pSupInfo->pkSrcSlot, pLastProcKey);
// check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
@ -1426,11 +1516,11 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
STsdbRowKey rowKey, nextRowKey;
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey);
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
SRowKey rowKey, nextRowKey;
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
if (rowKey.key.ts != nextRowKey.key.ts || (pkComp2(pReader, &rowKey, &nextRowKey) != 0)) { // merge is not needed
if (rowKey.ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, &rowKey, &nextRowKey) != 0)) { // merge is not needed
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
if (code) {
return code;
@ -1462,26 +1552,13 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
if (pSttBlockReader->numOfPks < 0) {// todo handle the deep copy problem
tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
pSttBlockReader->numOfPks = pSttBlockReader->currentKey.key.numOfPKs;
if (pSttBlockReader->numOfPks > 0) {
pSttBlockReader->pkComparFn = getComparFunc(pSttBlockReader->currentKey.key.pks[0].type, 0);
}
if (pSttBlockReader->numOfPks == 0) {
pSttBlockReader->currentKey.ts = key;
// todo handle error
pScanInfo->sttKeyInfo.nextProcKey = key;
} else { // todo handle the deep copy problem
tColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
pScanInfo->sttKeyInfo.nextProcKey = key;
} else {
if (pSttBlockReader->numOfPks == 0) {
pSttBlockReader->currentKey.key.ts = key;
pSttBlockReader->currentKey.version = ver;
// todo handle error
pScanInfo->sttKeyInfo.nextProcKey = key;
} else {
// todo handle the deep copy problem
tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
pScanInfo->sttKeyInfo.nextProcKey = key;
}
}
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
@ -1500,22 +1577,22 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
static int32_t pkCompEx(__compar_fn_t comparFn, STsdbRowKey* p1, STsdbRowKey* p2) {
if (p1->key.ts < p2->key.ts) {
static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
if (p1->ts < p2->ts) {
return -1;
} else if (p1->key.ts > p2->key.ts) {
} else if (p1->ts > p2->ts) {
return 1;
}
if (p1->key.numOfPKs == 0) {
if (p1->numOfPKs == 0) {
return 0;
} else {
return comparFn(&p1->key.pks[0].val, &p2->key.pks[0].val);
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
}
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
STableBlockScanInfo* pScanInfo, STsdbRowKey* pSttKey, STsdbReader* pReader,
STableBlockScanInfo* pScanInfo, SRowKey* pSttKey, STsdbReader* pReader,
bool* copied) {
int32_t code = TSDB_CODE_SUCCESS;
*copied = false;
@ -1525,10 +1602,10 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
doUnpinSttBlock(pSttBlockReader);
if (hasVal) {
STsdbRowKey nextKey;
SRowKey nextKey;
TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowGetKey(pNextRow, &nextKey);
tRowGetKeyEx(pNextRow, &nextKey);
if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) {
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
@ -1586,20 +1663,20 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
}
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SSttBlockReader* pSttBlockReader) {
SIterInfo* pIter, SSttBlockReader* pSttBlockReader) {
SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
__compar_fn_t compFn = pReader->pkComparFn;
STsdbRowKey* pSttKey = NULL;
SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
}
STsdbRowKey k;
tsdbRowGetKey(pRow, &k);
SRowKey k;
tRowGetKeyEx(pRow, &k);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -1609,9 +1686,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
STsdbRowKey* pfKey = &(STsdbRowKey){0};
SRowKey* pfKey = &(SRowKey){0};
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
} else {
pfKey = NULL;
}
@ -1627,8 +1704,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
// int64_t minKey = 0;
STsdbRowKey minKey;
SRowKey minKey;
if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = k; // chosen the minimum value
@ -1757,12 +1833,12 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
} else {
// row in both stt file blocks and data file blocks
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < pSttKey->key.ts) { // asc
if (key < pSttKey->ts) { // asc
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key > pSttKey->key.ts) {
} else if (key > pSttKey->ts) {
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
}
@ -1776,9 +1852,9 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
}
}
} else { // desc
if (key > pSttKey->key.ts) {
if (key > pSttKey->ts) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key < pSttKey->key.ts) {
} else if (key < pSttKey->ts) {
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
}
@ -1854,21 +1930,21 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
STsdbRowKey* pSttKey = NULL;
SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
}
STsdbRowKey* pfKey = &(STsdbRowKey){0};
SRowKey* pfKey = &(SRowKey){0};
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
} else {
pfKey = NULL;
}
STsdbRowKey k, ik;
tsdbRowGetKey(pRow, &k);
tsdbRowGetKey(piRow, &ik);
SRowKey k, ik;
tRowGetKeyEx(pRow, &k);
tRowGetKeyEx(piRow, &ik);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -1895,7 +1971,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
STsdbRowKey minKey;
SRowKey minKey;
if (ASCENDING_TRAVERSE(pReader->info.order)) {
minKey = k; // let's find the minimum
@ -2070,13 +2146,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
if (ASCENDING_TRAVERSE(pReader->info.order)) {
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
.key = {
.ts = pBlockScanInfo->lastProcKey.key.ts + 1,
.ts = pBlockScanInfo->lastProcKey.ts + 1,
.numOfPKs = 0, // TODO: change here if multi-key is supported
}};
} else {
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
.key = {
.ts = pBlockScanInfo->lastProcKey.key.ts - 1,
.ts = pBlockScanInfo->lastProcKey.ts - 1,
.numOfPKs = 0, // TODO: change here if multi-key is supported
}};
}
@ -2121,14 +2197,14 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
return false;
}
if ((asc && (ts < pBlockScanInfo->lastProcKey.key.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.key.ts))) {
if ((asc && (ts < pBlockScanInfo->lastProcKey.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.ts))) {
return false;
}
if (ts == pBlockScanInfo->lastProcKey.key.ts) { // todo opt perf
STsdbRowKey nextRowKey;
tsdbColRowGetKey(pBlockData, rowIndex, &nextRowKey);
if (pkComp2(pReader, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
if (ts == pBlockScanInfo->lastProcKey.ts) { // todo opt perf
SRowKey nextRowKey;
tColRowGetKey(pBlockData, rowIndex, &nextRowKey);
if (pkComp2(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
return false;
}
}
@ -2202,7 +2278,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
if (0 /*conf.rspRows*/) {
if (conf.rspRows) {
pScanInfo->cleanSttBlocks =
isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order);
@ -2226,7 +2302,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = true;
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
} else { // not clean stt blocks
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
pScanInfo->sttBlockReturned = false;
@ -2278,12 +2354,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
}
if (copied) {
if (pReader->suppInfo.numOfPks == 0) {
pBlockScanInfo->lastProcKey.key.ts = key;
} else { // todo use deep copy instead of shallow copy
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
}
// if (pReader->suppInfo.numOfPks == 0) {
// pBlockScanInfo->lastProcKey.ts = key;
// } else { // todo use deep copy instead of shallow copy
// int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
// tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
// }
return TSDB_CODE_SUCCESS;
} else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
@ -2312,11 +2388,9 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
STsdbReader* pReader) {
bool copied = false;
SRow* pTSRow = NULL;
STsdbRowKey* pSttKey = NULL;//getCurrentKeyInSttBlock(pSttBlockReader);
SRowKey sttKey = {0};
STsdbRowKey newSttKey;
tsdbRowKeyAssign(&newSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
pSttKey = &newSttKey;
tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader));
SRowMerger* pMerger = &pReader->status.merger;
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
@ -2325,13 +2399,13 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, pSttKey, pReader, &copied);
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, &sttKey, pReader, &copied);
if (code) {
return code;
}
if (copied) {
tsdbRowKeyAssign(&pBlockScanInfo->lastProcKey, pSttKey);
// tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
@ -2341,7 +2415,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
@ -2379,12 +2453,12 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
// imem + file + stt block
if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader);
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, pSttBlockReader);
}
// mem + file + stt block
if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader);
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, pSttBlockReader);
}
// files data blocks + stt block
@ -2439,7 +2513,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
pResBlock->info.version = pReader->info.verRange.maxVer;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pk.slotId, ASCENDING_TRAVERSE(pReader->info.order));
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pkDstSlot, ASCENDING_TRAVERSE(pReader->info.order));
setComposedBlockFlag(pReader, true);
// todo update the pk range for current return data block
@ -2789,7 +2863,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
ASSERT(0);
pScanInfo->lastProcKey.key.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
pScanInfo->sttBlockReturned = true;
pSttBlockReader->mergeTree.pIter = NULL;
@ -2825,16 +2899,10 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
}
}
// update the last key for the corresponding table
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
// update the last key for the corresponding table
SRowKey* pKey = &pScanInfo->lastProcKey.key;
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = pReader->suppInfo.numOfPks;
// todo opt allocation, and handle varchar primary key
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
updateLastKeyInfo(&pScanInfo->lastProcKey, pBlockInfo, pInfo, pReader->suppInfo.numOfPks, asc);
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
@ -3041,10 +3109,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
// data in stt now overlaps with current active file data block, need to composed with file data block.
STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
if ((pSttKey->key.ts >= pBlockInfo->firstKey && asc) || (pSttKey->key.ts <= pBlockInfo->lastKey && (!asc))) {
SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
if ((pSttKey->ts >= pBlockInfo->firstKey && asc) || (pSttKey->ts <= pBlockInfo->lastKey && (!asc))) {
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
pSttKey->key.ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
pSttKey->ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
break;
}
}
@ -3539,7 +3607,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
}
}
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) {
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) {
SRowMerger* pMerger = &pReader->status.merger;
while (1) {
@ -3555,7 +3623,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, S
}
// ts is not identical, quit
if (TSDBROW_TS(pRow) != pCurKey->key.ts) {
if (TSDBROW_TS(pRow) != pCurKey->ts) {
break;
}
@ -3659,10 +3727,10 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbRowKey* pRowKey,
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) {
STsdbRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader);
SRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader);
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, next1);
if (ret == 0) {
@ -3685,8 +3753,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow;
STsdbRowKey curKey = {0};
tsdbRowGetKey(&current, &curKey);
SRowKey curKey = {0};
tRowGetKeyEx(&current, &curKey);
{ // if the timestamp of the next valid row has a different ts, return current row directly
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
@ -3762,9 +3830,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
SRow** pTSRow) {
SRowMerger* pMerger = &pReader->status.merger;
STsdbRowKey k, ik;
tsdbRowGetKey(pRow, &k);
tsdbRowGetKey(piRow, &ik);
SRowKey k, ik;
tRowGetKeyEx(pRow, &k);
tRowGetKeyEx(piRow, &ik);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -3931,11 +3999,6 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
pBlock->info.dataLoad = 1;
pBlock->info.rows += 1;
// todo no version
TSDBROW row = {.pTSRow = pTSRow, .type = TSDBROW_ROW_FMT};
tsdbRowGetKey(&row, &pScanInfo->lastProcKey);
// pScanInfo->lastProcKey = pTSRow->ts;
return TSDB_CODE_SUCCESS;
}
@ -4001,7 +4064,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
}
if (row.type == TSDBROW_ROW_FMT) {
int64_t ts = row.pTSRow->ts;
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (freeTSRow) {
@ -4012,13 +4074,14 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
return code;
}
tsdbRowGetKey(&row, &pBlockScanInfo->lastProcKey);
tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey);
} else {
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) {
return code;
}
tsdbColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey);
tColRowGetKeyDeepCopy(row.pBlockData, row.iRow, pReader->suppInfo.pkSrcSlot, &pBlockScanInfo->lastProcKey);
}
// no data in buffer, return immediately

View File

@ -165,18 +165,23 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
int64_t skey = pTsdbReader->info.window.skey;
pScanInfo->lastProcKey.key.ts = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastProcKey.ts = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->sttKeyInfo.nextProcKey = skey;
} else {
int64_t ekey = pTsdbReader->info.window.ekey;
pScanInfo->lastProcKey.key.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastProcKey.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->sttKeyInfo.nextProcKey = ekey;
}
pScanInfo->lastProcKey.numOfPKs = pTsdbReader->suppInfo.numOfPks;
if (pTsdbReader->suppInfo.numOfPks > 0 && IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) {
pScanInfo->lastProcKey.pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes);
}
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastProcKey.key.ts, pTsdbReader->idStr);
pScanInfo->lastProcKey.ts, pTsdbReader->idStr);
}
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
@ -209,7 +214,7 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastProcKey.key.ts = ts;
pInfo->lastProcKey.ts = ts;
ASSERT(0);
pInfo->sttKeyInfo.nextProcKey = ts + step;

View File

@ -87,8 +87,7 @@ typedef struct SSttKeyInfo {
// 4. not overlap with data file blocks
typedef struct STableBlockScanInfo {
uint64_t uid;
// TSKEY lastProcKey; // todo: refactor: add primary key
STsdbRowKey lastProcKey;
SRowKey lastProcKey;
SSttKeyInfo sttKeyInfo;
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
@ -161,6 +160,8 @@ typedef struct SBlockLoadSuppInfo {
int32_t numOfCols;
int32_t numOfPks;
SColumnInfo pk;
int32_t pkSrcSlot;
int32_t pkDstSlot;
bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo;
@ -171,7 +172,7 @@ typedef struct SSttBlockReader {
int32_t order;
uint64_t uid;
SMergeTree mergeTree;
STsdbRowKey currentKey;
SRowKey currentKey;
int32_t numOfPks;
__compar_fn_t pkComparFn;
} SSttBlockReader;

View File

@ -606,27 +606,13 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
tRowGetKey(row->pTSRow, &key->key);
} else {
key->version = row->pBlockData->aVersion[row->iRow];
key->key.ts = row->pBlockData->aTSKEY[row->iRow];
key->key.numOfPKs = 0;
for (int32_t i = 0; i < row->pBlockData->nColData; i++) {
SColData *pColData = &row->pBlockData->aColData[i];
if (pColData->cflag & COL_IS_KEY) {
SColVal cv;
tColDataGetValue(pColData, row->iRow, &cv);
ASSERT(COL_VAL_IS_VALUE(&cv));
key->key.pks[key->key.numOfPKs] = cv.value;
key->key.numOfPKs++;
} else {
break;
}
}
tColRowGetKey(row->pBlockData, row->iRow, &key->key);
}
}
void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) {
key->version = pBlock->aVersion[irow];
key->key.ts = pBlock->aTSKEY[irow];
key->key.numOfPKs = 0;
void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) {
key->ts = pBlock->aTSKEY[irow];
key->numOfPKs = 0;
for (int32_t i = 0; i < pBlock->nColData; i++) {
SColData *pColData = &pBlock->aColData[i];
@ -634,37 +620,27 @@ void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) {
SColVal cv;
tColDataGetValue(pColData, irow, &cv);
ASSERT(COL_VAL_IS_VALUE(&cv));
key->key.pks[key->key.numOfPKs] = cv.value;
key->key.numOfPKs++;
key->pks[key->numOfPKs] = cv.value;
key->numOfPKs++;
} else {
break;
}
}
}
int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey* pSrc) {
pDst->version = pSrc->version;
if (pSrc->key.numOfPKs == 0) {
pDst->key.ts = pSrc->key.ts;
pDst->key.numOfPKs = 0;
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) {
if (pSrc->numOfPKs == 0) {
pDst->ts = pSrc->ts;
pDst->numOfPKs = 0;
} else {
pDst->key = pSrc->key;
*pDst = *pSrc;
for (int32_t i = 0; i < pDst->key.numOfPKs; ++i) {
SValue *pVal = &pDst->key.pks[i];
for (int32_t i = 0; i < pDst->numOfPKs; ++i) {
SValue *pVal = &pDst->pks[i];
if (IS_NUMERIC_TYPE(pVal->type)) {
continue;
}
uint8_t *p = taosMemoryMalloc(pVal->nData);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
memcpy(p, pVal->pData, pVal->nData);
pVal->pData = p;
memcpy(pVal->pData, pVal->pData, pVal->nData);
}
}

View File

@ -1197,6 +1197,21 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
{ // todo :refactor:
for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i);
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId);
pInfo->pResBlock->info.pks[0].type = pInfoData->info.type;
pInfo->pResBlock->info.pks[1].type = pInfoData->info.type;
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
pInfo->pResBlock->info.pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
pInfo->pResBlock->info.pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
}
}
}
}
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -1322,8 +1322,56 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
static bool sortPriKeyOptHasUnsupportedPkFunc(SLogicNode* pLogicNode, EOrder sortOrder) {
if (sortOrder == ORDER_ASC) {
return false;
}
SNodeList* pFuncList = NULL;
switch (nodeType(pLogicNode)) {
case QUERY_NODE_LOGIC_PLAN_AGG:
pFuncList = ((SAggLogicNode*)pLogicNode)->pAggFuncs;
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
pFuncList = ((SWindowLogicNode*)pLogicNode)->pFuncs;
break;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
pFuncList = ((SPartitionLogicNode*)pLogicNode)->pAggFuncs;
break;
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
pFuncList = ((SIndefRowsFuncLogicNode*)pLogicNode)->pFuncs;
break;
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
pFuncList = ((SInterpFuncLogicNode*)pLogicNode)->pFuncs;
break;
default:
break;
}
SNode* pNode = 0;
FOREACH(pNode, pFuncList) {
if (nodeType(pNode) != QUERY_NODE_FUNCTION) {
continue;
}
SFunctionNode* pFuncNode = (SFunctionNode*)pLogicNode;
if (pFuncNode->hasPk &&
(pFuncNode->funcType == FUNCTION_TYPE_DIFF ||
pFuncNode->funcType == FUNCTION_TYPE_DERIVATIVE ||
pFuncNode->funcType == FUNCTION_TYPE_IRATE ||
pFuncNode->funcType == FUNCTION_TYPE_TWA)) {
return true;
}
}
return false;
}
static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool groupSort, EOrder sortOrder,
bool* pNotOptimize, SNodeList** pSequencingNodes) {
if (sortPriKeyOptHasUnsupportedPkFunc(pNode, sortOrder)) {
*pNotOptimize = true;
return TSDB_CODE_SUCCESS;
}
if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
*pNotOptimize = false;
return TSDB_CODE_SUCCESS;