feat: table merge scan save work
This commit is contained in:
parent
bc180e39c9
commit
7f6ff39648
|
@ -163,6 +163,7 @@ typedef struct tExprNode {
|
|||
int32_t functionId;
|
||||
int32_t num;
|
||||
struct SFunctionNode *pFunctNode;
|
||||
int32_t functionType;
|
||||
} _function;
|
||||
|
||||
struct {
|
||||
|
|
|
@ -109,6 +109,12 @@ int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid);
|
|||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
|
||||
|
||||
typedef struct {
|
||||
int64_t uid;
|
||||
int64_t ctbNum;
|
||||
} SMetaStbStats;
|
||||
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
||||
|
||||
typedef struct SMetaFltParam {
|
||||
tb_uid_t suid;
|
||||
int16_t cid;
|
||||
|
|
|
@ -144,12 +144,6 @@ typedef struct SMetaInfo {
|
|||
} SMetaInfo;
|
||||
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
||||
|
||||
typedef struct {
|
||||
int64_t uid;
|
||||
int64_t ctbNum;
|
||||
} SMetaStbStats;
|
||||
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
||||
|
||||
// tsdb
|
||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
|
||||
int tsdbClose(STsdb** pTsdb);
|
||||
|
|
|
@ -530,6 +530,17 @@ typedef struct SSysTableScanInfo {
|
|||
SLoadRemoteDataInfo loadInfo;
|
||||
} SSysTableScanInfo;
|
||||
|
||||
typedef struct STableCountScanInfo {
|
||||
SReadHandle readHandle;
|
||||
SSDataBlock* pRes;
|
||||
SExprSupp pseudoSup;
|
||||
SNode* pCondition;
|
||||
SName name;
|
||||
uint64_t suid;
|
||||
uint64_t uid;
|
||||
int8_t tableType;
|
||||
} STableCountScanInfo;
|
||||
|
||||
typedef struct SBlockDistInfo {
|
||||
SSDataBlock* pResBlock;
|
||||
STsdbReader* pHandle;
|
||||
|
|
|
@ -1283,6 +1283,7 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
|
||||
pExprNode->_function.functionId = pFuncNode->funcId;
|
||||
pExprNode->_function.pFunctNode = pFuncNode;
|
||||
pExprNode->_function.functionType = pFuncNode->funcType;
|
||||
|
||||
tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
|
||||
|
||||
|
|
|
@ -2543,6 +2543,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
|||
|
||||
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
||||
|
||||
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pHandle->meta, 0);
|
||||
|
@ -2733,6 +2735,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
|
||||
STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
|
||||
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
||||
|
||||
|
|
|
@ -4788,3 +4788,83 @@ _error:
|
|||
taosMemoryFree(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// ====================================================================================================================
|
||||
// TableCountScanOperator
|
||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
|
||||
static void destoryTableCountScanOperator(void* param);
|
||||
|
||||
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pScanNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
STableCountScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
if (!pInfo || !pOperator) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->readHandle = *readHandle;
|
||||
|
||||
if (pScanNode->pScanPseudoCols != NULL) {
|
||||
SExprSupp* pSup = &pInfo->pseudoSup;
|
||||
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
|
||||
}
|
||||
|
||||
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1);
|
||||
pInfo->pRes = createResDataBlock(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
tNameAssign(&pInfo->name, &pScanNode->tableName);
|
||||
|
||||
pInfo->suid = pScanNode->suid; // 0 for super table, super table uid for child table
|
||||
pInfo->uid = pScanNode->uid; // super table uid, or child table uid
|
||||
pInfo->tableType = pScanNode->tableType;
|
||||
|
||||
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
destoryTableCountScanOperator(pInfo);
|
||||
}
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableCountScanInfo* pInfo = pOperator->info;
|
||||
|
||||
SExprSupp* pSup = &pInfo->pseudoSup;
|
||||
if (pSup->numOfExprs != 1 || pSup->pExprInfo[0].pExpr->nodeType != QUERY_NODE_FUNCTION ||
|
||||
pSup->pExprInfo[0].pExpr->_function.functionType != FUNCTION_TYPE_TABLE_COUNT ) {
|
||||
qError("%s table count scan operator invalid pseduo columns", GET_TASKID(pTaskInfo));
|
||||
}
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, 0);
|
||||
if (pInfo->uid != 0) {
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pInfo->readHandle.meta, pInfo->uid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
//TODO: wxy about the return type bigint or int?
|
||||
colDataAppend(pColInfoData, 0, (char*)&ctbNum, false);
|
||||
pInfo->pRes->info.rows = 1;
|
||||
} else {
|
||||
//TODO: get table count in this vnode?
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void destoryTableCountScanOperator(void* param) {
|
||||
STableCountScanInfo* pTableCountScanInfo = param;
|
||||
blockDataDestroy(pTableCountScanInfo->pRes);
|
||||
|
||||
cleanupExprSupp(&pTableCountScanInfo->pseudoSup);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue