enh(query): record and return the schema version and tags version of current queried table.

This commit is contained in:
Haojun Liao 2022-05-18 20:39:00 +08:00
parent fba964778b
commit 47e6e95866
5 changed files with 67 additions and 7 deletions

View File

@ -95,6 +95,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
/**
*
* @param tinfo
* @param sversion
* @param tversion
* @return
*/
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion);
/**
* The main task execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions

View File

@ -217,6 +217,13 @@ typedef struct SExecTaskInfo {
int64_t owner; // if it is in execution
int32_t code;
uint64_t totalRows; // total number of rows
struct {
char *tablename;
char *dbname;
int32_t sversion;
int32_t tversion;
} schemaVer;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
@ -670,7 +677,8 @@ SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type);
SExecTaskInfo* pTaskInfo, int32_t type);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);

View File

@ -14,6 +14,7 @@
*/
#include "executor.h"
#include <executorimpl.h>
#include <vnode.h>
#include "executorimpl.h"
#include "planner.h"
@ -171,3 +172,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return TSDB_CODE_SUCCESS;
}
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) {
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
*sversion = pTaskInfo->schemaVer.sversion;
*tversion = pTaskInfo->schemaVer.tversion;
strcpy(dbName, pTaskInfo->schemaVer.dbname);
strcpy(tableName, pTaskInfo->schemaVer.tablename);
return 0;
}

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@ -4653,6 +4654,26 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
void extractTableSchemaVersion(SReadHandle *pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, uid);
pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
} else if (mr.me.type == TSDB_CHILD_TABLE) {
tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
} else {
pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver;
}
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
int32_t type = nodeType(pPhyNode);
@ -4666,6 +4687,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
@ -4703,7 +4725,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions, pOperatorDumy);
@ -4718,7 +4740,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfOutputCols = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
@ -4741,7 +4763,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfOutputCols = 0;
SArray* colList =
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator =
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
@ -4823,7 +4845,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0;
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
@ -5014,7 +5036,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
}
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type) {
SExecTaskInfo* pTaskInfo, int32_t type) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
if (pList == NULL) {
@ -5022,10 +5044,16 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return NULL;
}
const char* tname = pTaskInfo->schemaVer.tablename;
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) &&
strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) {
pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName);
}
SColMatchInfo c = {0};
c.output = true;
c.colId = pColNode->colId;
@ -5219,6 +5247,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
taosMemoryFree(pTaskInfo->schemaVer.dbname);
taosMemoryFree(pTaskInfo->schemaVer.tablename);
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo);

View File

@ -455,7 +455,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {