enh: cache necessary file for seq mode

This commit is contained in:
dapan1121 2023-07-26 19:17:59 +08:00
parent 4e5fbb52ee
commit e83a46b88f
11 changed files with 271 additions and 208 deletions

View File

@ -169,7 +169,7 @@ typedef struct SGroupCacheLogicNode {
} SGroupCacheLogicNode;
typedef struct SDynQueryCtrlStbJoin {
bool batchJoin;
bool batchFetch;
SNodeList* pVgList;
SNodeList* pUidList;
} SDynQueryCtrlStbJoin;
@ -460,7 +460,7 @@ typedef struct SGroupCachePhysiNode {
} SGroupCachePhysiNode;
typedef struct SStbJoinDynCtrlBasic {
bool batchJoin;
bool batchFetch;
int32_t vgSlot[2];
int32_t uidSlot[2];
} SStbJoinDynCtrlBasic;

View File

@ -37,17 +37,19 @@ typedef struct SStbJoinTableList {
} SStbJoinTableList;
typedef struct SStbJoinPrevJoinCtx {
SSDataBlock* pLastBlk;
int32_t lastRow;
bool joinBuild;
SSHashObj* leftVg;
SSHashObj* rightVg;
SSHashObj* leftHash;
SSHashObj* rightHash;
SSHashObj* tableTimes;
SSHashObj* onceTable;
int64_t tableNum;
SStbJoinTableList* pListHead;
} SStbJoinPrevJoinCtx;
typedef struct SStbJoinPostJoinCtx {
bool isStarted;
int64_t rightCurrUid;
int64_t rightNextUid;
} SStbJoinPostJoinCtx;
typedef struct SStbJoinDynCtrlCtx {

View File

@ -110,6 +110,7 @@ typedef struct SGcOperatorParam {
int32_t downstreamIdx;
int32_t vgId;
int64_t tbUid;
bool needCache;
} SGcOperatorParam;
typedef struct SExprSupp {

View File

@ -75,6 +75,7 @@ typedef struct SGroupCacheData {
TdThreadMutex mutex;
SArray* waitQueue;
bool fetchDone;
bool needCache;
SSDataBlock* pBlock;
SGcVgroupCtx* pVgCtx;
int32_t downstreamIdx;

View File

@ -39,19 +39,28 @@ static void destroyDynQueryCtrlOperator(void* param) {
qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64,
pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows);
if (pDyn->stbJoin.ctx.prev.leftVg) {
tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.leftVg, freeVgTableList);
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftVg);
if (pDyn->stbJoin.basic.batchFetch) {
if (pDyn->stbJoin.ctx.prev.leftHash) {
tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.leftHash, freeVgTableList);
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftHash);
}
if (pDyn->stbJoin.ctx.prev.rightVg) {
tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightVg, freeVgTableList);
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightVg);
if (pDyn->stbJoin.ctx.prev.rightHash) {
tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightHash, freeVgTableList);
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightHash);
}
}
if (pDyn->stbJoin.ctx.prev.tableTimes) {
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.tableTimes);
}
if (pDyn->stbJoin.ctx.prev.onceTable) {
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.onceTable);
}
taosMemoryFreeClear(param);
}
static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) {
static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -77,6 +86,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
pGc->downstreamIdx = downstreamIdx;
pGc->vgId = vgId;
pGc->tbUid = tbUid;
pGc->needCache = needCache;
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
(*ppRes)->downstreamIdx = downstreamIdx;
@ -85,7 +95,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) {
static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -189,41 +199,21 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes,
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
if (batchFetch) {
return true;
}
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) {
int32_t rowIdx = pPrev->lastRow + 1;
SColumnInfoData* pVg0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[0]);
SColumnInfoData* pVg1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[1]);
SColumnInfoData* pUid0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[0]);
SColumnInfoData* pUid1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[1]);
SOperatorParam* pExcParam0 = NULL;
SOperatorParam* pExcParam1 = NULL;
SOperatorParam* pGcParam0 = NULL;
SOperatorParam* pGcParam1 = NULL;
int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx);
int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx);
int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx);
int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx);
if (rightTable) {
return pPost->rightCurrUid == pPost->rightNextUid;
}
qError("start stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, *leftVg, *leftUid, *rightVg, *rightUid);
uint32_t* num = tSimpleHashGet(pPrev->tableTimes, &uid, sizeof(uid));
int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid, NULL);
if (TSDB_CODE_SUCCESS == code) {
code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, false, pGcParam0, pGcParam1);
}
return code;
return (NULL == num) ? false : true;
}
static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) {
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
int64_t rowIdx = pPrev->pListHead->readIdx;
SOperatorParam* pExcParam0 = NULL;
SOperatorParam* pExcParam1 = NULL;
@ -238,24 +228,31 @@ static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInf
qError("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64,
rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
if (pPrev->leftVg) {
code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftVg);
if (pInfo->stbJoin.basic.batchFetch) {
if (pPrev->leftHash) {
code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftHash);
if (TSDB_CODE_SUCCESS == code) {
code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightVg);
code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightHash);
}
if (TSDB_CODE_SUCCESS == code) {
tSimpleHashCleanup(pPrev->leftVg);
tSimpleHashCleanup(pPrev->rightVg);
pPrev->leftVg = NULL;
pPrev->rightVg = NULL;
tSimpleHashCleanup(pPrev->leftHash);
tSimpleHashCleanup(pPrev->rightHash);
pPrev->leftHash = NULL;
pPrev->rightHash = NULL;
}
}
} else {
code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid);
if (TSDB_CODE_SUCCESS == code) {
code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0);
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, tableNeedCache(*leftUid, pPrev, pPost, false, pInfo->stbJoin.basic.batchFetch), pExcParam0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1);
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, tableNeedCache(*rightUid, pPrev, pPost, true, pInfo->stbJoin.basic.batchFetch), pExcParam1);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1);
@ -270,13 +267,7 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes)
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
SOperatorParam* pParam = NULL;
int32_t code = TSDB_CODE_SUCCESS;
if (pInfo->stbJoin.basic.batchJoin) {
code = buildSeqBatchStbJoinOperatorParam(pInfo, pPrev, &pParam);
} else {
code = buildSeqStbJoinOperatorParam(pInfo, pPrev, &pParam);
}
int32_t code = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
@ -294,34 +285,16 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes)
}
}
static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post;
while (true) {
if ((pPrev->lastRow + 1) >= pPrev->pLastBlk->info.rows) {
*ppRes = NULL;
pPrev->pLastBlk = NULL;
if (!pPost->isStarted) {
return;
}
seqJoinLaunchPostJoin(pOperator, ppRes);
pPrev->lastRow++;
if (*ppRes) {
break;
}
}
}
qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
if (pPost->isStarted) {
qDebug("%s dynQueryCtrl retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
*ppRes = getNextBlockFromDownstream(pOperator, 1);
if (NULL == *ppRes) {
pPost->isStarted = false;
@ -330,50 +303,9 @@ static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDat
pInfo->execInfo.postBlkRows += (*ppRes)->info.rows;
return;
}
}
if (pStbJoin->ctx.prev.pLastBlk) {
seqJoinLaunchRetrieve(pOperator, ppRes);
}
}
SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SSDataBlock* pRes = NULL;
if (pOperator->status == OP_EXEC_DONE) {
return pRes;
}
seqJoinContinueRetrieve(pOperator, &pRes);
if (pRes) {
return pRes;
}
while (true) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (NULL == pBlock) {
pOperator->status = OP_EXEC_DONE;
break;
}
pInfo->execInfo.prevBlkNum++;
pInfo->execInfo.prevBlkRows += pBlock->info.rows;
pStbJoin->ctx.prev.pLastBlk = pBlock;
pStbJoin->ctx.prev.lastRow = -1;
seqJoinLaunchRetrieve(pOperator, &pRes);
if (pRes) {
break;
}
}
return pRes;
}
static FORCE_INLINE int32_t addToJoinHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
if (NULL == ppArray) {
SArray* pArray = taosArrayInit(10, valSize);
@ -396,6 +328,37 @@ static FORCE_INLINE int32_t addToJoinHash(SSHashObj* pHash, void* pKey, int32_t
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
if (NULL == pNum) {
uint32_t n = 1;
if (tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
switch (*pNum) {
case 0:
break;
case UINT32_MAX:
*pNum = 0;
break;
default:
if (1 == (*pNum)) {
tSimpleHashRemove(pOnceHash, pKey, keySize);
}
(*pNum)++;
break;
}
return TSDB_CODE_SUCCESS;
}
static void freeStbJoinTableList(SStbJoinTableList* pList) {
if (NULL == pList) {
return;
@ -440,7 +403,7 @@ static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows,
return TSDB_CODE_SUCCESS;
}
static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
@ -449,17 +412,28 @@ static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SColumnInfoData* pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
SColumnInfoData* pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
if (pStbJoin->basic.batchFetch) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
code = addToJoinHash(pStbJoin->ctx.prev.leftVg, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
if (TSDB_CODE_SUCCESS != code) {
break;
}
code = addToJoinHash(pStbJoin->ctx.prev.rightVg, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
code = addToJoinTableHash(pStbJoin->ctx.prev.tableTimes, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
if (TSDB_CODE_SUCCESS != code) {
break;
}
@ -478,7 +452,64 @@ static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static void buildStbJoinVgList(SOperatorInfo* pOperator) {
static void updatePostJoinRightTableUid(SStbJoinDynCtrlInfo* pStbJoin) {
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
SStbJoinTableList* pNode = pPrev->pListHead;
int64_t* rightUid = pNode->pRightUid;
int64_t readIdx = pNode->readIdx + 1;
if (pPost->rightNextUid) {
pPost->rightCurrUid = pPost->rightNextUid;
} else {
pPost->rightCurrUid = *rightUid;
}
while (true) {
if (readIdx < pNode->uidNum) {
pPost->rightNextUid = *(rightUid + readIdx);
return;
}
pNode = pNode->pNext;
if (NULL == pNode) {
pPost->rightNextUid = 0;
return;
}
rightUid = pNode->pRightUid;
readIdx = 0;
}
}
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
updatePostJoinRightTableUid(pStbJoin);
if (tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
tSimpleHashClear(pStbJoin->ctx.prev.tableTimes);
return;
}
uint64_t* pUid = NULL;
int32_t iter = 0;
while (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter)) {
tSimpleHashRemove(pStbJoin->ctx.prev.tableTimes, pUid, sizeof(*pUid));
}
qError("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes));
// debug only
iter = 0;
uint32_t* num = NULL;
while (num = tSimpleHashIterate(pStbJoin->ctx.prev.tableTimes, num, &iter)) {
ASSERT(*num > 1);
}
}
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
@ -491,13 +522,15 @@ static void buildStbJoinVgList(SOperatorInfo* pOperator) {
pInfo->execInfo.prevBlkNum++;
pInfo->execInfo.prevBlkRows += pBlock->info.rows;
doBuildStbJoinHash(pOperator, pBlock);
doBuildStbJoinTableHash(pOperator, pBlock);
}
postProcessStbJoinTableHash(pOperator);
pStbJoin->ctx.prev.joinBuild = true;
}
static void seqBatchJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
@ -523,8 +556,7 @@ static void seqBatchJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** p
return;
}
SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) {
SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SSDataBlock* pRes = NULL;
@ -534,8 +566,8 @@ SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) {
}
if (!pStbJoin->ctx.prev.joinBuild) {
buildStbJoinVgList(pOperator);
if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftVg) <= 0 || tSimpleHashGetSize(pStbJoin->ctx.prev.rightVg) <= 0) {
buildStbJoinTableList(pOperator);
if (pInfo->execInfo.prevBlkRows <= 0) {
pOperator->status = OP_EXEC_DONE;
return NULL;
}
@ -546,18 +578,28 @@ SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) {
return pRes;
}
seqBatchJoinLaunchRetrieve(pOperator, &pRes);
seqJoinLaunchRetrieve(pOperator, &pRes);
return pRes;
}
int32_t initBatchStbJoinVgHash(SStbJoinPrevJoinCtx* pPrev) {
pPrev->leftVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pPrev->leftVg) {
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
if (batchFetch) {
pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pPrev->leftHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pPrev->rightHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pPrev->rightVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pPrev->rightVg) {
pPrev->tableTimes = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (NULL == pPrev->tableTimes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (NULL == pPrev->onceTable) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -586,15 +628,11 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
switch (pInfo->qType) {
case DYN_QTYPE_STB_HASH:
memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
if (pInfo->stbJoin.basic.batchJoin) {
code = initBatchStbJoinVgHash(&pInfo->stbJoin.ctx.prev);
code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
nextFp = seqBatchStableJoin;
} else {
nextFp = seqStableJoin;
}
break;
default:
qError("unsupported dynamic query ctrl type: %d", pInfo->qType);

View File

@ -556,22 +556,25 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat
}
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch) {
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) {
taosThreadMutexInit(&pGroup->mutex, NULL);
pGroup->downstreamIdx = downstreamIdx;
pGroup->vgId = vgId;
pGroup->fileId = -1;
pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic));
pGroup->startOffset = -1;
pGroup->needCache = needCache;
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
}
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SGcOperatorParam* pGcParam = pParam->value;
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData grpData = {0};
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch);
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache);
while (true) {
if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
@ -623,12 +626,16 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
int64_t newBlkIdx = 0;
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId));
if (NULL == pGroup) {
if (pGCache->batchFetch) {
SGcOperatorParam fakeGcParam = {0};
SOperatorParam fakeParam = {0};
fakeGcParam.needCache = true;
fakeParam.downstreamIdx = pSession->downstreamIdx;
fakeParam.value = &fakeGcParam;
code = addNewGroupData(pOperator, &fakeParam, &pGroup, -1, pBlock->info.id.groupId);
if (TSDB_CODE_SUCCESS != code) {
return code;
@ -646,21 +653,30 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
}
}
if (pGroup->needCache) {
qError("add block to group cache");
SGcBlkBufInfo newBlkBuf;
code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
if (code) {
return code;
}
int64_t blkIdx = 0;
code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &blkIdx);
code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &newBlkIdx);
if (code) {
return code;
}
} else {
qError("no need to add block to group cache");
pGroup->pBlock = pBlock;
}
notifyWaitingSessions(pGroup->waitQueue);
if (pGroup == pSession->pGroupData) {
pSession->lastBlkId = blkIdx;
if (pGroup->needCache) {
pSession->lastBlkId = newBlkIdx;
}
*continueFetch = false;
}
@ -747,6 +763,7 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
SGroupCacheOperatorInfo* pGCache = pOperator->info;
*got = true;
if (pSession->pGroupData->needCache) {
SGcBlkList* pBlkList = &pSession->pGroupData->blkList;
taosRLockLatch(&pBlkList->lock);
int64_t blkNum = taosArrayGetSize(pBlkList->pList);
@ -766,6 +783,10 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
return code;
}
taosRUnLockLatch(&pBlkList->lock);
} else if (pSession->pGroupData->pBlock) {
*ppRes = pSession->pGroupData->pBlock;
pSession->pGroupData->pBlock = NULL;
}
if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL;
@ -929,7 +950,7 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} else {
} else if (pSession->pGroupData->needCache) {
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (ppBlock) {
releaseBaseBlockToList(pCtx, *ppBlock);

View File

@ -550,7 +550,7 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache
static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(qType);
COPY_SCALAR_FIELD(stbJoin.batchJoin);
COPY_SCALAR_FIELD(stbJoin.batchFetch);
CLONE_NODE_LIST_FIELD(stbJoin.pVgList);
CLONE_NODE_LIST_FIELD(stbJoin.pUidList);
return TSDB_CODE_SUCCESS;

View File

@ -1228,7 +1228,7 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) {
}
static const char* jkDynQueryCtrlLogicPlanQueryType = "QueryType";
static const char* jkDynQueryCtrlLogicPlanStbJoinBatchJoin = "BatchJoin";
static const char* jkDynQueryCtrlLogicPlanStbJoinBatchFetch = "BatchFetch";
static const char* jkDynQueryCtrlLogicPlanStbJoinVgList = "VgroupList";
static const char* jkDynQueryCtrlLogicPlanStbJoinUidList = "UidList";
@ -1240,7 +1240,7 @@ static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, pNode->stbJoin.batchJoin);
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchFetch, pNode->stbJoin.batchFetch);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, pNode->stbJoin.pVgList);
@ -1260,7 +1260,7 @@ static int32_t jsonToLogicDynQueryCtrlNode(const SJson* pJson, void* pObj) {
tjsonGetNumberValue(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetBoolValue(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, &pNode->stbJoin.batchJoin);
tjsonGetBoolValue(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchFetch, &pNode->stbJoin.batchFetch);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, &pNode->stbJoin.pVgList);
@ -3005,7 +3005,7 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) {
}
static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType";
static const char* jkDynQueryCtrlPhysiPlanBatchJoin = "BatchJoin";
static const char* jkDynQueryCtrlPhysiPlanBatchFetch = "BatchFetch";
static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]";
static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]";
static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]";
@ -3021,7 +3021,7 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
switch (pNode->qType) {
case DYN_QTYPE_STB_HASH: {
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, pNode->stbJoin.batchJoin);
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanBatchFetch, pNode->stbJoin.batchFetch);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]);
}
@ -3055,7 +3055,7 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) {
case DYN_QTYPE_STB_HASH: {
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, &pNode->stbJoin.batchJoin);
code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanBatchFetch, &pNode->stbJoin.batchFetch);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0], code);

View File

@ -3608,7 +3608,7 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) {
enum {
PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1,
PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE,
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN,
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_FETCH,
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0,
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1,
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0,
@ -3625,7 +3625,7 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode
if (TSDB_CODE_SUCCESS == code) {
switch (pNode->qType) {
case DYN_QTYPE_STB_HASH: {
code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN, pNode->stbJoin.batchJoin);
code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_FETCH, pNode->stbJoin.batchFetch);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]);
}
@ -3660,8 +3660,8 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE:
code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType));
break;
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN:
code = tlvDecodeBool(pTlv, &pNode->stbJoin.batchJoin);
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_FETCH:
code = tlvDecodeBool(pTlv, &pNode->stbJoin.batchFetch);
break;
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0:
code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[0], sizeof(pNode->stbJoin.vgSlot[0]));

View File

@ -3352,7 +3352,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p
}
pDynCtrl->qType = DYN_QTYPE_STB_HASH;
pDynCtrl->stbJoin.batchJoin = true;
pDynCtrl->stbJoin.batchFetch = true;
if (TSDB_CODE_SUCCESS == code) {
pDynCtrl->node.pChildren = nodesMakeList();

View File

@ -1026,7 +1026,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList*
pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId;
++i;
}
pDynCtrl->stbJoin.batchJoin = pLogicNode->stbJoin.batchJoin;
pDynCtrl->stbJoin.batchFetch = pLogicNode->stbJoin.batchFetch;
}
return code;