Merge pull request #23835 from taosdata/szhou/tms-duration
feat: tsdb output in duration order
This commit is contained in:
commit
a2ee41233e
|
@ -152,6 +152,18 @@ typedef struct {
|
|||
|
||||
// clang-format off
|
||||
/*-------------------------------------------------new api format---------------------------------------------------*/
|
||||
typedef enum {
|
||||
TSD_READER_NOTIFY_DURATION_START
|
||||
} ETsdReaderNotifyType;
|
||||
|
||||
typedef union {
|
||||
struct {
|
||||
int32_t filesetId;
|
||||
} duration;
|
||||
} STsdReaderNotifyInfo;
|
||||
|
||||
typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param);
|
||||
|
||||
typedef struct TsdReader {
|
||||
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
||||
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
|
||||
|
@ -170,6 +182,9 @@ typedef struct TsdReader {
|
|||
int32_t (*tsdReaderGetDataBlockDistInfo)();
|
||||
int64_t (*tsdReaderGetNumOfInMemRows)();
|
||||
void (*tsdReaderNotifyClosing)();
|
||||
|
||||
void (*tsdSetFilesetDelimited)(void* pReader);
|
||||
void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
|
||||
} TsdReader;
|
||||
|
||||
typedef struct SStoreCacheReader {
|
||||
|
|
|
@ -118,6 +118,7 @@ typedef struct SScanLogicNode {
|
|||
bool igLastNull;
|
||||
bool groupOrderScan;
|
||||
bool onlyMetaCtbIdx; // for tag scan with no tbname
|
||||
bool filesetDelimited; // returned blocks delimited by fileset
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
@ -432,6 +433,7 @@ typedef struct STableScanPhysiNode {
|
|||
int8_t igExpired;
|
||||
bool assignBlockUid;
|
||||
int8_t igCheckUpdate;
|
||||
bool filesetDelimited;
|
||||
} STableScanPhysiNode;
|
||||
|
||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||
|
|
|
@ -170,7 +170,9 @@ void *tsdbGetIdx2(SMeta *pMeta);
|
|||
void *tsdbGetIvtIdx2(SMeta *pMeta);
|
||||
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
|
||||
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||
//======================================================================================================================
|
||||
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
|
||||
void tsdbSetFilesetDelimited(STsdbReader* pReader);
|
||||
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
|
||||
|
||||
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
|
|
|
@ -60,6 +60,8 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
|
|||
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader);
|
||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||
static void resetTableListIndex(SReaderStatus* pStatus);
|
||||
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey);
|
||||
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
|
||||
|
||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||
|
@ -244,6 +246,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
|
|||
|
||||
tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey,
|
||||
pReader->info.window.ekey, pReader->idStr);
|
||||
|
||||
*hasNext = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -422,6 +425,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
goto _end;
|
||||
}
|
||||
|
||||
pReader->bFilesetDelimited = false;
|
||||
|
||||
tsdbInitReaderLock(pReader);
|
||||
tsem_init(&pReader->resumeAfterSuspend, 0, 0);
|
||||
|
||||
|
@ -2556,6 +2561,41 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
|
|||
}
|
||||
}
|
||||
|
||||
static void prepareDurationForNextFileSet(STsdbReader* pReader) {
|
||||
if (pReader->status.bProcMemFirstFileset) {
|
||||
pReader->status.prevFilesetStartKey = INT64_MIN;
|
||||
pReader->status.prevFilesetEndKey = INT64_MAX;
|
||||
pReader->status.bProcMemFirstFileset = false;
|
||||
}
|
||||
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
STimeWindow winFid = {0};
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey ||
|
||||
(winFid.skey-1) < pReader->status.memTableMinKey);
|
||||
} else {
|
||||
pReader->status.bProcMemPreFileset = !( pReader->status.memTableMaxKey < (winFid.ekey+1) ||
|
||||
pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey);
|
||||
}
|
||||
|
||||
if (pReader->status.bProcMemPreFileset) {
|
||||
resetTableListIndex(&pReader->status);
|
||||
}
|
||||
|
||||
if (!pReader->status.bProcMemPreFileset) {
|
||||
if (pReader->notifyFn) {
|
||||
STsdReaderNotifyInfo info = {0};
|
||||
info.duration.filesetId = fid;
|
||||
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
||||
}
|
||||
}
|
||||
|
||||
pReader->status.prevFilesetStartKey = winFid.skey;
|
||||
pReader->status.prevFilesetEndKey = winFid.ekey;
|
||||
}
|
||||
|
||||
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
pBlockNum->numOfBlocks = 0;
|
||||
|
@ -2597,6 +2637,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
}
|
||||
|
||||
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
|
||||
if (pReader->bFilesetDelimited) {
|
||||
prepareDurationForNextFileSet(pReader);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2978,7 +3021,7 @@ static int32_t readRowsCountFromMem(STsdbReader* pReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableUidList* pUidList = &pStatus->uidList;
|
||||
|
||||
|
@ -2997,13 +3040,12 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
|||
if (!hasNexTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pBlockScanInfo = pStatus->pTableIter;
|
||||
continue;
|
||||
}
|
||||
|
||||
initMemDataIterator(*pBlockScanInfo, pReader);
|
||||
initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost);
|
||||
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||
int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -3940,6 +3982,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
|||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
||||
if (pReader->bFilesetDelimited) {
|
||||
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
|
||||
pReader->status.bProcMemFirstFileset = true;
|
||||
}
|
||||
|
||||
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
|
||||
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order);
|
||||
|
||||
|
@ -4247,6 +4294,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
|
||||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
|
||||
pReader->pReadSnap = NULL;
|
||||
if (pReader->bFilesetDelimited) {
|
||||
pReader->status.memTableMinKey = INT64_MAX;
|
||||
pReader->status.memTableMaxKey = INT64_MIN;
|
||||
}
|
||||
pReader->flag = READER_STATUS_SUSPEND;
|
||||
|
||||
#if SUSPEND_RESUME_TEST
|
||||
|
@ -4358,6 +4409,72 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
|||
return pBlock->info.rows > 0;
|
||||
}
|
||||
|
||||
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
if (pStatus->bProcMemPreFileset) {
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
STimeWindow win = {0};
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
|
||||
return code;
|
||||
} else {
|
||||
pStatus->bProcMemPreFileset = false;
|
||||
if (pReader->notifyFn) {
|
||||
STsdReaderNotifyInfo info = {0};
|
||||
info.duration.filesetId = fid;
|
||||
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
||||
}
|
||||
resetTableListIndex(pStatus);
|
||||
}
|
||||
}
|
||||
|
||||
code = buildBlockFromFiles(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pBlock->info.rows <= 0) {
|
||||
resetTableListIndex(&pReader->status);
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
}
|
||||
} else { // no data in files, let's try the buffer
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
code = buildBlockFromFiles(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pBlock->info.rows <= 0) {
|
||||
resetTableListIndex(&pReader->status);
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
}
|
||||
} else { // no data in files, let's try the buffer
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -4375,19 +4492,10 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
|||
if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
|
||||
return tsdbReadRowsCountOnly(pReader);
|
||||
}
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
code = buildBlockFromFiles(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pBlock->info.rows <= 0) {
|
||||
resetTableListIndex(&pReader->status);
|
||||
code = buildBlockFromBufferSequentially(pReader);
|
||||
}
|
||||
} else { // no data in files, let's try the buffer
|
||||
code = buildBlockFromBufferSequentially(pReader);
|
||||
if (!pReader->bFilesetDelimited) {
|
||||
code = doTsdbNextDataBlockFilesFirst(pReader);
|
||||
} else {
|
||||
code = doTsdbNextDataBlockFilesetDelimited(pReader);
|
||||
}
|
||||
|
||||
*hasNext = pBlock->info.rows > 0;
|
||||
|
@ -4875,6 +4983,54 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
|
|||
return code;
|
||||
}
|
||||
|
||||
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int64_t rows = 0;
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
int32_t iter = 0;
|
||||
int64_t maxKey = INT64_MIN;
|
||||
int64_t minKey = INT64_MAX;
|
||||
|
||||
void* pHashIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter);
|
||||
while (pHashIter!= NULL) {
|
||||
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pHashIter;
|
||||
|
||||
STbData* d = NULL;
|
||||
if (pReader->pReadSnap->pMem != NULL) {
|
||||
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->info.suid, pBlockScanInfo->uid);
|
||||
if (d != NULL) {
|
||||
if (d->maxKey > maxKey) {
|
||||
maxKey = d->maxKey;
|
||||
}
|
||||
if (d->minKey < minKey) {
|
||||
minKey = d->minKey;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
STbData* di = NULL;
|
||||
if (pReader->pReadSnap->pIMem != NULL) {
|
||||
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->info.suid, pBlockScanInfo->uid);
|
||||
if (di != NULL) {
|
||||
if (di->maxKey > maxKey) {
|
||||
maxKey = di->maxKey;
|
||||
}
|
||||
if (di->minKey < minKey) {
|
||||
minKey = di->minKey;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// current table is exhausted, let's try the next table
|
||||
pHashIter = tSimpleHashIterate(pStatus->pTableMap, pHashIter, &iter);
|
||||
}
|
||||
|
||||
*pMaxKey = maxKey;
|
||||
*pMinKey = minKey;
|
||||
}
|
||||
|
||||
int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int64_t rows = 0;
|
||||
|
@ -5062,3 +5218,12 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
|
|||
|
||||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
||||
}
|
||||
|
||||
void tsdbSetFilesetDelimited(STsdbReader* pReader) {
|
||||
pReader->bFilesetDelimited = true;
|
||||
}
|
||||
|
||||
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
|
||||
pReader->notifyFn = notifyFn;
|
||||
pReader->notifyParam = param;
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
|||
|
||||
#include "tsdbDataFileRW.h"
|
||||
#include "tsdbUtil2.h"
|
||||
#include "storageapi.h"
|
||||
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
|
||||
|
@ -200,6 +201,12 @@ typedef struct SReaderStatus {
|
|||
SArray* pLDataIterArray;
|
||||
SRowMerger merger;
|
||||
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
|
||||
bool bProcMemPreFileset;
|
||||
int64_t memTableMaxKey;
|
||||
int64_t memTableMinKey;
|
||||
int64_t prevFilesetStartKey;
|
||||
int64_t prevFilesetEndKey;
|
||||
bool bProcMemFirstFileset;
|
||||
} SReaderStatus;
|
||||
|
||||
struct STsdbReader {
|
||||
|
@ -223,6 +230,9 @@ struct STsdbReader {
|
|||
SBlockInfoBuf blockInfoBuf;
|
||||
EContentData step;
|
||||
STsdbReader* innerReader[2];
|
||||
bool bFilesetDelimited; // duration by duration output
|
||||
TsdReaderNotifyCbFn notifyFn;
|
||||
void* notifyParam;
|
||||
};
|
||||
|
||||
typedef struct SBrinRecordIter {
|
||||
|
|
|
@ -60,6 +60,9 @@ void initTsdbReaderAPI(TsdReader* pReader) {
|
|||
|
||||
pReader->tsdSetQueryTableList = tsdbSetTableList2;
|
||||
pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2;
|
||||
|
||||
pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited;
|
||||
pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb;
|
||||
}
|
||||
|
||||
void initMetadataAPI(SStoreMeta* pMeta) {
|
||||
|
|
|
@ -270,6 +270,7 @@ typedef struct STableScanInfo {
|
|||
bool hasGroupByTag;
|
||||
bool countOnly;
|
||||
// TsdReader readerAPI;
|
||||
bool filesetDelimited;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
|
@ -297,6 +298,9 @@ typedef struct STableMergeScanInfo {
|
|||
SHashObj* mSkipTables;
|
||||
int64_t mergeLimit;
|
||||
SSortExecInfo sortExecInfo;
|
||||
bool bNewFileset;
|
||||
bool bOnlyRetrieveBlock;
|
||||
bool filesetDelimited;
|
||||
} STableMergeScanInfo;
|
||||
|
||||
typedef struct STagScanFilterContext {
|
||||
|
|
|
@ -893,7 +893,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (pInfo->filesetDelimited) {
|
||||
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
|
||||
}
|
||||
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
|
||||
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
|
||||
}
|
||||
|
@ -1085,6 +1087,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->countOnly = true;
|
||||
}
|
||||
|
||||
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
||||
|
||||
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
|
||||
optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL);
|
||||
|
@ -3223,6 +3227,7 @@ static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock
|
|||
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
|
||||
} else {
|
||||
*(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows;
|
||||
nRows = *(int64_t*)pNum;
|
||||
}
|
||||
|
||||
if (nRows >= pInfo->mergeLimit) {
|
||||
|
@ -3251,23 +3256,28 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
|||
|
||||
STsdbReader* reader = pInfo->base.dataReader;
|
||||
while (true) {
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
||||
if (code != 0) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (!pInfo->bOnlyRetrieveBlock) {
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
||||
if (code != 0) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (!hasNext) {
|
||||
break;
|
||||
}
|
||||
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||
pInfo->bNewFileset = false;
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
break;
|
||||
if (pInfo->bNewFileset) {
|
||||
pInfo->bOnlyRetrieveBlock = true;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
// process this data block based on the probabilities
|
||||
bool processThisBlock = processBlockWithProbability(&pInfo->sample);
|
||||
if (!processThisBlock) {
|
||||
|
@ -3276,7 +3286,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
|||
|
||||
uint32_t status = 0;
|
||||
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||
if (pInfo->bOnlyRetrieveBlock) {
|
||||
pInfo->bOnlyRetrieveBlock = false;
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -3299,7 +3311,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
|||
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
@ -3335,6 +3347,60 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
||||
STableMergeScanInfo* pTmsInfo = param;
|
||||
pTmsInfo->bNewFileset = true;
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||
|
||||
pInfo->bNewFileset = false;
|
||||
|
||||
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||
|
||||
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
|
||||
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
|
||||
|
||||
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
||||
param->pOperator = pOperator;
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
ps->param = param;
|
||||
ps->onlyRef = false;
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
||||
if (numOfTable == 1) {
|
||||
tsortSetSingleTableMerge(pInfo->pSortHandle);
|
||||
} else {
|
||||
code = tsortOpen(pInfo->pSortHandle);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
|
||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
|
||||
pInfo->sortExecInfo.loops += sortExecInfo.loops;
|
||||
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
||||
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
||||
|
||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||
pInfo->pSortHandle = NULL;
|
||||
}
|
||||
|
||||
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -3358,43 +3424,16 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
|
||||
tSimpleHashClear(pInfo->mTableNumRows);
|
||||
|
||||
size_t szRow = blockDataGetRowSize(pInfo->pResBlock);
|
||||
// if (pInfo->mergeLimit != -1) {
|
||||
// pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
|
||||
// NULL, pTaskInfo->id.str, pInfo->mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
|
||||
// } else
|
||||
{
|
||||
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||
|
||||
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
|
||||
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
||||
}
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
|
||||
|
||||
// one table has one data block
|
||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||
|
||||
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
||||
param->pOperator = pOperator;
|
||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables);
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
ps->param = param;
|
||||
ps->onlyRef = false;
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (numOfTable == 1) {
|
||||
tsortSetSingleTableMerge(pInfo->pSortHandle);
|
||||
} else {
|
||||
code = tsortOpen(pInfo->pSortHandle);
|
||||
if (pInfo->filesetDelimited) {
|
||||
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
|
||||
}
|
||||
pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo);
|
||||
|
||||
int32_t code = startDurationForGroupTableMergeScan(pOperator);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
|
@ -3408,21 +3447,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
|
||||
pInfo->sortExecInfo.loops += sortExecInfo.loops;
|
||||
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
||||
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
||||
stopDurationForGroupTableMergeScan(pOperator);
|
||||
|
||||
if (pInfo->base.dataReader != NULL) {
|
||||
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
}
|
||||
|
||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||
pInfo->pSortHandle = NULL;
|
||||
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
taosHashCleanup(pInfo->mSkipTables);
|
||||
pInfo->mSkipTables = NULL;
|
||||
|
@ -3505,17 +3536,22 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
|||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
return pBlock;
|
||||
} else {
|
||||
// Data of this group are all dumped, let's try the next group
|
||||
stopGroupTableMergeScan(pOperator);
|
||||
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
if (pInfo->bNewFileset) {
|
||||
stopDurationForGroupTableMergeScan(pOperator);
|
||||
startDurationForGroupTableMergeScan(pOperator);
|
||||
} else {
|
||||
// Data of this group are all dumped, let's try the next group
|
||||
stopGroupTableMergeScan(pOperator);
|
||||
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3641,6 +3677,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
||||
|
||||
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
||||
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
|
|
|
@ -423,6 +423,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(igLastNull);
|
||||
COPY_SCALAR_FIELD(groupOrderScan);
|
||||
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
|
||||
COPY_SCALAR_FIELD(filesetDelimited);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -650,6 +651,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
|
|||
COPY_SCALAR_FIELD(triggerType);
|
||||
COPY_SCALAR_FIELD(watermark);
|
||||
COPY_SCALAR_FIELD(igExpired);
|
||||
COPY_SCALAR_FIELD(filesetDelimited);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -677,6 +677,7 @@ static const char* jkScanLogicPlanDataRequired = "DataRequired";
|
|||
static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
||||
|
||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||
|
@ -721,6 +722,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -768,7 +772,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1830,6 +1836,7 @@ static const char* jkTableScanPhysiPlanTags = "Tags";
|
|||
static const char* jkTableScanPhysiPlanSubtable = "Subtable";
|
||||
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
||||
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
||||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1898,6 +1905,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanFilesetDelimited, pNode->filesetDelimited);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1969,6 +1979,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2167,7 +2167,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueBool(pEncoder, pNode->filesetDelimited);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2246,6 +2248,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueBool(pDecoder, &pNode->filesetDelimited);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1400,6 +1400,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
|
|||
pScan->node.outputTsOrder = order;
|
||||
if (TSDB_SUPER_TABLE == pScan->tableType) {
|
||||
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
|
||||
pScan->filesetDelimited = true;
|
||||
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL;
|
||||
pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
|
||||
}
|
||||
|
|
|
@ -622,6 +622,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
pTableScan->igExpired = pScanLogicNode->igExpired;
|
||||
pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
|
||||
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
|
||||
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
||||
|
||||
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -692,6 +692,7 @@ static void stbSplSetTableMergeScan(SLogicNode* pNode) {
|
|||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
|
||||
pScan->filesetDelimited = true;
|
||||
if (NULL != pScan->pGroupTags) {
|
||||
pScan->groupSort = true;
|
||||
}
|
||||
|
@ -1243,6 +1244,7 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu
|
|||
SNodeList* pMergeKeys = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
|
||||
pMergeScan->filesetDelimited = true;
|
||||
pMergeScan->node.pChildren = pChildren;
|
||||
splSetParent((SLogicNode*)pMergeScan);
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),
|
||||
|
|
|
@ -164,6 +164,7 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel
|
|||
pScan->scanType = SCAN_TYPE_TABLE;
|
||||
} else if (TSDB_SUPER_TABLE == pScan->tableType) {
|
||||
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
|
||||
pScan->filesetDelimited = true;
|
||||
}
|
||||
|
||||
if (TSDB_NORMAL_TABLE != pScan->tableType && TSDB_CHILD_TABLE != pScan->tableType) {
|
||||
|
|
Loading…
Reference in New Issue