Merge pull request #11338 from taosdata/feature/qnode

feature/qnode
This commit is contained in:
dapan1121 2022-04-09 14:38:58 +08:00 committed by GitHub
commit f207646a64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 19 deletions

View File

@ -3297,6 +3297,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
int8_t* rowRes = NULL; int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pBlock); SSDataBlock* px = createOneDataBlock(pBlock);
blockDataEnsureCapacity(px, pBlock->info.rows); blockDataEnsureCapacity(px, pBlock->info.rows);
@ -3306,19 +3307,25 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
if (keep) {
colDataAssign(pDst, pSrc, pBlock->info.rows);
numOfRow = pBlock->info.rows;
} else if (NULL != rowRes) {
numOfRow = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
numOfRow = 0; if (colDataIsNull_s(pSrc, j)) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataAppendNULL(pDst, numOfRow);
if (rowRes[j] == 0) { } else {
continue; colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
}
numOfRow += 1;
} }
} else {
if (colDataIsNull_s(pSrc, j)) { numOfRow = 0;
colDataAppendNULL(pDst, numOfRow);
} else {
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
}
numOfRow += 1;
} }
*pSrc = *pDst; *pSrc = *pDst;

View File

@ -597,6 +597,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
int8_t* rowRes = NULL; int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pInfo->pRes); SSDataBlock* px = createOneDataBlock(pInfo->pRes);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows); blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
@ -607,14 +608,21 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
numOfRow = 0; if (keep) {
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { colDataAssign(pDest, pSrc, pInfo->pRes->info.rows);
if (rowRes[j] == 0) { numOfRow = pInfo->pRes->info.rows;
continue; } else if (NULL != rowRes) {
} numOfRow = 0;
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1; numOfRow += 1;
}
} else {
numOfRow = 0;
} }
} }

View File

@ -1797,6 +1797,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
} else { } else {
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
out.columnData->info.type = type; out.columnData->info.type = type;
out.columnData->info.bytes = tDataTypes[type].bytes;
// todo refactor the convert // todo refactor the convert
int32_t code = doConvertDataType(var, &out); int32_t code = doConvertDataType(var, &out);
@ -1804,6 +1805,8 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
qError("convert value to type[%d] failed", type); qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
memcpy(fi->data, out.columnData->pData, out.columnData->info.bytes);
} }
} }
@ -3638,6 +3641,10 @@ int32_t fltGetDataFromSlotId(void *param, int32_t id, void **data) {
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param) { int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param) {
if (NULL == info) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
return fltSetColFieldDataImpl(info, param, fltGetDataFromSlotId, false); return fltSetColFieldDataImpl(info, param, fltGetDataFromSlotId, false);
} }
@ -3691,6 +3698,10 @@ _return:
} }
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
if (NULL == info) {
return false;
}
if (info->scalarMode) { if (info->scalarMode) {
SScalarParam output = {0}; SScalarParam output = {0};