fix(query): add query involved column info

This commit is contained in:
Haojun Liao 2022-07-21 11:20:30 +08:00
parent abfa6d9162
commit ee34c3bb3e
7 changed files with 43 additions and 18 deletions

View File

@ -64,7 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle * @param SReadHandle
* @return * @return
*/ */
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols); qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper);
/** /**
* Set the input data block for the stream scan. * Set the input data block for the stream scan.

View File

@ -88,7 +88,8 @@ typedef struct {
STqExecTb execTb; STqExecTb execTb;
STqExecDb execDb; STqExecDb execDb;
}; };
int32_t numOfCols; // number of out pout column, temporarily used int32_t numOfCols; // number of out pout column, temporarily used
SSchemaWrapper *pSchemaWrapper; // columns that are involved in query
} STqExecHandle; } STqExecHandle;
typedef struct { typedef struct {

View File

@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true, .initTqReader = true,
.version = ver, .version = ver,
}; };
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);

View File

@ -93,7 +93,8 @@ int32_t tqMetaOpen(STQ* pTq) {
.version = handle.snapshotVer, .version = handle.snapshotVer,
}; };
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols); handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols,
&handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task[i]); ASSERT(handle.execHandle.execCol.task[i]);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner);

View File

@ -164,6 +164,7 @@ typedef struct {
char* dbname; char* dbname;
int32_t tversion; int32_t tversion;
SSchemaWrapper* sw; SSchemaWrapper* sw;
SSchemaWrapper* qsw;
} SSchemaInfo; } SSchemaInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {

View File

@ -104,7 +104,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code; return code;
} }
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols) { qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper) {
if (msg == NULL) { if (msg == NULL) {
// TODO create raw scan // TODO create raw scan
return NULL; return NULL;
@ -138,6 +138,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
} }
} }
*pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);;
return pTaskInfo; return pTaskInfo;
} }

View File

@ -4139,32 +4139,52 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
int32_t code = metaGetTableEntryByUid(&mr, uid); int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return terrno;
} }
pTaskInfo->schemaInfo.tablename = strdup(mr.me.name); SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
pSchemaInfo->tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) { if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder); tDecoderClear(&mr.coder);
tb_uid_t suid = mr.me.ctbEntry.suid; tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid); metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
} else { } else {
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
} }
metaReaderClear(&mr); metaReaderClear(&mr);
int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
pqSw->version = pSchemaInfo->sw->version;
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pScanNode->pScanCols, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
for(int32_t j = 0; j < pSchemaInfo->sw->nCols; ++j) {
if (pColNode->colId == pSchemaInfo->sw->pSchema[j].colId) {
pqSw->pSchema[pqSw->nCols++] = pSchemaInfo->sw->pSchema[j];
break;
}
}
}
pSchemaInfo->qsw = pqSw;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -4175,8 +4195,8 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
} }
taosMemoryFree(pSchemaInfo->tablename); taosMemoryFree(pSchemaInfo->tablename);
taosMemoryFree(pSchemaInfo->sw->pSchema); tDeleteSSchemaWrapper(pSchemaInfo->sw);
taosMemoryFree(pSchemaInfo->sw); tDeleteSSchemaWrapper(pSchemaInfo->qsw);
} }
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
@ -4377,7 +4397,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo); code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
if (code) { if (code) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
@ -4397,7 +4417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo); code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
if (code) { if (code) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
@ -4479,7 +4499,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;