feat: add group cache operator

This commit is contained in:
dapan1121 2023-07-03 13:42:06 +08:00
parent ad6f88e63f
commit 5971395dcf
5 changed files with 178 additions and 45 deletions

View File

@ -439,6 +439,7 @@ typedef struct SHashJoinPhysiNode {
typedef struct SGroupCachePhysiNode {
SPhysiNode node;
bool grpColsMayBeNull;
SArray* pDownstreamKey;
SNodeList* pGroupCols;
} SGroupCachePhysiNode;

View File

@ -21,15 +21,20 @@ extern "C" {
#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760
typedef struct SGcSessionRes {
} SGcSessionRes;
typedef struct SGcSessionCtx {
SOperatorInfo* pDownstream;
bool cacheHit;
bool needCache;
SGcBlkBufInfo* pLastBlk;
} SGcSessionCtx;
typedef struct SGcOperatorParam {
int64_t sessionId;
bool newFetch;
void* pGroupValue;
int32_t groupValueSize;
SOperatorBasicParam basic;
int64_t sessionId;
int32_t downstreamKey;
bool needCache;
void* pGroupValue;
int32_t groupValueSize;
} SGcOperatorParam;
#pragma pack(push, 1)
@ -66,13 +71,20 @@ typedef struct SGroupColsInfo {
char* pData;
} SGroupColsInfo;
typedef struct SGroupCacheOperatorInfo {
SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
SArray* pBlkBufs;
SSHashObj* pBlkHash;
typedef struct SGcDownstreamInfo {
SSHashObj* pKey2Idx;
SOperatorInfo** ppDownStream;
int32_t downStreamNum;
} SGcDownstreamInfo;
typedef struct SGroupCacheOperatorInfo {
SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
SArray* pBlkBufs;
SSHashObj* pBlkHash;
SGcDownstreamInfo downstreamInfo;
int64_t pCurrentId;
SGcSessionCtx* pCurrent;
} SGroupCacheOperatorInfo;
#ifdef __cplusplus

View File

@ -27,9 +27,19 @@ typedef struct SOperatorCostInfo {
struct SOperatorInfo;
typedef struct SOperatorParam {
typedef struct SOperatorBasicParam {
bool newExec;
} SOperatorBasicParam;
typedef struct SOperatorSpecParam {
int32_t opType;
void* value;
} SOperatorSpecParam;
typedef struct SOperatorParam {
SOperatorBasicParam basic;
int32_t opNum;
SOperatorSpecParam* pOpParams;
} SOperatorParam;
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
@ -150,6 +160,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
// clang-format on
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,

View File

@ -72,7 +72,10 @@ static void destroyGroupCacheOperator(void* param) {
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage);
taosMemoryFree(pGrpCacheOperator->ppDownStream);
tSimpleHashCleanup(pGrpCacheOperator->pSessionHash);
tSimpleHashCleanup(pGrpCacheOperator->pBlkHash);
tSimpleHashCleanup(pGrpCacheOperator->downstreamInfo.pKey2Idx);
taosMemoryFree(pGrpCacheOperator->downstreamInfo.ppDownStream);
taosMemoryFreeClear(param);
}
@ -90,6 +93,26 @@ static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) {
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) {
SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId);
return pPage->data + pBlkInfo->offset;
}
static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo** ppLastBlk) {
if (NULL == *ppLastBlk) {
return NULL;
}
SGcBlkBufInfo* pCurr = (*ppLastBlk)->next;
*ppLastBlk = pCurr;
if (pCurr) {
SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId);
return pPage->data + pCurr->offset;
}
return NULL;
}
static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
pInfo->pBlkBufs = taosArrayInit(32, sizeof(SBufPageInfo));
if (NULL == pInfo->pBlkBufs) {
@ -99,52 +122,132 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
return addPageToGroupCacheBuf(pInfo->pBlkBufs);
}
static int32_t getFromSessionCache(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes) {
static int32_t initGroupCacheDownstreamInfo(SGroupCachePhysiNode* pPhyciNode, SOperatorInfo** pDownstream, int32_t numOfDownstream, SGcDownstreamInfo* pInfo) {
pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES);
if (NULL == pInfo->ppDownStream) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES);
pInfo->pKey2Idx = tSimpleHashInit(numOfDownstream, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pInfo->pKey2Idx) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfDownstream; ++i) {
int32_t keyValue = taosArrayGet(pPhyciNode, i);
tSimpleHashPut(pInfo->pKey2Idx, &keyValue, sizeof(keyValue), &i, sizeof(i));
}
return TSDB_CODE_SUCCESS;
}
static int32_t initGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) {
SGcSessionCtx ctx = {0};
SGroupData* pGroup = tSimpleHashGet(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
if (pGroup) {
ctx.cacheHit = true;
ctx.pLastBlk = pGroup->blks;
} else {
int32_t* pIdx = tSimpleHashGet(pGCache->downstreamInfo.pKey2Idx, &pParam->downstreamKey, sizeof(pParam->downstreamKey));
if (NULL == pIdx) {
qError("Invalid downstream key value: %d", pParam->downstreamKey);
return TSDB_CODE_INVALID_PARA;
}
ctx.pDownstream = pGCache->downstreamInfo.ppDownStream[*pIdx];
ctx.needCache = pParam->needCache;
}
return TSDB_CODE_SUCCESS;
}
static void getFromSessionCache(SExecTaskInfo* pTaskInfo, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) {
if (pParam->basic.newExec) {
int32_t code = initGroupCacheSession(pGCache, pParam, ppSession);
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if ((*ppSession)->pLastBlk) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk);
} else {
*ppRes = NULL;
}
return;
}
SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
if (NULL == pCtx) {
qError("session %" PRIx64 " not exists", pParam->sessionId);
pTaskInfo->code = TSDB_CODE_INVALID_PARA;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
*ppSession = pCtx;
if (pCtx->cacheHit) {
*ppRes = moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk);
return;
}
*ppRes = NULL;
}
static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) {
if (NULL == *ppCurrent) {
return;
}
if (tSimpleHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) {
qError("remove session %" PRIx64 " failed", *pCurrentId);
}
*ppCurrent = NULL;
*pCurrentId = 0;
}
static void setCurrentGroupCacheDone(struct SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId);
}
static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) {
*ppRes = pBlock;
}
static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperatorParam* param) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
SGcSessionCtx* pSession = NULL;
SSDataBlock* pRes = NULL;
SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param)
if (NULL == pParam || pOperator->status == OP_EXEC_DONE) {
int32_t code = TSDB_CODE_SUCCESS;
SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param);
if (NULL == pParam) {
return NULL;
}
code = getFromSessionCache(pGCache, pParam, &pRes, );
getFromSessionCache(pTaskInfo, pGCache, pParam, &pRes, &pSession);
pGCache->pCurrent = pSession;
pGCache->pCurrentId = pParam->sessionId;
if (pRes) {
return pRes;
}
while (true) {
SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream);
SSDataBlock* pBlock = pSession->pDownstream->fpSet.getNextExtFn(pSession->pDownstream, param);
if (NULL == pBlock) {
setHJoinDone(pOperator);
setCurrentGroupCacheDone(pOperator);
break;
}
code = launchBlockHashJoin(pOperator, pBlock);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
if (pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
if (pRes->info.rows > 0) {
break;
if (pGCache->pCurrent->needCache) {
addBlkToGroupCache(pOperator, pBlock, &pRes);
}
break;
}
return (pRes->info.rows > 0) ? pRes : NULL;
return pRes;
}
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
@ -180,12 +283,10 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES);
if (NULL == pInfo->ppDownStream) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = initGroupCacheDownstreamInfo(pPhyciNode, pDownstream, numOfDownstream, &pInfo->downstreamInfo);
if (code) {
goto _error;
}
memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, getFromGroupCache, NULL);

View File

@ -523,7 +523,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
} else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
pOptr = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo);
//pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo);
} else {
terrno = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = terrno;
@ -593,9 +593,15 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
}
void *getOperatorParam(int32_t opType, SOperatorParam* param) {
if (NULL == param || opType != param->opType) {
if (NULL == param) {
return NULL;
}
return param->value;
for (int32_t i = 0; i < param->opNum; ++i) {
if (opType == param->pOpParams[i].opType) {
memcpy(&param->pOpParams[i], param, sizeof(param->basic));
return &param->pOpParams[i];
}
}
return NULL;
}