From 9dd306e1ef80ec21863d0ef7c4e06c2899a346e6 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Nov 2023 13:56:16 +0800 Subject: [PATCH 01/27] enhance: duration notification --- include/libs/executor/storageapi.h | 14 ++++++++++++++ source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tsdb/tsdbRead2.c | 12 ++++++++++++ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 +++ source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 ++ source/libs/executor/src/scanoperator.c | 9 +++++++-- 6 files changed, 39 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index d5f1da957d..b45103f94a 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -165,6 +165,18 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ +typedef enum { + TSD_READER_NOTIFY_DURATION +} ETsdReaderNotifyType; + +typedef union { + struct { + int32_t fileSetId; + } duration; +} STsdReaderNotifyInfo; + +typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param); + typedef struct TsdReader { int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, @@ -183,6 +195,8 @@ typedef struct TsdReader { int32_t (*tsdReaderGetDataBlockDistInfo)(); int64_t (*tsdReaderGetNumOfInMemRows)(); void (*tsdReaderNotifyClosing)(); + + void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); } TsdReader; typedef struct SStoreCacheReader { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6a0c991be4..db01f7d995 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -191,6 +191,7 @@ void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); +void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param); //====================================================================================================================== int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6169014d9f..2412639722 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -241,6 +241,11 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.fileSetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); + } *hasNext = true; return TSDB_CODE_SUCCESS; } @@ -4055,6 +4060,8 @@ _err: return code; } +void tsdbReaderSetNotifyFn(STsdbReader* pReader, ) + void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; @@ -5025,3 +5032,8 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } + +void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { + pReader->notifyFn = notifyFn; + pReader->notifyParam = param; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 60e6e6960a..fb32190115 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -22,6 +22,7 @@ extern "C" { #include "tsdbDataFileRW.h" #include "tsdbUtil2.h" +#include "storageapi.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -227,6 +228,8 @@ struct STsdbReader { SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; + TsdReaderNotifyCbFn notifyFn; + void* notifyParam; }; typedef struct SBrinRecordIter { diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index a6673917bf..1729e3ab0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -60,6 +60,8 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdSetQueryTableList = tsdbSetTableList2; pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; + + pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; } void initMetadataAPI(SStoreMeta* pMeta) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 28832ffec8..3d2053c0c0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3284,7 +3284,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + uInfo("getBlockForTableMergeScan retrieved one block"); return pBlock; } @@ -3320,6 +3320,11 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* return 0; } +void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { + uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); + return; +} + int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3368,7 +3373,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - + pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = param; ps->onlyRef = false; From 1a7ee3643e7021d916e6c84b6becf4f844159634 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 22 Nov 2023 14:09:09 +0800 Subject: [PATCH 02/27] fix: pass compilation --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 2412639722..f3c5d9ec93 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4060,8 +4060,6 @@ _err: return code; } -void tsdbReaderSetNotifyFn(STsdbReader* pReader, ) - void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; From 644dbad920c575c06ed00388c3a0dd0158448531 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 28 Nov 2023 08:28:49 +0800 Subject: [PATCH 03/27] Revert "fix: pass compilation" This reverts commit 1a7ee3643e7021d916e6c84b6becf4f844159634. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index f3c5d9ec93..2412639722 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4060,6 +4060,8 @@ _err: return code; } +void tsdbReaderSetNotifyFn(STsdbReader* pReader, ) + void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; From 3331e25aafc3efa07825df72e753e7c210c6442f Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 28 Nov 2023 08:29:05 +0800 Subject: [PATCH 04/27] Revert "enhance: duration notification" This reverts commit 9dd306e1ef80ec21863d0ef7c4e06c2899a346e6. --- include/libs/executor/storageapi.h | 14 -------------- source/dnode/vnode/inc/vnode.h | 1 - source/dnode/vnode/src/tsdb/tsdbRead2.c | 12 ------------ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 --- source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 -- source/libs/executor/src/scanoperator.c | 9 ++------- 6 files changed, 2 insertions(+), 39 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index b45103f94a..d5f1da957d 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -165,18 +165,6 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ -typedef enum { - TSD_READER_NOTIFY_DURATION -} ETsdReaderNotifyType; - -typedef union { - struct { - int32_t fileSetId; - } duration; -} STsdReaderNotifyInfo; - -typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param); - typedef struct TsdReader { int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, @@ -195,8 +183,6 @@ typedef struct TsdReader { int32_t (*tsdReaderGetDataBlockDistInfo)(); int64_t (*tsdReaderGetNumOfInMemRows)(); void (*tsdReaderNotifyClosing)(); - - void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); } TsdReader; typedef struct SStoreCacheReader { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index db01f7d995..6a0c991be4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -191,7 +191,6 @@ void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); -void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param); //====================================================================================================================== int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 2412639722..6169014d9f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -241,11 +241,6 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); - if (pReader->notifyFn) { - STsdReaderNotifyInfo info = {0}; - info.duration.fileSetId = fid; - pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); - } *hasNext = true; return TSDB_CODE_SUCCESS; } @@ -4060,8 +4055,6 @@ _err: return code; } -void tsdbReaderSetNotifyFn(STsdbReader* pReader, ) - void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; @@ -5032,8 +5025,3 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } - -void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { - pReader->notifyFn = notifyFn; - pReader->notifyParam = param; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index fb32190115..60e6e6960a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -22,7 +22,6 @@ extern "C" { #include "tsdbDataFileRW.h" #include "tsdbUtil2.h" -#include "storageapi.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -228,8 +227,6 @@ struct STsdbReader { SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; - TsdReaderNotifyCbFn notifyFn; - void* notifyParam; }; typedef struct SBrinRecordIter { diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 1729e3ab0b..a6673917bf 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -60,8 +60,6 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdSetQueryTableList = tsdbSetTableList2; pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; - - pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; } void initMetadataAPI(SStoreMeta* pMeta) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3d2053c0c0..28832ffec8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3284,7 +3284,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - uInfo("getBlockForTableMergeScan retrieved one block"); + return pBlock; } @@ -3320,11 +3320,6 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* return 0; } -void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { - uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); - return; -} - int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3373,7 +3368,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = param; ps->onlyRef = false; From df4f7d6956e229599559a17ffaada117ff3c73a8 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 28 Nov 2023 10:31:18 +0800 Subject: [PATCH 05/27] Revert "Revert "enhance: duration notification"" This reverts commit 3331e25aafc3efa07825df72e753e7c210c6442f. --- include/libs/executor/storageapi.h | 14 ++++++++++++++ source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tsdb/tsdbRead2.c | 12 ++++++++++++ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 +++ source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 ++ source/libs/executor/src/scanoperator.c | 9 +++++++-- 6 files changed, 39 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index d5f1da957d..b45103f94a 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -165,6 +165,18 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ +typedef enum { + TSD_READER_NOTIFY_DURATION +} ETsdReaderNotifyType; + +typedef union { + struct { + int32_t fileSetId; + } duration; +} STsdReaderNotifyInfo; + +typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param); + typedef struct TsdReader { int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, @@ -183,6 +195,8 @@ typedef struct TsdReader { int32_t (*tsdReaderGetDataBlockDistInfo)(); int64_t (*tsdReaderGetNumOfInMemRows)(); void (*tsdReaderNotifyClosing)(); + + void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); } TsdReader; typedef struct SStoreCacheReader { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6a0c991be4..db01f7d995 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -191,6 +191,7 @@ void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); +void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param); //====================================================================================================================== int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6169014d9f..2412639722 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -241,6 +241,11 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.fileSetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); + } *hasNext = true; return TSDB_CODE_SUCCESS; } @@ -4055,6 +4060,8 @@ _err: return code; } +void tsdbReaderSetNotifyFn(STsdbReader* pReader, ) + void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; @@ -5025,3 +5032,8 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } + +void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { + pReader->notifyFn = notifyFn; + pReader->notifyParam = param; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 60e6e6960a..fb32190115 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -22,6 +22,7 @@ extern "C" { #include "tsdbDataFileRW.h" #include "tsdbUtil2.h" +#include "storageapi.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -227,6 +228,8 @@ struct STsdbReader { SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; + TsdReaderNotifyCbFn notifyFn; + void* notifyParam; }; typedef struct SBrinRecordIter { diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index a6673917bf..1729e3ab0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -60,6 +60,8 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdSetQueryTableList = tsdbSetTableList2; pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; + + pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; } void initMetadataAPI(SStoreMeta* pMeta) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 28832ffec8..3d2053c0c0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3284,7 +3284,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + uInfo("getBlockForTableMergeScan retrieved one block"); return pBlock; } @@ -3320,6 +3320,11 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* return 0; } +void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { + uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); + return; +} + int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3368,7 +3373,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - + pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = param; ps->onlyRef = false; From cdb7eb462de681913b8c2ee2fee3e6f4fb742835 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 28 Nov 2023 10:31:18 +0800 Subject: [PATCH 06/27] Revert "Revert "fix: pass compilation"" This reverts commit 644dbad920c575c06ed00388c3a0dd0158448531. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 2412639722..f3c5d9ec93 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4060,8 +4060,6 @@ _err: return code; } -void tsdbReaderSetNotifyFn(STsdbReader* pReader, ) - void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; From 89cd2b6f17ee681594e18800f4e9a1723d45f75e Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 28 Nov 2023 14:39:08 +0800 Subject: [PATCH 07/27] enhance: tsdb output duration order --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 97 +++++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 + 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index f3c5d9ec93..d71f1729b6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); +static void resetTableListIndex(SReaderStatus* pStatus); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -241,11 +242,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); - if (pReader->notifyFn) { - STsdReaderNotifyInfo info = {0}; - info.duration.fileSetId = fid; - pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); - } + *hasNext = true; return TSDB_CODE_SUCCESS; } @@ -436,6 +433,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } + pReader->bDurationOrder = true; + tsdbInitReaderLock(pReader); *ppReader = pReader; @@ -2550,6 +2549,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) { + pReader->status.bProcMemPreFileset = true; + resetTableListIndex(&pReader->status); break; } } @@ -2931,7 +2932,7 @@ static int32_t readRowsCountFromMem(STsdbReader* pReader) { return code; } -static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { +static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -2948,13 +2949,12 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { if (!hasNexTable) { return TSDB_CODE_SUCCESS; } - pBlockScanInfo = pStatus->pTableIter; + continue; } initMemDataIterator(*pBlockScanInfo, pReader); initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); - int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4350,6 +4350,70 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { return pBlock->info.rows > 0; } +static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + + if (pStatus->loadFromFile) { + if (pStatus->bProcMemPreFileset) { + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow win = {0}; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); + + code = buildBlockFromBufferSequentially(pReader, win.skey); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } else { + pStatus->bProcMemPreFileset = false; + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.fileSetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); + } + } + } + + code = buildBlockFromFiles(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pBlock->info.rows <= 0) { + resetTableListIndex(&pReader->status); + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + } else { // no data in files, let's try the buffer + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + return code; +} + +static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + + if (pStatus->loadFromFile) { + code = buildBlockFromFiles(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pBlock->info.rows <= 0) { + resetTableListIndex(&pReader->status); + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + } else { // no data in files, let's try the buffer + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + return code; +} + static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; @@ -4367,19 +4431,10 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { return tsdbReadRowsCountOnly(pReader); } - - if (pStatus->loadFromFile) { - code = buildBlockFromFiles(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (pBlock->info.rows <= 0) { - resetTableListIndex(&pReader->status); - code = buildBlockFromBufferSequentially(pReader); - } - } else { // no data in files, let's try the buffer - code = buildBlockFromBufferSequentially(pReader); + if (!pReader->bDurationOrder) { + code = doTsdbNextDataBlockFilesFirst(pReader); + } else { + code = doTsdbNextDataBlockDurationOrder(pReader); } *hasNext = pBlock->info.rows > 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index fb32190115..2d5b6f38a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -206,6 +206,7 @@ typedef struct SReaderStatus { SArray* pLDataIterArray; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data + bool bProcMemPreFileset; } SReaderStatus; struct STsdbReader { @@ -228,6 +229,7 @@ struct STsdbReader { SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; + bool bDurationOrder; // duration by duration output TsdReaderNotifyCbFn notifyFn; void* notifyParam; }; From 4fb0f83f070ef2ea03717f4d132b8e31b7741238 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 28 Nov 2023 14:50:50 +0800 Subject: [PATCH 08/27] fix: core dump without resetTableListIndex --- include/libs/executor/storageapi.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index b45103f94a..2c9b8a4e5e 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -166,7 +166,7 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ typedef enum { - TSD_READER_NOTIFY_DURATION + TSD_READER_NOTIFY_DURATION_START } ETsdReaderNotifyType; typedef union { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d71f1729b6..710a076edc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4369,8 +4369,9 @@ static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) { if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; info.duration.fileSetId = fid; - pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); + pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); } + resetTableListIndex(pStatus); } } From 5a75d744213839c7180761f75f8bc80ed67c21cd Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 28 Nov 2023 15:58:33 +0800 Subject: [PATCH 09/27] fix:mem table has data in the fileset duration --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 72 +++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 + 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 710a076edc..492739e407 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -59,6 +59,7 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static void resetTableListIndex(SReaderStatus* pStatus); +static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -2549,8 +2550,19 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) { - pReader->status.bProcMemPreFileset = true; - resetTableListIndex(&pReader->status); + if (pReader->bDurationOrder) { + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow win = {0}; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); + + if ((ASCENDING_TRAVERSE(pReader->info.order) && + win.skey >= pReader->status.memTableMinKey && win.skey <= pReader->status.memTableMaxKey) || + (!ASCENDING_TRAVERSE(pReader->info.order) && + win.ekey >= pReader->status.memTableMinKey && win.ekey <= pReader->status.memTableMaxKey)) { + pReader->status.bProcMemPreFileset = true; + resetTableListIndex(&pReader->status); + } + } break; } } @@ -4243,6 +4255,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); pReader->pReadSnap = NULL; + if (pReader->bDurationOrder) { + pReader->status.memTableMinKey = INT64_MAX; + pReader->status.memTableMaxKey = INT64_MIN; + } pReader->flag = READER_STATUS_SUSPEND; tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, @@ -4314,6 +4330,10 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { } } + if (pReader->bDurationOrder) { + getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey); + } + pReader->flag = READER_STATUS_NORMAL; tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader, pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr); @@ -4899,6 +4919,54 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT return code; } +static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t rows = 0; + + SReaderStatus* pStatus = &pReader->status; + + int32_t iter = 0; + int64_t maxKey = INT64_MIN; + int64_t minKey = INT64_MAX; + + pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter); + while (pStatus->pTableIter != NULL) { + STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; + + STbData* d = NULL; + if (pReader->pReadSnap->pMem != NULL) { + d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->info.suid, pBlockScanInfo->uid); + if (d != NULL) { + if (d->maxKey > maxKey) { + maxKey = d->maxKey; + } + if (d->minKey < minKey) { + minKey = d->minKey; + } + } + } + + STbData* di = NULL; + if (pReader->pReadSnap->pIMem != NULL) { + di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->info.suid, pBlockScanInfo->uid); + if (di != NULL) { + if (d->maxKey > maxKey) { + maxKey = d->maxKey; + } + if (d->minKey < minKey) { + minKey = d->minKey; + } + } + } + + // current table is exhausted, let's try the next table + pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter); + } + + *pMaxKey = maxKey; + *pMinKey = minKey; +} + int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; int64_t rows = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 2d5b6f38a6..31bd9b5c78 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -207,6 +207,8 @@ typedef struct SReaderStatus { SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data bool bProcMemPreFileset; + int64_t memTableMaxKey; + int64_t memTableMinKey; } SReaderStatus; struct STsdbReader { From b2fa6209d90f5fd09272e565eb9c9884790fb005 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 29 Nov 2023 08:09:00 +0800 Subject: [PATCH 10/27] fix: descending traverse wrong endkey --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 710a076edc..462f385172 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4361,7 +4361,8 @@ static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) { STimeWindow win = {0}; tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); - code = buildBlockFromBufferSequentially(pReader, win.skey); + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; + code = buildBlockFromBufferSequentially(pReader, endKey); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; } else { From 8a79df1113fbda4d06834053c2375d1640d893da Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 29 Nov 2023 10:15:43 +0800 Subject: [PATCH 11/27] enhance: prepare duration for next fileset --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 39 ++++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 ++ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 20dcb4b52c..cbfbe782f6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2509,6 +2509,33 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) } } +static void prepareDurationForNextFileSet(STsdbReader* pReader) { + if (pReader->status.bProcMemFirstFileset) { + pReader->status.prevFilesetStartKey = INT64_MIN; + pReader->status.prevFilesetEndKey = INT64_MAX; + pReader->status.bProcMemFirstFileset = false; + } + + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow winFid = {0}; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey); + + bool bProcMemPreFileset = false; + if (ASCENDING_TRAVERSE(pReader->info.order)) { + bProcMemPreFileset = !(pReader->status.prevFilesetStartKey > pReader->status.memTableMaxKey || winFid.skey < pReader->status.memTableMinKey); + } else { + bProcMemPreFileset = !(winFid.ekey > pReader->status.memTableMaxKey || pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); + } + + if (bProcMemPreFileset) { + pReader->status.bProcMemPreFileset = true; + resetTableListIndex(&pReader->status); + } + + pReader->status.prevFilesetStartKey = winFid.skey; + pReader->status.prevFilesetEndKey = winFid.ekey; +} + static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) { SReaderStatus* pStatus = &pReader->status; pBlockNum->numOfBlocks = 0; @@ -2551,17 +2578,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) { if (pReader->bDurationOrder) { - int32_t fid = pReader->status.pCurrentFileset->fid; - STimeWindow win = {0}; - tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); - - if ((ASCENDING_TRAVERSE(pReader->info.order) && - win.skey >= pReader->status.memTableMinKey && win.skey <= pReader->status.memTableMaxKey) || - (!ASCENDING_TRAVERSE(pReader->info.order) && - win.ekey >= pReader->status.memTableMinKey && win.ekey <= pReader->status.memTableMaxKey)) { - pReader->status.bProcMemPreFileset = true; - resetTableListIndex(&pReader->status); - } + prepareDurationForNextFileSet(pReader); } break; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 31bd9b5c78..c7331e1913 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -209,6 +209,9 @@ typedef struct SReaderStatus { bool bProcMemPreFileset; int64_t memTableMaxKey; int64_t memTableMinKey; + int64_t prevFilesetStartKey; + int64_t prevFilesetEndKey; + bool bProcMemFirstFileset; } SReaderStatus; struct STsdbReader { From 4691a7767a8005eebbb7cd683a208f93f4f3cf67 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 29 Nov 2023 10:29:40 +0800 Subject: [PATCH 12/27] restore buildBlockFromBufferSequentially back --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index cbfbe782f6..b999607503 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2519,7 +2519,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { int32_t fid = pReader->status.pCurrentFileset->fid; STimeWindow winFid = {0}; tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey); - + bool bProcMemPreFileset = false; if (ASCENDING_TRAVERSE(pReader->info.order)) { bProcMemPreFileset = !(pReader->status.prevFilesetStartKey > pReader->status.memTableMaxKey || winFid.skey < pReader->status.memTableMinKey); @@ -2978,7 +2978,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en if (!hasNexTable) { return TSDB_CODE_SUCCESS; } - continue; + pBlockScanInfo = pStatus->pTableIter; } initMemDataIterator(*pBlockScanInfo, pReader); From 09e314c3ae6f087adc798292d6dccaea8b7ce0bf Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 29 Nov 2023 12:26:32 +0800 Subject: [PATCH 13/27] fix: pStatus->pTableIter is set to null when getting mem table time range --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b999607503..e590358267 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2520,18 +2520,15 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { STimeWindow winFid = {0}; tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey); - bool bProcMemPreFileset = false; if (ASCENDING_TRAVERSE(pReader->info.order)) { - bProcMemPreFileset = !(pReader->status.prevFilesetStartKey > pReader->status.memTableMaxKey || winFid.skey < pReader->status.memTableMinKey); + pReader->status.bProcMemPreFileset = !(pReader->status.prevFilesetStartKey > pReader->status.memTableMaxKey || winFid.skey < pReader->status.memTableMinKey); } else { - bProcMemPreFileset = !(winFid.ekey > pReader->status.memTableMaxKey || pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); + pReader->status.bProcMemPreFileset = !(winFid.ekey > pReader->status.memTableMaxKey || pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); } - - if (bProcMemPreFileset) { - pReader->status.bProcMemPreFileset = true; + + if (pReader->status.bProcMemPreFileset) { resetTableListIndex(&pReader->status); } - pReader->status.prevFilesetStartKey = winFid.skey; pReader->status.prevFilesetEndKey = winFid.ekey; } @@ -2978,7 +2975,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en if (!hasNexTable) { return TSDB_CODE_SUCCESS; } - pBlockScanInfo = pStatus->pTableIter; + continue; } initMemDataIterator(*pBlockScanInfo, pReader); @@ -4947,9 +4944,9 @@ static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t int64_t maxKey = INT64_MIN; int64_t minKey = INT64_MAX; - pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter); - while (pStatus->pTableIter != NULL) { - STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; + void* pHashIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter); + while (pHashIter!= NULL) { + STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pHashIter; STbData* d = NULL; if (pReader->pReadSnap->pMem != NULL) { @@ -4978,7 +4975,7 @@ static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t } // current table is exhausted, let's try the next table - pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter); + pHashIter = tSimpleHashIterate(pStatus->pTableMap, pHashIter, &iter); } *pMaxKey = maxKey; From 1a3e280ea878b491d82694b1935a8f6140595f2e Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 29 Nov 2023 13:50:53 +0800 Subject: [PATCH 14/27] fix: use imem maxKey/minKey --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e590358267..0320265f7f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4965,11 +4965,11 @@ static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t if (pReader->pReadSnap->pIMem != NULL) { di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->info.suid, pBlockScanInfo->uid); if (di != NULL) { - if (d->maxKey > maxKey) { - maxKey = d->maxKey; + if (di->maxKey > maxKey) { + maxKey = di->maxKey; } - if (d->minKey < minKey) { - minKey = d->minKey; + if (di->minKey < minKey) { + minKey = di->minKey; } } } From 80ee5bfb3e11cbce9759c390da3e3834b41d3bf7 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 30 Nov 2023 11:08:18 +0800 Subject: [PATCH 15/27] fix: the initial value of first fileset start / end key --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 13 +++++++------ source/libs/executor/src/scanoperator.c | 1 - 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 0320265f7f..e7018638a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2521,9 +2521,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey); if (ASCENDING_TRAVERSE(pReader->info.order)) { - pReader->status.bProcMemPreFileset = !(pReader->status.prevFilesetStartKey > pReader->status.memTableMaxKey || winFid.skey < pReader->status.memTableMinKey); + pReader->status.bProcMemPreFileset = !(pReader->status.prevFilesetStartKey > pReader->status.memTableMaxKey || winFid.skey-1 < pReader->status.memTableMinKey); } else { - pReader->status.bProcMemPreFileset = !(winFid.ekey > pReader->status.memTableMaxKey || pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); + pReader->status.bProcMemPreFileset = !(winFid.ekey+1 > pReader->status.memTableMaxKey || pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); } if (pReader->status.bProcMemPreFileset) { @@ -3911,6 +3911,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; + if (pReader->bDurationOrder) { + getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey); + pReader->status.bProcMemFirstFileset = true; + } + initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); @@ -4344,10 +4349,6 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { } } - if (pReader->bDurationOrder) { - getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey); - } - pReader->flag = READER_STATUS_NORMAL; tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader, pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3d2053c0c0..8af3765f87 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3284,7 +3284,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - uInfo("getBlockForTableMergeScan retrieved one block"); return pBlock; } From 82629c26581708c4c0674604bd9a9570a20f8609 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 1 Dec 2023 15:11:43 +0800 Subject: [PATCH 16/27] fix: notify when there are no memory pre fileset --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 853d0d7374..ecc1351698 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2514,6 +2514,15 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { if (pReader->status.bProcMemPreFileset) { resetTableListIndex(&pReader->status); } + + if (!pReader->status.bProcMemFirstFileset) { + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.fileSetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); + } + } + pReader->status.prevFilesetStartKey = winFid.skey; pReader->status.prevFilesetEndKey = winFid.ekey; } From bff88e0639db40dec27c1bb9512c8e048b7b10d1 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 1 Dec 2023 16:27:36 +0800 Subject: [PATCH 17/27] fix: set duration order api --- include/libs/executor/storageapi.h | 1 + source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tsdb/tsdbRead2.c | 6 +++++- source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index baa5ef8100..84fe62f836 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -183,6 +183,7 @@ typedef struct TsdReader { int64_t (*tsdReaderGetNumOfInMemRows)(); void (*tsdReaderNotifyClosing)(); + void (*tsdSetDurationOrder)(void* pReader); void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); } TsdReader; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index cdf3fb6a2a..876e8767e8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -191,6 +191,7 @@ void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); +void tsdbSetDurationOrder(STsdbReader* pReader); void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param); //====================================================================================================================== diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index ecc1351698..42790a9e7a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -423,7 +423,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } - pReader->bDurationOrder = true; + pReader->bDurationOrder = false; tsdbInitReaderLock(pReader); tsem_init(&pReader->resumeAfterSuspend, 0, 0); @@ -5136,6 +5136,10 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } +void tsdbSetDurationOrder(STsdbReader* pReader) { + pReader->bDurationOrder = true; +} + void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { pReader->notifyFn = notifyFn; pReader->notifyParam = param; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 1729e3ab0b..fee823083c 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -61,6 +61,7 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdSetQueryTableList = tsdbSetTableList2; pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; + pReader->tsdSetDurationOrder = (void (*)(void*))tsdbSetDurationOrder; pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; } From f2b38f5a4a8e47ef55c75b19f5584c339e4f18a1 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 1 Dec 2023 16:51:02 +0800 Subject: [PATCH 18/27] refactor: start/stop duration from start/stop table merge scan --- source/libs/executor/src/scanoperator.c | 90 ++++++++++++++----------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 34f35c3ba4..b88bf1ff56 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3327,9 +3327,56 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); + return; } +int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; + + pInfo->sortBufSize = 2048 * pInfo->bufPageSize; + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); + + tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); + tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); + + STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + param->pOperator = pOperator; + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = param; + ps->onlyRef = false; + tsortAddSource(pInfo->pSortHandle, ps); + + if (numOfTable == 1) { + tsortSetSingleTableMerge(pInfo->pSortHandle); + } else { + code = tsortOpen(pInfo->pSortHandle); + } + return code; +} + +void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + + SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); + pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; + pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; + pInfo->sortExecInfo.loops += sortExecInfo.loops; + pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; + pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; + + tsortDestroySortHandle(pInfo->pSortHandle); + pInfo->pSortHandle = NULL; +} + int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3353,43 +3400,14 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { tSimpleHashClear(pInfo->mTableNumRows); - size_t szRow = blockDataGetRowSize(pInfo->pResBlock); -// if (pInfo->mergeLimit != -1) { -// pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, -// NULL, pTaskInfo->id.str, pInfo->mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024); -// } else - { - pInfo->sortBufSize = 2048 * pInfo->bufPageSize; - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); - - tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); - tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); - } - - tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); - - // one table has one data block int32_t numOfTable = tableEndIdx - tableStartIdx + 1; - - STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); - param->pOperator = pOperator; STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); + pAPI->tsdReader.tsdSetDurationOrder(pInfo->base.dataReader); pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - ps->param = param; - ps->onlyRef = false; - tsortAddSource(pInfo->pSortHandle, ps); - int32_t code = TSDB_CODE_SUCCESS; - if (numOfTable == 1) { - tsortSetSingleTableMerge(pInfo->pSortHandle); - } else { - code = tsortOpen(pInfo->pSortHandle); - } + int32_t code = startDurationForGroupTableMergeScan(pOperator); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); @@ -3403,21 +3421,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); - pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; - pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; - pInfo->sortExecInfo.loops += sortExecInfo.loops; - pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; - pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; + stopDurationForGroupTableMergeScan(pOperator); if (pInfo->base.dataReader != NULL) { pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; } - tsortDestroySortHandle(pInfo->pSortHandle); - pInfo->pSortHandle = NULL; - resetLimitInfoForNextGroup(&pInfo->limitInfo); taosHashCleanup(pInfo->mSkipTables); pInfo->mSkipTables = NULL; From 3f8b2e826fdd775cda9217cf5083aaa4b9ab77d6 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 4 Dec 2023 10:49:40 +0800 Subject: [PATCH 19/27] fix: add new duration processing to table merge scan --- source/libs/executor/inc/executorInt.h | 2 ++ source/libs/executor/src/scanoperator.c | 45 ++++++++++++++++++------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e29583d8fc..a3ec2cca74 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -297,6 +297,8 @@ typedef struct STableMergeScanInfo { SHashObj* mSkipTables; int64_t mergeLimit; SSortExecInfo sortExecInfo; + bool bNewDuration; + SSDataBlock* pNewDurationBlock; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b88bf1ff56..e8213d0597 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3242,6 +3242,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { + if (pInfo->pNewDurationBlock) { + SSDataBlock* pNewDurationBlock = pInfo->pNewDurationBlock; + pInfo->pNewDurationBlock = NULL; + return pNewDurationBlock; + } + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); @@ -3267,7 +3273,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); - // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); @@ -3290,6 +3295,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + if (pInfo->bNewDuration) { + pInfo->pNewDurationBlock = pBlock; + return NULL; + } return pBlock; } @@ -3327,7 +3337,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); - + STableMergeScanInfo* pTmsInfo = param; + pTmsInfo->bNewDuration = true; return; } @@ -3337,6 +3348,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; + pInfo->bNewDuration = false; + pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, @@ -3354,7 +3367,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { ps->param = param; ps->onlyRef = false; tsortAddSource(pInfo->pSortHandle, ps); - + if (numOfTable == 1) { tsortSetSingleTableMerge(pInfo->pSortHandle); } else { @@ -3510,17 +3523,23 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { - // Data of this group are all dumped, let's try the next group - stopGroupTableMergeScan(pOperator); - if (pInfo->tableEndIndex >= tableListSize - 1) { - setOperatorCompleted(pOperator); - break; - } + if (pInfo->bNewDuration) { + stopDurationForGroupTableMergeScan(pOperator); + startDurationForGroupTableMergeScan(pOperator); - pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; - startGroupTableMergeScan(pOperator); - resetLimitInfoForNextGroup(&pInfo->limitInfo); + } else { + // Data of this group are all dumped, let's try the next group + stopGroupTableMergeScan(pOperator); + if (pInfo->tableEndIndex >= tableListSize - 1) { + setOperatorCompleted(pOperator); + break; + } + + pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + startGroupTableMergeScan(pOperator); + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } } } From 07e43fe56b92113504d5ca0746bcc8465d1bc470 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 4 Dec 2023 14:54:15 +0800 Subject: [PATCH 20/27] fix: two intervals are intesected or not --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index ecc1351698..a17d664232 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2506,9 +2506,11 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey); if (ASCENDING_TRAVERSE(pReader->info.order)) { - pReader->status.bProcMemPreFileset = !(pReader->status.prevFilesetStartKey > pReader->status.memTableMaxKey || winFid.skey-1 < pReader->status.memTableMinKey); + pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey || + (winFid.skey-1) < pReader->status.memTableMinKey); } else { - pReader->status.bProcMemPreFileset = !(winFid.ekey+1 > pReader->status.memTableMaxKey || pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); + pReader->status.bProcMemPreFileset = !( pReader->status.memTableMaxKey < (winFid.ekey+1) || + pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); } if (pReader->status.bProcMemPreFileset) { From 8c290587b78249373dc990f51aa3058efe15a856 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 4 Dec 2023 16:19:07 +0800 Subject: [PATCH 21/27] fix: typo error --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 725679f079..ee50526883 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2517,7 +2517,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { resetTableListIndex(&pReader->status); } - if (!pReader->status.bProcMemFirstFileset) { + if (!pReader->status.bProcMemPreFileset) { if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; info.duration.fileSetId = fid; From 538f6fc722a6ca0a9d4be92ea13b8af2ced415c2 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 5 Dec 2023 09:51:14 +0800 Subject: [PATCH 22/27] fix: no block after new duration --- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/scanoperator.c | 49 ++++++++++++------------- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a3ec2cca74..b190cd815c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -298,7 +298,7 @@ typedef struct STableMergeScanInfo { int64_t mergeLimit; SSortExecInfo sortExecInfo; bool bNewDuration; - SSDataBlock* pNewDurationBlock; + bool bOnlyRetrieveBlock; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e8213d0597..bb3cf7b937 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3242,29 +3242,28 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { - if (pInfo->pNewDurationBlock) { - SSDataBlock* pNewDurationBlock = pInfo->pNewDurationBlock; - pInfo->pNewDurationBlock = NULL; - return pNewDurationBlock; - } + if (!pInfo->bOnlyRetrieveBlock) { + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); + if (code != 0) { + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } - code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); - if (code != 0) { - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } + if (!hasNext || isTaskKilled(pTaskInfo)) { + pInfo->bNewDuration = false; + if (isTaskKilled(pTaskInfo)) { + qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + } + break; + } - if (!hasNext) { - break; + if (pInfo->bNewDuration) { + pInfo->bOnlyRetrieveBlock = true; + return NULL; + } } - - if (isTaskKilled(pTaskInfo)) { - qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - break; - } - // process this data block based on the probabilities bool processThisBlock = processBlockWithProbability(&pInfo->sample); if (!processThisBlock) { @@ -3273,6 +3272,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); + if (pInfo->bOnlyRetrieveBlock) { + pInfo->bOnlyRetrieveBlock = false; + } if (code != TSDB_CODE_SUCCESS) { qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); @@ -3295,11 +3297,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - if (pInfo->bNewDuration) { - pInfo->pNewDurationBlock = pBlock; - return NULL; - } + return pBlock; } @@ -3336,7 +3334,6 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* } void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { - uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); STableMergeScanInfo* pTmsInfo = param; pTmsInfo->bNewDuration = true; return; From be6daee280cda58e441fc2e55b315fbaf7146f53 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 5 Dec 2023 11:01:57 +0800 Subject: [PATCH 23/27] feature: switch duration mode randomly --- source/libs/executor/src/scanoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6f2de2fbe2..9a556a4ea1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3423,7 +3423,11 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - pAPI->tsdReader.tsdSetDurationOrder(pInfo->base.dataReader); + int32_t r = taosRand() % 2; + if (r == 1) { + uInfo("zsl: set duration order"); + pAPI->tsdReader.tsdSetDurationOrder(pInfo->base.dataReader); + } pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); int32_t code = startDurationForGroupTableMergeScan(pOperator); From 8ff3f8e25ef0f3d0c414c1fed4e4849b2d44bb26 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 6 Dec 2023 08:48:29 +0800 Subject: [PATCH 24/27] enhance: add duration order change by file /tmp/duration --- source/libs/executor/src/scanoperator.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9a556a4ea1..3fc228503a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#define ALLOW_FORBID_FUNC #include "executorInt.h" #include "filter.h" @@ -3423,10 +3424,17 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - int32_t r = taosRand() % 2; + int r = 1; + FILE* f = fopen("/tmp/duration", "r"); + if (f) { + fscanf(f, "%d", &r); + fclose(f); + } if (r == 1) { - uInfo("zsl: set duration order"); + uInfo("zsl: DURATION ORDER"); pAPI->tsdReader.tsdSetDurationOrder(pInfo->base.dataReader); + } else { + uInfo("zsl: NO DURATION"); } pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); From a2e7c78d27b20583ca5f435a5e429e2d463596c2 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 6 Dec 2023 09:39:38 +0800 Subject: [PATCH 25/27] refactor: rename variables --- include/libs/executor/storageapi.h | 4 ++-- include/libs/nodes/plannodes.h | 1 + source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 22 +++++++++++----------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 +- source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 +- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/scanoperator.c | 12 ++++++------ 8 files changed, 24 insertions(+), 23 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 3f5bd7cf23..32b60b55ae 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -158,7 +158,7 @@ typedef enum { typedef union { struct { - int32_t fileSetId; + int32_t filesetId; } duration; } STsdReaderNotifyInfo; @@ -183,7 +183,7 @@ typedef struct TsdReader { int64_t (*tsdReaderGetNumOfInMemRows)(); void (*tsdReaderNotifyClosing)(); - void (*tsdSetDurationOrder)(void* pReader); + void (*tsdSetFilesetDelimited)(void* pReader); void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); } TsdReader; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e29750d8a0..ce6ea32e2e 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -118,6 +118,7 @@ typedef struct SScanLogicNode { bool igLastNull; bool groupOrderScan; bool onlyMetaCtbIdx; // for tag scan with no tbname + bool filesetDelimited; // returned blocks delimited by fileset } SScanLogicNode; typedef struct SJoinLogicNode { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index bd840c5975..74d62da5c5 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -171,7 +171,7 @@ void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); -void tsdbSetDurationOrder(STsdbReader* pReader); +void tsdbSetFilesetDelimited(STsdbReader* pReader); void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param); int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index c108bbe03e..823e2e1786 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -423,7 +423,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } - pReader->bDurationOrder = false; + pReader->bFilesetDelimited = false; tsdbInitReaderLock(pReader); tsem_init(&pReader->resumeAfterSuspend, 0, 0); @@ -2520,7 +2520,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { if (!pReader->status.bProcMemPreFileset) { if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; - info.duration.fileSetId = fid; + info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); } } @@ -2570,7 +2570,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) { - if (pReader->bDurationOrder) { + if (pReader->bFilesetDelimited) { prepareDurationForNextFileSet(pReader); } break; @@ -3915,7 +3915,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - if (pReader->bDurationOrder) { + if (pReader->bFilesetDelimited) { getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey); pReader->status.bProcMemFirstFileset = true; } @@ -4227,7 +4227,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); pReader->pReadSnap = NULL; - if (pReader->bDurationOrder) { + if (pReader->bFilesetDelimited) { pReader->status.memTableMinKey = INT64_MAX; pReader->status.memTableMaxKey = INT64_MIN; } @@ -4342,7 +4342,7 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { return pBlock->info.rows > 0; } -static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) { +static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; int32_t code = TSDB_CODE_SUCCESS; SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; @@ -4361,7 +4361,7 @@ static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) { pStatus->bProcMemPreFileset = false; if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; - info.duration.fileSetId = fid; + info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); } resetTableListIndex(pStatus); @@ -4425,10 +4425,10 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { return tsdbReadRowsCountOnly(pReader); } - if (!pReader->bDurationOrder) { + if (!pReader->bFilesetDelimited) { code = doTsdbNextDataBlockFilesFirst(pReader); } else { - code = doTsdbNextDataBlockDurationOrder(pReader); + code = doTsdbNextDataBlockFilesetDelimited(pReader); } *hasNext = pBlock->info.rows > 0; @@ -5152,8 +5152,8 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } -void tsdbSetDurationOrder(STsdbReader* pReader) { - pReader->bDurationOrder = true; +void tsdbSetFilesetDelimited(STsdbReader* pReader) { + pReader->bFilesetDelimited = true; } void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index c5904ea03e..f4fa7bc2c8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -230,7 +230,7 @@ struct STsdbReader { SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; - bool bDurationOrder; // duration by duration output + bool bFilesetDelimited; // duration by duration output TsdReaderNotifyCbFn notifyFn; void* notifyParam; }; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index f495b0a3e0..6b7b778cd5 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -61,7 +61,7 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdSetQueryTableList = tsdbSetTableList2; pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; - pReader->tsdSetDurationOrder = (void (*)(void*))tsdbSetDurationOrder; + pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited; pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 4fa1dcdf72..c91701788d 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -297,7 +297,7 @@ typedef struct STableMergeScanInfo { SHashObj* mSkipTables; int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool bNewDuration; + bool bNewFileset; bool bOnlyRetrieveBlock; } STableMergeScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3fc228503a..9911366166 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3261,7 +3261,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } if (!hasNext || isTaskKilled(pTaskInfo)) { - pInfo->bNewDuration = false; + pInfo->bNewFileset = false; if (isTaskKilled(pTaskInfo)) { qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); @@ -3269,7 +3269,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { break; } - if (pInfo->bNewDuration) { + if (pInfo->bNewFileset) { pInfo->bOnlyRetrieveBlock = true; return NULL; } @@ -3345,7 +3345,7 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { STableMergeScanInfo* pTmsInfo = param; - pTmsInfo->bNewDuration = true; + pTmsInfo->bNewFileset = true; return; } @@ -3355,7 +3355,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; - pInfo->bNewDuration = false; + pInfo->bNewFileset = false; pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -3432,7 +3432,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { } if (r == 1) { uInfo("zsl: DURATION ORDER"); - pAPI->tsdReader.tsdSetDurationOrder(pInfo->base.dataReader); + pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); } else { uInfo("zsl: NO DURATION"); } @@ -3541,7 +3541,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { - if (pInfo->bNewDuration) { + if (pInfo->bNewFileset) { stopDurationForGroupTableMergeScan(pOperator); startDurationForGroupTableMergeScan(pOperator); From ac531c6d7f4369213f04f37833db6de68aebfa0c Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 6 Dec 2023 10:45:12 +0800 Subject: [PATCH 26/27] enhance: add fileset delimited front end --- include/libs/nodes/plannodes.h | 1 + source/libs/executor/inc/executorInt.h | 2 ++ source/libs/executor/src/scanoperator.c | 19 +++++++------------ source/libs/nodes/src/nodesCloneFuncs.c | 2 ++ source/libs/nodes/src/nodesCodeFuncs.c | 15 ++++++++++++++- source/libs/nodes/src/nodesMsgFuncs.c | 7 ++++++- source/libs/planner/src/planOptimizer.c | 1 + source/libs/planner/src/planPhysiCreater.c | 1 + source/libs/planner/src/planSpliter.c | 2 ++ source/libs/planner/src/planUtil.c | 1 + 10 files changed, 37 insertions(+), 14 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ce6ea32e2e..b99a97a194 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -433,6 +433,7 @@ typedef struct STableScanPhysiNode { int8_t igExpired; bool assignBlockUid; int8_t igCheckUpdate; + bool filesetDelimited; } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c91701788d..cba26c46b5 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -270,6 +270,7 @@ typedef struct STableScanInfo { bool hasGroupByTag; bool countOnly; // TsdReader readerAPI; + bool filesetDelimited; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -299,6 +300,7 @@ typedef struct STableMergeScanInfo { SSortExecInfo sortExecInfo; bool bNewFileset; bool bOnlyRetrieveBlock; + bool filesetDelimited; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9911366166..7d06f5dab6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -894,7 +894,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } - + if (pInfo->filesetDelimited) { + pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); + } if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; } @@ -1086,6 +1088,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->countOnly = true; } + pInfo->filesetDelimited = pTableScanNode->filesetDelimited; + taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL); @@ -3424,17 +3428,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - int r = 1; - FILE* f = fopen("/tmp/duration", "r"); - if (f) { - fscanf(f, "%d", &r); - fclose(f); - } - if (r == 1) { - uInfo("zsl: DURATION ORDER"); + if (pInfo->filesetDelimited) { pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); - } else { - uInfo("zsl: NO DURATION"); } pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); @@ -3544,7 +3539,6 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { if (pInfo->bNewFileset) { stopDurationForGroupTableMergeScan(pOperator); startDurationForGroupTableMergeScan(pOperator); - } else { // Data of this group are all dumped, let's try the next group stopGroupTableMergeScan(pOperator); @@ -3683,6 +3677,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); + pInfo->filesetDelimited = pTableScanNode->filesetDelimited; setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index ce23928268..97438b84a6 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -423,6 +423,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(igLastNull); COPY_SCALAR_FIELD(groupOrderScan); COPY_SCALAR_FIELD(onlyMetaCtbIdx); + COPY_SCALAR_FIELD(filesetDelimited); return TSDB_CODE_SUCCESS; } @@ -650,6 +651,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(igExpired); + COPY_SCALAR_FIELD(filesetDelimited); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index f3087dd5d4..d26cdcf401 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -677,6 +677,7 @@ static const char* jkScanLogicPlanDataRequired = "DataRequired"; static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanGroupTags = "GroupTags"; static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; +static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -721,6 +722,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkScanLogicPlanOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited); + } return code; } @@ -768,7 +772,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkScanLogicPlanOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited); + } return code; } @@ -1830,6 +1836,7 @@ static const char* jkTableScanPhysiPlanTags = "Tags"; static const char* jkTableScanPhysiPlanSubtable = "Subtable"; static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; +static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1898,6 +1905,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanFilesetDelimited, pNode->filesetDelimited); + } return code; } @@ -1969,6 +1979,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanFilesetDelimited, &pNode->filesetDelimited); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 9804f2075b..d6eb3360aa 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2167,7 +2167,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate); } - + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueBool(pEncoder, pNode->filesetDelimited); + } return code; } @@ -2246,6 +2248,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj) if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueBool(pDecoder, &pNode->filesetDelimited); + } return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e71e18d37d..aa3181e166 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1400,6 +1400,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS pScan->node.outputTsOrder = order; if (TSDB_SUPER_TABLE == pScan->tableType) { pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->filesetDelimited = true; pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 598bce3133..d1fbd0681d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -622,6 +622,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pTableScan->igExpired = pScanLogicNode->igExpired; pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; + pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited; int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 43bd8a5589..9b9f701e2b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -692,6 +692,7 @@ static void stbSplSetTableMergeScan(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->filesetDelimited = true; if (NULL != pScan->pGroupTags) { pScan->groupSort = true; } @@ -1241,6 +1242,7 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu SNodeList* pMergeKeys = NULL; if (TSDB_CODE_SUCCESS == code) { pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; + pMergeScan->filesetDelimited = true; pMergeScan->node.pChildren = pChildren; splSetParent((SLogicNode*)pMergeScan); code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 1c7c937b7f..2da270e42d 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -164,6 +164,7 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel pScan->scanType = SCAN_TYPE_TABLE; } else if (TSDB_SUPER_TABLE == pScan->tableType) { pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->filesetDelimited = true; } if (TSDB_NORMAL_TABLE != pScan->tableType && TSDB_CHILD_TABLE != pScan->tableType) { From 1ba1fbfabc9403f9ce5d7b111b99689227cd21e2 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 7 Dec 2023 10:45:26 +0800 Subject: [PATCH 27/27] fix: remove allow_forbid_func define --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7d06f5dab6..a92770b1f9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#define ALLOW_FORBID_FUNC #include "executorInt.h" #include "filter.h" @@ -3228,6 +3227,7 @@ static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); } else { *(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows; + nRows = *(int64_t*)pNum; } if (nRows >= pInfo->mergeLimit) {