refactor: do some internal refactor.
This commit is contained in:
parent
efdd0c8a2a
commit
1e89b86827
|
@ -203,6 +203,19 @@ typedef struct SBlockID {
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
} SBlockID;
|
} SBlockID;
|
||||||
|
|
||||||
|
typedef struct SPkInfo {
|
||||||
|
int8_t type;
|
||||||
|
int32_t bytes;
|
||||||
|
union {
|
||||||
|
int64_t val;
|
||||||
|
uint8_t* pData;
|
||||||
|
} skey;
|
||||||
|
union {
|
||||||
|
int64_t val;
|
||||||
|
uint8_t* pData;
|
||||||
|
} ekey;
|
||||||
|
} SPkInfo;
|
||||||
|
|
||||||
typedef struct SDataBlockInfo {
|
typedef struct SDataBlockInfo {
|
||||||
STimeWindow window;
|
STimeWindow window;
|
||||||
int32_t rowSize;
|
int32_t rowSize;
|
||||||
|
|
|
@ -490,10 +490,6 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (pDataBlock->info.rows > 0) {
|
|
||||||
// ASSERT(pDataBlock->info.dataLoad == 1);
|
|
||||||
// }
|
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
if (numOfCols <= 0) {
|
if (numOfCols <= 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -525,35 +521,36 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDataBlockInfo* pInfo = &pDataBlock->info;
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
|
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
|
||||||
if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
|
if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* skey = colDataGetData(pColInfoData, 0);
|
void* skey = colDataGetData(pColInfoData, 0);
|
||||||
void* ekey = colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
|
void* ekey = colDataGetData(pColInfoData, (pInfo->rows - 1));
|
||||||
|
|
||||||
if (asc) {
|
if (asc) {
|
||||||
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, skey);
|
GET_TYPED_DATA(pInfo->pks[0].val, int64_t, pColInfoData->info.type, skey);
|
||||||
GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, ekey);
|
GET_TYPED_DATA(pInfo->pks[1].val, int64_t, pColInfoData->info.type, ekey);
|
||||||
} else { // todo refactor
|
} else { // todo refactor
|
||||||
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
|
memcpy(pInfo->pks[0].pData, varDataVal(skey), varDataLen(skey));
|
||||||
pDataBlock->info.pks[0].nData = varDataLen(skey);
|
pInfo->pks[0].nData = varDataLen(skey);
|
||||||
|
|
||||||
memcpy(pDataBlock->info.pks[1].pData, varDataVal(ekey), varDataLen(ekey));
|
memcpy(pInfo->pks[1].pData, varDataVal(ekey), varDataLen(ekey));
|
||||||
pDataBlock->info.pks[1].nData = varDataLen(ekey);
|
pInfo->pks[1].nData = varDataLen(ekey);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, ekey);
|
GET_TYPED_DATA(pInfo->pks[0].val, int64_t, pColInfoData->info.type, ekey);
|
||||||
GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, skey);
|
GET_TYPED_DATA(pInfo->pks[1].val, int64_t, pColInfoData->info.type, skey);
|
||||||
} else { // todo refactor
|
} else { // todo refactor
|
||||||
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
|
memcpy(pInfo->pks[0].pData, varDataVal(ekey), varDataLen(ekey));
|
||||||
pDataBlock->info.pks[0].nData = varDataLen(ekey);
|
pInfo->pks[0].nData = varDataLen(ekey);
|
||||||
|
|
||||||
memcpy(pDataBlock->info.pks[1].pData, varDataVal(skey), varDataLen(skey));
|
memcpy(pInfo->pks[1].pData, varDataVal(skey), varDataLen(skey));
|
||||||
pDataBlock->info.pks[1].nData = varDataLen(skey);
|
pInfo->pks[1].nData = varDataLen(skey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -839,7 +839,6 @@ struct SLDataIter {
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
SSttBlockLoadInfo *pBlockLoadInfo;
|
SSttBlockLoadInfo *pBlockLoadInfo;
|
||||||
SRowKey* pStartRowKey; // current row key
|
SRowKey* pStartRowKey; // current row key
|
||||||
__compar_fn_t comparFn;
|
|
||||||
bool ignoreEarlierTs;
|
bool ignoreEarlierTs;
|
||||||
struct SSttFileReader *pReader;
|
struct SSttFileReader *pReader;
|
||||||
};
|
};
|
||||||
|
@ -865,7 +864,6 @@ typedef struct SMergeTreeConf {
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
SRowKey *pCurRowKey;
|
SRowKey *pCurRowKey;
|
||||||
_load_tomb_fn loadTombFn;
|
_load_tomb_fn loadTombFn;
|
||||||
__compar_fn_t comparFn;
|
|
||||||
void *pReader;
|
void *pReader;
|
||||||
void *idstr;
|
void *idstr;
|
||||||
bool rspRows; // response the rows in stt-file, if possible
|
bool rspRows; // response the rows in stt-file, if possible
|
||||||
|
|
|
@ -2490,7 +2490,6 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb
|
||||||
.loadTombFn = loadSttTomb,
|
.loadTombFn = loadSttTomb,
|
||||||
.pReader = pr,
|
.pReader = pr,
|
||||||
.idstr = pr->idstr,
|
.idstr = pr->idstr,
|
||||||
.comparFn = pr->pkComparFn,
|
|
||||||
.pCurRowKey = &pr->rowKey,
|
.pCurRowKey = &pr->rowKey,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -228,7 +228,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
||||||
|
|
||||||
p->rowKey.numOfPKs = numOfPks;
|
p->rowKey.numOfPKs = numOfPks;
|
||||||
if (numOfPks > 0) {
|
if (numOfPks > 0) {
|
||||||
p->pkComparFn = getComparFunc(pPkCol->type, 0);
|
|
||||||
p->rowKey.pks[0].type = pPkCol->type;
|
p->rowKey.pks[0].type = pPkCol->type;
|
||||||
if (IS_VAR_DATA_TYPE(pPkCol->type)) {
|
if (IS_VAR_DATA_TYPE(pPkCol->type)) {
|
||||||
p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
|
p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
|
||||||
|
|
|
@ -549,7 +549,6 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
||||||
pIter->verRange.maxVer = pConf->verRange.maxVer;
|
pIter->verRange.maxVer = pConf->verRange.maxVer;
|
||||||
pIter->timeWindow.skey = pConf->timewindow.skey;
|
pIter->timeWindow.skey = pConf->timewindow.skey;
|
||||||
pIter->timeWindow.ekey = pConf->timewindow.ekey;
|
pIter->timeWindow.ekey = pConf->timewindow.ekey;
|
||||||
pIter->comparFn = pConf->comparFn;
|
|
||||||
|
|
||||||
pIter->pStartRowKey = pConf->pCurRowKey;
|
pIter->pStartRowKey = pConf->pCurRowKey;
|
||||||
pIter->pReader = pSttFileReader;
|
pIter->pReader = pSttFileReader;
|
||||||
|
@ -702,7 +701,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
||||||
if (ts == pIter->timeWindow.skey && pIter->pStartRowKey->numOfPKs > 0) {
|
if (ts == pIter->timeWindow.skey && pIter->pStartRowKey->numOfPKs > 0) {
|
||||||
SRowKey key;
|
SRowKey key;
|
||||||
tColRowGetKey(pData, i, &key);
|
tColRowGetKey(pData, i, &key);
|
||||||
int32_t ret = pkCompEx(pIter->comparFn, &key, pIter->pStartRowKey);
|
int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -719,7 +718,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
||||||
if (ts == pIter->timeWindow.ekey && pIter->pStartRowKey->numOfPKs > 0) {
|
if (ts == pIter->timeWindow.ekey && pIter->pStartRowKey->numOfPKs > 0) {
|
||||||
SRowKey key;
|
SRowKey key;
|
||||||
tColRowGetKey(pData, i, &key);
|
tColRowGetKey(pData, i, &key);
|
||||||
int32_t ret = pkCompEx(pIter->comparFn, &key, pIter->pStartRowKey);
|
int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi
|
||||||
|
|
||||||
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
|
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
|
||||||
|
|
||||||
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
|
int32_t pkCompEx(SRowKey* p1, SRowKey* p2) {
|
||||||
if (p2 == NULL) {
|
if (p2 == NULL) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -103,6 +103,8 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(p1->numOfPKs == p2->numOfPKs);
|
||||||
|
|
||||||
if (p1->numOfPKs == 0) {
|
if (p1->numOfPKs == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -291,7 +293,6 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
|
||||||
pSttReader->window = pReader->info.window;
|
pSttReader->window = pReader->info.window;
|
||||||
pSttReader->verRange = pReader->info.verRange;
|
pSttReader->verRange = pReader->info.verRange;
|
||||||
pSttReader->numOfPks = pReader->suppInfo.numOfPks;
|
pSttReader->numOfPks = pReader->suppInfo.numOfPks;
|
||||||
pSttReader->pkComparFn = pReader->pkComparFn;
|
|
||||||
pSttReader->uid = 0;
|
pSttReader->uid = 0;
|
||||||
|
|
||||||
tMergeTreeClose(&pSttReader->mergeTree);
|
tMergeTreeClose(&pSttReader->mergeTree);
|
||||||
|
@ -564,10 +565,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
||||||
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
||||||
|
|
||||||
if (pSup->numOfPks > 0) {
|
|
||||||
pReader->pkComparFn = getComparFunc(pSup->pk.type, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup);
|
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -751,11 +748,11 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
|
||||||
}
|
}
|
||||||
|
|
||||||
if (asc) {
|
if (asc) {
|
||||||
if (pkCompEx(pReader->pkComparFn, &pRecord->lastKey.key, &pScanInfo->lastProcKey) <= 0) {
|
if (pkCompEx(&pRecord->lastKey.key, &pScanInfo->lastProcKey) <= 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pkCompEx(pReader->pkComparFn, &pRecord->firstKey.key, &pScanInfo->lastProcKey) >= 0) {
|
if (pkCompEx(&pRecord->firstKey.key, &pScanInfo->lastProcKey) >= 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1561,7 +1558,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
||||||
SRowKey nextRowKey;
|
SRowKey nextRowKey;
|
||||||
tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
|
tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
|
||||||
|
|
||||||
if (pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) { // merge is not needed
|
if (pkCompEx(pKey, &nextRowKey) != 0) { // merge is not needed
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1638,7 +1635,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
|
||||||
doUnpinSttBlock(pSttBlockReader);
|
doUnpinSttBlock(pSttBlockReader);
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if (pkCompEx(pReader->pkComparFn, pSttKey, pNext) != 0) {
|
if (pkCompEx(pSttKey, pNext) != 0) {
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
*copied = (code == TSDB_CODE_SUCCESS);
|
*copied = (code == TSDB_CODE_SUCCESS);
|
||||||
return code;
|
return code;
|
||||||
|
@ -1691,7 +1688,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
__compar_fn_t compFn = pReader->pkComparFn;
|
|
||||||
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
||||||
|
|
||||||
SRowKey* pSttKey = NULL;
|
SRowKey* pSttKey = NULL;
|
||||||
|
@ -1730,19 +1726,19 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
SRowKey minKey = k;
|
SRowKey minKey = k;
|
||||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||||
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) {
|
if (pfKey != NULL && pkCompEx(pfKey, &minKey) < 0) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttKey != NULL && pkCompEx(compFn, pSttKey, &minKey) < 0) {
|
if (pSttKey != NULL && pkCompEx(pSttKey, &minKey) < 0) {
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) {
|
if (pfKey != NULL && pkCompEx(pfKey, &minKey) > 0) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttKey != NULL && pkCompEx(compFn, pSttKey, &minKey) > 0) {
|
if (pSttKey != NULL && pkCompEx(pSttKey, &minKey) > 0) {
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1752,7 +1748,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
|
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
|
||||||
|
|
||||||
// file block ---> stt block -----> mem
|
// file block ---> stt block -----> mem
|
||||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
if (pkCompEx(&minKey, pfKey) == 0) {
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1761,7 +1757,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
if (pkCompEx(&minKey, pSttKey) == 0) {
|
||||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1770,7 +1766,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
if (pkCompEx(&minKey, &k) == 0) {
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1828,7 +1824,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
|
|
||||||
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pKey, pSttKey);
|
int32_t ret = pkCompEx(pKey, pSttKey);
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
if (ret < 0) { // asc
|
if (ret < 0) { // asc
|
||||||
|
@ -1886,7 +1882,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||||
__compar_fn_t compFn = pReader->pkComparFn;
|
|
||||||
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
||||||
|
|
||||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
||||||
|
@ -1935,27 +1930,27 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
SRowKey minKey = k;
|
SRowKey minKey = k;
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) {
|
if (pkCompEx(&ik, &minKey) < 0) { // minKey > ik.key.ts) {
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pfKey != NULL) && (pkCompEx(compFn, pfKey, &minKey) < 0)) {
|
if ((pfKey != NULL) && (pkCompEx(pfKey, &minKey) < 0)) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pSttKey != NULL) && (pkCompEx(compFn, pSttKey, &minKey) < 0)) {
|
if ((pSttKey != NULL) && (pkCompEx(pSttKey, &minKey) < 0)) {
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pkCompEx(compFn, &ik, &minKey) > 0) {
|
if (pkCompEx(&ik, &minKey) > 0) {
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pfKey != NULL) && (pkCompEx(compFn, pfKey, &minKey) > 0)) {
|
if ((pfKey != NULL) && (pkCompEx(pfKey, &minKey) > 0)) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pSttKey != NULL) && (pkCompEx(compFn, pSttKey, &minKey) > 0)) {
|
if ((pSttKey != NULL) && (pkCompEx(pSttKey, &minKey) > 0)) {
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1963,7 +1958,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
|
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
|
||||||
|
|
||||||
// file block -----> stt block -----> imem -----> mem
|
// file block -----> stt block -----> imem -----> mem
|
||||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
if (pkCompEx(&minKey, pfKey) == 0) {
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1973,7 +1968,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
if (pkCompEx(&minKey, pSttKey) == 0) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1983,7 +1978,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, &ik) == 0) {
|
if (pkCompEx(&minKey, &ik) == 0) {
|
||||||
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1995,7 +1990,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
if (pkCompEx(&minKey, &k) == 0) {
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -2060,7 +2055,7 @@ static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanIn
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowGetKeyEx(pRow, &rowKey);
|
tRowGetKeyEx(pRow, &rowKey);
|
||||||
int32_t ret = pkCompEx(pReader->pkComparFn, pKey, &rowKey);
|
int32_t ret = pkCompEx(pKey, &rowKey);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2144,7 +2139,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
|
||||||
if (ts == pBlockScanInfo->lastProcKey.ts) { // todo opt perf
|
if (ts == pBlockScanInfo->lastProcKey.ts) { // todo opt perf
|
||||||
SRowKey nextRowKey; // lazy eval
|
SRowKey nextRowKey; // lazy eval
|
||||||
tColRowGetKey(pBlockData, rowIndex, &nextRowKey);
|
tColRowGetKey(pBlockData, rowIndex, &nextRowKey);
|
||||||
if (pkCompEx(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
|
if (pkCompEx(&pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2207,7 +2202,6 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
.numOfCols = pReader->suppInfo.numOfCols,
|
.numOfCols = pReader->suppInfo.numOfCols,
|
||||||
.loadTombFn = loadSttTombDataForAll,
|
.loadTombFn = loadSttTombDataForAll,
|
||||||
.pCurRowKey = &pScanInfo->sttKeyInfo.nextProcKey,
|
.pCurRowKey = &pScanInfo->sttKeyInfo.nextProcKey,
|
||||||
.comparFn = pReader->pkComparFn,
|
|
||||||
.pReader = pReader,
|
.pReader = pReader,
|
||||||
.idstr = pReader->idStr,
|
.idstr = pReader->idStr,
|
||||||
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
|
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
|
||||||
|
@ -2230,11 +2224,11 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
// calculate the time window for data in stt files
|
// calculate the time window for data in stt files
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) {
|
||||||
SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i);
|
SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i);
|
||||||
if (pkCompEx(pReader->pkComparFn, &pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) {
|
if (pkCompEx(&pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) {
|
||||||
tRowKeyAssign(&pScanInfo->sttRange.skey, &pKeyRange->skey);
|
tRowKeyAssign(&pScanInfo->sttRange.skey, &pKeyRange->skey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(pReader->pkComparFn, &pScanInfo->sttRange.ekey, &pKeyRange->ekey) < 0) {
|
if (pkCompEx(&pScanInfo->sttRange.ekey, &pKeyRange->ekey) < 0) {
|
||||||
tRowKeyAssign(&pScanInfo->sttRange.ekey, &pKeyRange->ekey);
|
tRowKeyAssign(&pScanInfo->sttRange.ekey, &pKeyRange->ekey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2457,8 +2451,6 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
|
||||||
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pkDstSlot, ASCENDING_TRAVERSE(pReader->info.order));
|
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pkDstSlot, ASCENDING_TRAVERSE(pReader->info.order));
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
|
|
||||||
// todo update the pk range for current return data block
|
|
||||||
|
|
||||||
pReader->cost.composedBlocks += 1;
|
pReader->cost.composedBlocks += 1;
|
||||||
pReader->cost.buildComposedBlockTime += el;
|
pReader->cost.buildComposedBlockTime += el;
|
||||||
}
|
}
|
||||||
|
@ -3597,7 +3589,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArra
|
||||||
|
|
||||||
SRowKey nextKey = {0};
|
SRowKey nextKey = {0};
|
||||||
tRowGetKeyEx(pRow, &nextKey);
|
tRowGetKeyEx(pRow, &nextKey);
|
||||||
if (pkCompEx(pReader->pkComparFn, pCurKey, &nextKey) != 0) {
|
if (pkCompEx(pCurKey, &nextKey) != 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3616,11 +3608,11 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArra
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, SRowKey* pKey, SRowMerger* pMerger,
|
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, SRowKey* pKey, SRowMerger* pMerger,
|
||||||
SVersionRange* pVerRange, int32_t step, __compar_fn_t comparFn) {
|
SVersionRange* pVerRange, int32_t step) {
|
||||||
while (rowIndex < pBlockData->nRow && rowIndex >= 0) {
|
while (rowIndex < pBlockData->nRow && rowIndex >= 0) {
|
||||||
SRowKey cur;
|
SRowKey cur;
|
||||||
tColRowGetKey(pBlockData, rowIndex, &cur);
|
tColRowGetKey(pBlockData, rowIndex, &cur);
|
||||||
if (pkCompEx(comparFn, &cur, pKey) != 0) {
|
if (pkCompEx(&cur, pKey) != 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3656,7 +3648,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
*state = CHECK_FILEBLOCK_QUIT;
|
*state = CHECK_FILEBLOCK_QUIT;
|
||||||
|
|
||||||
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
|
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
|
||||||
pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pVerRange, step, pReader->pkComparFn);
|
pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pVerRange, step);
|
||||||
if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
|
if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
|
||||||
*state = CHECK_FILEBLOCK_CONT;
|
*state = CHECK_FILEBLOCK_CONT;
|
||||||
}
|
}
|
||||||
|
@ -3675,7 +3667,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
pDumpInfo->rowIndex += step;
|
pDumpInfo->rowIndex += step;
|
||||||
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
|
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
|
||||||
pDumpInfo->rowIndex =
|
pDumpInfo->rowIndex =
|
||||||
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pRange, step, pReader->pkComparFn);
|
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pRange, step);
|
||||||
}
|
}
|
||||||
|
|
||||||
// all rows are consumed, let's try next file block
|
// all rows are consumed, let's try next file block
|
||||||
|
@ -3706,7 +3698,7 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
|
||||||
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) {
|
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) {
|
||||||
SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
|
|
||||||
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, pNextKey);
|
int32_t ret = pkCompEx(pRowKey, pNextKey);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
|
@ -3750,7 +3742,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
|
||||||
|
|
||||||
SRowKey nextRowKey = {0};
|
SRowKey nextRowKey = {0};
|
||||||
tRowGetKeyEx(pNextRow, &nextRowKey);
|
tRowGetKeyEx(pNextRow, &nextRowKey);
|
||||||
if (pKey->numOfPKs > 0 && pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) {
|
if (pKey->numOfPKs > 0 && pkCompEx(pKey, &nextRowKey) != 0) {
|
||||||
*pResRow = current;
|
*pResRow = current;
|
||||||
*freeTSRow = false;
|
*freeTSRow = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3907,7 +3899,7 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
tRowGetKeyEx(piRow, &irowKey);
|
tRowGetKeyEx(piRow, &irowKey);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t ret = pkCompEx(pReader->pkComparFn, &rowKey, &irowKey);
|
int32_t ret = pkCompEx(&rowKey, &irowKey);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
if ((ret > 0 && asc) || (ret < 0 && (!asc))) { // ik.ts < k.ts
|
if ((ret > 0 && asc) || (ret < 0 && (!asc))) { // ik.ts < k.ts
|
||||||
code = doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow);
|
code = doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow);
|
||||||
|
|
|
@ -186,7 +186,6 @@ typedef struct SSttBlockReader {
|
||||||
SMergeTree mergeTree;
|
SMergeTree mergeTree;
|
||||||
SRowKey currentKey;
|
SRowKey currentKey;
|
||||||
int32_t numOfPks;
|
int32_t numOfPks;
|
||||||
__compar_fn_t pkComparFn;
|
|
||||||
} SSttBlockReader;
|
} SSttBlockReader;
|
||||||
|
|
||||||
typedef struct SFilesetIter {
|
typedef struct SFilesetIter {
|
||||||
|
@ -290,7 +289,6 @@ struct STsdbReader {
|
||||||
bool bFilesetDelimited; // duration by duration output
|
bool bFilesetDelimited; // duration by duration output
|
||||||
TsdReaderNotifyCbFn notifyFn;
|
TsdReaderNotifyCbFn notifyFn;
|
||||||
void* notifyParam;
|
void* notifyParam;
|
||||||
__compar_fn_t pkComparFn;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SBrinRecordIter {
|
typedef struct SBrinRecordIter {
|
||||||
|
@ -343,7 +341,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
||||||
const char* pstr);
|
const char* pstr);
|
||||||
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
|
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
|
||||||
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
||||||
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
|
int32_t pkCompEx(SRowKey* p1, SRowKey* p2);
|
||||||
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
|
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
|
||||||
void clearRowKey(SRowKey* pKey);
|
void clearRowKey(SRowKey* pKey);
|
||||||
|
|
||||||
|
@ -384,7 +382,6 @@ typedef struct SCacheRowsReader {
|
||||||
char* idstr;
|
char* idstr;
|
||||||
int64_t lastTs;
|
int64_t lastTs;
|
||||||
SArray* pFuncTypeList;
|
SArray* pFuncTypeList;
|
||||||
__compar_fn_t pkComparFn;
|
|
||||||
SRowKey rowKey;
|
SRowKey rowKey;
|
||||||
SColumnInfo pkColumn;
|
SColumnInfo pkColumn;
|
||||||
} SCacheRowsReader;
|
} SCacheRowsReader;
|
||||||
|
|
Loading…
Reference in New Issue