optimize scan tsdb

This commit is contained in:
54liuyao 2024-05-15 09:09:20 +08:00
parent 56a1e7e01f
commit ab3ba3f2e4
11 changed files with 146 additions and 104 deletions

View File

@ -95,6 +95,15 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
#define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL) #define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL)
#define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE) #define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE)
#define tRowGetKey(_pRow, _pKey) \
do { \
(_pKey)->ts = (_pRow)->ts; \
(_pKey)->numOfPKs = 0; \
if ((_pRow)->numOfPKs > 0) { \
tRowGetPrimaryKey((_pRow), (_pKey)); \
} \
} while (0)
// SValueColumn ================================ // SValueColumn ================================
typedef struct { typedef struct {
int8_t cmprAlg; // filled by caller int8_t cmprAlg; // filled by caller
@ -125,8 +134,8 @@ void tRowDestroy(SRow *pRow);
int32_t tRowSort(SArray *aRowP); int32_t tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);
void tRowGetKey(SRow *pRow, SRowKey *key); void tRowGetPrimaryKey(SRow *pRow, SRowKey *key);
int32_t tRowKeyCompare(const void *p1, const void *p2); int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2);
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
// SRowIter ================================ // SRowIter ================================

View File

@ -1183,8 +1183,7 @@ int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, in
} }
} }
void tRowGetKey(SRow *row, SRowKey *key) { void tRowGetPrimaryKey(SRow *row, SRowKey *key) {
key->ts = row->ts;
key->numOfPKs = row->numOfPKs; key->numOfPKs = row->numOfPKs;
if (key->numOfPKs == 0) { if (key->numOfPKs == 0) {
@ -1283,10 +1282,7 @@ int32_t tValueCompare(const SValue *tv1, const SValue *tv2) {
// NOTE: // NOTE:
// set key->numOfPKs to 0 as the smallest key with ts // set key->numOfPKs to 0 as the smallest key with ts
// set key->numOfPKs to (TD_MAX_PK_COLS + 1) as the largest key with ts // set key->numOfPKs to (TD_MAX_PK_COLS + 1) as the largest key with ts
int32_t tRowKeyCompare(const void *p1, const void *p2) { FORCE_INLINE int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2) {
SRowKey *key1 = (SRowKey *)p1;
SRowKey *key2 = (SRowKey *)p2;
if (key1->ts < key2->ts) { if (key1->ts < key2->ts) {
return -1; return -1;
} else if (key1->ts > key2->ts) { } else if (key1->ts > key2->ts) {

View File

@ -120,12 +120,32 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
#define tsdbRowFromBlockData(BLOCKDATA, IROW) \ #define tsdbRowFromBlockData(BLOCKDATA, IROW) \
((TSDBROW){.type = TSDBROW_COL_FMT, .pBlockData = (BLOCKDATA), .iRow = (IROW)}) ((TSDBROW){.type = TSDBROW_COL_FMT, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
#define TSDBROW_INIT_KEY(_ROW, _KEY) \
{ \
if ((_ROW)->type == TSDBROW_ROW_FMT) { \
_KEY.version = (_ROW)->version; \
_KEY.ts = (_ROW)->pTSRow->ts; \
} else { \
_KEY.version = (_ROW)->pBlockData->aVersion[(_ROW)->iRow]; \
_KEY.ts = (_ROW)->pBlockData->aTSKEY[(_ROW)->iRow]; \
} \
}
#define tColRowGetKey(_pBlock, _irow, _key) \
{ \
(_key)->ts = (_pBlock)->aTSKEY[(_irow)]; \
(_key)->numOfPKs = 0; \
if ((_pBlock)->nColData > 0) { \
tColRowGetPrimaryKey((_pBlock), (_irow), (_key)); \
} \
}
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tsdbRowCompare(const void *p1, const void *p2); int32_t tsdbRowCompare(const void *p1, const void *p2);
int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2); int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2);
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2); int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key); void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key); void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key);
// STSDBRowIter // STSDBRowIter
@ -946,8 +966,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
// #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) // #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
// #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) // #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter);
typedef struct { typedef struct {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;

View File

@ -16,6 +16,7 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbDataFileRW.h" #include "tsdbDataFileRW.h"
#include "tsdbIter.h"
#include "tsdbReadUtil.h" #include "tsdbReadUtil.h"
#include "vnd.h" #include "vnd.h"

View File

@ -66,6 +66,29 @@ int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid);
SRowInfo *tsdbIterMergerGetData(SIterMerger *merger); SRowInfo *tsdbIterMergerGetData(SIterMerger *merger);
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger); STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger);
FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
if (pIter == NULL) return NULL;
if (pIter->pRow) {
return pIter->pRow;
}
if (pIter->backward) {
if (pIter->pNode == pIter->pTbData->sl.pHead) {
return NULL;
}
} else {
if (pIter->pNode == pIter->pTbData->sl.pTail) {
return NULL;
}
}
pIter->pRow = &pIter->row;
pIter->row = pIter->pNode->row;
return pIter->pRow;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -806,26 +806,3 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
_exit: _exit:
return aTbDataP; return aTbDataP;
} }
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
if (pIter == NULL) return NULL;
if (pIter->pRow) {
return pIter->pRow;
}
if (pIter->backward) {
if (pIter->pNode == pIter->pTbData->sl.pHead) {
return NULL;
}
} else {
if (pIter->pNode == pIter->pTbData->sl.pTail) {
return NULL;
}
}
pIter->pRow = &pIter->row;
pIter->row = pIter->pNode->row;
return pIter->pRow;
}

View File

@ -828,7 +828,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTre
SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node)); SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node)); SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
SRowKey rkey1, rkey2; SRowKey rkey1 = {0}, rkey2 = {0};
tRowGetKeyEx(&pIter1->rInfo.row, &rkey1); tRowGetKeyEx(&pIter1->rInfo.row, &rkey1);
tRowGetKeyEx(&pIter2->rInfo.row, &rkey2); tRowGetKeyEx(&pIter2->rInfo.row, &rkey2);

View File

@ -24,6 +24,16 @@
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey)) #define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey))
#define tColRowGetKeyDeepCopy(_pBlock, _irow, _slotId, _pKey) \
do { \
(_pKey)->ts = (_pBlock)->aTSKEY[(_irow)]; \
(_pKey)->numOfPKs = 0; \
if ((_slotId) != -1) { \
tColRowGetPriamyKeyDeepCopy(_pBlock, _irow, _slotId, _pKey); \
} \
} while(0)
#define outOfTimeWindow(_ts, _window) (((_ts) > (_window)->ekey) || ((_ts) < (_window)->skey))
typedef struct { typedef struct {
bool overlapWithNeighborBlock; bool overlapWithNeighborBlock;
@ -75,11 +85,9 @@ static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader); static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
int32_t pkCompEx(SRowKey* p1, SRowKey* p2) { FORCE_INLINE int32_t pkCompEx(SRowKey* p1, SRowKey* p2) {
if (p2 == NULL) { if (p2 == NULL) {
return 1; return 1;
} }
@ -101,13 +109,7 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2) {
} }
} }
static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slotId, SRowKey* pKey) { static void tColRowGetPriamyKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slotId, SRowKey* pKey) {
pKey->ts = pBlock->aTSKEY[irow];
if (slotId == -1) {
pKey->numOfPKs = 0;
return;
}
SColData* pColData = &pBlock->aColData[slotId]; SColData* pColData = &pBlock->aColData[slotId];
SColVal cv; SColVal cv;
tColDataGetValue(pColData, irow, &cv); tColDataGetValue(pColData, irow, &cv);
@ -131,13 +133,7 @@ static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) {
return n; return n;
} }
static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { static void tRowGetPrimaryKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
pKey->ts = pRow->ts;
pKey->numOfPKs = pRow->numOfPKs;
if (pKey->numOfPKs == 0) {
return;
}
SPrimaryKeyIndex indices[TD_MAX_PK_COLS]; SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
ASSERT(pKey->numOfPKs <= TD_MAX_PK_COLS); ASSERT(pKey->numOfPKs <= TD_MAX_PK_COLS);
@ -1735,7 +1731,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
} }
SRowKey k; SRowKey k = {0};
tRowGetKeyEx(pRow, &k); tRowGetKeyEx(pRow, &k);
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
@ -1939,7 +1935,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
pfKey = NULL; pfKey = NULL;
} }
SRowKey k, ik; SRowKey k = {0}, ik = {0};
tRowGetKeyEx(pRow, &k); tRowGetKeyEx(pRow, &k);
tRowGetKeyEx(piRow, &ik); tRowGetKeyEx(piRow, &ik);
@ -2086,7 +2082,7 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
} }
static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
SRowKey rowKey; SRowKey rowKey = {0};
while (1) { while (1) {
TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader); TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader);
@ -3559,7 +3555,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
return false; return false;
} }
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) { FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
if (!pIter->hasVal) { if (!pIter->hasVal) {
return NULL; return NULL;
} }
@ -3567,7 +3563,8 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
int32_t order = pReader->info.order; int32_t order = pReader->info.order;
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key;
TSDBROW_INIT_KEY(pRow, key);
if (outOfTimeWindow(key.ts, &pReader->info.window)) { if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false; pIter->hasVal = false;
return NULL; return NULL;
@ -3593,7 +3590,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
pRow = tsdbTbDataIterGet(pIter->iter); pRow = tsdbTbDataIterGet(pIter->iter);
key = TSDBROW_KEY(pRow); TSDBROW_INIT_KEY(pRow, key);
if (outOfTimeWindow(key.ts, &pReader->info.window)) { if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false; pIter->hasVal = false;
return NULL; return NULL;
@ -3632,11 +3629,13 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArra
break; break;
} }
if (pCurKey->numOfPKs > 0) {
SRowKey nextKey = {0}; SRowKey nextKey = {0};
tRowGetKeyEx(pRow, &nextKey); tRowGetKeyEx(pRow, &nextKey);
if (pkCompEx(pCurKey, &nextKey) != 0) { if (pkCompEx(pCurKey, &nextKey) != 0) {
break; break;
} }
}
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
@ -3785,15 +3784,17 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pKey->numOfPKs > 0) {
SRowKey nextRowKey = {0}; SRowKey nextRowKey = {0};
tRowGetKeyEx(pNextRow, &nextRowKey); tRowGetKeyEx(pNextRow, &nextRowKey);
if (pKey->numOfPKs > 0 && pkCompEx(pKey, &nextRowKey) != 0) { if (pkCompEx(pKey, &nextRowKey) != 0) {
*pResRow = current; *pResRow = current;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
} }
}
terrno = 0; terrno = 0;
int32_t code = 0; int32_t code = 0;
@ -3801,7 +3802,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
// start to merge duplicated rows // start to merge duplicated rows
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid); pTSchema = doGetSchemaForTSRow(current.pTSRow->sver, pReader, uid);
if (pTSchema == NULL) { if (pTSchema == NULL) {
return terrno; return terrno;
} }
@ -3814,7 +3815,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
STSchema* pTSchema1 = NULL; STSchema* pTSchema1 = NULL;
if (pNextRow->type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory if (pNextRow->type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid); pTSchema1 = doGetSchemaForTSRow(pNextRow->pTSRow->sver, pReader, uid);
if (pTSchema1 == NULL) { if (pTSchema1 == NULL) {
return terrno; return terrno;
} }
@ -3926,24 +3927,35 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea
// todo refactor // todo refactor
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
if (piter->hasVal) { if (piter->hasVal) {
TSDBKEY k = TSDBROW_KEY(pRow); tRowGetKeyEx(pRow, &rowKey);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { if ((rowKey.ts >= endKey && asc) || (rowKey.ts <= endKey && !asc)) {
pRow = NULL; pRow = NULL;
} }
} }
if (piiter->hasVal) { if (piiter->hasVal) {
TSDBKEY k = TSDBROW_KEY(piRow); tRowGetKeyEx(piRow, &irowKey);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { if ((irowKey.ts >= endKey && asc) || (irowKey.ts <= endKey && !asc)) {
piRow = NULL; piRow = NULL;
} }
} }
if (piter->hasVal && piiter->hasVal && pRow != NULL && piRow != NULL) { if (pRow != NULL && piRow != NULL) {
tRowGetKeyEx(pRow, &rowKey);
tRowGetKeyEx(piRow, &irowKey);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (rowKey.numOfPKs == 0) {
if ((rowKey.ts > irowKey.ts && asc) || (rowKey.ts < irowKey.ts && (!asc))) { // ik.ts < k.ts
code = doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow);
} else if ((rowKey.ts < irowKey.ts && asc) || (rowKey.ts > irowKey.ts && (!asc))) {
code = doMergeMemTableMultiRows(pRow, &rowKey, uid, piter, pDelList, pResRow, pReader, freeTSRow);
} else { // ik.ts == k.ts
*freeTSRow = true;
pResRow->type = TSDBROW_ROW_FMT;
code = doMergeMemIMemRows(pRow, &rowKey, piRow, &irowKey, pBlockScanInfo, pReader, &pResRow->pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else {
int32_t ret = pkCompEx(&rowKey, &irowKey); int32_t ret = pkCompEx(&rowKey, &irowKey);
if (ret != 0) { if (ret != 0) {
if ((ret > 0 && asc) || (ret < 0 && (!asc))) { // ik.ts < k.ts if ((ret > 0 && asc) || (ret < 0 && (!asc))) { // ik.ts < k.ts
@ -3959,17 +3971,16 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea
return code; return code;
} }
} }
}
return code; return code;
} }
if (piter->hasVal && pRow != NULL) { if (piter->hasVal && pRow != NULL) {
tRowGetKeyEx(pRow, &rowKey);
return doMergeMemTableMultiRows(pRow, &rowKey, uid, piter, pDelList, pResRow, pReader, freeTSRow); return doMergeMemTableMultiRows(pRow, &rowKey, uid, piter, pDelList, pResRow, pReader, freeTSRow);
} }
if (piiter->hasVal && piRow != NULL) { if (piiter->hasVal && piRow != NULL) {
tRowGetKeyEx(piRow, &irowKey);
return doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow); return doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow);
} }
@ -4095,7 +4106,11 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
if (row.type == TSDBROW_ROW_FMT) { if (row.type == TSDBROW_ROW_FMT) {
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey); pBlockScanInfo->lastProcKey.ts = row.pTSRow->ts;
pBlockScanInfo->lastProcKey.numOfPKs = row.pTSRow->numOfPKs;
if (row.pTSRow->numOfPKs > 0) {
tRowGetPrimaryKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey);
}
} }
if (freeTSRow) { if (freeTSRow) {

View File

@ -1056,7 +1056,7 @@ static int32_t sortUidComparFn(const void* p1, const void* p2) {
const SSttKeyRange* px1 = p1; const SSttKeyRange* px1 = p1;
const SSttKeyRange* px2 = p2; const SSttKeyRange* px2 = p2;
int32_t ret = tRowKeyCompare(&px1, px2); int32_t ret = tRowKeyCompare(&px1->skey, &px2->skey);
return ret; return ret;
} }

View File

@ -39,13 +39,19 @@ extern "C" {
} while (0); } while (0);
#define tRowGetKeyEx(_pRow, _pKey) \ #define tRowGetKeyEx(_pRow, _pKey) \
do { \ { \
if ((_pRow)->type == TSDBROW_ROW_FMT) { \ if ((_pRow)->type == TSDBROW_ROW_FMT) { \
tRowGetKey((_pRow)->pTSRow, (_pKey)); \ (_pKey)->ts = (_pRow)->pTSRow->ts; \
} else { \ if ((_pRow)->pTSRow->numOfPKs > 0) { \
tColRowGetKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \ tRowGetPrimaryKey((_pRow)->pTSRow, (_pKey)); \
} \ } \
} while (0) } else { \
(_pKey)->ts = (_pRow)->pBlockData->aTSKEY[(_pRow)->iRow]; \
if ((_pRow)->pBlockData->nColData > 0) { \
tColRowGetPrimaryKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \
} \
} \
}
typedef enum { typedef enum {
READER_STATUS_SUSPEND = 0x1, READER_STATUS_SUSPEND = 0x1,

View File

@ -628,10 +628,7 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
} }
} }
void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key) { void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) {
key->ts = pBlock->aTSKEY[irow];
key->numOfPKs = 0;
for (int32_t i = 0; i < pBlock->nColData; i++) { for (int32_t i = 0; i < pBlock->nColData; i++) {
SColData *pColData = &pBlock->aColData[i]; SColData *pColData = &pBlock->aColData[i];
if (pColData->cflag & COL_IS_KEY) { if (pColData->cflag & COL_IS_KEY) {