diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 204cd7bfc7..25c6ef18a9 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -46,6 +46,7 @@ target_sources( "src/tsdb/tsdbReaderWriter.c" "src/tsdb/tsdbUtil.c" "src/tsdb/tsdbSnapshot.c" + "src/tsdb/tsdbCacheRead.c" # tq "src/tq/tq.c" diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4332b23477..392966bb78 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -285,6 +285,12 @@ typedef struct STagScanInfo { SNode* pFilterNode; // filter info, } STagScanInfo; +typedef struct SLastrowScanInfo { + SSDataBlock *pRes; + STableListInfo *pTableList; + SReadHandle readHandle; +} SLastrowScanInfo; + typedef enum EStreamScanMode { STREAM_SCAN_FROM_READERHANDLE = 1, STREAM_SCAN_FROM_RES, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fc7e2b3a6a..095282f188 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2343,3 +2343,68 @@ _error: taosMemoryFree(pOperator); return NULL; } + +static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SLastrowScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList); + if (size == 0) { + setTaskStatus(pTaskInfo, TASK_COMPLETED); + return NULL; + } + + // check if it is a group by tbname + if (size == taosHashGetSize(pInfo->pTableList->map)) { + // fetch last row for each table + } else { + //todo fetch the result for each group + + } + + return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes; +} + +static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { + SLastrowScanInfo* pInfo = (SLastrowScanInfo*) param; + blockDataDestroy(pInfo->pRes); +} + +SOperatorInfo* createLastrowScanOperator(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + STableListInfo* pTableList, SExecTaskInfo* pTaskInfo) { + + SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->pTableList = pTableList; + pInfo->readHandle = *readHandle; +// pInfo->pRes = createResDataBlock(); + + pOperator->name = "LastrowScanOperator"; +// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + initResultSizeInfo(pOperator, 1024); + + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, + NULL, NULL, NULL); + pOperator->cost.openCost = 0; + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +}