Merge pull request #24070 from taosdata/szhou/fix-tms-duration
fix: the block return from buildBlockFromFiles should be of new fileset
This commit is contained in:
commit
178b94a398
|
@ -158,8 +158,8 @@ Automatically creating table and the table name is specified through the `tbname
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
INSERT INTO meters(tbname, location, groupId, ts, current, phase)
|
INSERT INTO meters(tbname, location, groupId, ts, current, phase)
|
||||||
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
|
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 0.32)
|
||||||
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33)
|
('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 0.33)
|
||||||
values('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33)
|
('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 0.33)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -158,7 +158,7 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/c
|
||||||
自动建表, 表名通过tbname列指定
|
自动建表, 表名通过tbname列指定
|
||||||
```sql
|
```sql
|
||||||
INSERT INTO meters(tbname, location, groupId, ts, current, phase)
|
INSERT INTO meters(tbname, location, groupId, ts, current, phase)
|
||||||
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
|
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 0.32)
|
||||||
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33)
|
('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 0.33)
|
||||||
values('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33)
|
('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 0.33)
|
||||||
```
|
```
|
||||||
|
|
|
@ -153,7 +153,8 @@ typedef struct {
|
||||||
// clang-format off
|
// clang-format off
|
||||||
/*-------------------------------------------------new api format---------------------------------------------------*/
|
/*-------------------------------------------------new api format---------------------------------------------------*/
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSD_READER_NOTIFY_DURATION_START
|
TSD_READER_NOTIFY_DURATION_START,
|
||||||
|
TSD_READER_NOTIFY_NEXT_DURATION_BLOCK,
|
||||||
} ETsdReaderNotifyType;
|
} ETsdReaderNotifyType;
|
||||||
|
|
||||||
typedef union {
|
typedef union {
|
||||||
|
|
|
@ -76,6 +76,8 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST
|
||||||
|
|
||||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||||
|
|
||||||
|
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
|
||||||
|
|
||||||
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
|
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
|
||||||
int32_t numOfCols) {
|
int32_t numOfCols) {
|
||||||
pSupInfo->smaValid = true;
|
pSupInfo->smaValid = true;
|
||||||
|
@ -2565,7 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->status.bProcMemPreFileset) {
|
if (pReader->status.bProcMemPreFileset) {
|
||||||
resetTableListIndex(&pReader->status);
|
tsdbDebug("will start pre-fileset %d buffer processing. %s", fid, pReader->idStr);
|
||||||
|
pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList;
|
||||||
|
resetPreFilesetMemTableListIndex(&pReader->status);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pReader->status.bProcMemPreFileset) {
|
if (!pReader->status.bProcMemPreFileset) {
|
||||||
|
@ -2573,6 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
|
||||||
STsdReaderNotifyInfo info = {0};
|
STsdReaderNotifyInfo info = {0};
|
||||||
info.duration.filesetId = fid;
|
info.duration.filesetId = fid;
|
||||||
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
||||||
|
tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2643,6 +2648,14 @@ static void resetTableListIndex(SReaderStatus* pStatus) {
|
||||||
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) {
|
||||||
|
STableUidList* pList = &pStatus->procMemUidList;
|
||||||
|
|
||||||
|
pList->currentIndex = 0;
|
||||||
|
uint64_t uid = pList->tableUidList[0];
|
||||||
|
pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||||
|
}
|
||||||
|
|
||||||
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
|
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
|
||||||
pOrderedCheckInfo->currentIndex += 1;
|
pOrderedCheckInfo->currentIndex += 1;
|
||||||
if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
|
if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
|
||||||
|
@ -2655,6 +2668,19 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt
|
||||||
return (pStatus->pTableIter != NULL);
|
return (pStatus->pTableIter != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) {
|
||||||
|
STableUidList* pUidList = &pStatus->procMemUidList;
|
||||||
|
pUidList->currentIndex += 1;
|
||||||
|
if (pUidList->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
|
||||||
|
pStatus->pProcMemTableIter = NULL;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t uid = pUidList->tableUidList[pUidList->currentIndex];
|
||||||
|
pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||||
|
return (pStatus->pProcMemTableIter != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
||||||
|
@ -2889,6 +2915,47 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
|
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) {
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
|
tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||||
|
return pReader->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter;
|
||||||
|
if (pReader->pIgnoreTables &&
|
||||||
|
taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
|
||||||
|
bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus);
|
||||||
|
if (!hasNexTable) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
initMemDataIterator(*pBlockScanInfo, pReader);
|
||||||
|
initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost);
|
||||||
|
|
||||||
|
int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// current table is exhausted, let's try next table
|
||||||
|
bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus);
|
||||||
|
if (!hasNexTable) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) {
|
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
STableUidList* pUidList = &pStatus->uidList;
|
STableUidList* pUidList = &pStatus->uidList;
|
||||||
|
@ -4244,6 +4311,33 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
|
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||||
|
STimeWindow win = {0};
|
||||||
|
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||||
|
|
||||||
|
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
|
||||||
|
code = buildBlockFromBufferSeqForPreFileset(pReader, endKey);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr);
|
||||||
|
pStatus->bProcMemPreFileset = false;
|
||||||
|
if (pReader->notifyFn) {
|
||||||
|
STsdReaderNotifyInfo info = {0};
|
||||||
|
info.duration.filesetId = fid;
|
||||||
|
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
||||||
|
tsdbDebug("new duration %d start notification when buffer pre-fileset, %s", fid, pReader->idStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -4251,22 +4345,9 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||||
|
|
||||||
if (pStatus->loadFromFile) {
|
if (pStatus->loadFromFile) {
|
||||||
if (pStatus->bProcMemPreFileset) {
|
if (pStatus->bProcMemPreFileset) {
|
||||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
code = buildFromPreFilesetBuffer(pReader);
|
||||||
STimeWindow win = {0};
|
|
||||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
|
||||||
|
|
||||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
|
|
||||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
|
||||||
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
|
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
|
||||||
return code;
|
return code;
|
||||||
} else {
|
|
||||||
pStatus->bProcMemPreFileset = false;
|
|
||||||
if (pReader->notifyFn) {
|
|
||||||
STsdReaderNotifyInfo info = {0};
|
|
||||||
info.duration.filesetId = fid;
|
|
||||||
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
|
||||||
}
|
|
||||||
resetTableListIndex(pStatus);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4275,6 +4356,21 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s",
|
||||||
|
pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr);
|
||||||
|
if (pStatus->bProcMemPreFileset) {
|
||||||
|
if (pBlock->info.rows > 0) {
|
||||||
|
if (pReader->notifyFn) {
|
||||||
|
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||||
|
STsdReaderNotifyInfo info = {0};
|
||||||
|
info.duration.filesetId = fid;
|
||||||
|
pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pStatus->bProcMemPreFileset = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock->info.rows <= 0) {
|
if (pBlock->info.rows <= 0) {
|
||||||
resetTableListIndex(&pReader->status);
|
resetTableListIndex(&pReader->status);
|
||||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||||
|
|
|
@ -210,12 +210,16 @@ typedef struct SReaderStatus {
|
||||||
SArray* pLDataIterArray;
|
SArray* pLDataIterArray;
|
||||||
SRowMerger merger;
|
SRowMerger merger;
|
||||||
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
|
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
|
||||||
|
// the following for preceeds fileset memory processing
|
||||||
|
// TODO: refactor into seperate struct
|
||||||
bool bProcMemPreFileset;
|
bool bProcMemPreFileset;
|
||||||
int64_t memTableMaxKey;
|
int64_t memTableMaxKey;
|
||||||
int64_t memTableMinKey;
|
int64_t memTableMinKey;
|
||||||
int64_t prevFilesetStartKey;
|
int64_t prevFilesetStartKey;
|
||||||
int64_t prevFilesetEndKey;
|
int64_t prevFilesetEndKey;
|
||||||
bool bProcMemFirstFileset;
|
bool bProcMemFirstFileset;
|
||||||
|
STableUidList procMemUidList;
|
||||||
|
STableBlockScanInfo** pProcMemTableIter;
|
||||||
} SReaderStatus;
|
} SReaderStatus;
|
||||||
|
|
||||||
struct STsdbReader {
|
struct STsdbReader {
|
||||||
|
|
|
@ -298,9 +298,14 @@ typedef struct STableMergeScanInfo {
|
||||||
SHashObj* mSkipTables;
|
SHashObj* mSkipTables;
|
||||||
int64_t mergeLimit;
|
int64_t mergeLimit;
|
||||||
SSortExecInfo sortExecInfo;
|
SSortExecInfo sortExecInfo;
|
||||||
bool bNewFileset;
|
|
||||||
bool bOnlyRetrieveBlock;
|
|
||||||
bool filesetDelimited;
|
bool filesetDelimited;
|
||||||
|
bool bNewFilesetEvent;
|
||||||
|
bool bNextDurationBlockEvent;
|
||||||
|
int32_t numNextDurationBlocks;
|
||||||
|
SSDataBlock* nextDurationBlocks[2];
|
||||||
|
bool rtnNextDurationBlocks;
|
||||||
|
int32_t nextDurationBlocksIdx;
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanFilterContext {
|
typedef struct STagScanFilterContext {
|
||||||
|
|
|
@ -3240,6 +3240,52 @@ static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = pInfo->pReaderBlock;
|
||||||
|
int32_t code = 0;
|
||||||
|
bool hasNext = false;
|
||||||
|
STsdbReader* reader = pInfo->base.dataReader;
|
||||||
|
|
||||||
|
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
||||||
|
if (code != 0) {
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||||
|
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||||
|
}
|
||||||
|
*pFinished = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||||
|
*pFinished = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||||
|
*pSkipped = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
STableMergeScanSortSourceParam* source = param;
|
STableMergeScanSortSourceParam* source = param;
|
||||||
SOperatorInfo* pOperator = source->pOperator;
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
@ -3247,7 +3293,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
SSDataBlock* pBlock = pInfo->pReaderBlock;
|
SSDataBlock* pBlock = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -3255,53 +3301,56 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
|
|
||||||
STsdbReader* reader = pInfo->base.dataReader;
|
STsdbReader* reader = pInfo->base.dataReader;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!pInfo->bOnlyRetrieveBlock) {
|
if (pInfo->rtnNextDurationBlocks) {
|
||||||
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d",
|
||||||
if (code != 0) {
|
GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks);
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
|
||||||
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
|
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
++pInfo->nextDurationBlocksIdx;
|
||||||
}
|
} else {
|
||||||
|
for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) {
|
||||||
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
blockDataDestroy(pInfo->nextDurationBlocks[i]);
|
||||||
pInfo->bNewFileset = false;
|
pInfo->nextDurationBlocks[i] = NULL;
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
|
||||||
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
|
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
|
||||||
}
|
}
|
||||||
|
pInfo->rtnNextDurationBlocks = false;
|
||||||
|
pInfo->nextDurationBlocksIdx = 0;
|
||||||
|
pInfo->numNextDurationBlocks = 0;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
|
||||||
|
bool bFinished = false;
|
||||||
|
bool bSkipped = false;
|
||||||
|
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
|
||||||
|
pBlock = pInfo->pReaderBlock;
|
||||||
|
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
|
||||||
|
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
|
||||||
|
if (bFinished) {
|
||||||
|
pInfo->bNewFilesetEvent = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->bNewFileset) {
|
if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) {
|
||||||
pInfo->bOnlyRetrieveBlock = true;
|
if (!bSkipped) {
|
||||||
return NULL;
|
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
|
||||||
|
++pInfo->numNextDurationBlocks;
|
||||||
|
if (pInfo->numNextDurationBlocks > 2) {
|
||||||
|
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks);
|
||||||
|
pInfo->bNewFilesetEvent = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pInfo->bNewFilesetEvent) {
|
||||||
|
pInfo->rtnNextDurationBlocks = true;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (pInfo->bNextDurationBlockEvent) {
|
||||||
|
pInfo->bNextDurationBlockEvent = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (bSkipped) continue;
|
||||||
}
|
}
|
||||||
// process this data block based on the probabilities
|
|
||||||
bool processThisBlock = processBlockWithProbability(&pInfo->sample);
|
|
||||||
if (!processThisBlock) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t status = 0;
|
|
||||||
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
|
||||||
if (pInfo->bOnlyRetrieveBlock) {
|
|
||||||
pInfo->bOnlyRetrieveBlock = false;
|
|
||||||
}
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// current block is filter out according to filter condition, continue load the next block
|
|
||||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||||
|
|
||||||
if (pInfo->mergeLimit != -1) {
|
if (pInfo->mergeLimit != -1) {
|
||||||
|
@ -3310,13 +3359,13 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
|
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
|
||||||
int32_t tsTargetSlotId = 0;
|
int32_t tsTargetSlotId = 0;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
|
||||||
|
@ -3348,7 +3397,13 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
|
||||||
|
|
||||||
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
||||||
STableMergeScanInfo* pTmsInfo = param;
|
STableMergeScanInfo* pTmsInfo = param;
|
||||||
pTmsInfo->bNewFileset = true;
|
if (type == TSD_READER_NOTIFY_DURATION_START) {
|
||||||
|
pTmsInfo->bNewFilesetEvent = true;
|
||||||
|
} else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) {
|
||||||
|
pTmsInfo->bNextDurationBlockEvent = true;
|
||||||
|
}
|
||||||
|
qDebug("table merge scan receive notification. type %d, fileset %d", type, info->duration.filesetId);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3358,7 +3413,9 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||||
|
|
||||||
pInfo->bNewFileset = false;
|
qDebug("%s table merge scan start duration ", GET_TASKID(pTaskInfo));
|
||||||
|
pInfo->bNewFilesetEvent = false;
|
||||||
|
pInfo->bNextDurationBlockEvent = false;
|
||||||
|
|
||||||
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
@ -3388,6 +3445,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
qDebug("%s table merge scan stop duration ", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||||
|
@ -3405,6 +3464,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
qDebug("%s table merge scan start group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
|
||||||
|
|
||||||
{
|
{
|
||||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
@ -3452,10 +3512,19 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
|
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
}
|
}
|
||||||
|
for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) {
|
||||||
|
if (pInfo->nextDurationBlocks[i]) {
|
||||||
|
blockDataDestroy(pInfo->nextDurationBlocks[i]);
|
||||||
|
pInfo->nextDurationBlocks[i] = NULL;
|
||||||
|
}
|
||||||
|
pInfo->numNextDurationBlocks = 0;
|
||||||
|
pInfo->nextDurationBlocksIdx = 0;
|
||||||
|
}
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
taosHashCleanup(pInfo->mSkipTables);
|
taosHashCleanup(pInfo->mSkipTables);
|
||||||
pInfo->mSkipTables = NULL;
|
pInfo->mSkipTables = NULL;
|
||||||
|
qDebug("%s table merge scan stop group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3535,7 +3604,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->bNewFileset) {
|
if (pInfo->bNewFilesetEvent) {
|
||||||
stopDurationForGroupTableMergeScan(pOperator);
|
stopDurationForGroupTableMergeScan(pOperator);
|
||||||
startDurationForGroupTableMergeScan(pOperator);
|
startDurationForGroupTableMergeScan(pOperator);
|
||||||
} else {
|
} else {
|
||||||
|
@ -3566,6 +3635,13 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) {
|
||||||
|
if (pTableScanInfo->nextDurationBlocks[i] != NULL) {
|
||||||
|
blockDataDestroy(pTableScanInfo->nextDurationBlocks[i]);
|
||||||
|
pTableScanInfo->nextDurationBlocks[i] = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
|
|
Loading…
Reference in New Issue