enh: add group cache

This commit is contained in:
dapan1121 2023-07-10 19:23:52 +08:00
parent 61c85ae4fc
commit 2d3f9c739d
2 changed files with 85 additions and 52 deletions

View File

@ -35,9 +35,14 @@ typedef struct SGcBufPageInfo {
char* data; char* data;
} SGcBufPageInfo; } SGcBufPageInfo;
typedef struct SGroupData { typedef struct SGroupCacheData {
SGcBlkBufInfo* blks; TdThreadMutex mutex;
} SGroupData; SSHashObj* waitQueue;
bool fetchDone;
int64_t fetchSessionId;
SGcBlkBufInfo* pFirstBlk;
SGcBlkBufInfo* pLastBlk;
} SGroupCacheData;
typedef struct SGroupColInfo { typedef struct SGroupColInfo {
int32_t slot; int32_t slot;
@ -57,8 +62,8 @@ typedef struct SGroupColsInfo {
typedef struct SGcSessionCtx { typedef struct SGcSessionCtx {
int32_t downstreamIdx; int32_t downstreamIdx;
bool cacheHit;
bool needCache; bool needCache;
SGroupCacheData* pGroupData;
SGcBlkBufInfo* pLastBlk; SGcBlkBufInfo* pLastBlk;
} SGcSessionCtx; } SGcSessionCtx;
@ -71,9 +76,7 @@ typedef struct SGroupCacheOperatorInfo {
SSHashObj* pSessionHash; SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo; SGroupColsInfo groupColsInfo;
SArray* pBlkBufs; SArray* pBlkBufs;
SSHashObj* pBlkHash; SHashObj* pBlkHash;
int64_t pCurrentId;
SGcSessionCtx* pCurrent;
SGcExecInfo execInfo; SGcExecInfo execInfo;
} SGroupCacheOperatorInfo; } SGroupCacheOperatorInfo;

View File

@ -87,7 +87,7 @@ static void destroyGroupCacheOperator(void* param) {
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage); taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage);
tSimpleHashCleanup(pGrpCacheOperator->pSessionHash); tSimpleHashCleanup(pGrpCacheOperator->pSessionHash);
tSimpleHashCleanup(pGrpCacheOperator->pBlkHash); taosHashCleanup(pGrpCacheOperator->pBlkHash);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -110,10 +110,21 @@ static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo
return pPage->data + pBlkInfo->offset; return pPage->data + pBlkInfo->offset;
} }
static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo** ppLastBlk) { static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
if (NULL == *ppLastBlk) { int32_t code = TSDB_CODE_SUCCESS;
return NULL; SGroupCacheOperatorInfo* pGCache = pOperator->info;
if (NULL == pSession->pLastBlk) {
if (pSession->pGroupData->pFirstBlk) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk);
pSession->pLastBlk = pSession->pGroupData->pFirstBlk;
return TSDB_CODE_SUCCESS;
} }
if (atomic_load_64(&pSession->pGroupData->fetchSessionId) == sessionId) {
getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes);
}
}
SGcBlkBufInfo* pCurr = (*ppLastBlk)->next; SGcBlkBufInfo* pCurr = (*ppLastBlk)->next;
*ppLastBlk = pCurr; *ppLastBlk = pCurr;
if (pCurr) { if (pCurr) {
@ -134,51 +145,70 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
return addPageToGroupCacheBuf(pInfo->pBlkBufs); return addPageToGroupCacheBuf(pInfo->pBlkBufs);
} }
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { static int32_t initGroupCacheGroupData(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) {
SGcSessionCtx ctx = {0}; SGroupCacheData grpData = {0};
SGroupCacheOperatorInfo* pGCache = pOperator->info; grpData.fetchSessionId = pParam->sessionId;
SGroupData* pGroup = tSimpleHashGet(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); while (true) {
if (pGroup) { if (0 != taosHashPut(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize, &grpData, sizeof(grpData))) {
ctx.cacheHit = true; if (terrno == TSDB_CODE_DUP_KEY) {
ctx.pLastBlk = pGroup->blks; *ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
if (*ppGrp) {
break;
}
} else { } else {
ctx.downstreamIdx = pParam->downstreamIdx; return terrno;
ctx.needCache = pParam->needCache; }
} }
int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx)); *ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
if (TSDB_CODE_SUCCESS == code) { if (*ppGrp) {
*ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); break;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) {
SGcSessionCtx ctx = {0};
int32_t code = 0;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
if (pGroup) {
ctx.pGroupData = pGroup;
} else {
code = initGroupCacheGroupData(pGCache, pParam, &ctx.pGroupData);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
ctx.downstreamIdx = pParam->downstreamIdx;
ctx.needCache = pParam->needCache;
int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
return TSDB_CODE_SUCCESS;
}
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
if (NULL == pCtx) { if (NULL == pCtx) {
int32_t code = initGroupCacheSession(pOperator, pParam, ppSession); int32_t code = initGroupCacheSession(pOperator, pParam, ppSession);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code; return code;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
if ((*ppSession)->pLastBlk) {
*ppRes = (SSDataBlock*)retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk);
} else { } else {
*ppRes = NULL;
}
return;
}
*ppSession = pCtx; *ppSession = pCtx;
if (pCtx->cacheHit) {
*ppRes = (SSDataBlock*)moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk);
return;
} }
*ppRes = NULL; return getBlkFromSessionCacheImpl(pOperator, pParam->sessionId, *ppSession, ppRes);
} }
static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) { static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) {
@ -195,7 +225,7 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo
static void setCurrentGroupCacheDone(struct SOperatorInfo* pOperator) { static void setCurrentGroupCacheDone(struct SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheOperatorInfo* pGCache = pOperator->info;
destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId); //destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId);
} }
static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) { static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) {
@ -208,11 +238,11 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam->value; SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam->value;
SGcSessionCtx* pSession = NULL; SGcSessionCtx* pSession = NULL;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = getBlkFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession);
if (TSDB_CODE_SUCCESS != code) {
getFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession); pTaskInfo->code = code;
pGCache->pCurrent = pSession; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
pGCache->pCurrentId = pParam->sessionId; }
if (pRes) { if (pRes) {
return pRes; return pRes;
@ -227,7 +257,7 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
pGCache->execInfo.pDownstreamBlkNum[pSession->downstreamIdx]++; pGCache->execInfo.pDownstreamBlkNum[pSession->downstreamIdx]++;
if (pGCache->pCurrent->needCache) { if (pSession->needCache) {
addBlkToGroupCache(pOperator, pBlock, &pRes); addBlkToGroupCache(pOperator, pBlock, &pRes);
} else { } else {
pRes = pBlock; pRes = pBlock;
@ -273,7 +303,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error; goto _error;
} }
pInfo->pBlkHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); pInfo->pBlkHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pInfo->pBlkHash == NULL) { if (pInfo->pBlkHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;