fix: fix bugs
This commit is contained in:
parent
18c2400c75
commit
779224f0a6
|
@ -166,6 +166,8 @@ typedef struct SGroupCacheLogicNode {
|
||||||
typedef struct SDynQueryCtrlLogicNode {
|
typedef struct SDynQueryCtrlLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
EDynQueryType qType;
|
EDynQueryType qType;
|
||||||
|
SNodeList* pVgList;
|
||||||
|
SNodeList* pUidList;
|
||||||
} SDynQueryCtrlLogicNode;
|
} SDynQueryCtrlLogicNode;
|
||||||
|
|
||||||
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;
|
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;
|
||||||
|
|
|
@ -292,8 +292,25 @@ int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
|
return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
|
||||||
} else {
|
} else {
|
||||||
return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, false);
|
int32_t colBytes = pColumnInfoData->info.bytes;
|
||||||
|
int32_t colOffset = currentRow * colBytes;
|
||||||
|
uint32_t num = 1;
|
||||||
|
|
||||||
|
void* pStart = pColumnInfoData->pData + colOffset;
|
||||||
|
memcpy(pStart, pData, colBytes);
|
||||||
|
colOffset += num * colBytes;
|
||||||
|
|
||||||
|
while (num < numOfRows) {
|
||||||
|
int32_t maxNum = num << 1;
|
||||||
|
int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
|
||||||
|
|
||||||
|
memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
|
||||||
|
colOffset += tnum * colBytes;
|
||||||
|
num += tnum;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
|
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
|
||||||
|
@ -2470,4 +2487,4 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList
|
||||||
|
|
||||||
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||||
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,13 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct SDynQueryCtrlExecInfo {
|
||||||
|
int64_t prevBlkNum;
|
||||||
|
int64_t prevBlkRows;
|
||||||
|
int64_t postBlkNum;
|
||||||
|
int64_t postBlkRows;
|
||||||
|
} SDynQueryCtrlExecInfo;
|
||||||
|
|
||||||
typedef struct SStbJoinPrevJoinCtx {
|
typedef struct SStbJoinPrevJoinCtx {
|
||||||
SSDataBlock* pLastBlk;
|
SSDataBlock* pLastBlk;
|
||||||
int32_t lastRow;
|
int32_t lastRow;
|
||||||
|
@ -39,7 +46,8 @@ typedef struct SStbJoinDynCtrlInfo {
|
||||||
} SStbJoinDynCtrlInfo;
|
} SStbJoinDynCtrlInfo;
|
||||||
|
|
||||||
typedef struct SDynQueryCtrlOperatorInfo {
|
typedef struct SDynQueryCtrlOperatorInfo {
|
||||||
EDynQueryType qType;
|
EDynQueryType qType;
|
||||||
|
SDynQueryCtrlExecInfo execInfo;
|
||||||
union {
|
union {
|
||||||
SStbJoinDynCtrlInfo stbJoin;
|
SStbJoinDynCtrlInfo stbJoin;
|
||||||
};
|
};
|
||||||
|
|
|
@ -94,6 +94,7 @@ typedef struct SHJoinOperatorInfo {
|
||||||
SArray* pRowBufs;
|
SArray* pRowBufs;
|
||||||
SNode* pCond;
|
SNode* pCond;
|
||||||
SSHashObj* pKeyHash;
|
SSHashObj* pKeyHash;
|
||||||
|
bool keyHashBuilt;
|
||||||
SHJoinCtx ctx;
|
SHJoinCtx ctx;
|
||||||
} SHJoinOperatorInfo;
|
} SHJoinOperatorInfo;
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,7 @@ typedef struct SOperatorInfo {
|
||||||
SOperatorParam** pDownstreamParams;
|
SOperatorParam** pDownstreamParams;
|
||||||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||||
|
int32_t numOfRealDownstream;
|
||||||
SOperatorFpSet fpSet;
|
SOperatorFpSet fpSet;
|
||||||
} SOperatorInfo;
|
} SOperatorInfo;
|
||||||
|
|
||||||
|
@ -170,6 +171,7 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32
|
||||||
int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
|
int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
|
||||||
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam);
|
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam);
|
||||||
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx);
|
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx);
|
||||||
|
SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx);
|
||||||
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx);
|
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx);
|
||||||
|
|
||||||
SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
|
SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
|
||||||
|
|
|
@ -30,7 +30,9 @@
|
||||||
int64_t gSessionId = 0;
|
int64_t gSessionId = 0;
|
||||||
|
|
||||||
static void destroyDynQueryCtrlOperator(void* param) {
|
static void destroyDynQueryCtrlOperator(void* param) {
|
||||||
SDynQueryCtrlOperatorInfo* pDynCtrlOperator = (SDynQueryCtrlOperatorInfo*)param;
|
SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
|
||||||
|
qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64,
|
||||||
|
pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,6 +84,7 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i
|
||||||
|
|
||||||
pExc->pChild = pChild;
|
pExc->pChild = pChild;
|
||||||
pExc->vgId = *pVgId;
|
pExc->vgId = *pVgId;
|
||||||
|
pExc->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
pExc->uidList = taosArrayInit(1, sizeof(int64_t));
|
pExc->uidList = taosArrayInit(1, sizeof(int64_t));
|
||||||
if (NULL == pExc->uidList) {
|
if (NULL == pExc->uidList) {
|
||||||
taosMemoryFree(pExc);
|
taosMemoryFree(pExc);
|
||||||
|
@ -203,10 +206,13 @@ static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
|
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
|
||||||
|
|
||||||
if (pPost->isStarted) {
|
if (pPost->isStarted) {
|
||||||
|
qDebug("%s dynQueryCtrl retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
|
||||||
*ppRes = getNextBlockFromDownstream(pOperator, 1);
|
*ppRes = getNextBlockFromDownstream(pOperator, 1);
|
||||||
if (NULL == *ppRes) {
|
if (NULL == *ppRes) {
|
||||||
pPost->isStarted = false;
|
pPost->isStarted = false;
|
||||||
} else {
|
} else {
|
||||||
|
pInfo->execInfo.postBlkNum++;
|
||||||
|
pInfo->execInfo.postBlkRows += (*ppRes)->info.rows;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,6 +238,9 @@ SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->execInfo.prevBlkNum++;
|
||||||
|
pInfo->execInfo.prevBlkRows += pBlock->info.rows;
|
||||||
|
|
||||||
pStbJoin->ctx.prev.pLastBlk = pBlock;
|
pStbJoin->ctx.prev.pLastBlk = pBlock;
|
||||||
pStbJoin->ctx.prev.lastRow = -1;
|
pStbJoin->ctx.prev.lastRow = -1;
|
||||||
|
|
||||||
|
@ -264,7 +273,7 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
|
||||||
pInfo->qType = pPhyciNode->qType;
|
pInfo->qType = pPhyciNode->qType;
|
||||||
switch (pInfo->qType) {
|
switch (pInfo->qType) {
|
||||||
case DYN_QTYPE_STB_HASH:
|
case DYN_QTYPE_STB_HASH:
|
||||||
memcpy(&pInfo->stbJoin, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
|
memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
|
||||||
nextFp = getResFromStbJoin;
|
nextFp = getResFromStbJoin;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -161,7 +161,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||||
|
@ -455,7 +455,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
||||||
}
|
}
|
||||||
|
|
||||||
pDataInfo->status = EX_SOURCE_DATA_STARTED;
|
pDataInfo->status = EX_SOURCE_DATA_STARTED;
|
||||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
|
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
|
||||||
pDataInfo->startTime = taosGetTimestampUs();
|
pDataInfo->startTime = taosGetTimestampUs();
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,11 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato
|
||||||
ctx.downstreamIdx = pParam->downstreamIdx;
|
ctx.downstreamIdx = pParam->downstreamIdx;
|
||||||
ctx.needCache = pParam->needCache;
|
ctx.needCache = pParam->needCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx));
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -200,7 +205,7 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pSession->downstreamIdx);
|
SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, pSession->downstreamIdx);
|
||||||
if (NULL == pBlock) {
|
if (NULL == pBlock) {
|
||||||
setCurrentGroupCacheDone(pOperator);
|
setCurrentGroupCacheDone(pOperator);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -126,7 +126,7 @@ static int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTable->valBitMapSize = colNum / sizeof(int8_t) + ((colNum % sizeof(int8_t)) ? 1 : 0);
|
pTable->valBitMapSize = BitmapLen(colNum);
|
||||||
pTable->valBufSize += pTable->valBitMapSize;
|
pTable->valBufSize += pTable->valBitMapSize;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -330,7 +330,7 @@ static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, i
|
||||||
buildValIdx++;
|
buildValIdx++;
|
||||||
}
|
}
|
||||||
buildIdx++;
|
buildIdx++;
|
||||||
} else if (0 == i) {
|
} else if (0 == r) {
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
|
SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
|
||||||
|
|
||||||
|
@ -369,6 +369,9 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator,
|
||||||
|
|
||||||
pRes->info.rows = resNum;
|
pRes->info.rows = resNum;
|
||||||
pCtx->rowRemains = pCtx->pBuildRow ? true : false;
|
pCtx->rowRemains = pCtx->pBuildRow ? true : false;
|
||||||
|
if (!pCtx->rowRemains) {
|
||||||
|
pCtx->probeIdx++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -415,7 +418,9 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pJoin->ctx.pBuildRow) {
|
if (pJoin->ctx.pBuildRow) {
|
||||||
appendHJoinResToBlock(pOperator, pRes);
|
appendHJoinResToBlock(pOperator, pRes);
|
||||||
return;
|
if (pRes->info.rows >= pRes->info.capacity) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) {
|
for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) {
|
||||||
|
@ -425,10 +430,12 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) {
|
||||||
pCtx->pBuildRow = pGroup->rows;
|
pCtx->pBuildRow = pGroup->rows;
|
||||||
appendHJoinResToBlock(pOperator, pRes);
|
appendHJoinResToBlock(pOperator, pRes);
|
||||||
if (pRes->info.rows >= pRes->info.capacity) {
|
if (pRes->info.rows >= pRes->info.capacity) {
|
||||||
break;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pCtx->rowRemains = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
|
static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
|
||||||
|
@ -489,6 +496,7 @@ static FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t r
|
||||||
|
|
||||||
char *pData = NULL;
|
char *pData = NULL;
|
||||||
size_t bufLen = pTable->valBitMapSize;
|
size_t bufLen = pTable->valBitMapSize;
|
||||||
|
memset(pTable->valData, 0, pTable->valBitMapSize);
|
||||||
for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) {
|
for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) {
|
||||||
if (pTable->valCols[i].keyCol) {
|
if (pTable->valCols[i].keyCol) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -693,7 +701,9 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pJoin->pKeyHash) {
|
if (!pJoin->keyHashBuilt) {
|
||||||
|
pJoin->keyHashBuilt = true;
|
||||||
|
|
||||||
code = buildHJoinKeyHash(pOperator);
|
code = buildHJoinKeyHash(pOperator);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
|
|
@ -299,6 +299,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
pOperator->numOfRealDownstream = newDownstreams ? 1 : 2;
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
||||||
|
|
||||||
memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
|
memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
|
||||||
p->numOfDownstream = num;
|
p->numOfDownstream = num;
|
||||||
|
p->numOfRealDownstream = num;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -561,7 +562,7 @@ void destroyOperator(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->pDownstream != NULL) {
|
if (pOperator->pDownstream != NULL) {
|
||||||
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfRealDownstream; ++i) {
|
||||||
destroyOperator(pOperator->pDownstream[i]);
|
destroyOperator(pOperator->pDownstream[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -646,15 +647,29 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
|
SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam) {
|
||||||
if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) {
|
if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) {
|
||||||
return pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]);
|
qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
|
||||||
|
SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]);
|
||||||
|
if (clearParam) {
|
||||||
|
pOperator->pDownstreamParams[idx] = NULL;
|
||||||
|
}
|
||||||
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]);
|
return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||||
|
return getNextBlockFromDownstreamImpl(pOperator, idx, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||||
|
return getNextBlockFromDownstreamImpl(pOperator, idx, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
|
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
|
||||||
pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0);
|
pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0);
|
||||||
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam);
|
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam);
|
||||||
|
|
|
@ -361,6 +361,7 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(groupAction);
|
COPY_SCALAR_FIELD(groupAction);
|
||||||
COPY_SCALAR_FIELD(inputTsOrder);
|
COPY_SCALAR_FIELD(inputTsOrder);
|
||||||
COPY_SCALAR_FIELD(outputTsOrder);
|
COPY_SCALAR_FIELD(outputTsOrder);
|
||||||
|
COPY_SCALAR_FIELD(dynamicOp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -544,6 +545,8 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache
|
||||||
static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) {
|
static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) {
|
||||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||||
COPY_SCALAR_FIELD(qType);
|
COPY_SCALAR_FIELD(qType);
|
||||||
|
CLONE_NODE_LIST_FIELD(pVgList);
|
||||||
|
CLONE_NODE_LIST_FIELD(pUidList);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -562,6 +565,7 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
|
||||||
CLONE_NODE_LIST_FIELD(pChildren);
|
CLONE_NODE_LIST_FIELD(pChildren);
|
||||||
COPY_SCALAR_FIELD(inputTsOrder);
|
COPY_SCALAR_FIELD(inputTsOrder);
|
||||||
COPY_SCALAR_FIELD(outputTsOrder);
|
COPY_SCALAR_FIELD(outputTsOrder);
|
||||||
|
COPY_SCALAR_FIELD(dynamicOp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2941,6 +2941,10 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType";
|
static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType";
|
||||||
|
static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]";
|
||||||
|
static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]";
|
||||||
|
static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]";
|
||||||
|
static const char* jkDynQueryCtrlPhysiPlanUidSlot1 = "UidSlot[1]";
|
||||||
|
|
||||||
static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SDynQueryCtrlPhysiNode* pNode = (const SDynQueryCtrlPhysiNode*)pObj;
|
const SDynQueryCtrlPhysiNode* pNode = (const SDynQueryCtrlPhysiNode*)pObj;
|
||||||
|
@ -2949,6 +2953,25 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType);
|
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
switch (pNode->qType) {
|
||||||
|
case DYN_QTYPE_STB_HASH: {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot1, pNode->stbJoin.vgSlot[1]);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanUidSlot0, pNode->stbJoin.uidSlot[0]);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1]);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2959,6 +2982,29 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code);
|
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
switch (pNode->qType) {
|
||||||
|
case DYN_QTYPE_STB_HASH: {
|
||||||
|
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0], code);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot1, pNode->stbJoin.vgSlot[1], code);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanUidSlot0, pNode->stbJoin.uidSlot[0], code);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1], code);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3570,7 +3570,11 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1,
|
PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1,
|
||||||
PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE
|
PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE,
|
||||||
|
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0,
|
||||||
|
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1,
|
||||||
|
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0,
|
||||||
|
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1,
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
@ -3580,6 +3584,25 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE, pNode->qType);
|
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE, pNode->qType);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
switch (pNode->qType) {
|
||||||
|
case DYN_QTYPE_STB_HASH: {
|
||||||
|
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, pNode->stbJoin.vgSlot[1]);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0, pNode->stbJoin.uidSlot[0]);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1, pNode->stbJoin.uidSlot[1]);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3596,6 +3619,18 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE:
|
case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE:
|
||||||
code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType));
|
code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType));
|
||||||
break;
|
break;
|
||||||
|
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0:
|
||||||
|
code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[0], sizeof(pNode->stbJoin.vgSlot[0]));
|
||||||
|
break;
|
||||||
|
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1:
|
||||||
|
code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[1], sizeof(pNode->stbJoin.vgSlot[1]));
|
||||||
|
break;
|
||||||
|
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0:
|
||||||
|
code = tlvDecodeEnum(pTlv, &pNode->stbJoin.uidSlot[0], sizeof(pNode->stbJoin.uidSlot[0]));
|
||||||
|
break;
|
||||||
|
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1:
|
||||||
|
code = tlvDecodeEnum(pTlv, &pNode->stbJoin.uidSlot[1], sizeof(pNode->stbJoin.uidSlot[1]));
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3339,6 +3339,26 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pDynCtrl->pVgList = nodesMakeList();
|
||||||
|
if (NULL == pDynCtrl->pVgList) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pDynCtrl->pUidList = nodesMakeList();
|
||||||
|
if (NULL == pDynCtrl->pUidList) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SJoinLogicNode* pHJoin = (SJoinLogicNode*)pPrev;
|
||||||
|
nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 0));
|
||||||
|
nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 2));
|
||||||
|
nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 1));
|
||||||
|
nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 3));
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPrev);
|
nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPrev);
|
||||||
|
|
|
@ -1009,11 +1009,33 @@ static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* p
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDynCtrl->qType = pLogicNode->qType;
|
SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
|
||||||
|
SNodeList* pVgList = NULL;
|
||||||
|
SNodeList* pUidList = NULL;
|
||||||
|
int32_t code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pVgList, &pVgList);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pUidList, &pUidList);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
SNode* pNode = NULL;
|
||||||
|
int32_t i = 0;
|
||||||
|
FOREACH(pNode, pVgList) {
|
||||||
|
pDynCtrl->stbJoin.vgSlot[i] = ((SColumnNode*)pNode)->slotId;
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
i = 0;
|
||||||
|
FOREACH(pNode, pUidList) {
|
||||||
|
pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId;
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pDynCtrl->qType = pLogicNode->qType;
|
||||||
|
|
||||||
*pPhyNode = (SPhysiNode*)pDynCtrl;
|
*pPhyNode = (SPhysiNode*)pDynCtrl;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SRewritePrecalcExprsCxt {
|
typedef struct SRewritePrecalcExprsCxt {
|
||||||
|
|
Loading…
Reference in New Issue