Merge pull request #17121 from taosdata/szhou/fixbugs
enhance: optimize when select from ins_tags when table_name='xxxx'
This commit is contained in:
commit
e6ac8ed3a9
|
@ -92,6 +92,7 @@ typedef struct SMetaEntry SMetaEntry;
|
|||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
void metaReaderClear(SMetaReader *pReader);
|
||||
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
|
||||
int32_t metaReadNext(SMetaReader *pReader);
|
||||
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
|
|
|
@ -110,7 +110,6 @@ int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq
|
|||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
||||
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
|
||||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
|
||||
int64_t metaGetTbNum(SMeta* pMeta);
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||
|
|
|
@ -41,6 +41,11 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S
|
|||
|
||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||
|
||||
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smr, const char* dbname,
|
||||
const char* tableName, int32_t* pNumOfRows,
|
||||
const SSDataBlock* dataBlock);
|
||||
|
||||
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock);
|
||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||
#if 0
|
||||
if (pInfo->sampleRatio == 1) {
|
||||
|
@ -2399,6 +2404,48 @@ int32_t convertTagDataToStr(char* str, int type, void* buf, int32_t bufSize, int
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool sysTableIsOperatorCondOnOneTable(SNode* pCond, char* condTable) {
|
||||
SOperatorNode* node = (SOperatorNode*)pCond;
|
||||
if (node->opType == OP_TYPE_EQUAL) {
|
||||
if (nodeType(node->pLeft) == QUERY_NODE_COLUMN &&
|
||||
strcasecmp(nodesGetNameFromColumnNode(node->pLeft), "table_name") == 0 &&
|
||||
nodeType(node->pRight) == QUERY_NODE_VALUE) {
|
||||
SValueNode* pValue = (SValueNode*)node->pRight;
|
||||
if (pValue->node.type == TSDB_DATA_TYPE_NCHAR || pValue->node.type == TSDB_DATA_TYPE_VARCHAR ||
|
||||
pValue->node.type == TSDB_DATA_TYPE_BINARY) {
|
||||
char* value = nodesGetStrValueFromNode(pValue);
|
||||
strncpy(condTable, value, TSDB_TABLE_NAME_LEN);
|
||||
taosMemoryFree(value);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
|
||||
if (pCond == NULL) {
|
||||
return false;
|
||||
}
|
||||
if (nodeType(pCond) == QUERY_NODE_LOGIC_CONDITION) {
|
||||
SLogicConditionNode* node = (SLogicConditionNode*)pCond;
|
||||
if (LOGIC_COND_TYPE_AND == node->condType) {
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, node->pParameterList) {
|
||||
if (QUERY_NODE_OPERATOR == nodeType(pChild) && sysTableIsOperatorCondOnOneTable(pChild, condTable)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (QUERY_NODE_OPERATOR == nodeType(pCond)) {
|
||||
return sysTableIsOperatorCondOnOneTable(pCond, condTable);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
|
@ -2406,13 +2453,12 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
|
||||
blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
|
@ -2424,10 +2470,32 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
tNameGetDbName(&sn, varDataVal(dbname));
|
||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||
|
||||
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
|
||||
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||
char condTableName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
// optimize when sql like where table_name='tablename' and xxx.
|
||||
if (pInfo->pCondition && sysTableIsCondOnOneTable(pInfo->pCondition, condTableName)) {
|
||||
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tableName, condTableName);
|
||||
|
||||
SMetaReader smr = {0};
|
||||
metaReaderInit(&smr, pInfo->readHandle.meta, 0);
|
||||
metaGetTableEntryByName(&smr, condTableName);
|
||||
sysTableUserTagsFillOneTableTags(pInfo, &smr, dbname, tableName, &numOfRows, dataBlock);
|
||||
metaReaderClear(&smr);
|
||||
if (numOfRows > 0) {
|
||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
|
||||
numOfRows = 0;
|
||||
}
|
||||
blockDataDestroy(dataBlock);
|
||||
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
int32_t ret = 0;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||
}
|
||||
|
||||
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
|
||||
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
|
||||
continue;
|
||||
|
@ -2450,49 +2518,94 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(stableName, smr.me.name);
|
||||
sysTableUserTagsFillOneTableTags(pInfo, &smr, dbname, tableName, &numOfRows, dataBlock);
|
||||
|
||||
int32_t numOfTags = smr.me.stbEntry.schemaTag.nCols;
|
||||
metaReaderClear(&smr);
|
||||
|
||||
if (numOfRows >= pOperator->resultInfo.capacity) {
|
||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
|
||||
numOfRows = 0;
|
||||
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfRows > 0) {
|
||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
|
||||
numOfRows = 0;
|
||||
}
|
||||
|
||||
blockDataDestroy(dataBlock);
|
||||
if (ret != 0) {
|
||||
metaCloseTbCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) {
|
||||
dataBlock->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, dataBlock->pDataBlock, false);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataCleanup(dataBlock);
|
||||
}
|
||||
|
||||
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smr, const char* dbname,
|
||||
const char* tableName, int32_t* pNumOfRows,
|
||||
const SSDataBlock* dataBlock) {
|
||||
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(stableName, (*smr).me.name);
|
||||
|
||||
int32_t numOfRows = *pNumOfRows;
|
||||
|
||||
int32_t numOfTags = (*smr).me.stbEntry.schemaTag.nCols;
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
SColumnInfoData* pColInfoData = NULL;
|
||||
|
||||
// table name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
|
||||
colDataAppend(pColInfoData, numOfRows, tableName, false);
|
||||
|
||||
// database name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
|
||||
colDataAppend(pColInfoData, numOfRows, dbname, false);
|
||||
|
||||
// super table name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
|
||||
colDataAppend(pColInfoData, numOfRows, stableName, false);
|
||||
|
||||
// tag name
|
||||
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tagName, smr.me.stbEntry.schemaTag.pSchema[i].name);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
STR_TO_VARSTR(tagName, (*smr).me.stbEntry.schemaTag.pSchema[i].name);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
|
||||
colDataAppend(pColInfoData, numOfRows, tagName, false);
|
||||
|
||||
// tag type
|
||||
int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type;
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
int8_t tagType = (*smr).me.stbEntry.schemaTag.pSchema[i].type;
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
|
||||
char tagTypeStr[VARSTR_HEADER_SIZE + 32];
|
||||
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
|
||||
if (tagType == TSDB_DATA_TYPE_VARCHAR) {
|
||||
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
|
||||
(int32_t)((*smr).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
|
||||
} else if (tagType == TSDB_DATA_TYPE_NCHAR) {
|
||||
tagTypeLen +=
|
||||
sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||
(int32_t)(((*smr).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||
}
|
||||
varDataSetLen(tagTypeStr, tagTypeLen);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
|
||||
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = smr.me.stbEntry.schemaTag.pSchema[i].colId;
|
||||
tagVal.cid = (*smr).me.stbEntry.schemaTag.pSchema[i].colId;
|
||||
char* tagData = NULL;
|
||||
uint32_t tagLen = 0;
|
||||
|
||||
|
@ -2528,52 +2641,16 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
varDataSetLen(tagVarChar, len);
|
||||
}
|
||||
}
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
|
||||
colDataAppend(pColInfoData, numOfRows, tagVarChar,
|
||||
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
|
||||
taosMemoryFree(tagVarChar);
|
||||
++numOfRows;
|
||||
}
|
||||
metaReaderClear(&smr);
|
||||
|
||||
if (numOfRows >= pOperator->resultInfo.capacity) {
|
||||
p->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
*pNumOfRows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataCleanup(p);
|
||||
numOfRows = 0;
|
||||
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfRows > 0) {
|
||||
p->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataCleanup(p);
|
||||
numOfRows = 0;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
||||
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||
if (ret != 0) {
|
||||
metaCloseTbCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
||||
|
|
Loading…
Reference in New Issue