Merge branch '3.0' into feature/stream
This commit is contained in:
commit
1505667897
|
@ -116,7 +116,7 @@ typedef void *tsdbReaderT;
|
||||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||||
|
|
||||||
tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
||||||
void *pMemRef);
|
void *pMemRef);
|
||||||
|
|
|
@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
SSubmitBlkRsp* pRsp);
|
SSubmitBlkRsp* pRsp);
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||||
void* pMemRef);
|
void* pMemRef);
|
||||||
|
|
|
@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||||
uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
|
@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi
|
||||||
}
|
}
|
||||||
|
|
||||||
if (emptyQueryTimewindow(pTsdbReadHandle)) {
|
if (emptyQueryTimewindow(pTsdbReadHandle)) {
|
||||||
return (tsdbReaderT*)pTsdbReadHandle;
|
return (tsdbReaderT)pTsdbReadHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo apply the lastkey of table check to avoid to load header file
|
// todo apply the lastkey of table check to avoid to load header file
|
||||||
|
|
|
@ -412,6 +412,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length);
|
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
||||||
|
@ -426,27 +427,57 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_NEW(level + 1, "I/O: ");
|
EXPLAIN_ROW_NEW(level + 1, "I/O: ");
|
||||||
|
|
||||||
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
|
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
|
||||||
|
struct STableScanAnalyzeInfo info = {0};
|
||||||
|
|
||||||
|
int32_t maxIndex = 0;
|
||||||
|
int32_t totalRows = 0;
|
||||||
for(int32_t i = 0; i < nodeNum; ++i) {
|
for(int32_t i = 0; i < nodeNum; ++i) {
|
||||||
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i);
|
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i);
|
||||||
STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
|
STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
|
||||||
|
|
||||||
EXPLAIN_ROW_APPEND("total_blocks=%d", pScanInfo->totalBlocks);
|
info.totalBlocks += pScanInfo->totalBlocks;
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
info.loadBlocks += pScanInfo->loadBlocks;
|
||||||
|
info.totalRows += pScanInfo->totalRows;
|
||||||
|
info.skipBlocks += pScanInfo->skipBlocks;
|
||||||
|
info.filterTime += pScanInfo->filterTime;
|
||||||
|
info.loadBlockStatis += pScanInfo->loadBlockStatis;
|
||||||
|
info.totalCheckedRows += pScanInfo->totalCheckedRows;
|
||||||
|
info.filterOutBlocks += pScanInfo->filterOutBlocks;
|
||||||
|
|
||||||
EXPLAIN_ROW_APPEND("load_blocks=%d", pScanInfo->loadBlocks);
|
if (pScanInfo->totalRows > totalRows) {
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
totalRows = pScanInfo->totalRows;
|
||||||
|
maxIndex = i;
|
||||||
EXPLAIN_ROW_APPEND("load_block_SMAs=%d", pScanInfo->loadBlockStatis);
|
}
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
|
|
||||||
EXPLAIN_ROW_APPEND("total_rows=%" PRIu64, pScanInfo->totalRows);
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
|
|
||||||
EXPLAIN_ROW_APPEND("check_rows=%" PRIu64, pScanInfo->totalCheckedRows);
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EXPLAIN_ROW_APPEND("total_blocks=%.1f", ((double)info.totalBlocks) / nodeNum);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
|
||||||
|
EXPLAIN_ROW_APPEND("load_blocks=%.1f", ((double)info.loadBlocks) / nodeNum);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
|
||||||
|
EXPLAIN_ROW_APPEND("load_block_SMAs=%.1f", ((double)info.loadBlockStatis) / nodeNum);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
|
||||||
|
EXPLAIN_ROW_APPEND("total_rows=%.1f", ((double)info.totalRows) / nodeNum);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
|
||||||
|
EXPLAIN_ROW_APPEND("check_rows=%.1f", ((double)info.totalCheckedRows) / nodeNum);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
|
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
|
||||||
|
//Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end, start offset by 1.470 ms.
|
||||||
|
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex);
|
||||||
|
STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
|
||||||
|
|
||||||
|
EXPLAIN_ROW_NEW(level + 1, " ");
|
||||||
|
EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows, "tbd",
|
||||||
|
execInfo->startupCost, execInfo->totalCost);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo {
|
||||||
typedef struct SBlockDistInfo {
|
typedef struct SBlockDistInfo {
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
void* pHandle;
|
void* pHandle;
|
||||||
|
SReadHandle readHandle;
|
||||||
|
uint64_t uid; // table uid
|
||||||
} SBlockDistInfo;
|
} SBlockDistInfo;
|
||||||
|
|
||||||
// todo remove this
|
// todo remove this
|
||||||
|
@ -740,7 +742,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
||||||
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||||
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
||||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
||||||
|
|
|
@ -2815,7 +2815,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
||||||
// todo add more information about exchange operation
|
// todo add more information about exchange operation
|
||||||
int32_t type = pOperator->operatorType;
|
int32_t type = pOperator->operatorType;
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
|
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
|
||||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) {
|
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
|
||||||
|
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) {
|
||||||
*order = TSDB_ORDER_ASC;
|
*order = TSDB_ORDER_ASC;
|
||||||
*scanFlag = MAIN_SCAN;
|
*scanFlag = MAIN_SCAN;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2843,7 +2844,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -4095,6 +4095,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
|
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
|
||||||
|
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode;
|
||||||
|
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||||
|
|
||||||
|
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
|
||||||
|
int32_t code = tsdbGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pTaskInfo->code = terrno;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
} else { // Create one table group.
|
||||||
|
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid};
|
||||||
|
taosArrayPush(pTableListInfo->pTableList, &info);
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryTableDataCond cond = {0};
|
||||||
|
|
||||||
|
{
|
||||||
|
cond.order = TSDB_ORDER_ASC;
|
||||||
|
cond.numOfCols = 1;
|
||||||
|
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
||||||
|
if (cond.colList == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
cond.colList->colId = 1;
|
||||||
|
cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
cond.colList->bytes = sizeof(TSKEY);
|
||||||
|
|
||||||
|
cond.numOfTWindows = 1;
|
||||||
|
cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow));
|
||||||
|
cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
cond.suid = pBlockNode->suid;
|
||||||
|
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||||
|
}
|
||||||
|
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
|
||||||
|
cleanupQueryTableDataCond(&cond);
|
||||||
|
|
||||||
|
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <vnode.h>
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
@ -367,13 +368,14 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
|
||||||
|
|
||||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
||||||
if (isTaskKilled(pOperator->pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
// process this data block based on the probabilities
|
// process this data block based on the probabilities
|
||||||
|
@ -396,7 +398,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
if (groupId) {
|
if (groupId) {
|
||||||
pBlock->info.groupId = *groupId;
|
pBlock->info.groupId = *groupId;
|
||||||
}
|
}
|
||||||
|
@ -600,33 +602,65 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
|
||||||
|
int32_t rowLen = 0;
|
||||||
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
|
metaGetTableEntryByUid(&mr, uid);
|
||||||
|
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
|
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
|
uint64_t suid = mr.me.ctbEntry.suid;
|
||||||
|
metaGetTableEntryByUid(&mr, suid);
|
||||||
|
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
|
||||||
|
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return rowLen;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
SBlockDistInfo* pBlockScanInfo = pOperator->info;
|
||||||
|
|
||||||
STableBlockDistInfo blockDistInfo = {0};
|
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
|
||||||
blockDistInfo.maxRows = INT_MIN;
|
blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid);
|
||||||
blockDistInfo.minRows = INT_MAX;
|
|
||||||
|
|
||||||
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo);
|
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
|
||||||
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
|
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
|
||||||
|
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
|
||||||
pBlock->info.rows = 1;
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
||||||
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||||
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
||||||
varDataSetLen(p, len);
|
varDataSetLen(p, len);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pBlock, 1);
|
||||||
colDataAppend(pColInfo, 0, p, false);
|
colDataAppend(pColInfo, 0, p, false);
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
|
|
||||||
|
pBlock->info.rows = 1;
|
||||||
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
@ -636,7 +670,8 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
blockDataDestroy(pDistInfo->pResBlock);
|
blockDataDestroy(pDistInfo->pResBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
|
||||||
|
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
|
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -645,17 +680,16 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pHandle = dataReader;
|
pInfo->pHandle = dataReader;
|
||||||
|
pInfo->readHandle = *readHandle;
|
||||||
|
pInfo->uid = uid;
|
||||||
|
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
int32_t numOfCols = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
||||||
|
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
|
||||||
|
|
||||||
SColumnInfoData infoData = {0};
|
pOperator->name = "DataBlockDistScanOperator";
|
||||||
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
|
||||||
infoData.info.bytes = 1024;
|
|
||||||
|
|
||||||
taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData);
|
|
||||||
|
|
||||||
pOperator->name = "DataBlockInfoScanOperator";
|
|
||||||
// pOperator->operatorType = OP_TableBlockInfoScan;
|
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
@ -1837,6 +1871,9 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||||
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
||||||
|
|
||||||
|
|
||||||
|
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||||
|
|
||||||
pInfo->pTableList = pTableListInfo;
|
pInfo->pTableList = pTableListInfo;
|
||||||
pInfo->pColMatchInfo = colList;
|
pInfo->pColMatchInfo = colList;
|
||||||
pInfo->pRes = createResDataBlock(pDescNode);
|
pInfo->pRes = createResDataBlock(pDescNode);
|
||||||
|
@ -1846,11 +1883,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
|
|
||||||
pOperator->name = "TagScanOperator";
|
pOperator->name = "TagScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||||
|
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfExprs;
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
|
@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
|
bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t blockDistFunction(SqlFunctionCtx *pCtx);
|
int32_t blockDistFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
|
|
@ -419,7 +419,7 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
int32_t topBotCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
||||||
int32_t code = nodesListMakeAppend(pParameters, pPartialRes);
|
int32_t code = nodesListMakeAppend(pParameters, pPartialRes);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1)));
|
code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1)));
|
||||||
|
@ -427,65 +427,6 @@ int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeL
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
|
||||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
|
||||||
|
|
||||||
if (isPartial) {
|
|
||||||
if (2 != numOfParams) {
|
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
|
||||||
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
|
||||||
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
|
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
// param1
|
|
||||||
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
|
||||||
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
SValueNode* pValue = (SValueNode*)pParamNode1;
|
|
||||||
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
|
|
||||||
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
pValue->notReserved = true;
|
|
||||||
|
|
||||||
// set result type
|
|
||||||
pFunc->node.resType =
|
|
||||||
(SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
|
||||||
} else {
|
|
||||||
if (1 != numOfParams) {
|
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
|
||||||
if (TSDB_DATA_TYPE_BINARY != para1Type) {
|
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do nothing. We can only access output of partial functions as input,
|
|
||||||
// so original input type cannot be obtained, resType will be set same
|
|
||||||
// as original function input type after merge function created.
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|
||||||
return translateTopBotImpl(pFunc, pErrBuf, len, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|
||||||
return translateTopBotImpl(pFunc, pErrBuf, len, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
@ -1735,31 +1676,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = topFunction,
|
.processFunc = topFunction,
|
||||||
.finalizeFunc = topBotFinalize,
|
.finalizeFunc = topBotFinalize,
|
||||||
.combineFunc = topCombine,
|
.combineFunc = topCombine,
|
||||||
.pPartialFunc = "_top_partial",
|
.pPartialFunc = "top",
|
||||||
.pMergeFunc = "_top_merge",
|
.pMergeFunc = "top",
|
||||||
// .createMergeParaFuc = topCreateMergePara
|
.createMergeParaFuc = topBotCreateMergePara
|
||||||
},
|
|
||||||
{
|
|
||||||
.name = "_top_partial",
|
|
||||||
.type = FUNCTION_TYPE_TOP_PARTIAL,
|
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
|
||||||
.translateFunc = translateTopBotPartial,
|
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
|
||||||
.initFunc = topBotFunctionSetup,
|
|
||||||
.processFunc = topFunction,
|
|
||||||
.finalizeFunc = topBotPartialFinalize,
|
|
||||||
.combineFunc = topCombine,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
.name = "_top_merge",
|
|
||||||
.type = FUNCTION_TYPE_TOP_MERGE,
|
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
|
||||||
.translateFunc = translateTopBotMerge,
|
|
||||||
.getEnvFunc = getTopBotMergeFuncEnv,
|
|
||||||
.initFunc = functionSetup,
|
|
||||||
.processFunc = topFunctionMerge,
|
|
||||||
.finalizeFunc = topBotMergeFinalize,
|
|
||||||
.combineFunc = topCombine,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "bottom",
|
.name = "bottom",
|
||||||
|
@ -1771,30 +1690,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = bottomFunction,
|
.processFunc = bottomFunction,
|
||||||
.finalizeFunc = topBotFinalize,
|
.finalizeFunc = topBotFinalize,
|
||||||
.combineFunc = bottomCombine,
|
.combineFunc = bottomCombine,
|
||||||
.pPartialFunc = "_bottom_partial",
|
.pPartialFunc = "bottom",
|
||||||
.pMergeFunc = "_bottom_merge"
|
.pMergeFunc = "bottom",
|
||||||
},
|
.createMergeParaFuc = topBotCreateMergePara
|
||||||
{
|
|
||||||
.name = "_bottom_partial",
|
|
||||||
.type = FUNCTION_TYPE_BOTTOM_PARTIAL,
|
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
|
||||||
.translateFunc = translateTopBotPartial,
|
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
|
||||||
.initFunc = topBotFunctionSetup,
|
|
||||||
.processFunc = bottomFunction,
|
|
||||||
.finalizeFunc = topBotPartialFinalize,
|
|
||||||
.combineFunc = bottomCombine,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
.name = "_bottom_merge",
|
|
||||||
.type = FUNCTION_TYPE_BOTTOM_MERGE,
|
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
|
||||||
.translateFunc = translateTopBotMerge,
|
|
||||||
.getEnvFunc = getTopBotMergeFuncEnv,
|
|
||||||
.initFunc = functionSetup,
|
|
||||||
.processFunc = bottomFunctionMerge,
|
|
||||||
.finalizeFunc = topBotMergeFinalize,
|
|
||||||
.combineFunc = bottomCombine,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "spread",
|
.name = "spread",
|
||||||
|
@ -2524,7 +2422,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
|
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = NULL,
|
.processFunc = NULL,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL,
|
||||||
|
.pPartialFunc = "_select_value",
|
||||||
|
.pMergeFunc = "_select_value"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "_block_dist",
|
.name = "_block_dist",
|
||||||
|
@ -2532,6 +2432,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.translateFunc = translateBlockDistFunc,
|
.translateFunc = translateBlockDistFunc,
|
||||||
.getEnvFunc = getBlockDistFuncEnv,
|
.getEnvFunc = getBlockDistFuncEnv,
|
||||||
|
.initFunc = blockDistSetup,
|
||||||
.processFunc = blockDistFunction,
|
.processFunc = blockDistFunction,
|
||||||
.finalizeFunc = blockDistFinalize
|
.finalizeFunc = blockDistFinalize
|
||||||
},
|
},
|
||||||
|
|
|
@ -67,8 +67,7 @@ typedef struct STopBotResItem {
|
||||||
|
|
||||||
typedef struct STopBotRes {
|
typedef struct STopBotRes {
|
||||||
int32_t maxSize;
|
int32_t maxSize;
|
||||||
int16_t type; // store the original input type, used in merge function
|
int16_t type;
|
||||||
int32_t numOfItems;
|
|
||||||
STopBotResItem* pItems;
|
STopBotResItem* pItems;
|
||||||
} STopBotRes;
|
} STopBotRes;
|
||||||
|
|
||||||
|
@ -2872,12 +2871,6 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|
||||||
// intermediate result is binary and length contains VAR header size
|
|
||||||
pEnv->calcMemSize = pFunc->node.resType.bytes;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||||
if (!functionSetup(pCtx, pResInfo)) {
|
if (!functionSetup(pCtx, pResInfo)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -2952,50 +2945,6 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) {
|
|
||||||
for (int32_t i = 0; i < pInput->numOfItems; i++) {
|
|
||||||
addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
|
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
|
||||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
|
||||||
char* data = colDataGetData(pCol, start);
|
|
||||||
STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data);
|
|
||||||
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
|
||||||
|
|
||||||
pInfo->maxSize = pInputInfo->maxSize;
|
|
||||||
pInfo->type = pInputInfo->type;
|
|
||||||
topBotTransferInfo(pCtx, pInputInfo, true);
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) {
|
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
|
||||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
|
||||||
char* data = colDataGetData(pCol, start);
|
|
||||||
STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data);
|
|
||||||
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
|
||||||
|
|
||||||
pInfo->maxSize = pInputInfo->maxSize;
|
|
||||||
pInfo->type = pInputInfo->type;
|
|
||||||
topBotTransferInfo(pCtx, pInputInfo, false);
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
|
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
|
||||||
uint16_t type = *(uint16_t*)param;
|
uint16_t type = *(uint16_t*)param;
|
||||||
|
|
||||||
|
@ -3044,8 +2993,6 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
|
|
||||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||||
pEntryInfo->numOfRes++;
|
pEntryInfo->numOfRes++;
|
||||||
// accumulate number of items for each vgroup, this info is needed for merge
|
|
||||||
pRes->numOfItems++;
|
|
||||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||||
!isTopQuery);
|
!isTopQuery);
|
||||||
} else { // replace the minimum value in the result
|
} else { // replace the minimum value in the result
|
||||||
|
@ -3144,7 +3091,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
releaseBufPage(pCtx->pBuf, pPage);
|
releaseBufPage(pCtx->pBuf, pPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) {
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
|
||||||
|
@ -3164,39 +3111,13 @@ int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMer
|
||||||
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isMerge) {
|
|
||||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||||
}
|
|
||||||
currentRow += 1;
|
currentRow += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pEntryInfo->numOfRes;
|
return pEntryInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return topBotFinalizeImpl(pCtx, pBlock, false); }
|
|
||||||
|
|
||||||
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|
||||||
return topBotFinalizeImpl(pCtx, pBlock, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
|
||||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
|
||||||
int32_t resultBytes = getTopBotInfoSize(pRes->maxSize);
|
|
||||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
|
||||||
|
|
||||||
memcpy(varDataVal(res), pRes, resultBytes);
|
|
||||||
varDataSetLen(res, resultBytes);
|
|
||||||
|
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
|
||||||
|
|
||||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
|
||||||
|
|
||||||
taosMemoryFree(res);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) {
|
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) {
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||||
|
@ -3211,8 +3132,6 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
|
||||||
pItem->tuplePos.pageId = -1;
|
pItem->tuplePos.pageId = -1;
|
||||||
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
|
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
|
||||||
pEntryInfo->numOfRes++;
|
pEntryInfo->numOfRes++;
|
||||||
// accumulate number of items for each vgroup, this info is needed for merge
|
|
||||||
pRes->numOfItems++;
|
|
||||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||||
!isTopQuery);
|
!isTopQuery);
|
||||||
} else { // replace the minimum value in the result
|
} else { // replace the minimum value in the result
|
||||||
|
@ -5004,7 +4923,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
pInfo->minRows = INT32_MAX;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
const int32_t BLOCK_DIST_RESULT_ROWS = 24;
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
|
@ -5022,6 +4953,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
||||||
pDistInfo->totalRows += p1.totalRows;
|
pDistInfo->totalRows += p1.totalRows;
|
||||||
pDistInfo->numOfFiles += p1.numOfFiles;
|
pDistInfo->numOfFiles += p1.numOfFiles;
|
||||||
|
|
||||||
|
pDistInfo->defMinRows = p1.defMinRows;
|
||||||
|
pDistInfo->defMaxRows = p1.defMaxRows;
|
||||||
|
pDistInfo->rowSize = p1.rowSize;
|
||||||
|
pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks;
|
||||||
|
|
||||||
if (pDistInfo->minRows > p1.minRows) {
|
if (pDistInfo->minRows > p1.minRows) {
|
||||||
pDistInfo->minRows = p1.minRows;
|
pDistInfo->minRows = p1.minRows;
|
||||||
}
|
}
|
||||||
|
@ -5033,7 +4969,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
||||||
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
|
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = BLOCK_DIST_RESULT_ROWS; // default output rows
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5045,7 +4981,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
||||||
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
|
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
|
||||||
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1;
|
||||||
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
|
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
|
||||||
|
@ -5076,7 +5012,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
||||||
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
|
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
|
||||||
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1;
|
||||||
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
|
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
|
||||||
|
@ -5098,32 +5034,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
||||||
|
|
||||||
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
char* pData = GET_ROWCELL_INTERBUF(pResInfo);
|
STableBlockDistInfo* pData = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
|
|
||||||
int32_t row = 0;
|
int32_t row = 0;
|
||||||
|
|
||||||
STableBlockDistInfo info = {0};
|
|
||||||
tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info);
|
|
||||||
|
|
||||||
char st[256] = {0};
|
char st[256] = {0};
|
||||||
|
double totalRawSize = pData->totalRows * pData->rowSize;
|
||||||
int32_t len =
|
int32_t len =
|
||||||
sprintf(st + VARSTR_HEADER_SIZE, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]",
|
sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
|
||||||
info.numOfBlocks, info.totalSize / 1024.0, info.totalSize / (info.numOfBlocks * 1024.0),
|
pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks,
|
||||||
info.totalSize / (info.totalRows * info.rowSize * 1.0));
|
pData->totalSize * 100 / totalRawSize, '%');
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
colDataAppend(pColInfo, row++, st, false);
|
colDataAppend(pColInfo, row++, st, false);
|
||||||
|
|
||||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]",
|
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%"PRId64"] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%"PRId64"]",
|
||||||
info.totalRows, info.minRows, info.maxRows, info.totalRows / info.numOfBlocks, info.numOfInmemRows);
|
pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, pData->totalRows / pData->numOfBlocks);
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
colDataAppend(pColInfo, row++, st, false);
|
colDataAppend(pColInfo, row++, st, false);
|
||||||
|
|
||||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables,
|
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
|
||||||
info.numOfFiles, 0);
|
pData->numOfFiles, 0);
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
colDataAppend(pColInfo, row++, st, false);
|
colDataAppend(pColInfo, row++, st, false);
|
||||||
|
@ -5135,40 +5068,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
int32_t maxVal = 0;
|
int32_t maxVal = 0;
|
||||||
int32_t minVal = INT32_MAX;
|
int32_t minVal = INT32_MAX;
|
||||||
for (int32_t i = 0; i < sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); ++i) {
|
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
||||||
if (maxVal < info.blockRowsHisto[i]) {
|
if (maxVal < pData->blockRowsHisto[i]) {
|
||||||
maxVal = info.blockRowsHisto[i];
|
maxVal = pData->blockRowsHisto[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minVal > info.blockRowsHisto[i]) {
|
if (minVal > pData->blockRowsHisto[i]) {
|
||||||
minVal = info.blockRowsHisto[i];
|
minVal = pData->blockRowsHisto[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t delta = maxVal - minVal;
|
int32_t delta = maxVal - minVal;
|
||||||
int32_t step = delta / 50;
|
int32_t step = delta / 50;
|
||||||
|
if (step == 0) {
|
||||||
|
step = 1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfBuckets = sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]);
|
int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]);
|
||||||
int32_t bucketRange = (info.maxRows - info.minRows) / numOfBuckets;
|
int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets;
|
||||||
|
|
||||||
for (int32_t i = 0; i < 20; ++i) {
|
bool singleModel = false;
|
||||||
len += sprintf(st + VARSTR_HEADER_SIZE, "%04d |", info.defMinRows + bucketRange * (i + 1));
|
if (bucketRange == 0) {
|
||||||
|
singleModel = true;
|
||||||
|
step = 20;
|
||||||
|
bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
||||||
|
len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
|
||||||
|
|
||||||
|
int32_t num = 0;
|
||||||
|
if (singleModel && pData->blockRowsHisto[i] > 0) {
|
||||||
|
num = 20;
|
||||||
|
} else {
|
||||||
|
num = (pData->blockRowsHisto[i] + step - 1) / step;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t num = (info.blockRowsHisto[i] + step - 1) / step;
|
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|');
|
int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|');
|
||||||
len += x;
|
len += x;
|
||||||
}
|
}
|
||||||
|
|
||||||
double v = info.blockRowsHisto[i] * 100.0 / info.numOfBlocks;
|
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
|
||||||
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.3f%c)", info.blockRowsHisto[i], v, '%');
|
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
|
||||||
printf("%s\n", st);
|
printf("%s\n", st);
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
colDataAppend(pColInfo, row++, st, false);
|
colDataAppend(pColInfo, row++, st, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return row;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SDerivInfo {
|
typedef struct SDerivInfo {
|
||||||
|
|
|
@ -419,6 +419,24 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
|
||||||
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
|
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) {
|
||||||
|
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
|
||||||
|
strcpy(name.dbname, pStmt->dbName);
|
||||||
|
strcpy(name.tname, pStmt->tableName);
|
||||||
|
int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
||||||
pCxt->pStmt = pStmt;
|
pCxt->pStmt = pStmt;
|
||||||
switch (nodeType(pStmt)) {
|
switch (nodeType(pStmt)) {
|
||||||
|
@ -497,6 +515,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
||||||
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
|
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
|
||||||
case QUERY_NODE_DELETE_STMT:
|
case QUERY_NODE_DELETE_STMT:
|
||||||
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
|
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
|
||||||
|
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
|
||||||
|
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue