save work in case it losts
This commit is contained in:
parent
662187916e
commit
c2a7404b42
|
@ -108,6 +108,8 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
|||
int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid);
|
||||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
|
||||
int64_t metaGetTbNum(SMeta* pMeta);
|
||||
|
||||
typedef struct {
|
||||
int64_t uid;
|
||||
|
|
|
@ -116,8 +116,6 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
|
|||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||
int metaAlterCache(SMeta* pMeta, int32_t nPage);
|
||||
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
|
||||
int64_t metaGetTbNum(SMeta* pMeta);
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
|
||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
|
||||
|
|
|
@ -530,16 +530,32 @@ typedef struct SSysTableScanInfo {
|
|||
SLoadRemoteDataInfo loadInfo;
|
||||
} SSysTableScanInfo;
|
||||
|
||||
typedef struct STableCountScanInfo {
|
||||
SReadHandle readHandle;
|
||||
SSDataBlock* pRes;
|
||||
SExprSupp pseudoSup;
|
||||
SNode* pCondition;
|
||||
SName name;
|
||||
uint64_t suid;
|
||||
uint64_t uid;
|
||||
int8_t tableType;
|
||||
} STableCountScanInfo;
|
||||
typedef struct STableCountScanSupp {
|
||||
int16_t dbNameSlotId;
|
||||
int16_t stbNameSlotId;
|
||||
int16_t tbCountSlotId;
|
||||
|
||||
bool groupByDbName;
|
||||
bool groupByStbName;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
} STableCountScanSupp;
|
||||
|
||||
typedef struct STableCountScanOperatorInfo {
|
||||
SReadHandle readHandle;
|
||||
SSDataBlock* pRes;
|
||||
|
||||
SName tableName;
|
||||
|
||||
SNodeList* groupTags;
|
||||
SNodeList* scanCols;
|
||||
SNodeList* pseudoCols;
|
||||
|
||||
STableCountScanSupp supp;
|
||||
|
||||
int32_t currGrpIdx;
|
||||
} STableCountScanOperatorInfo;
|
||||
|
||||
typedef struct SBlockDistInfo {
|
||||
SSDataBlock* pResBlock;
|
||||
|
|
|
@ -31,7 +31,6 @@
|
|||
#include "tcompare.h"
|
||||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
#include "vnode.h"
|
||||
|
||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
@ -233,8 +232,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
|
|||
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf,
|
||||
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
|
||||
buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||
|
||||
if (p1 == NULL) {
|
||||
return NULL;
|
||||
|
@ -376,7 +375,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
|||
|
||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
|
||||
uint32_t* status) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||
|
||||
pCost->totalBlocks += 1;
|
||||
|
@ -4428,7 +4427,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
|
||||
uint32_t status = 0;
|
||||
loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
|
||||
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -4758,7 +4757,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
goto _error;
|
||||
}
|
||||
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
|
@ -4792,15 +4790,106 @@ _error:
|
|||
// ====================================================================================================================
|
||||
// TableCountScanOperator
|
||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
|
||||
static void destoryTableCountScanOperator(void* param);
|
||||
static void destoryTableCountScanOperator(void* param);
|
||||
static const char* GROUP_TAG_DB_NAME = "db_name";
|
||||
static const char* GROUP_TAG_STABLE_NAME = "stable_name";
|
||||
|
||||
int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
|
||||
if (scanCols != NULL) {
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, scanCols) {
|
||||
if (nodeType(pNode) != QUERY_NODE_TARGET) {
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
STargetNode* targetNode = (STargetNode*)pNode;
|
||||
if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
|
||||
if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
|
||||
supp->dbNameSlotId = targetNode->slotId;
|
||||
} else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
|
||||
supp->stbNameSlotId = targetNode->slotId;
|
||||
}
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
|
||||
if (pseudoCols != NULL) {
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pseudoCols) {
|
||||
if (nodeType(pNode) != QUERY_NODE_TARGET) {
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
STargetNode* targetNode = (STargetNode*)pNode;
|
||||
if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
|
||||
if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
|
||||
supp->tbCountSlotId = targetNode->slotId;
|
||||
}
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
|
||||
if (groupTags != NULL) {
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, groupTags) {
|
||||
if (nodeType(pNode) != QUERY_NODE_COLUMN) {
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
SColumnNode* colNode = (SColumnNode*)pNode;
|
||||
if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
|
||||
supp->groupByDbName = true;
|
||||
}
|
||||
if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
|
||||
supp->groupByStbName = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
strncpy(supp->dbName, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
|
||||
strncpy(supp->stbName, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
|
||||
STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
|
||||
int32_t code = 0;
|
||||
code = tblCountScanGetInputs(groupTags, tableName, supp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
|
||||
return code;
|
||||
}
|
||||
supp->dbNameSlotId = -1;
|
||||
supp->stbNameSlotId = -1;
|
||||
supp->tbCountSlotId = -1;
|
||||
|
||||
code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
|
||||
return code;
|
||||
}
|
||||
code = tblCountScanGetCountSlotId(pseudoCols, supp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
|
||||
return code;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SScanPhysiNode* pScanNode = &pTblCountScanNode->scan;
|
||||
STableCountScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SScanPhysiNode* pScanNode = &pTblCountScanNode->scan;
|
||||
STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanSupp));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
if (!pInfo || !pOperator) {
|
||||
goto _error;
|
||||
|
@ -4808,26 +4897,19 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
|
|||
|
||||
pInfo->readHandle = *readHandle;
|
||||
|
||||
if (pScanNode->pScanPseudoCols != NULL) {
|
||||
SExprSupp* pSup = &pInfo->pseudoSup;
|
||||
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
|
||||
}
|
||||
|
||||
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1);
|
||||
pInfo->pRes = createResDataBlock(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
tNameAssign(&pInfo->name, &pScanNode->tableName);
|
||||
|
||||
pInfo->suid = pScanNode->suid; // 0 for super table, super table uid for child table
|
||||
pInfo->uid = pScanNode->uid; // super table uid, or child table uid
|
||||
pInfo->tableType = pScanNode->tableType;
|
||||
getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
|
||||
pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
|
||||
pTaskInfo);
|
||||
|
||||
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
@ -4840,32 +4922,99 @@ _error:
|
|||
}
|
||||
|
||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableCountScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableCountScanOperatorInfo* pInfo = pOperator->info;
|
||||
STableCountScanSupp* pSupp = &pInfo->supp;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
// compute group id must, but output is according to scancols. output group by group
|
||||
// grouptags high priority(groupid<=>grouptag), then tablename(dbname,tableName).
|
||||
// scanCols, (grouptags cols)
|
||||
{
|
||||
// mnode, query table count of information_schema and performance_schema
|
||||
if (pInfo->readHandle.mnd != NULL) {
|
||||
if (pSupp->groupByDbName) {
|
||||
if (pInfo->currGrpIdx == 0) {
|
||||
uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
|
||||
size_t infodbTableNum;
|
||||
getInfosDbMeta(NULL, &infodbTableNum);
|
||||
pRes->info.groupId = groupId;
|
||||
if (pSupp->dbNameSlotId != -1) {
|
||||
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
|
||||
//colDataAppend(colInfoData, 0, )
|
||||
}
|
||||
return NULL;
|
||||
} else if (pInfo->currGrpIdx == 1) {
|
||||
uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
|
||||
size_t perfdbTableNum;
|
||||
getPerfDbMeta(NULL, &perfdbTableNum);
|
||||
} else if (pInfo->currGrpIdx >= 2) {
|
||||
return NULL;
|
||||
}
|
||||
uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
|
||||
size_t infodbTableNum;
|
||||
getInfosDbMeta(NULL, &infodbTableNum);
|
||||
size_t perfdbTableNum;
|
||||
getPerfDbMeta(NULL, &perfdbTableNum);
|
||||
// grouptags, db name
|
||||
} else {
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
{
|
||||
// get dbname
|
||||
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, 0);
|
||||
SName sn = {0};
|
||||
char varDbName[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||
|
||||
SExprSupp* pSup = &pInfo->pseudoSup;
|
||||
if (pSup->numOfExprs != 1 || pSup->pExprInfo[0].pExpr->nodeType != QUERY_NODE_FUNCTION ||
|
||||
pSup->pExprInfo[0].pExpr->_function.functionType != FUNCTION_TYPE_TABLE_COUNT ) {
|
||||
qError("%s table count scan operator invalid pseduo columns", GET_TASKID(pTaskInfo));
|
||||
tNameGetDbName(&sn, varDataVal(varDbName));
|
||||
varDataSetLen(varDbName, strlen(varDataVal(varDbName)));
|
||||
}
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, 0);
|
||||
if (pInfo->uid != 0) {
|
||||
const char* tableName = tNameGetTableName(&pInfo->tableName);
|
||||
|
||||
{
|
||||
// grouptags only have column db_name or (no grouptags and tablename is null)
|
||||
// if (tableName == NULL | strlen(tableName) == 0)
|
||||
metaGetTbNum(pInfo->readHandle.meta);
|
||||
}
|
||||
{// no grouptags and TableName is not null, return child table count
|
||||
{tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, tableName);
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
}
|
||||
}
|
||||
{
|
||||
// grouptags have column stable_name. return (stable name, child table count)
|
||||
SArray* stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
|
||||
vnodeGetStbIdList(pInfo->readHandle.vnode, 0, stbUidList);
|
||||
if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, stbUidList) < 0) {
|
||||
qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
|
||||
taosArrayDestroy(stbUidList);
|
||||
// return failure
|
||||
}
|
||||
for (int i = 0; i < taosArrayGetSize(stbUidList); ++i) {
|
||||
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(stbUidList, i);
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pInfo->readHandle.meta, pInfo->uid, &stats);
|
||||
metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
//TODO: wxy about the return type bigint or int?
|
||||
colDataAppend(pColInfoData, 0, (char*)&ctbNum, false);
|
||||
pInfo->pRes->info.rows = 1;
|
||||
} else {
|
||||
//TODO: get table count in this vnode?
|
||||
|
||||
char varStbName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
metaGetTableNameByUid(pInfo->readHandle.meta, stbUid, varStbName);
|
||||
}
|
||||
return NULL;
|
||||
taosArrayDestroy(stbUidList);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void destoryTableCountScanOperator(void* param) {
|
||||
STableCountScanInfo* pTableCountScanInfo = param;
|
||||
STableCountScanOperatorInfo* pTableCountScanInfo = param;
|
||||
blockDataDestroy(pTableCountScanInfo->pRes);
|
||||
|
||||
cleanupExprSupp(&pTableCountScanInfo->pseudoSup);
|
||||
nodesDestroyList(pTableCountScanInfo->groupTags);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue