enh: support parallel group cache fetch

This commit is contained in:
dapan1121 2023-07-12 11:29:54 +08:00
parent a42bd9e478
commit 6763c6f98c
5 changed files with 157 additions and 69 deletions

View File

@ -106,7 +106,6 @@ typedef struct SExchangeOpStopInfo {
} SExchangeOpStopInfo;
typedef struct SGcOperatorParam {
SOperatorParam* pChild;
int64_t sessionId;
int32_t downstreamIdx;
bool needCache;
@ -151,10 +150,20 @@ typedef struct SLimitInfo {
typedef struct SSortMergeJoinOperatorParam {
} SSortMergeJoinOperatorParam;
typedef struct SExchangeOperatorParam {
typedef struct SExchangeOperatorBasicParam {
int32_t vgId;
int32_t srcOpType;
SArray* uidList;
} SExchangeOperatorBasicParam;
typedef struct SExchangeOperatorBatchParam {
bool multiParams;
SArray* pBatchs; // SArray<SExchangeOperatorBasicParam>
} SExchangeOperatorBatchParam;
typedef struct SExchangeOperatorParam {
bool multiParams;
SExchangeOperatorBasicParam basic;
} SExchangeOperatorParam;
typedef struct SExchangeInfo {

View File

@ -88,13 +88,8 @@ typedef struct SGcExecInfo {
int64_t* pDownstreamBlkNum;
} SGcExecInfo;
typedef struct SGcNewGroupInfo {
int64_t uid;
} SGcNewGroupInfo;
typedef struct SGroupCacheOperatorInfo {
TdThreadMutex sessionMutex;
SGcNewGroupInfo newGroup;
SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
bool grpByUid;

View File

@ -516,6 +516,10 @@ bool qIsDynamicExecTask(qTaskInfo_t tinfo) {
return ((SExecTaskInfo*)tinfo)->dynamicTask;
}
void destroyOperatorParamValue(void* pValues) {
}
void destroyOperatorParam(SOperatorParam* pParam) {
if (NULL == pParam) {
return;
@ -524,7 +528,6 @@ void destroyOperatorParam(SOperatorParam* pParam) {
//TODO
}
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam);
((SExecTaskInfo*)tinfo)->pOpParam = pParam;

View File

@ -105,38 +105,73 @@ static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) {
return TSDB_CODE_SUCCESS;
}
static void addBlkToBlkBufs(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcBlkBufInfo** ppBuf) {
*ppRes = pBlock;
}
static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) {
SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId);
return pPage->data + pBlkInfo->offset;
}
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx) {
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx];
taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
if (NULL == taosArrayPush(pCtx->pGrpUidList, &pGCache->newGroup.uid)) {
return TSDB_CODE_OUT_OF_MEMORY;
SOperatorParam* pDst = NULL;
taosWLockLatch(&pCtx->lock);
int32_t num = taosArrayGetSize(pCtx->pNewGrpList);
if (num <= 0) {
goto _return;
}
for (int32_t i = 0; i < num; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
if (NULL == taosArrayPush(pCtx->pGrpUidList, &pNew->uid)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
if (num > 1) {
if (0 == i) {
pDst = pNew->pParam;
} else {
code = mergeOperatorParams(pDst, pNew->pParam);
if (code) {
goto _return;
}
}
} else {
pDst = pNew->pParam;
}
}
taosArrayClear(pCtx->pNewGrpList);
pGCache->newGroup.uid = 0;
taosThreadMutexUnlock(&pGCache->sessionMutex);
_return:
taosWUnLockLatch(&pCtx->lock);
*ppParam = pDst;
return TSDB_CODE_SUCCESS;
return code;
}
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SOperatorParam* pDownstreamParam = NULL;
SSDataBlock* pBlock = NULL;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
if (pGCache->pDownstreams[downstreamIdx].pNewGrpList) {
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pGCache->newGroup.uid);
if (code) {
return code;
}
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
if (code) {
return code;
}
SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, downstreamIdx);
if (pDownstreamParam) {
pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextExtFn(pOperator->pDownstream[downstreamIdx], pDownstreamParam);
} else {
pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]);
}
if (pBlock) {
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
}
@ -146,10 +181,6 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
return TSDB_CODE_SUCCESS;
}
static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) {
*ppRes = pBlock;
}
static void notifyWaitingSessions(SArray* pWaitQueue) {
if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) {
return;
@ -171,7 +202,7 @@ int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBloc
if (pGroup->needCache) {
SGcBlkBufInfo* pNewBlk = NULL;
code = addBlkToGroupCache(pOperator, pBlock, &pNewBlk);
code = addBlkToBlkBufs(pOperator, pBlock, &pNewBlk);
if (code) {
return code;
}
@ -197,7 +228,21 @@ int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBloc
}
static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) {
notifyWaitingSessions();
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
int32_t uidNum = taosArrayGetSize(pCtx->pGrpUidList);
for (int32_t i = 0; i < uidNum; ++i) {
int64_t* pUid = taosArrayGet(pCtx->pGrpUidList, i);
SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, pUid, sizeof(*pUid));
pGroup->pBlock = NULL;
pGroup->fetchDone = true;
notifyWaitingSessions(pGroup->waitQueue);
}
taosArrayClear(pCtx->pGrpUidList);
return TSDB_CODE_SUCCESS;
}
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
@ -334,15 +379,16 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
return addPageToGroupCacheBuf(pInfo->pBlkBufs);
}
static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) {
static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcOperatorParam* pGcParam = pParam->value;
SGroupCacheData grpData = {0};
grpData.needCache = pParam->needCache;
grpData.needCache = pGcParam->needCache;
while (true) {
if (0 != taosHashPut(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize, &grpData, sizeof(grpData))) {
if (0 != taosHashPut(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize, &grpData, sizeof(grpData))) {
if (terrno == TSDB_CODE_DUP_KEY) {
*ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
*ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize);
if (*ppGrp) {
break;
}
@ -351,11 +397,12 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOpera
}
}
*ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
*ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize);
if (*ppGrp) {
SGcNewGroupInfo newGroup;
newGroup.uid = *(int64_t*)pParam->pGroupValue;
newGroup.pParam = pOperator->pDownstreamParams[pParam->downstreamIdx];
newGroup.uid = *(int64_t*)pGcParam->pGroupValue;
newGroup.pParam = taosArrayGet(pParam->pChildren, 0);
taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
if (NULL == taosArrayPush(pGCache->pDownstreams[pParam->downstreamIdx].pNewGrpList, &newGroup)) {
taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
@ -370,11 +417,12 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOpera
return TSDB_CODE_SUCCESS;
}
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) {
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
SGcSessionCtx ctx = {0};
int32_t code = 0;
SGcOperatorParam* pGcParam = pParam->value;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize);
if (pGroup) {
ctx.pGroupData = pGroup;
} else {
@ -383,32 +431,28 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato
return code;
}
}
taosThreadMutexUnlock(&pGCache->sessionMutex);
ctx.pParam = pParam;
ctx.pParam = pGcParam;
int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx));
int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
*ppSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
return TSDB_CODE_SUCCESS;
}
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes) {
static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcOperatorParam* pGcParam = pOperator->pOperatorParam->value;
SGcOperatorParam* pGcParam = pParam->value;
SGcSessionCtx* pSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (NULL == pSession) {
int32_t code = initGroupCacheSession(pOperator, pGcParam, &pSession);
int32_t code = initGroupCacheSession(pOperator, pParam, &pSession);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} else {
taosThreadMutexUnlock(&pGCache->sessionMutex);
}
return getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes);
@ -426,19 +470,6 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo
*pCurrentId = 0;
}
SSDataBlock* getBlkFromGroupCache(struct SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pRes = NULL;
int32_t code = getBlkFromSessionCache(pOperator, &pRes);
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
return pRes;
}
static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pInfo = pOperator->info;
pInfo->execInfo.downstreamNum = pOperator->numOfDownstream;
@ -462,23 +493,26 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
if (NULL == pInfo->pDownstreams[i].pGrpUidList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInfo->pDownstreams[i].pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
if (NULL == pInfo->pDownstreams[i].pNewGrpList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
taosThreadMutexLock(&pGCache->sessionMutex);
SSDataBlock* pBlock = NULL;
int32_t code = setOperatorParams(pOperator, pParam);
int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return getBlkFromGroupCache(pOperator);
return pBlock;
}
@ -542,7 +576,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlkFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL);
return pOperator;

View File

@ -605,6 +605,46 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
return TSDB_CODE_SUCCESS;
}
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
if (pDst->opType != pSrc->opType) {
qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType);
return TSDB_CODE_INVALID_PARA;
}
switch (pDst->opType) {
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
SExchangeOperatorParam* pDExc = pDst->value;
SExchangeOperatorParam* pSExc = pSrc->value;
if (!pDExc->multiParams) {
SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
if (NULL == pBatch) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBatch->multiParams = true;
pBatch->pBatchs = taosArrayInit(4, sizeof(SExchangeOperatorBasicParam));
if (NULL == pBatch->pBatchs) {
taosMemoryFree(pBatch);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBatch->pBatchs, &pDExc->basic);
taosArrayPush(pBatch->pBatchs, &pSExc->basic);
destroyOperatorParamValue(pDst->value);
pDst->value = pBatch;
} else {
SExchangeOperatorBatchParam* pBatch = pDst->value;
taosArrayPush(pBatch->pBatchs, &pSExc->basic);
}
break;
}
default:
qError("invalid optype %d for merge operator params", (*ppDst)->opType);
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
if (NULL == pParam) {
pOperator->pOperatorParam = NULL;
@ -637,7 +677,14 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara
for (int32_t i = 0; i < childrenNum; ++i) {
SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i);
pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild;
if (pOperator->pDownstreamParams[pChild->downstreamIdx]) {
int32_t code = mergeOperatorParams(&pOperator->pDownstreamParams[pChild->downstreamIdx], pChild);
if (code) {
return code;
}
} else {
pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild;
}
}
taosArrayClear(pOperator->pOperatorParam->pChildren);