refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-03-19 19:22:43 +08:00
parent 7e712a14b6
commit c678740968
3 changed files with 23 additions and 0 deletions

View File

@ -180,6 +180,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr); SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);

View File

@ -4990,3 +4990,9 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti
} }
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
} }
// if failed, do nothing
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr);
}

View File

@ -159,12 +159,28 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
void doSetTaskId(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pStreamScanInfo = pOperator->info;
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
if (pScanInfo->base.dataReader != NULL) {
tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str);
}
} else {
doSetTaskId(pOperator->pDownstream[0]);
}
}
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
SExecTaskInfo* pTaskInfo = tinfo; SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo->id.str);
pTaskInfo->id.str = buildTaskId(taskId, queryId); pTaskInfo->id.str = buildTaskId(taskId, queryId);
// set the idstr for tsdbReader
doSetTaskId(pTaskInfo->pRoot);
} }
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {