fix: dynamic query ctrl compile issues
This commit is contained in:
parent
b371b7e6be
commit
18c2400c75
|
@ -105,6 +105,15 @@ typedef struct SExchangeOpStopInfo {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
} SExchangeOpStopInfo;
|
} SExchangeOpStopInfo;
|
||||||
|
|
||||||
|
typedef struct SGcOperatorParam {
|
||||||
|
SOperatorParam* pChild;
|
||||||
|
int64_t sessionId;
|
||||||
|
int32_t downstreamIdx;
|
||||||
|
bool needCache;
|
||||||
|
void* pGroupValue;
|
||||||
|
int32_t groupValueSize;
|
||||||
|
} SGcOperatorParam;
|
||||||
|
|
||||||
typedef struct SExprSupp {
|
typedef struct SExprSupp {
|
||||||
SExprInfo* pExprInfo;
|
SExprInfo* pExprInfo;
|
||||||
int32_t numOfExprs; // the number of scalar expression in group operator
|
int32_t numOfExprs; // the number of scalar expression in group operator
|
||||||
|
|
|
@ -21,15 +21,6 @@ extern "C" {
|
||||||
|
|
||||||
#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760
|
#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760
|
||||||
|
|
||||||
typedef struct SGcOperatorParam {
|
|
||||||
SOperatorParam* pChild;
|
|
||||||
int64_t sessionId;
|
|
||||||
int32_t downstreamIdx;
|
|
||||||
bool needCache;
|
|
||||||
void* pGroupValue;
|
|
||||||
int32_t groupValueSize;
|
|
||||||
} SGcOperatorParam;
|
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct SGcBlkBufInfo {
|
typedef struct SGcBlkBufInfo {
|
||||||
void* next;
|
void* next;
|
||||||
|
|
|
@ -154,6 +154,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
|
||||||
|
|
||||||
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
|
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
|
||||||
|
|
|
@ -141,9 +141,9 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ
|
||||||
SOperatorParam* pGcParam0 = NULL;
|
SOperatorParam* pGcParam0 = NULL;
|
||||||
SOperatorParam* pGcParam1 = NULL;
|
SOperatorParam* pGcParam1 = NULL;
|
||||||
|
|
||||||
int32_t code = buildExchangeOperatorParam(&pExcParam0, pVg0->pData + pVg0->info.bytes * rowIdx, pUid0->pData + pUid0->info.bytes * rowIdx, NULL);
|
int32_t code = buildExchangeOperatorParam(&pExcParam0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildExchangeOperatorParam(&pExcParam1, pVg1->pData + pVg1->info.bytes * rowIdx, pUid1->pData + pUid1->info.bytes * rowIdx, NULL);
|
code = buildExchangeOperatorParam(&pExcParam1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0);
|
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0);
|
||||||
|
|
|
@ -539,7 +539,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
||||||
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
|
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
|
||||||
|
|
||||||
int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
|
int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
|
||||||
qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
|
qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,8 +70,7 @@ typedef struct SMJoinOperatorInfo {
|
||||||
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
||||||
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
|
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
|
||||||
static void destroyMergeJoinOperator(void* param);
|
static void destroyMergeJoinOperator(void* param);
|
||||||
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
|
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr);
|
||||||
SSortMergeJoinPhysiNode* pJoinNode, const char* idStr);
|
|
||||||
|
|
||||||
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
|
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
|
||||||
SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond;
|
SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond;
|
||||||
|
@ -203,8 +202,8 @@ SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorIn
|
||||||
if (p) {
|
if (p) {
|
||||||
p[0] = pDownstream[0];
|
p[0] = pDownstream[0];
|
||||||
p[1] = pDownstream[0];
|
p[1] = pDownstream[0];
|
||||||
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0);
|
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0);
|
||||||
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 1);
|
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
return p;
|
return p;
|
||||||
|
|
|
@ -535,7 +535,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
|
||||||
pOptr = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo);
|
pOptr = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
|
||||||
//pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo);
|
pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
pTaskInfo->code = terrno;
|
pTaskInfo->code = terrno;
|
||||||
|
|
Loading…
Reference in New Issue