Merge pull request #12546 from taosdata/feature/3.0_liaohj
refactor(query): do some internal refactor.
This commit is contained in:
commit
3a1c12f8fd
|
@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
|
||||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||||
|
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
|
||||||
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
||||||
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
||||||
|
|
||||||
|
|
|
@ -2055,6 +2055,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
// it is a reserved column for scalar function, and no data in this column yet.
|
||||||
|
if (pSrc->pData == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
for (int32_t j = 0; j < totalRows; ++j) {
|
for (int32_t j = 0; j < totalRows; ++j) {
|
||||||
if (rowRes[j] == 0) {
|
if (rowRes[j] == 0) {
|
||||||
|
@ -4372,9 +4377,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->limit = *pLimit;
|
pInfo->limit = *pLimit;
|
||||||
pInfo->slimit = *pSlimit;
|
pInfo->slimit = *pSlimit;
|
||||||
pInfo->curOffset = pLimit->offset;
|
pInfo->curOffset = pLimit->offset;
|
||||||
pInfo->curSOffset = pSlimit->offset;
|
pInfo->curSOffset = pSlimit->offset;
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
|
|
|
@ -291,20 +291,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
|
||||||
|
|
||||||
// this is to handle the tbname
|
// this is to handle the tbname
|
||||||
if (fmIsScanPseudoColumnFunc(functionId)) {
|
if (fmIsScanPseudoColumnFunc(functionId)) {
|
||||||
struct SScalarFuncExecFuncs fpSet = {0};
|
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
|
||||||
fmGetScalarFuncExecFuncs(functionId, &fpSet);
|
|
||||||
|
|
||||||
SColumnInfoData infoData = {0};
|
|
||||||
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
infoData.info.bytes = sizeof(uint64_t);
|
|
||||||
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
|
||||||
|
|
||||||
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
|
|
||||||
SScalarParam srcParam = {
|
|
||||||
.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
|
|
||||||
|
|
||||||
SScalarParam param = {.columnData = pColInfoData};
|
|
||||||
fpSet.process(&srcParam, 1, ¶m);
|
|
||||||
} else { // these are tags
|
} else { // these are tags
|
||||||
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
|
@ -316,6 +303,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
|
||||||
|
struct SScalarFuncExecFuncs fpSet = {0};
|
||||||
|
fmGetScalarFuncExecFuncs(functionId, &fpSet);
|
||||||
|
|
||||||
|
SColumnInfoData infoData = {0};
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
infoData.info.bytes = sizeof(uint64_t);
|
||||||
|
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
||||||
|
|
||||||
|
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
|
||||||
|
SScalarParam srcParam = {
|
||||||
|
.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
|
||||||
|
|
||||||
|
SScalarParam param = {.columnData = pColInfoData};
|
||||||
|
fpSet.process(&srcParam, 1, ¶m);
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
|
|
Loading…
Reference in New Issue