From 9dd306e1ef80ec21863d0ef7c4e06c2899a346e6 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Nov 2023 13:56:16 +0800 Subject: [PATCH] enhance: duration notification --- include/libs/executor/storageapi.h | 14 ++++++++++++++ source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tsdb/tsdbRead2.c | 12 ++++++++++++ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 +++ source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 ++ source/libs/executor/src/scanoperator.c | 9 +++++++-- 6 files changed, 39 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index d5f1da957d..b45103f94a 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -165,6 +165,18 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ +typedef enum { + TSD_READER_NOTIFY_DURATION +} ETsdReaderNotifyType; + +typedef union { + struct { + int32_t fileSetId; + } duration; +} STsdReaderNotifyInfo; + +typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param); + typedef struct TsdReader { int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, @@ -183,6 +195,8 @@ typedef struct TsdReader { int32_t (*tsdReaderGetDataBlockDistInfo)(); int64_t (*tsdReaderGetNumOfInMemRows)(); void (*tsdReaderNotifyClosing)(); + + void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); } TsdReader; typedef struct SStoreCacheReader { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6a0c991be4..db01f7d995 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -191,6 +191,7 @@ void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); +void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param); //====================================================================================================================== int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6169014d9f..2412639722 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -241,6 +241,11 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.fileSetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); + } *hasNext = true; return TSDB_CODE_SUCCESS; } @@ -4055,6 +4060,8 @@ _err: return code; } +void tsdbReaderSetNotifyFn(STsdbReader* pReader, ) + void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; @@ -5025,3 +5032,8 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } + +void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { + pReader->notifyFn = notifyFn; + pReader->notifyParam = param; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 60e6e6960a..fb32190115 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -22,6 +22,7 @@ extern "C" { #include "tsdbDataFileRW.h" #include "tsdbUtil2.h" +#include "storageapi.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -227,6 +228,8 @@ struct STsdbReader { SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; + TsdReaderNotifyCbFn notifyFn; + void* notifyParam; }; typedef struct SBrinRecordIter { diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index a6673917bf..1729e3ab0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -60,6 +60,8 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdSetQueryTableList = tsdbSetTableList2; pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; + + pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; } void initMetadataAPI(SStoreMeta* pMeta) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 28832ffec8..3d2053c0c0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3284,7 +3284,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + uInfo("getBlockForTableMergeScan retrieved one block"); return pBlock; } @@ -3320,6 +3320,11 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* return 0; } +void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { + uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); + return; +} + int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3368,7 +3373,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - + pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = param; ps->onlyRef = false;