enhance: optimize select from ins_tags where table_name='tablename' without iterating through meta tdb
This commit is contained in:
parent
b880d460ec
commit
99bc329384
|
@ -92,6 +92,7 @@ typedef struct SMetaEntry SMetaEntry;
|
|||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
void metaReaderClear(SMetaReader *pReader);
|
||||
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
|
||||
int32_t metaReadNext(SMetaReader *pReader);
|
||||
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
|
|
|
@ -110,7 +110,6 @@ int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq
|
|||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
||||
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
|
||||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
|
||||
int64_t metaGetTbNum(SMeta* pMeta);
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||
|
|
|
@ -41,7 +41,12 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S
|
|||
|
||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||
|
||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smr, const char* dbname,
|
||||
const char* tableName, int32_t* pNumOfRows,
|
||||
const SSDataBlock* dataBlock);
|
||||
|
||||
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock);
|
||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||
#if 0
|
||||
if (pInfo->sampleRatio == 1) {
|
||||
return true;
|
||||
|
@ -2330,6 +2335,47 @@ int32_t convertTagDataToStr(char* str, int type, void* buf, int32_t bufSize, int
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool sysTableIsOperatorCondOnOneTable(SNode* pCond, char* condTable) {
|
||||
SOperatorNode* node = (SOperatorNode*)pCond;
|
||||
if (node->opType == OP_TYPE_EQUAL) {
|
||||
if (nodeType(node->pLeft) == QUERY_NODE_COLUMN &&
|
||||
strcasecmp(nodesGetNameFromColumnNode(node->pLeft), "table_name") == 0 &&
|
||||
nodeType(node->pRight) == QUERY_NODE_VALUE) {
|
||||
SValueNode* pValue = (SValueNode*)node->pRight;
|
||||
if (pValue->node.type == TSDB_DATA_TYPE_NCHAR || pValue->node.type == TSDB_DATA_TYPE_VARCHAR ||
|
||||
pValue->node.type == TSDB_DATA_TYPE_BINARY) {
|
||||
char* value = nodesGetStrValueFromNode(pValue);
|
||||
strncpy(condTable, value, TSDB_TABLE_NAME_LEN);
|
||||
taosMemoryFree(value);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
|
||||
if (nodeType(pCond) == QUERY_NODE_LOGIC_CONDITION) {
|
||||
SLogicConditionNode* node = (SLogicConditionNode*)pCond;
|
||||
if (LOGIC_COND_TYPE_AND == node->condType) {
|
||||
SListCell* cell = node->pParameterList->pHead;
|
||||
for (int32_t i = 0; i < node->pParameterList->length; ++i) {
|
||||
SNode* pChild = cell->pNode;
|
||||
if (QUERY_NODE_OPERATOR == nodeType(pChild)) {
|
||||
if (sysTableIsOperatorCondOnOneTable(pChild, condTable)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
cell = cell->pNext;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (QUERY_NODE_OPERATOR == nodeType(pCond)) {
|
||||
return sysTableIsOperatorCondOnOneTable(pCond, condTable);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
|
@ -2337,13 +2383,12 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
|
||||
blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
|
@ -2355,10 +2400,32 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
tNameGetDbName(&sn, varDataVal(dbname));
|
||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||
|
||||
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
|
||||
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||
char condTableName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
// optimize when sql like where table_name='tablename' and xxx.
|
||||
if (sysTableIsCondOnOneTable(pInfo->pCondition, condTableName)) {
|
||||
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tableName, condTableName);
|
||||
|
||||
SMetaReader smr = {0};
|
||||
metaReaderInit(&smr, pInfo->readHandle.meta, 0);
|
||||
metaGetTableEntryByName(&smr, condTableName);
|
||||
sysTableUserTagsFillOneTableTags(pInfo, &smr, dbname, tableName, &numOfRows, dataBlock);
|
||||
metaReaderClear(&smr);
|
||||
if (numOfRows > 0) {
|
||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
|
||||
numOfRows = 0;
|
||||
}
|
||||
blockDataDestroy(dataBlock);
|
||||
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
int32_t ret = 0;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||
}
|
||||
|
||||
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
|
||||
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
|
||||
continue;
|
||||
|
@ -2381,100 +2448,12 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(stableName, smr.me.name);
|
||||
sysTableUserTagsFillOneTableTags(pInfo, &smr, dbname, tableName, &numOfRows, dataBlock);
|
||||
|
||||
int32_t numOfTags = smr.me.stbEntry.schemaTag.nCols;
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
SColumnInfoData* pColInfoData = NULL;
|
||||
|
||||
// table name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||
colDataAppend(pColInfoData, numOfRows, tableName, false);
|
||||
|
||||
// database name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||
colDataAppend(pColInfoData, numOfRows, dbname, false);
|
||||
|
||||
// super table name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||
colDataAppend(pColInfoData, numOfRows, stableName, false);
|
||||
|
||||
// tag name
|
||||
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tagName, smr.me.stbEntry.schemaTag.pSchema[i].name);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
colDataAppend(pColInfoData, numOfRows, tagName, false);
|
||||
|
||||
// tag type
|
||||
int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type;
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
char tagTypeStr[VARSTR_HEADER_SIZE + 32];
|
||||
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
|
||||
if (tagType == TSDB_DATA_TYPE_VARCHAR) {
|
||||
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
|
||||
} else if (tagType == TSDB_DATA_TYPE_NCHAR) {
|
||||
tagTypeLen +=
|
||||
sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||
}
|
||||
varDataSetLen(tagTypeStr, tagTypeLen);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
|
||||
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = smr.me.stbEntry.schemaTag.pSchema[i].colId;
|
||||
char* tagData = NULL;
|
||||
uint32_t tagLen = 0;
|
||||
|
||||
if (tagType == TSDB_DATA_TYPE_JSON) {
|
||||
tagData = (char*)pInfo->pCur->mr.me.ctbEntry.pTags;
|
||||
} else {
|
||||
bool exist = tTagGet((STag*)pInfo->pCur->mr.me.ctbEntry.pTags, &tagVal);
|
||||
if (exist) {
|
||||
if (IS_VAR_DATA_TYPE(tagType)) {
|
||||
tagData = (char*)tagVal.pData;
|
||||
tagLen = tagVal.nData;
|
||||
} else {
|
||||
tagData = (char*)&tagVal.i64;
|
||||
tagLen = tDataTypes[tagType].bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char* tagVarChar = NULL;
|
||||
if (tagData != NULL) {
|
||||
if (tagType == TSDB_DATA_TYPE_JSON) {
|
||||
char* tagJson = parseTagDatatoJson(tagData);
|
||||
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
|
||||
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
|
||||
varDataSetLen(tagVarChar, strlen(tagJson));
|
||||
taosMemoryFree(tagJson);
|
||||
} else {
|
||||
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
|
||||
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
|
||||
tagVarChar = taosMemoryMalloc(bufSize);
|
||||
int32_t len = -1;
|
||||
convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
|
||||
varDataSetLen(tagVarChar, len);
|
||||
}
|
||||
}
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||
colDataAppend(pColInfoData, numOfRows, tagVarChar,
|
||||
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
|
||||
taosMemoryFree(tagVarChar);
|
||||
++numOfRows;
|
||||
}
|
||||
metaReaderClear(&smr);
|
||||
|
||||
if (numOfRows >= pOperator->resultInfo.capacity) {
|
||||
p->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataCleanup(p);
|
||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
|
||||
numOfRows = 0;
|
||||
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
|
@ -2484,19 +2463,11 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (numOfRows > 0) {
|
||||
p->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataCleanup(p);
|
||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
|
||||
numOfRows = 0;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
||||
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||
blockDataDestroy(dataBlock);
|
||||
if (ret != 0) {
|
||||
metaCloseTbCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
|
@ -2507,6 +2478,111 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) {
|
||||
dataBlock->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, dataBlock->pDataBlock, false);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataCleanup(dataBlock);
|
||||
}
|
||||
|
||||
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smr, const char* dbname,
|
||||
const char* tableName, int32_t* pNumOfRows,
|
||||
const SSDataBlock* dataBlock) {
|
||||
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(stableName, (*smr).me.name);
|
||||
|
||||
int32_t numOfRows = *pNumOfRows;
|
||||
|
||||
int32_t numOfTags = (*smr).me.stbEntry.schemaTag.nCols;
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
SColumnInfoData* pColInfoData = NULL;
|
||||
|
||||
// table name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
|
||||
colDataAppend(pColInfoData, numOfRows, tableName, false);
|
||||
|
||||
// database name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
|
||||
colDataAppend(pColInfoData, numOfRows, dbname, false);
|
||||
|
||||
// super table name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
|
||||
colDataAppend(pColInfoData, numOfRows, stableName, false);
|
||||
|
||||
// tag name
|
||||
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tagName, (*smr).me.stbEntry.schemaTag.pSchema[i].name);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
|
||||
colDataAppend(pColInfoData, numOfRows, tagName, false);
|
||||
|
||||
// tag type
|
||||
int8_t tagType = (*smr).me.stbEntry.schemaTag.pSchema[i].type;
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
|
||||
char tagTypeStr[VARSTR_HEADER_SIZE + 32];
|
||||
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
|
||||
if (tagType == TSDB_DATA_TYPE_VARCHAR) {
|
||||
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)((*smr).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
|
||||
} else if (tagType == TSDB_DATA_TYPE_NCHAR) {
|
||||
tagTypeLen +=
|
||||
sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)(((*smr).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||
}
|
||||
varDataSetLen(tagTypeStr, tagTypeLen);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
|
||||
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = (*smr).me.stbEntry.schemaTag.pSchema[i].colId;
|
||||
char* tagData = NULL;
|
||||
uint32_t tagLen = 0;
|
||||
|
||||
if (tagType == TSDB_DATA_TYPE_JSON) {
|
||||
tagData = (char*)pInfo->pCur->mr.me.ctbEntry.pTags;
|
||||
} else {
|
||||
bool exist = tTagGet((STag*)pInfo->pCur->mr.me.ctbEntry.pTags, &tagVal);
|
||||
if (exist) {
|
||||
if (IS_VAR_DATA_TYPE(tagType)) {
|
||||
tagData = (char*)tagVal.pData;
|
||||
tagLen = tagVal.nData;
|
||||
} else {
|
||||
tagData = (char*)&tagVal.i64;
|
||||
tagLen = tDataTypes[tagType].bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char* tagVarChar = NULL;
|
||||
if (tagData != NULL) {
|
||||
if (tagType == TSDB_DATA_TYPE_JSON) {
|
||||
char* tagJson = parseTagDatatoJson(tagData);
|
||||
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
|
||||
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
|
||||
varDataSetLen(tagVarChar, strlen(tagJson));
|
||||
taosMemoryFree(tagJson);
|
||||
} else {
|
||||
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
|
||||
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
|
||||
tagVarChar = taosMemoryMalloc(bufSize);
|
||||
int32_t len = -1;
|
||||
convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
|
||||
varDataSetLen(tagVarChar, len);
|
||||
}
|
||||
}
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
|
||||
colDataAppend(pColInfoData, numOfRows, tagVarChar,
|
||||
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
|
||||
taosMemoryFree(tagVarChar);
|
||||
++numOfRows;
|
||||
}
|
||||
|
||||
*pNumOfRows = numOfRows;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
|
|
Loading…
Reference in New Issue