feature(query):add lastrow scan operator.
This commit is contained in:
parent
a55f40a1a7
commit
d8c778e468
|
@ -46,6 +46,7 @@ target_sources(
|
||||||
"src/tsdb/tsdbReaderWriter.c"
|
"src/tsdb/tsdbReaderWriter.c"
|
||||||
"src/tsdb/tsdbUtil.c"
|
"src/tsdb/tsdbUtil.c"
|
||||||
"src/tsdb/tsdbSnapshot.c"
|
"src/tsdb/tsdbSnapshot.c"
|
||||||
|
"src/tsdb/tsdbCacheRead.c"
|
||||||
|
|
||||||
# tq
|
# tq
|
||||||
"src/tq/tq.c"
|
"src/tq/tq.c"
|
||||||
|
|
|
@ -285,6 +285,12 @@ typedef struct STagScanInfo {
|
||||||
SNode* pFilterNode; // filter info,
|
SNode* pFilterNode; // filter info,
|
||||||
} STagScanInfo;
|
} STagScanInfo;
|
||||||
|
|
||||||
|
typedef struct SLastrowScanInfo {
|
||||||
|
SSDataBlock *pRes;
|
||||||
|
STableListInfo *pTableList;
|
||||||
|
SReadHandle readHandle;
|
||||||
|
} SLastrowScanInfo;
|
||||||
|
|
||||||
typedef enum EStreamScanMode {
|
typedef enum EStreamScanMode {
|
||||||
STREAM_SCAN_FROM_READERHANDLE = 1,
|
STREAM_SCAN_FROM_READERHANDLE = 1,
|
||||||
STREAM_SCAN_FROM_RES,
|
STREAM_SCAN_FROM_RES,
|
||||||
|
|
|
@ -2343,3 +2343,68 @@ _error:
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SLastrowScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
|
||||||
|
if (size == 0) {
|
||||||
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if it is a group by tbname
|
||||||
|
if (size == taosHashGetSize(pInfo->pTableList->map)) {
|
||||||
|
// fetch last row for each table
|
||||||
|
} else {
|
||||||
|
//todo fetch the result for each group
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
|
||||||
|
SLastrowScanInfo* pInfo = (SLastrowScanInfo*) param;
|
||||||
|
blockDataDestroy(pInfo->pRes);
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createLastrowScanOperator(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||||
|
STableListInfo* pTableList, SExecTaskInfo* pTaskInfo) {
|
||||||
|
|
||||||
|
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pTableList = pTableList;
|
||||||
|
pInfo->readHandle = *readHandle;
|
||||||
|
// pInfo->pRes = createResDataBlock();
|
||||||
|
|
||||||
|
pOperator->name = "LastrowScanOperator";
|
||||||
|
// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
|
||||||
|
pOperator->blocking = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
|
||||||
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
pOperator->cost.openCost = 0;
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue