From c678740968059e296931ee2a3363ea91a7b27f7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 19 Mar 2023 19:22:43 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tsdb/tsdbRead.c | 6 ++++++ source/libs/executor/src/executor.c | 16 ++++++++++++++++ 3 files changed, 23 insertions(+) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 906d16ce77..29d1d0aafa 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -180,6 +180,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr); +void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c939c11536..38ffa5023a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4990,3 +4990,9 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } + +// if failed, do nothing +void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { + taosMemoryFreeClear(pReader->idStr); + pReader->idStr = taosStrdup(idstr); +} \ No newline at end of file diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7a6e737d48..86fcd319c5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -159,12 +159,28 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +void doSetTaskId(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pStreamScanInfo = pOperator->info; + STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info; + if (pScanInfo->base.dataReader != NULL) { + tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str); + } + } else { + doSetTaskId(pOperator->pDownstream[0]); + } +} + void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { SExecTaskInfo* pTaskInfo = tinfo; pTaskInfo->id.queryId = queryId; taosMemoryFreeClear(pTaskInfo->id.str); pTaskInfo->id.str = buildTaskId(taskId, queryId); + + // set the idstr for tsdbReader + doSetTaskId(pTaskInfo->pRoot); } int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {