Merge pull request #12669 from taosdata/feature/3_liaohj
enh(query): record and return the schema version and tags version of the queried table.
This commit is contained in:
commit
14998af2b8
|
@ -95,6 +95,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
|
||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param tinfo
|
||||
* @param sversion
|
||||
* @param tversion
|
||||
* @return
|
||||
*/
|
||||
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion);
|
||||
|
||||
/**
|
||||
* The main task execution function, including query on both table and multiple tables,
|
||||
* which are decided according to the tag or table name query conditions
|
||||
|
|
|
@ -62,11 +62,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
|||
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
len = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
/* META */
|
||||
case TDMT_VND_CREATE_STB:
|
||||
|
@ -123,6 +118,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
|||
break;
|
||||
}
|
||||
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
|
||||
|
||||
// commit if need
|
||||
|
|
|
@ -217,6 +217,13 @@ typedef struct SExecTaskInfo {
|
|||
int64_t owner; // if it is in execution
|
||||
int32_t code;
|
||||
uint64_t totalRows; // total number of rows
|
||||
struct {
|
||||
char *tablename;
|
||||
char *dbname;
|
||||
int32_t sversion;
|
||||
int32_t tversion;
|
||||
} schemaVer;
|
||||
|
||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||
char* sql; // query sql string
|
||||
jmp_buf env; // jump to this position when error happens.
|
||||
|
@ -689,7 +696,8 @@ SSDataBlock* loadNextDataBlock(void* param);
|
|||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||
|
||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||
int32_t type);
|
||||
SExecTaskInfo* pTaskInfo, int32_t type);
|
||||
|
||||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
|
||||
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
|
||||
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "executor.h"
|
||||
#include <executorimpl.h>
|
||||
#include <vnode.h>
|
||||
#include "executorimpl.h"
|
||||
#include "planner.h"
|
||||
|
@ -171,3 +172,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) {
|
||||
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
||||
|
||||
*sversion = pTaskInfo->schemaVer.sversion;
|
||||
*tversion = pTaskInfo->schemaVer.tversion;
|
||||
strcpy(dbName, pTaskInfo->schemaVer.dbname);
|
||||
strcpy(tableName, pTaskInfo->schemaVer.tablename);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <executorimpl.h>
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
|
@ -1831,12 +1832,6 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
|
|||
|
||||
pCtx[i].resultInfo = pEntry;
|
||||
pCtx[i].scanFlag = stage;
|
||||
|
||||
// set the timestamp output buffer for top/bottom/diff query
|
||||
// int32_t fid = pCtx[i].functionId;
|
||||
// if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) {
|
||||
// if (i > 0) pCtx[i].pTsOutput = pCtx[i-1].pOutput;
|
||||
// }
|
||||
}
|
||||
|
||||
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
|
||||
|
@ -4635,6 +4630,28 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
|
|||
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||
|
||||
void extractTableSchemaVersion(SReadHandle *pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pHandle->meta, 0);
|
||||
metaGetTableEntryByUid(&mr, uid);
|
||||
|
||||
pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
|
||||
|
||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
|
||||
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||
metaGetTableEntryByUid(&mr, suid);
|
||||
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
|
||||
} else {
|
||||
pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
}
|
||||
|
||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||
int32_t type = nodeType(pPhyNode);
|
||||
|
@ -4648,6 +4665,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
return NULL;
|
||||
}
|
||||
|
||||
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
|
@ -4685,7 +4703,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo,
|
||||
pScanPhyNode->node.pConditions, pOperatorDumy);
|
||||
|
@ -4700,7 +4718,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
|
||||
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
||||
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
||||
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
||||
|
@ -4723,7 +4741,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* colList =
|
||||
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
|
||||
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
SOperatorInfo* pOperator =
|
||||
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
||||
|
@ -4805,7 +4823,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||
|
@ -4996,7 +5014,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
|||
}
|
||||
|
||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||
int32_t type) {
|
||||
SExecTaskInfo* pTaskInfo, int32_t type) {
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
||||
if (pList == NULL) {
|
||||
|
@ -5004,10 +5022,16 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
return NULL;
|
||||
}
|
||||
|
||||
const char* tname = pTaskInfo->schemaVer.tablename;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||
|
||||
if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) &&
|
||||
strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) {
|
||||
pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName);
|
||||
}
|
||||
|
||||
SColMatchInfo c = {0};
|
||||
c.output = true;
|
||||
c.colId = pColNode->colId;
|
||||
|
@ -5201,6 +5225,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
|||
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
|
||||
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
|
||||
|
||||
taosMemoryFree(pTaskInfo->schemaVer.dbname);
|
||||
taosMemoryFree(pTaskInfo->schemaVer.tablename);
|
||||
taosMemoryFreeClear(pTaskInfo->sql);
|
||||
taosMemoryFreeClear(pTaskInfo->id.str);
|
||||
taosMemoryFreeClear(pTaskInfo);
|
||||
|
|
|
@ -460,7 +460,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -13,7 +13,7 @@ class TDTestCase:
|
|||
"wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143}
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def prepare_datas(self):
|
||||
tdSql.execute(
|
||||
|
|
Loading…
Reference in New Issue