enhance: duration notification

This commit is contained in:
shenglian zhou 2023-11-22 13:56:16 +08:00
parent c4a1398050
commit 9dd306e1ef
6 changed files with 39 additions and 2 deletions

View File

@ -165,6 +165,18 @@ typedef struct {
// clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/
typedef enum {
TSD_READER_NOTIFY_DURATION
} ETsdReaderNotifyType;
typedef union {
struct {
int32_t fileSetId;
} duration;
} STsdReaderNotifyInfo;
typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param);
typedef struct TsdReader {
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
@ -183,6 +195,8 @@ typedef struct TsdReader {
int32_t (*tsdReaderGetDataBlockDistInfo)();
int64_t (*tsdReaderGetNumOfInMemRows)();
void (*tsdReaderNotifyClosing)();
void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
} TsdReader;
typedef struct SStoreCacheReader {

View File

@ -191,6 +191,7 @@ void *tsdbGetIvtIdx2(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
//======================================================================================================================
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);

View File

@ -241,6 +241,11 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey,
pReader->info.window.ekey, pReader->idStr);
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.fileSetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam);
}
*hasNext = true;
return TSDB_CODE_SUCCESS;
}
@ -4055,6 +4060,8 @@ _err:
return code;
}
void tsdbReaderSetNotifyFn(STsdbReader* pReader, )
void tsdbReaderClose2(STsdbReader* pReader) {
if (pReader == NULL) {
return;
@ -5025,3 +5032,8 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
}
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
pReader->notifyFn = notifyFn;
pReader->notifyParam = param;
}

View File

@ -22,6 +22,7 @@ extern "C" {
#include "tsdbDataFileRW.h"
#include "tsdbUtil2.h"
#include "storageapi.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
@ -227,6 +228,8 @@ struct STsdbReader {
SBlockInfoBuf blockInfoBuf;
EContentData step;
STsdbReader* innerReader[2];
TsdReaderNotifyCbFn notifyFn;
void* notifyParam;
};
typedef struct SBrinRecordIter {

View File

@ -60,6 +60,8 @@ void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdSetQueryTableList = tsdbSetTableList2;
pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2;
pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb;
}
void initMetadataAPI(SStoreMeta* pMeta) {

View File

@ -3284,7 +3284,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
uInfo("getBlockForTableMergeScan retrieved one block");
return pBlock;
}
@ -3320,6 +3320,11 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
return 0;
}
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId);
return;
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3368,7 +3373,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables);
pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = param;
ps->onlyRef = false;