enh: add batch table scan
This commit is contained in:
parent
c22619ab67
commit
294376de0b
|
@ -2044,6 +2044,7 @@ typedef struct SOperatorParam {
|
||||||
} SOperatorParam;
|
} SOperatorParam;
|
||||||
|
|
||||||
typedef struct STableScanOperatorParam {
|
typedef struct STableScanOperatorParam {
|
||||||
|
bool tableSeq;
|
||||||
SArray* pUidList;
|
SArray* pUidList;
|
||||||
} STableScanOperatorParam;
|
} STableScanOperatorParam;
|
||||||
|
|
||||||
|
|
|
@ -451,6 +451,7 @@ typedef struct SGroupCachePhysiNode {
|
||||||
typedef struct SStbJoinDynCtrlBasic {
|
typedef struct SStbJoinDynCtrlBasic {
|
||||||
int32_t vgSlot[2];
|
int32_t vgSlot[2];
|
||||||
int32_t uidSlot[2];
|
int32_t uidSlot[2];
|
||||||
|
bool batchJoin;
|
||||||
} SStbJoinDynCtrlBasic;
|
} SStbJoinDynCtrlBasic;
|
||||||
|
|
||||||
typedef struct SDynQueryCtrlPhysiNode {
|
typedef struct SDynQueryCtrlPhysiNode {
|
||||||
|
|
|
@ -5470,6 +5470,7 @@ int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) {
|
||||||
switch (pOpParam->opType) {
|
switch (pOpParam->opType) {
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
|
||||||
STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value;
|
STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value;
|
||||||
|
if (tEncodeI8(pEncoder, pScan->tableSeq) < 0) return -1;
|
||||||
int32_t uidNum = taosArrayGetSize(pScan->pUidList);
|
int32_t uidNum = taosArrayGetSize(pScan->pUidList);
|
||||||
if (tEncodeI32(pEncoder, uidNum) < 0) return -1;
|
if (tEncodeI32(pEncoder, uidNum) < 0) return -1;
|
||||||
for (int32_t m = 0; m < uidNum; ++m) {
|
for (int32_t m = 0; m < uidNum; ++m) {
|
||||||
|
@ -5499,6 +5500,7 @@ int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam)
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
|
||||||
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
|
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
|
||||||
if (NULL == pScan) return -1;
|
if (NULL == pScan) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, (int8_t*)&pScan->tableSeq) < 0) return -1;
|
||||||
int32_t uidNum = 0;
|
int32_t uidNum = 0;
|
||||||
int64_t uid = 0;
|
int64_t uid = 0;
|
||||||
if (tDecodeI32(pDecoder, &uidNum) < 0) return -1;
|
if (tDecodeI32(pDecoder, &uidNum) < 0) return -1;
|
||||||
|
|
|
@ -26,9 +26,24 @@ typedef struct SDynQueryCtrlExecInfo {
|
||||||
int64_t postBlkRows;
|
int64_t postBlkRows;
|
||||||
} SDynQueryCtrlExecInfo;
|
} SDynQueryCtrlExecInfo;
|
||||||
|
|
||||||
|
typedef struct SStbJoinTableList {
|
||||||
|
void *pNext;
|
||||||
|
int64_t uidNum;
|
||||||
|
int64_t readIdx;
|
||||||
|
int32_t *pLeftVg;
|
||||||
|
int64_t *pLeftUid;
|
||||||
|
int32_t *pRightVg;
|
||||||
|
int64_t *pRightUid;
|
||||||
|
} SStbJoinTableList;
|
||||||
|
|
||||||
typedef struct SStbJoinPrevJoinCtx {
|
typedef struct SStbJoinPrevJoinCtx {
|
||||||
SSDataBlock* pLastBlk;
|
SSDataBlock* pLastBlk;
|
||||||
int32_t lastRow;
|
int32_t lastRow;
|
||||||
|
bool joinBuild;
|
||||||
|
SSHashObj* leftVg;
|
||||||
|
SSHashObj* rightVg;
|
||||||
|
int64_t tableNum;
|
||||||
|
SStbJoinTableList* pListHead;
|
||||||
} SStbJoinPrevJoinCtx;
|
} SStbJoinPrevJoinCtx;
|
||||||
|
|
||||||
typedef struct SStbJoinPostJoinCtx {
|
typedef struct SStbJoinPostJoinCtx {
|
||||||
|
|
|
@ -152,6 +152,7 @@ typedef struct SSortMergeJoinOperatorParam {
|
||||||
typedef struct SExchangeOperatorBasicParam {
|
typedef struct SExchangeOperatorBasicParam {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t srcOpType;
|
int32_t srcOpType;
|
||||||
|
bool tableSeq;
|
||||||
SArray* uidList;
|
SArray* uidList;
|
||||||
} SExchangeOperatorBasicParam;
|
} SExchangeOperatorBasicParam;
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,12 @@ static void destroyDynQueryCtrlOperator(void* param) {
|
||||||
SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
|
SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
|
||||||
qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64,
|
qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64,
|
||||||
pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows);
|
pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows);
|
||||||
|
|
||||||
|
tSimpleHashClear(pDyn->stbJoin.ctx.prev.leftVg);
|
||||||
|
tSimpleHashClear(pDyn->stbJoin.ctx.prev.rightVg);
|
||||||
|
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftVg);
|
||||||
|
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightVg);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +47,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
|
||||||
if (NULL == *ppRes) {
|
if (NULL == *ppRes) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
if (pChild) {
|
||||||
(*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
|
(*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
|
||||||
if (NULL == *ppRes) {
|
if (NULL == *ppRes) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -48,6 +55,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
|
||||||
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
|
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
|
SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
|
||||||
if (NULL == pGc) {
|
if (NULL == pGc) {
|
||||||
|
@ -80,6 +88,7 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i
|
||||||
|
|
||||||
pExc->multiParams = false;
|
pExc->multiParams = false;
|
||||||
pExc->basic.vgId = *pVgId;
|
pExc->basic.vgId = *pVgId;
|
||||||
|
pExc->basic.tableSeq = true;
|
||||||
pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
|
pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
|
||||||
if (NULL == pExc->basic.uidList) {
|
if (NULL == pExc->basic.uidList) {
|
||||||
|
@ -95,6 +104,49 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
|
||||||
|
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
||||||
|
if (NULL == *ppRes) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
(*ppRes)->pChildren = NULL;
|
||||||
|
|
||||||
|
SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
|
||||||
|
if (NULL == pExc) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pExc->multiParams = true;
|
||||||
|
pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||||
|
if (NULL == pExc->pBatchs) {
|
||||||
|
taosMemoryFree(pExc);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExchangeOperatorBasicParam basic;
|
||||||
|
basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
|
|
||||||
|
int32_t iter = 0;
|
||||||
|
void* p = NULL;
|
||||||
|
while (p = tSimpleHashIterate(pVg, p, &iter)) {
|
||||||
|
int32_t* pVgId = tSimpleHashGetKey(p, NULL);
|
||||||
|
SArray* pUidList = *(SArray**)p;
|
||||||
|
basic.vgId = *pVgId;
|
||||||
|
basic.uidList = pUidList;
|
||||||
|
basic.tableSeq = false;
|
||||||
|
|
||||||
|
tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic));
|
||||||
|
}
|
||||||
|
|
||||||
|
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
|
||||||
|
(*ppRes)->downstreamIdx = downstreamIdx;
|
||||||
|
(*ppRes)->value = pExc;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
|
static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
|
||||||
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
||||||
if (NULL == *ppRes) {
|
if (NULL == *ppRes) {
|
||||||
|
@ -123,7 +175,7 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) {
|
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) {
|
||||||
int32_t rowIdx = pPrev->lastRow + 1;
|
int32_t rowIdx = pPrev->lastRow + 1;
|
||||||
SColumnInfoData* pVg0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[0]);
|
SColumnInfoData* pVg0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[0]);
|
||||||
SColumnInfoData* pVg1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[1]);
|
SColumnInfoData* pVg1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[1]);
|
||||||
|
@ -156,14 +208,60 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) {
|
||||||
|
int64_t rowIdx = pPrev->pListHead->readIdx;
|
||||||
|
SOperatorParam* pExcParam0 = NULL;
|
||||||
|
SOperatorParam* pExcParam1 = NULL;
|
||||||
|
SOperatorParam* pGcParam0 = NULL;
|
||||||
|
SOperatorParam* pGcParam1 = NULL;
|
||||||
|
int32_t* leftVg = pPrev->pListHead->pLeftVg + rowIdx;
|
||||||
|
int64_t* leftUid = pPrev->pListHead->pLeftUid + rowIdx;
|
||||||
|
int32_t* rightVg = pPrev->pListHead->pRightVg + rowIdx;
|
||||||
|
int64_t* rightUid = pPrev->pListHead->pRightUid + rowIdx;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
qError("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64,
|
||||||
|
rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
|
||||||
|
|
||||||
|
if (pPrev->leftVg) {
|
||||||
|
code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftVg);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightVg);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tSimpleHashCleanup(pPrev->leftVg);
|
||||||
|
tSimpleHashCleanup(pPrev->rightVg);
|
||||||
|
pPrev->leftVg = NULL;
|
||||||
|
pPrev->rightVg = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
|
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
|
||||||
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
|
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
|
||||||
SOperatorParam* pParam = NULL;
|
SOperatorParam* pParam = NULL;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
int32_t code = buildStbJoinOperatorParam(pInfo, pPrev, &pParam);
|
if (pInfo->stbJoin.basic.batchJoin) {
|
||||||
|
code = buildSeqBatchStbJoinOperatorParam(pInfo, pPrev, &pParam);
|
||||||
|
} else {
|
||||||
|
code = buildSeqStbJoinOperatorParam(pInfo, pPrev, &pParam);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
pOperator->pTaskInfo->code = code;
|
pOperator->pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
|
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
|
||||||
|
@ -181,7 +279,7 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void seqJoinWithSeqRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
|
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
|
||||||
|
@ -201,7 +299,7 @@ static void seqJoinWithSeqRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
|
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
|
||||||
|
@ -220,20 +318,155 @@ static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStbJoin->ctx.prev.pLastBlk) {
|
if (pStbJoin->ctx.prev.pLastBlk) {
|
||||||
seqJoinWithSeqRetrieve(pOperator, ppRes);
|
seqJoinLaunchRetrieve(pOperator, ppRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) {
|
SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
|
||||||
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
|
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return pRes;
|
||||||
|
}
|
||||||
|
|
||||||
seqJoinContinueRetrieve(pOperator, &pRes);
|
seqJoinContinueRetrieve(pOperator, &pRes);
|
||||||
if (pRes) {
|
if (pRes) {
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
|
||||||
|
if (NULL == pBlock) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->execInfo.prevBlkNum++;
|
||||||
|
pInfo->execInfo.prevBlkRows += pBlock->info.rows;
|
||||||
|
|
||||||
|
pStbJoin->ctx.prev.pLastBlk = pBlock;
|
||||||
|
pStbJoin->ctx.prev.lastRow = -1;
|
||||||
|
|
||||||
|
seqJoinLaunchRetrieve(pOperator, &pRes);
|
||||||
|
if (pRes) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t addToJoinHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
|
||||||
|
SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
|
||||||
|
if (NULL == ppArray) {
|
||||||
|
SArray* pArray = taosArrayInit(10, valSize);
|
||||||
|
if (NULL == pArray) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
if (NULL == taosArrayPush(pArray, pVal)) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == taosArrayPush(*ppArray, pVal)) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void freeStbJoinTableList(SStbJoinTableList* pList) {
|
||||||
|
if (NULL == pList) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosMemoryFree(pList->pLeftVg);
|
||||||
|
taosMemoryFree(pList->pLeftUid);
|
||||||
|
taosMemoryFree(pList->pRightVg);
|
||||||
|
taosMemoryFree(pList->pRightUid);
|
||||||
|
taosMemoryFree(pList);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
|
||||||
|
SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList));
|
||||||
|
if (NULL == pNew) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
|
||||||
|
pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
|
||||||
|
pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
|
||||||
|
pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
|
||||||
|
if (NULL == pNew->pLeftVg || NULL == pNew->pLeftUid || NULL == pNew->pRightVg || NULL == pNew->pRightUid) {
|
||||||
|
freeStbJoinTableList(pNew);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
|
||||||
|
memcpy(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
|
||||||
|
memcpy(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
|
||||||
|
memcpy(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
|
||||||
|
|
||||||
|
pNew->readIdx = 0;
|
||||||
|
pNew->uidNum = rows;
|
||||||
|
|
||||||
|
if (*ppHead) {
|
||||||
|
pNew->pNext = *ppHead;
|
||||||
|
} else {
|
||||||
|
pNew->pNext = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppHead = pNew;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
|
SColumnInfoData* pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
|
||||||
|
SColumnInfoData* pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
|
||||||
|
SColumnInfoData* pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
|
||||||
|
SColumnInfoData* pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
|
int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
|
||||||
|
int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
|
||||||
|
int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
|
||||||
|
int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
|
||||||
|
|
||||||
|
code = addToJoinHash(pStbJoin->ctx.prev.leftVg, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
code = addToJoinHash(pStbJoin->ctx.prev.rightVg, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = appendStbJoinTableList(&pStbJoin->ctx.prev.pListHead, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
pOperator->pTaskInfo->code = code;
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void buildStbJoinVgList(SOperatorInfo* pOperator) {
|
||||||
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
|
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
|
||||||
if (NULL == pBlock) {
|
if (NULL == pBlock) {
|
||||||
|
@ -243,18 +476,85 @@ SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) {
|
||||||
pInfo->execInfo.prevBlkNum++;
|
pInfo->execInfo.prevBlkNum++;
|
||||||
pInfo->execInfo.prevBlkRows += pBlock->info.rows;
|
pInfo->execInfo.prevBlkRows += pBlock->info.rows;
|
||||||
|
|
||||||
pStbJoin->ctx.prev.pLastBlk = pBlock;
|
doBuildStbJoinHash(pOperator, pBlock);
|
||||||
pStbJoin->ctx.prev.lastRow = -1;
|
}
|
||||||
|
|
||||||
seqJoinWithSeqRetrieve(pOperator, &pRes);
|
pStbJoin->ctx.prev.joinBuild = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void seqBatchJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
|
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
|
||||||
|
SStbJoinTableList* pNode = pPrev->pListHead;
|
||||||
|
|
||||||
|
while (pNode) {
|
||||||
|
if (pNode->readIdx >= pNode->uidNum) {
|
||||||
|
pPrev->pListHead = pNode->pNext;
|
||||||
|
freeStbJoinTableList(pNode);
|
||||||
|
pNode = pPrev->pListHead;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
seqJoinLaunchPostJoin(pOperator, ppRes);
|
||||||
|
if (*ppRes) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppRes = NULL;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) {
|
||||||
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
|
SSDataBlock* pRes = NULL;
|
||||||
|
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pStbJoin->ctx.prev.joinBuild) {
|
||||||
|
buildStbJoinVgList(pOperator);
|
||||||
|
if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftVg) <= 0 || tSimpleHashGetSize(pStbJoin->ctx.prev.rightVg) <= 0) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
seqJoinContinueRetrieve(pOperator, &pRes);
|
||||||
if (pRes) {
|
if (pRes) {
|
||||||
break;
|
return pRes;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
seqBatchJoinLaunchRetrieve(pOperator, &pRes);
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void freeVgTableList(void* ptr) {
|
||||||
|
taosArrayDestroy(*(SArray**)ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t initBatchStbJoinVgHash(SStbJoinPrevJoinCtx* pPrev) {
|
||||||
|
pPrev->leftVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||||
|
if (NULL == pPrev->leftVg) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
tSimpleHashSetFreeFp(pPrev->leftVg, freeVgTableList);
|
||||||
|
|
||||||
|
pPrev->rightVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||||
|
if (NULL == pPrev->rightVg) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
tSimpleHashSetFreeFp(pPrev->rightVg, freeVgTableList);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||||
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
|
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
|
SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
|
||||||
|
@ -276,7 +576,16 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
|
||||||
switch (pInfo->qType) {
|
switch (pInfo->qType) {
|
||||||
case DYN_QTYPE_STB_HASH:
|
case DYN_QTYPE_STB_HASH:
|
||||||
memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
|
memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
|
||||||
nextFp = getResFromStbJoin;
|
pInfo->stbJoin.basic.batchJoin = false;
|
||||||
|
if (pInfo->stbJoin.basic.batchJoin) {
|
||||||
|
code = initBatchStbJoinVgHash(&pInfo->stbJoin.ctx.prev);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
nextFp = seqBatchStableJoin;
|
||||||
|
} else {
|
||||||
|
nextFp = seqStableJoin;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
|
qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
|
||||||
|
|
|
@ -42,6 +42,7 @@ typedef struct SSourceDataInfo {
|
||||||
const char* taskId;
|
const char* taskId;
|
||||||
SArray* pSrcUidList;
|
SArray* pSrcUidList;
|
||||||
int32_t srcOpType;
|
int32_t srcOpType;
|
||||||
|
bool tableSeq;
|
||||||
} SSourceDataInfo;
|
} SSourceDataInfo;
|
||||||
|
|
||||||
static void destroyExchangeOperatorInfo(void* param);
|
static void destroyExchangeOperatorInfo(void* param);
|
||||||
|
@ -417,7 +418,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
|
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
|
||||||
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
||||||
if (NULL == *ppRes) {
|
if (NULL == *ppRes) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -432,6 +433,7 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in
|
||||||
if (NULL == pScan->pUidList) {
|
if (NULL == pScan->pUidList) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
pScan->tableSeq = tableSeq;
|
||||||
|
|
||||||
(*ppRes)->opType = srcOpType;
|
(*ppRes)->opType = srcOpType;
|
||||||
(*ppRes)->downstreamIdx = 0;
|
(*ppRes)->downstreamIdx = 0;
|
||||||
|
@ -472,7 +474,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
||||||
req.queryId = pTaskInfo->id.queryId;
|
req.queryId = pTaskInfo->id.queryId;
|
||||||
req.execId = pSource->execId;
|
req.execId = pSource->execId;
|
||||||
if (pDataInfo->pSrcUidList) {
|
if (pDataInfo->pSrcUidList) {
|
||||||
int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
|
int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
|
||||||
taosArrayDestroy(pDataInfo->pSrcUidList);
|
taosArrayDestroy(pDataInfo->pSrcUidList);
|
||||||
pDataInfo->pSrcUidList = NULL;
|
pDataInfo->pSrcUidList = NULL;
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
@ -759,6 +761,8 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
|
||||||
dataInfo.index = *pIdx;
|
dataInfo.index = *pIdx;
|
||||||
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
|
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
|
||||||
dataInfo.srcOpType = pBasicParam->srcOpType;
|
dataInfo.srcOpType = pBasicParam->srcOpType;
|
||||||
|
dataInfo.tableSeq = pBasicParam->tableSeq;
|
||||||
|
|
||||||
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
|
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -797,9 +797,14 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
qError("add total %d dynamic tables to scan, exist num:%" PRId64, num, (int64_t)taosArrayGetSize(pListInfo->pTableList));
|
qError("add total %d dynamic tables to scan, tableSeq:%d, exist num:%" PRId64, num, pParam->tableSeq, (int64_t)taosArrayGetSize(pListInfo->pTableList));
|
||||||
|
|
||||||
|
if (pParam->tableSeq) {
|
||||||
pListInfo->oneTableForEachGroup = true;
|
pListInfo->oneTableForEachGroup = true;
|
||||||
|
} else {
|
||||||
|
pListInfo->oneTableForEachGroup = false;
|
||||||
|
pListInfo->numOfOuputGroups = 1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
|
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
|
||||||
|
@ -846,7 +851,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||||
if (result != NULL) {
|
if (result != NULL) {
|
||||||
if (pInfo->base.pTableListInfo->oneTableForEachGroup) {
|
if (pOperator->dynamicTask) {
|
||||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
||||||
result->info.id.groupId = pKeyInfo->uid;
|
result->info.id.groupId = pKeyInfo->uid;
|
||||||
}
|
}
|
||||||
|
@ -885,7 +890,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||||
if (result != NULL) {
|
if (result != NULL) {
|
||||||
if (pInfo->base.pTableListInfo->oneTableForEachGroup) {
|
if (pOperator->dynamicTask) {
|
||||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
||||||
result->info.id.groupId = pKeyInfo->uid;
|
result->info.id.groupId = pKeyInfo->uid;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue