fix: operator param issues

This commit is contained in:
dapan1121 2023-07-07 19:31:56 +08:00
parent c19c59d5e6
commit 8e1f4d1d6a
6 changed files with 38 additions and 11 deletions

View File

@ -5501,6 +5501,7 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) {
if (tEncodeI32(pEncoder, pOpParam->opType) < 0) return -1;
if (tEncodeI32(pEncoder, pOpParam->downstreamIdx) < 0) return -1;
switch (pOpParam->opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value;
@ -5528,6 +5529,7 @@ int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) {
int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) {
if (tDecodeI32(pDecoder, &pOpParam->opType) < 0) return -1;
if (tDecodeI32(pDecoder, &pOpParam->downstreamIdx) < 0) return -1;
switch (pOpParam->opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
@ -5545,6 +5547,7 @@ int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam)
} else {
pScan->pUidList = NULL;
}
pOpParam->value = pScan;
break;
}
default:

View File

@ -61,6 +61,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
pGc->groupValueSize = grpValSize;
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
(*ppRes)->downstreamIdx = downstreamIdx;
(*ppRes)->value = pGc;
return TSDB_CODE_SUCCESS;
@ -132,16 +133,22 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ
SOperatorParam* pExcParam1 = NULL;
SOperatorParam* pGcParam0 = NULL;
SOperatorParam* pGcParam1 = NULL;
int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx);
int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx);
int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx);
int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx);
qDebug("start stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, *leftVg, *leftUid, *rightVg, *rightUid);
int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL);
int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid, NULL);
if (TSDB_CODE_SUCCESS == code) {
code = buildExchangeOperatorParam(&pExcParam1, 1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL);
code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0);
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, leftUid, pUid0->info.bytes, pExcParam0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, pUid1->pData + pUid1->info.bytes * rowIdx, pUid1->info.bytes, pExcParam1);
code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, rightUid, pUid1->info.bytes, pExcParam1);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1);
@ -165,6 +172,11 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes)
*ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam);
if (*ppRes) {
pPost->isStarted = true;
pInfo->execInfo.postBlkNum++;
pInfo->execInfo.postBlkRows += (*ppRes)->info.rows;
qDebug("join res block retrieved");
} else {
qDebug("Empty join res block retrieved");
}
}

View File

@ -159,11 +159,6 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
@ -206,6 +201,11 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
@ -434,6 +434,7 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in
}
(*ppRes)->opType = srcOpType;
(*ppRes)->downstreamIdx = 0;
(*ppRes)->value = pScan;
(*ppRes)->pChildren = NULL;
@ -762,6 +763,8 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
dataInfo.srcOpType = pParam->srcOpType;
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
pOperator->pOperatorParam = NULL;
return TSDB_CODE_SUCCESS;
}
@ -769,7 +772,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
if (OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) {
if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorParam)) {
return TSDB_CODE_SUCCESS;
}

View File

@ -647,8 +647,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
pJoinInfo->leftPos = 0;
if (pJoinInfo->pLeft == NULL) {
qDebug("merge join left got empty block");
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
return false;
} else {
qDebug("merge join left got block");
}
}
@ -657,8 +660,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
pJoinInfo->rightPos = 0;
if (pJoinInfo->pRight == NULL) {
qDebug("merge join right got empty block");
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
return false;
} else {
qDebug("merge join right got block");
}
}
// only the timestamp match support for ordinary table

View File

@ -640,6 +640,8 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara
pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild;
}
taosArrayClear(pOperator->pOperatorParam->pChildren);
return TSDB_CODE_SUCCESS;
}

View File

@ -815,7 +815,6 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
qTrace("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo));
}
pOperator->pOperatorParam = NULL;
return code;
}
@ -852,12 +851,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (pOperator->pOperatorParam) {
int32_t code = createTableListInfoFromParam(pOperator);
pOperator->pOperatorParam = NULL;
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pInfo->currentGroupId != -1) {
pInfo->currentGroupId = 0;
return startNextGroupScan(pOperator);
}
}