enh: add batch table scan
This commit is contained in:
parent
04fea1e38f
commit
c45359c989
|
@ -162,14 +162,20 @@ typedef struct SGroupCacheLogicNode {
|
|||
bool grpColsMayBeNull;
|
||||
bool grpByUid;
|
||||
bool globalGrp;
|
||||
bool batchFetch;
|
||||
SNodeList* pGroupCols;
|
||||
} SGroupCacheLogicNode;
|
||||
|
||||
typedef struct SDynQueryCtrlLogicNode {
|
||||
SLogicNode node;
|
||||
EDynQueryType qType;
|
||||
typedef struct SDynQueryCtrlStbJoin {
|
||||
bool batchJoin;
|
||||
SNodeList* pVgList;
|
||||
SNodeList* pUidList;
|
||||
} SDynQueryCtrlStbJoin;
|
||||
|
||||
typedef struct SDynQueryCtrlLogicNode {
|
||||
SLogicNode node;
|
||||
EDynQueryType qType;
|
||||
SDynQueryCtrlStbJoin stbJoin;
|
||||
} SDynQueryCtrlLogicNode;
|
||||
|
||||
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;
|
||||
|
@ -445,13 +451,14 @@ typedef struct SGroupCachePhysiNode {
|
|||
bool grpColsMayBeNull;
|
||||
bool grpByUid;
|
||||
bool globalGrp;
|
||||
bool batchFetch;
|
||||
SNodeList* pGroupCols;
|
||||
} SGroupCachePhysiNode;
|
||||
|
||||
typedef struct SStbJoinDynCtrlBasic {
|
||||
bool batchJoin;
|
||||
int32_t vgSlot[2];
|
||||
int32_t uidSlot[2];
|
||||
bool batchJoin;
|
||||
} SStbJoinDynCtrlBasic;
|
||||
|
||||
typedef struct SDynQueryCtrlPhysiNode {
|
||||
|
|
|
@ -582,7 +582,6 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
|
|||
switch (pInfo->qType) {
|
||||
case DYN_QTYPE_STB_HASH:
|
||||
memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
|
||||
pInfo->stbJoin.basic.batchJoin = true;
|
||||
if (pInfo->stbJoin.basic.batchJoin) {
|
||||
code = initBatchStbJoinVgHash(&pInfo->stbJoin.ctx.prev);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
|
@ -280,6 +280,8 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
|
|||
}
|
||||
|
||||
if (pBlock) {
|
||||
qError("%s group cache retrieved block with groupId: %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
|
||||
|
||||
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
|
||||
if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
|
||||
code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
|
||||
|
@ -342,6 +344,60 @@ static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData*
|
|||
pGroup->startOffset = pGroup->pVgCtx->fileSize;
|
||||
}
|
||||
|
||||
|
||||
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId) {
|
||||
taosThreadMutexInit(&pGroup->mutex, NULL);
|
||||
pGroup->downstreamIdx = downstreamIdx;
|
||||
pGroup->vgId = vgId;
|
||||
pGroup->fileId = -1;
|
||||
pGroup->startBlkId = -1;
|
||||
pGroup->endBlkId = -1;
|
||||
pGroup->startOffset = -1;
|
||||
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
|
||||
}
|
||||
|
||||
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
|
||||
SGroupCacheOperatorInfo* pGCache = pOperator->info;
|
||||
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
|
||||
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
|
||||
SGroupCacheData grpData = {0};
|
||||
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId);
|
||||
|
||||
while (true) {
|
||||
if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
|
||||
if (terrno == TSDB_CODE_DUP_KEY) {
|
||||
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid));
|
||||
if (*ppGrp) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid));
|
||||
if (*ppGrp && pParam->pChildren) {
|
||||
SGcNewGroupInfo newGroup;
|
||||
newGroup.pGroup = *ppGrp;
|
||||
newGroup.vgId = vgId;
|
||||
newGroup.uid = uid;
|
||||
newGroup.pParam = taosArrayGetP(pParam->pChildren, 0);
|
||||
|
||||
taosWLockLatch(&pCtx->grpLock);
|
||||
if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
|
||||
taosWUnLockLatch(&pCtx->grpLock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosWUnLockLatch(&pCtx->grpLock);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SGroupCacheOperatorInfo* pGCache = pOperator->info;
|
||||
|
@ -594,58 +650,6 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId) {
|
||||
taosThreadMutexInit(&pGroup->mutex, NULL);
|
||||
pGroup->downstreamIdx = downstreamIdx;
|
||||
pGroup->vgId = vgId;
|
||||
pGroup->fileId = -1;
|
||||
pGroup->startBlkId = -1;
|
||||
pGroup->endBlkId = -1;
|
||||
pGroup->startOffset = -1;
|
||||
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
|
||||
}
|
||||
|
||||
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
|
||||
SGroupCacheOperatorInfo* pGCache = pOperator->info;
|
||||
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
|
||||
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
|
||||
SGroupCacheData grpData = {0};
|
||||
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId);
|
||||
|
||||
while (true) {
|
||||
if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
|
||||
if (terrno == TSDB_CODE_DUP_KEY) {
|
||||
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid));
|
||||
if (*ppGrp) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid));
|
||||
if (*ppGrp && pParam->pChildren) {
|
||||
SGcNewGroupInfo newGroup;
|
||||
newGroup.pGroup = *ppGrp;
|
||||
newGroup.vgId = vgId;
|
||||
newGroup.uid = uid;
|
||||
newGroup.pParam = taosArrayGetP(pParam->pChildren, 0);
|
||||
|
||||
taosWLockLatch(&pCtx->grpLock);
|
||||
if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
|
||||
taosWUnLockLatch(&pCtx->grpLock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosWUnLockLatch(&pCtx->grpLock);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) {
|
||||
pSession->pParam = pGcParam;
|
||||
pSession->downstreamIdx = pGcParam->downstreamIdx;
|
||||
|
@ -664,7 +668,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
|
|||
|
||||
SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
|
||||
if (NULL == pGroup) {
|
||||
code = addNewGroupData(pOperator, pParam, &pGroup, pGcParam->vgId, pGcParam->tbUid);
|
||||
code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
@ -733,6 +737,11 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
|
|||
if (NULL == pInfo->pDownstreams[i].pVgTbHash) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (pInfo->batchFetch) {
|
||||
SGcVgroupCtx vgCtx = {.pTbList = NULL, .lastUid = 0, .fileSize = 0, .fileId = i};
|
||||
int32_t defaultVg = -1;
|
||||
tSimpleHashPut(pInfo->pDownstreams[i].pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx));
|
||||
}
|
||||
|
||||
pInfo->pDownstreams[i].pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
|
||||
if (NULL == pInfo->pDownstreams[i].pNewGrpList) {
|
||||
|
@ -795,7 +804,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
|||
pInfo->maxCacheSize = -1;
|
||||
pInfo->grpByUid = pPhyciNode->grpByUid;
|
||||
pInfo->globalGrp = pPhyciNode->globalGrp;
|
||||
pInfo->batchFetch = false;
|
||||
pInfo->batchFetch = pPhyciNode->batchFetch;
|
||||
|
||||
if (!pInfo->grpByUid) {
|
||||
qError("only group cache by uid is supported now");
|
||||
|
|
|
@ -852,8 +852,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result != NULL) {
|
||||
if (pOperator->dynamicTask) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
||||
result->info.id.groupId = pKeyInfo->uid;
|
||||
result->info.id.groupId = result->info.id.uid;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -891,8 +890,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result != NULL) {
|
||||
if (pOperator->dynamicTask) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
||||
result->info.id.groupId = pKeyInfo->uid;
|
||||
result->info.id.groupId = result->info.id.uid;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -540,6 +540,7 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache
|
|||
COPY_SCALAR_FIELD(grpColsMayBeNull);
|
||||
COPY_SCALAR_FIELD(grpByUid);
|
||||
COPY_SCALAR_FIELD(globalGrp);
|
||||
COPY_SCALAR_FIELD(batchFetch);
|
||||
CLONE_NODE_LIST_FIELD(pGroupCols);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -547,8 +548,9 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache
|
|||
static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
COPY_SCALAR_FIELD(qType);
|
||||
CLONE_NODE_LIST_FIELD(pVgList);
|
||||
CLONE_NODE_LIST_FIELD(pUidList);
|
||||
COPY_SCALAR_FIELD(stbJoin.batchJoin);
|
||||
CLONE_NODE_LIST_FIELD(stbJoin.pVgList);
|
||||
CLONE_NODE_LIST_FIELD(stbJoin.pUidList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1228,6 +1228,9 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
|
||||
static const char* jkDynQueryCtrlLogicPlanQueryType = "QueryType";
|
||||
static const char* jkDynQueryCtrlLogicPlanStbJoinBatchJoin = "BatchJoin";
|
||||
static const char* jkDynQueryCtrlLogicPlanStbJoinVgList = "VgroupList";
|
||||
static const char* jkDynQueryCtrlLogicPlanStbJoinUidList = "UidList";
|
||||
|
||||
static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SDynQueryCtrlLogicNode* pNode = (const SDynQueryCtrlLogicNode*)pObj;
|
||||
|
@ -1236,6 +1239,15 @@ static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, pNode->stbJoin.batchJoin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, pNode->stbJoin.pVgList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkDynQueryCtrlLogicPlanStbJoinUidList, pNode->stbJoin.pUidList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1247,6 +1259,15 @@ static int32_t jsonToLogicDynQueryCtrlNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType, code);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetBoolValue(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, &pNode->stbJoin.batchJoin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, &pNode->stbJoin.pVgList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkDynQueryCtrlLogicPlanStbJoinUidList, &pNode->stbJoin.pUidList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2929,6 +2950,7 @@ static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns";
|
|||
static const char* jkGroupCachePhysiPlanGrpColsMayBeNull = "GroupColumnsMayBeNull";
|
||||
static const char* jkGroupCachePhysiPlanGroupByUid = "GroupByUid";
|
||||
static const char* jkGroupCachePhysiPlanGlobalGroup = "GlobalGroup";
|
||||
static const char* jkGroupCachePhysiPlanBatchFetch = "BatchFetch";
|
||||
|
||||
|
||||
static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -2944,6 +2966,9 @@ static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGlobalGroup, pNode->globalGrp);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanBatchFetch, pNode->batchFetch);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols);
|
||||
}
|
||||
|
@ -2963,6 +2988,9 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGlobalGroup, &pNode->globalGrp);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanBatchFetch, &pNode->batchFetch);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols);
|
||||
}
|
||||
|
@ -2970,6 +2998,7 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
|
||||
static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType";
|
||||
static const char* jkDynQueryCtrlPhysiPlanBatchJoin = "BatchJoin";
|
||||
static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]";
|
||||
static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]";
|
||||
static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]";
|
||||
|
@ -2985,7 +3014,10 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
switch (pNode->qType) {
|
||||
case DYN_QTYPE_STB_HASH: {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]);
|
||||
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, pNode->stbJoin.batchJoin);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot1, pNode->stbJoin.vgSlot[1]);
|
||||
}
|
||||
|
@ -3015,6 +3047,9 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) {
|
|||
switch (pNode->qType) {
|
||||
case DYN_QTYPE_STB_HASH: {
|
||||
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, &pNode->stbJoin.batchJoin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0], code);
|
||||
}
|
||||
|
|
|
@ -3530,6 +3530,7 @@ enum {
|
|||
PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL,
|
||||
PHY_GROUP_CACHE_CODE_GROUP_BY_UID,
|
||||
PHY_GROUP_CACHE_CODE_GLOBAL_GROUP,
|
||||
PHY_GROUP_CACHE_CODE_BATCH_FETCH,
|
||||
PHY_GROUP_CACHE_CODE_GROUP_COLUMNS
|
||||
};
|
||||
|
||||
|
@ -3549,6 +3550,9 @@ static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, pNode->globalGrp);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_BATCH_FETCH, pNode->batchFetch);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -3575,6 +3579,9 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_GROUP_CACHE_CODE_GLOBAL_GROUP:
|
||||
code = tlvDecodeBool(pTlv, &pNode->globalGrp);
|
||||
break;
|
||||
case PHY_GROUP_CACHE_CODE_BATCH_FETCH:
|
||||
code = tlvDecodeBool(pTlv, &pNode->batchFetch);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -3587,6 +3594,7 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
enum {
|
||||
PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1,
|
||||
PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0,
|
||||
|
@ -3603,7 +3611,10 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
switch (pNode->qType) {
|
||||
case DYN_QTYPE_STB_HASH: {
|
||||
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]);
|
||||
code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN, pNode->stbJoin.batchJoin);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, pNode->stbJoin.vgSlot[1]);
|
||||
}
|
||||
|
@ -3635,6 +3646,9 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE:
|
||||
code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType));
|
||||
break;
|
||||
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN:
|
||||
code = tlvDecodeBool(pTlv, &pNode->stbJoin.batchJoin);
|
||||
break;
|
||||
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0:
|
||||
code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[0], sizeof(pNode->stbJoin.vgSlot[0]));
|
||||
break;
|
||||
|
|
|
@ -3232,6 +3232,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
|
|||
pGrpCache->node.dynamicOp = true;
|
||||
pGrpCache->grpColsMayBeNull = false;
|
||||
pGrpCache->grpByUid = true;
|
||||
pGrpCache->batchFetch = true;
|
||||
pGrpCache->node.pChildren = pChildren;
|
||||
pGrpCache->node.pTargets = nodesMakeList();
|
||||
if (NULL == pGrpCache->node.pTargets) {
|
||||
|
@ -3337,6 +3338,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p
|
|||
}
|
||||
|
||||
pDynCtrl->qType = DYN_QTYPE_STB_HASH;
|
||||
pDynCtrl->stbJoin.batchJoin = true;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pDynCtrl->node.pChildren = nodesMakeList();
|
||||
|
@ -3346,24 +3348,24 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p
|
|||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pDynCtrl->pVgList = nodesMakeList();
|
||||
if (NULL == pDynCtrl->pVgList) {
|
||||
pDynCtrl->stbJoin.pVgList = nodesMakeList();
|
||||
if (NULL == pDynCtrl->stbJoin.pVgList) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pDynCtrl->pUidList = nodesMakeList();
|
||||
if (NULL == pDynCtrl->pUidList) {
|
||||
pDynCtrl->stbJoin.pUidList = nodesMakeList();
|
||||
if (NULL == pDynCtrl->stbJoin.pUidList) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
SJoinLogicNode* pHJoin = (SJoinLogicNode*)pPrev;
|
||||
nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 0));
|
||||
nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 2));
|
||||
nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 1));
|
||||
nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 3));
|
||||
nodesListStrictAppend(pDynCtrl->stbJoin.pUidList, nodesListGetNode(pHJoin->node.pTargets, 0));
|
||||
nodesListStrictAppend(pDynCtrl->stbJoin.pUidList, nodesListGetNode(pHJoin->node.pTargets, 2));
|
||||
nodesListStrictAppend(pDynCtrl->stbJoin.pVgList, nodesListGetNode(pHJoin->node.pTargets, 1));
|
||||
nodesListStrictAppend(pDynCtrl->stbJoin.pVgList, nodesListGetNode(pHJoin->node.pTargets, 3));
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPrev);
|
||||
|
|
|
@ -988,6 +988,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
|
|||
pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull;
|
||||
pGrpCache->grpByUid = pLogicNode->grpByUid;
|
||||
pGrpCache->globalGrp = pLogicNode->globalGrp;
|
||||
pGrpCache->batchFetch = pLogicNode->batchFetch;
|
||||
SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -1002,21 +1003,15 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
SDynQueryCtrlPhysiNode* pDynCtrl =
|
||||
(SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL);
|
||||
if (NULL == pDynCtrl) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
|
||||
SDynQueryCtrlPhysiNode* pDynCtrl) {
|
||||
SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
|
||||
SNodeList* pVgList = NULL;
|
||||
SNodeList* pUidList = NULL;
|
||||
int32_t code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pVgList, &pVgList);
|
||||
int32_t code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pVgList, &pVgList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pUidList, &pUidList);
|
||||
code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pUidList, &pUidList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SNode* pNode = NULL;
|
||||
|
@ -1030,10 +1025,32 @@ static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* p
|
|||
pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId;
|
||||
++i;
|
||||
}
|
||||
pDynCtrl->stbJoin.batchJoin = pLogicNode->stbJoin.batchJoin;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SDynQueryCtrlPhysiNode* pDynCtrl =
|
||||
(SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL);
|
||||
if (NULL == pDynCtrl) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
switch (pLogicNode->qType) {
|
||||
case DYN_QTYPE_STB_HASH:
|
||||
code = updateDynQueryCtrlStbJoinInfo(pCxt, pChildren, pLogicNode, pDynCtrl);
|
||||
break;
|
||||
default:
|
||||
planError("Invalid dyn query ctrl type:%d", pLogicNode->qType);
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pDynCtrl->qType = pLogicNode->qType;
|
||||
|
||||
*pPhyNode = (SPhysiNode*)pDynCtrl;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue