adj operator res

This commit is contained in:
54liuyao 2024-07-29 10:35:06 +08:00
parent e0f49cb148
commit d999c00227
6 changed files with 111 additions and 54 deletions

View File

@ -1002,7 +1002,7 @@ int32_t doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, S
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
#ifdef __cplusplus
}

View File

@ -295,7 +295,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
return NULL;
}
createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
if (NULL == pTaskInfo->pRoot || code != 0) {
taosMemoryFree(pTaskInfo);
return NULL;
@ -1160,7 +1160,12 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
int32_t code = tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
}
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
@ -1231,7 +1236,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
if (subType == TOPIC_SUB_TYPE__COLUMN) {
extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
if (pOperator == NULL || code != 0) {
return code;
}

View File

@ -567,7 +567,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
QUERY_CHECK_CODE(code, lino, _err);
extractQualifiedTupleByFilterResult(pBlock, p, status);
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
QUERY_CHECK_CODE(code, lino, _err);
if (pColMatchInfo != NULL) {
size_t size = taosArrayGetSize(pColMatchInfo->pList);
@ -591,18 +592,20 @@ _err:
return code;
}
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
int32_t code = TSDB_CODE_SUCCESS;
int8_t* pIndicator = (int8_t*)p->pData;
if (status == FILTER_RESULT_ALL_QUALIFIED) {
// here nothing needs to be done
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
trimDataBlock(pBlock, pBlock->info.rows, NULL);
code = trimDataBlock(pBlock, pBlock->info.rows, NULL);
pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
} else {
qError("unknown filter result type: %d", status);
}
return code;
}
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {

View File

@ -308,7 +308,10 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo
}
}
extractQualifiedTupleByFilterResult(pBlock, p, status);
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
code = TSDB_CODE_SUCCESS;
@ -375,7 +378,10 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
}
}
extractQualifiedTupleByFilterResult(pBlock, p, status);
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
if (code != TSDB_CODE_SUCCESS) {
goto _return;
}
code = TSDB_CODE_SUCCESS;

View File

@ -1311,7 +1311,8 @@ static void destroyTableScanOperatorInfo(void* param) {
}
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
@ -2413,9 +2414,13 @@ _end:
return code;
}
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
static int32_t doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
bool* p = NULL;
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
@ -2445,18 +2450,27 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
code = trimDataBlock(pBlock, pBlock->info.rows, p);
QUERY_CHECK_CODE(code, lino, _end);
}
}
_end:
taosMemoryFree(p);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offset) {
static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offset) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pBlock->info.window.skey != offset->ts || offset->primaryKey.type == 0) {
return;
return code;
}
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
bool hasUnqualified = false;
SColumnInfoData* pColTs = taosArrayGet(pBlock->pDataBlock, 0);
@ -2485,16 +2499,26 @@ static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offse
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
code = trimDataBlock(pBlock, pBlock->info.rows, p);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
taosMemoryFree(p);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
static int32_t rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
bool hasUnqualified = false;
int64_t skey = pWindow->skey;
int64_t ekey = pWindow->ekey;
@ -2531,14 +2555,24 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, co
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
code = trimDataBlock(pBlock, pBlock->info.rows, p);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("%s re-build delete datablock, start key revised to:%" PRId64 ", rows:%" PRId64, id, skey,
pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
} else {
qDebug("%s not update the delete block", id);
}
_end:
taosMemoryFree(p);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t colIdComparFn(const void* param1, const void* param2) {
@ -2657,7 +2691,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
// filter the block extracted from WAL files, according to the time window apply additional time window filter
doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
code = doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pRes->info.dataLoad = 1;
code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
@ -2676,14 +2711,19 @@ _end:
return code;
}
static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) {
static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) {
int32_t code = TSDB_CODE_SUCCESS;
SValue val = {0};
if (hasPrimaryKey) {
doBlockDataPrimaryKeyFilter(pBlock, offset);
code = doBlockDataPrimaryKeyFilter(pBlock, offset);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
if (pBlock->info.rows < 1) {
return;
return code;
}
void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1);
val.type = pColPk->info.type;
@ -2696,6 +2736,7 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset
}
}
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
return code;
}
static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -2720,7 +2761,7 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pResult && pResult->info.rows > 0) {
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
code = processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid);
if (pResult->info.rows > 0) {
(*ppRes) = pResult;
@ -3156,7 +3197,8 @@ FETCH_NEXT_BLOCK:
code = setBlockGroupIdByUid(pInfo, pDelBlock);
QUERY_CHECK_CODE(code, lino, _end);
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
code = rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
QUERY_CHECK_CODE(code, lino, _end);
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered",
GET_TASKID(pTaskInfo));
if (pDelBlock->info.rows == 0) {
@ -3479,7 +3521,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pBlock && pBlock->info.rows > 0) {
bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext);
processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
code = processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
(*ppRes) = pBlock;
return code;
@ -3964,7 +4006,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes);
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes);
QUERY_CHECK_CODE(code, lino, _error);
if (hasPrimaryKeyCol(pInfo)) {
@ -5626,7 +5668,8 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
}
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
int32_t code = 0;

View File

@ -5183,7 +5183,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delIndex = 0;
createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
initResultRowInfo(&pInfo->binfo.resultRowInfo);