add quick cmd
This commit is contained in:
parent
eddc7918d3
commit
620c7627e9
|
@ -1749,17 +1749,18 @@ typedef struct {
|
|||
} SVnodeLoad;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int64_t numOfTables;
|
||||
int64_t memSize;
|
||||
int64_t l1Size;
|
||||
int64_t l2Size;
|
||||
int64_t l3Size;
|
||||
int64_t cacheSize;
|
||||
int64_t walSize;
|
||||
int64_t metaSize;
|
||||
int64_t rawDataSize;
|
||||
int64_t s3Size;
|
||||
int32_t vgId;
|
||||
int64_t numOfTables;
|
||||
int64_t memSize;
|
||||
int64_t l1Size;
|
||||
int64_t l2Size;
|
||||
int64_t l3Size;
|
||||
int64_t cacheSize;
|
||||
int64_t walSize;
|
||||
int64_t metaSize;
|
||||
int64_t rawDataSize;
|
||||
int64_t s3Size;
|
||||
const char* dbname;
|
||||
} SDbSizeStatisInfo;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -29,10 +29,9 @@
|
|||
#include "storageapi.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
|
||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
|
||||
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
|
||||
__optr_explain_fn_t explain, __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
|
||||
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain,
|
||||
__optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
|
||||
SOperatorFpSet fpSet = {
|
||||
._openFn = openFn,
|
||||
.getNextFn = nextFn,
|
||||
|
@ -133,7 +132,7 @@ void releaseQueryBuf(size_t numOfTables) {
|
|||
int64_t t = getQuerySupportBufSize(numOfTables);
|
||||
|
||||
// restore value is not enough buffer available
|
||||
(void) atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
|
||||
(void)atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
|
||||
}
|
||||
|
||||
typedef enum {
|
||||
|
@ -148,7 +147,7 @@ typedef struct STraverParam {
|
|||
} STraverParam;
|
||||
|
||||
// iterate the operator tree helper
|
||||
typedef ERetType (*optr_fn_t)(SOperatorInfo *pOperator, STraverParam *pParam, const char* pIdstr);
|
||||
typedef ERetType (*optr_fn_t)(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdstr);
|
||||
|
||||
void traverseOperatorTree(SOperatorInfo* pOperator, optr_fn_t fn, STraverParam* pParam, const char* id) {
|
||||
if (pOperator == NULL) {
|
||||
|
@ -202,30 +201,30 @@ typedef struct SExtScanInfo {
|
|||
} SExtScanInfo;
|
||||
|
||||
static ERetType extractScanInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
int32_t type = pOperator->operatorType;
|
||||
SExtScanInfo* pInfo = pParam->pParam;
|
||||
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->scanFlag= MAIN_SCAN;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
return OPTR_FN_RET_ABORT;
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
|
||||
if (!pInfo->inheritUsOrder) {
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
}
|
||||
pInfo->scanFlag= MAIN_SCAN;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
return OPTR_FN_RET_ABORT;
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
pInfo->order = pTableScanInfo->base.cond.order;
|
||||
pInfo->scanFlag= pTableScanInfo->base.scanFlag;
|
||||
pInfo->scanFlag = pTableScanInfo->base.scanFlag;
|
||||
return OPTR_FN_RET_ABORT;
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
|
||||
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||
pInfo->order = pTableScanInfo->base.cond.order;
|
||||
pInfo->scanFlag= pTableScanInfo->base.scanFlag;
|
||||
pInfo->scanFlag = pTableScanInfo->base.scanFlag;
|
||||
return OPTR_FN_RET_ABORT;
|
||||
} else {
|
||||
return OPTR_FN_RET_CONTINUE;
|
||||
|
@ -281,14 +280,13 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SSto
|
|||
}
|
||||
|
||||
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
|
||||
SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo) {
|
||||
SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo) {
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t type = nodeType(pPhyNode);
|
||||
const char* idstr = GET_TASKID(pTaskInfo);
|
||||
|
||||
|
||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||
SOperatorInfo* pOperator = NULL;
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||
|
@ -318,8 +316,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
|
||||
pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
|
||||
} else {
|
||||
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
|
||||
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
|
||||
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
|
||||
pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
|
@ -346,8 +344,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
return terrno;
|
||||
}
|
||||
|
||||
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
|
||||
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
|
||||
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo,
|
||||
pTagCond, pTagIndexCond, pTaskInfo);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
|
@ -373,7 +371,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||
code = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
|
||||
pTaskInfo, &pOperator);
|
||||
pTaskInfo, &pOperator);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
STableListInfo* pTableListInfo = tableListCreate();
|
||||
|
@ -416,7 +414,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
}
|
||||
if (!pTagScanPhyNode->onlyMetaCtbIdx) {
|
||||
code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
|
||||
pTagIndexCond, pTaskInfo);
|
||||
pTagIndexCond, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("failed to getTableList, code: %s", tstrerror(code));
|
||||
|
@ -653,7 +651,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
void destroyOperator(SOperatorInfo* pOperator) {
|
||||
if (pOperator == NULL) {
|
||||
return;
|
||||
|
@ -735,7 +732,7 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
|
|||
qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
|
||||
switch (pDst->opType) {
|
||||
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
|
||||
SExchangeOperatorParam* pDExc = pDst->value;
|
||||
|
@ -755,17 +752,19 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
|
|||
}
|
||||
|
||||
tSimpleHashSetFreeFp(pBatch->pBatchs, freeExchangeGetBasicOperatorParam);
|
||||
|
||||
int32_t code = tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic, sizeof(pDExc->basic));
|
||||
|
||||
int32_t code = tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic,
|
||||
sizeof(pDExc->basic));
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic));
|
||||
code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
|
||||
sizeof(pSExc->basic));
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
taosMemoryFree(pDst->value);
|
||||
pDst->value = pBatch;
|
||||
} else {
|
||||
|
@ -776,14 +775,16 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
|
|||
}
|
||||
} else {
|
||||
SExchangeOperatorBatchParam* pBatch = pDst->value;
|
||||
SExchangeOperatorBasicParam* pBasic = tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
|
||||
SExchangeOperatorBasicParam* pBasic =
|
||||
tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
|
||||
if (pBasic) {
|
||||
void* p = taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
int32_t code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic));
|
||||
int32_t code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
|
||||
sizeof(pSExc->basic));
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
@ -799,9 +800,8 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) {
|
||||
SOperatorParam** ppParam = NULL;
|
||||
SOperatorParam** ppParam = NULL;
|
||||
SOperatorParam*** pppDownstramParam = NULL;
|
||||
switch (type) {
|
||||
case OP_GET_PARAM:
|
||||
|
@ -817,13 +817,13 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
|
|||
}
|
||||
|
||||
freeResetOperatorParams(pOperator, type, false);
|
||||
|
||||
|
||||
if (NULL == pInput) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL;
|
||||
|
||||
|
||||
if (NULL == *pppDownstramParam) {
|
||||
*pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
|
||||
if (NULL == *pppDownstramParam) {
|
||||
|
@ -844,7 +844,7 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
|
|||
if (childrenNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
for (int32_t i = 0; i < childrenNum; ++i) {
|
||||
SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i);
|
||||
if (pChild == NULL) {
|
||||
|
@ -867,19 +867,18 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
|
||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
|
||||
blockDataCheck(p, false);
|
||||
return (code == 0)? p:NULL;
|
||||
return (code == 0) ? p : NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
|
||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
|
||||
blockDataCheck(p, false);
|
||||
return (code == 0)? p:NULL;
|
||||
return (code == 0) ? p : NULL;
|
||||
}
|
||||
|
||||
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
|
||||
|
@ -921,7 +920,7 @@ int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pPa
|
|||
pOperator->pTaskInfo->code = code;
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
|
||||
}
|
||||
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -931,5 +930,3 @@ int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
|
|||
}
|
||||
return pOperator->resultDataBlockId;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -73,6 +73,7 @@ typedef struct SSysTableScanInfo {
|
|||
SLimitInfo limitInfo;
|
||||
int32_t tbnameSlotId;
|
||||
STableListInfo* pTableListInfo;
|
||||
SSDataBlock* pResBlock;
|
||||
SReadHandle* pHandle;
|
||||
SStorageAPI* pAPI;
|
||||
} SSysTableScanInfo;
|
||||
|
@ -2015,40 +2016,15 @@ _end:
|
|||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* sysTableBuildVgUsage(SOperatorInfo* pOperator) {
|
||||
static int32_t buildVgDiskUsage(SOperatorInfo* pOperator, SDbSizeStatisInfo* pStaticsInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
int32_t numOfCols = 0;
|
||||
|
||||
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
|
||||
if (pInfo->readHandle.mnd != NULL) {
|
||||
// code = buildSysUsageInfo(pInfo, pOperator->resultInfo.capacity);
|
||||
// QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
// QUERY_CHECK_CODE(code, lino, _end);
|
||||
// pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
// return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
|
||||
int32_t vgId = 0;
|
||||
const char* db = NULL;
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &pStaticsInfo->dbname, &vgId, NULL, NULL);
|
||||
|
||||
SDbSizeStatisInfo staticsInfo = {0};
|
||||
|
||||
|
@ -2057,86 +2033,147 @@ static SSDataBlock* sysTableBuildVgUsage(SOperatorInfo* pOperator) {
|
|||
code = pAPI->metaFn.getDBSize(pInfo->readHandle.vnode, &staticsInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
code = vnodeEstimateRawDataSize(pOperator, &staticsInfo);
|
||||
return vnodeEstimateRawDataSize(pOperator, &staticsInfo);
|
||||
_end:
|
||||
return code;
|
||||
}
|
||||
static SSDataBlock* sysTableBuildVgUsage(SOperatorInfo* pOperator) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
SDbSizeStatisInfo staticsInfo = {0};
|
||||
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
const char* db = NULL;
|
||||
int32_t numOfCols = 0;
|
||||
|
||||
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
|
||||
if (pInfo->readHandle.mnd != NULL) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
SSDataBlock* pBlock = pInfo->pRes;
|
||||
|
||||
code = buildVgDiskUsage(pOperator, &staticsInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SName sn = {0};
|
||||
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (pInfo->showRewrite) {
|
||||
SSDataBlock* pBlock = pInfo->pRes;
|
||||
SDBBlockUsageInfo usageInfo = {0};
|
||||
int32_t len = tSerializeBlockDbUsage(NULL, 0, &usageInfo);
|
||||
|
||||
code = tNameGetDbName(&sn, varDataVal(dbname));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
usageInfo.numOfSttRows = 120;
|
||||
usageInfo.numOfInmemRows = 120;
|
||||
usageInfo.numOfBlocks = 120;
|
||||
|
||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||
int32_t tempRes = tSerializeBlockDbUsage(varDataVal(p), len, &usageInfo);
|
||||
if (tempRes != len) {
|
||||
taosMemoryFree(p);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
varDataSetLen(p, len);
|
||||
|
||||
p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USAGE);
|
||||
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||
int32_t slotId = 1;
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 1);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfo, 0, p, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
taosMemoryFree(p);
|
||||
if (slotId != 0) {
|
||||
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
|
||||
int64_t v = 0;
|
||||
// colDataSetInt64(p1, 0, &v);
|
||||
}
|
||||
|
||||
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pBlock->info.rows = 1;
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
pInfo->pRes->info.rows = pBlock->info.rows;
|
||||
// code = relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, pBlock->pDataBlock, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
SName sn = {0};
|
||||
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
code = tNameFromString(&sn, staticsInfo.dbname, T_NAME_ACCT | T_NAME_DB);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = tNameGetDbName(&sn, varDataVal(dbname));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.walSize, false); // wal
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USAGE);
|
||||
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.memSize, false); // memtable
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.l1Size, false); // l1_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.l2Size, false); // l2_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.vgId, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.l3Size, false); // l3_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.walSize, false); // wal
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.s3Size, false); // s3_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.memSize, false); // memtable
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.rawDataSize, false); // estimate_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.l1Size, false); // l1_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
numOfRows += 1;
|
||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.l2Size, false); // l2_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (numOfRows > 0) {
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.l3Size, false); // l3_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.s3Size, false); // s3_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&staticsInfo.rawDataSize, false); // estimate_size
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
numOfRows += 1;
|
||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
|
||||
p->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
if (numOfRows > 0) {
|
||||
p->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
code = relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
numOfRows = 0;
|
||||
}
|
||||
|
||||
numOfRows = 0;
|
||||
blockDataDestroy(p);
|
||||
p = NULL;
|
||||
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
blockDataDestroy(p);
|
||||
p = NULL;
|
||||
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
setOperatorCompleted(pOperator);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
|
@ -2209,7 +2246,11 @@ _end:
|
|||
}
|
||||
return NULL;
|
||||
}
|
||||
static SSDataBlock* sysTableScanUsageRewrite(SOperatorInfo* pOperator) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
static SSDataBlock* sysTableScanUsage(SOperatorInfo* pOperator) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -2331,7 +2372,7 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
|
|||
if (dbName[0]) TAOS_UNUSED(tsnprintf(pInfo->req.db, sizeof(pInfo->req.db), "%d.%s", pInfo->accountId, dbName));
|
||||
(void)sysTableIsCondOnOneTable(pInfo->pCondition, pInfo->req.filterTb);
|
||||
}
|
||||
|
||||
bool filter = true;
|
||||
SSDataBlock* pBlock = NULL;
|
||||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||
pBlock = sysTableScanUserTables(pOperator);
|
||||
|
@ -2343,12 +2384,15 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
|
|||
IS_SYS_DBNAME(dbName)) {
|
||||
pBlock = sysTableScanUserSTables(pOperator);
|
||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USAGE, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||
if (pInfo->showRewrite) {
|
||||
filter = false;
|
||||
}
|
||||
pBlock = sysTableScanUsage(pOperator);
|
||||
} else { // load the meta from mnode of the given epset
|
||||
pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo);
|
||||
}
|
||||
|
||||
sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
|
||||
if (filter) sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
|
||||
if (pBlock != NULL) {
|
||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
|
||||
if (limitReached) {
|
||||
|
@ -2524,6 +2568,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
|
||||
SScanPhysiNode* pScanNode = &pScanPhyNode->scan;
|
||||
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t num = 0;
|
||||
code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
|
@ -2540,6 +2585,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
pInfo->showRewrite = pScanPhyNode->showRewrite;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
|
||||
pInfo->pCondition = pScanNode->node.pConditions;
|
||||
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -3280,8 +3326,13 @@ static int32_t vnodeEstimateDataSizeByUid(SOperatorInfo* pOperator, STableId* id
|
|||
|
||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
|
||||
code = pReadHandle->api.tsdReader.tsdReaderOpen(pReadHandle->vnode, &cond, pList, num, NULL, (void**)&pInfo->pHandle,
|
||||
pTaskInfo->id.str, NULL);
|
||||
if (pInfo->showRewrite) {
|
||||
code = pReadHandle->api.tsdReader.tsdReaderOpen(pReadHandle->vnode, &cond, pList, num, pInfo->pRes,
|
||||
(void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);
|
||||
} else {
|
||||
code = pReadHandle->api.tsdReader.tsdReaderOpen(pReadHandle->vnode, &cond, pList, num, NULL,
|
||||
(void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);
|
||||
}
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
QUERY_CHECK_CODE(code, line, _end);
|
||||
|
||||
|
|
Loading…
Reference in New Issue