enh: add group cache func

This commit is contained in:
dapan1121 2023-07-11 19:34:03 +08:00
parent 2d3f9c739d
commit 185071fae9
5 changed files with 317 additions and 73 deletions

View File

@ -443,6 +443,7 @@ typedef struct SGroupCachePhysiNode {
SPhysiNode node;
bool grpColsMayBeNull;
SArray* pDownstreamKey;
bool grpByUid;
SNodeList* pGroupCols;
} SGroupCachePhysiNode;

View File

@ -517,6 +517,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x072F)
#define TSDB_CODE_QRY_QWORKER_QUIT TAOS_DEF_ERROR_CODE(0, 0x0730)
#define TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x0731)
#define TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0732)
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)

View File

@ -37,9 +37,10 @@ typedef struct SGcBufPageInfo {
typedef struct SGroupCacheData {
TdThreadMutex mutex;
SSHashObj* waitQueue;
SArray* waitQueue;
bool fetchDone;
int64_t fetchSessionId;
bool needCache;
SSDataBlock* pBlock;
SGcBlkBufInfo* pFirstBlk;
SGcBlkBufInfo* pLastBlk;
} SGroupCacheData;
@ -60,11 +61,26 @@ typedef struct SGroupColsInfo {
char* pData;
} SGroupColsInfo;
typedef struct SGcNewGroupInfo {
int64_t uid;
SOperatorParam* pParam;
} SGcNewGroupInfo;
typedef struct SGcDownstreamCtx {
SRWLatch lock;
int64_t fetchSessionId;
SArray* pNewGrpList; // SArray<SGcNewGroupInfo>
SArray* pGrpUidList;
} SGcDownstreamCtx;
typedef struct SGcSessionCtx {
int32_t downstreamIdx;
bool needCache;
SGcOperatorParam* pParam;
SGroupCacheData* pGroupData;
SGcBlkBufInfo* pLastBlk;
bool semInit;
tsem_t waitSem;
} SGcSessionCtx;
typedef struct SGcExecInfo {
@ -72,9 +88,17 @@ typedef struct SGcExecInfo {
int64_t* pDownstreamBlkNum;
} SGcExecInfo;
typedef struct SGcNewGroupInfo {
int64_t uid;
} SGcNewGroupInfo;
typedef struct SGroupCacheOperatorInfo {
TdThreadMutex sessionMutex;
SGcNewGroupInfo newGroup;
SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
bool grpByUid;
SGcDownstreamCtx* pDownstreams;
SArray* pBlkBufs;
SHashObj* pBlkHash;
SGcExecInfo execInfo;

View File

@ -110,29 +110,218 @@ static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo
return pPage->data + pBlkInfo->offset;
}
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx];
taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
if (NULL == taosArrayPush(pCtx->pGrpUidList, &pGCache->newGroup.uid)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pGCache->newGroup.uid = 0;
taosThreadMutexUnlock(&pGCache->sessionMutex);
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
if (pGCache->pDownstreams[downstreamIdx].pNewGrpList) {
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pGCache->newGroup.uid);
if (code) {
return code;
}
}
SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, downstreamIdx);
if (pBlock) {
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
}
*ppRes = pBlock;
return TSDB_CODE_SUCCESS;
}
static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) {
*ppRes = pBlock;
}
static void notifyWaitingSessions(SArray* pWaitQueue) {
if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) {
return;
}
int32_t n = taosArrayGetSize(pWaitQueue);
for (int32_t i = 0; i < n; ++i) {
SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i);
tsem_post(&pSession->waitSem);
}
}
int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
if (pGCache->grpByUid) {
SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
pGroup->pBlock = pBlock;
if (pGroup->needCache) {
SGcBlkBufInfo* pNewBlk = NULL;
code = addBlkToGroupCache(pOperator, pBlock, &pNewBlk);
if (code) {
return code;
}
if (pGroup->pLastBlk) {
pGroup->pLastBlk->next = pNewBlk;
pGroup->pLastBlk = pNewBlk;
} else {
pGroup->pFirstBlk = pNewBlk;
pGroup->pLastBlk = pNewBlk;
}
}
notifyWaitingSessions(pGroup->waitQueue);
if (pGroup == pSession->pGroupData) {
*continueFetch = false;
}
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_INVALID_PARA;
}
static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) {
notifyWaitingSessions();
}
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
bool continueFetch = true;
int32_t code = TSDB_CODE_SUCCESS;
while (continueFetch && TSDB_CODE_SUCCESS == code) {
int32_t code = getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (NULL == *ppRes) {
code = handleDownstreamFetchDone(pOperator, pSession);
break;
} else {
code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch);
}
}
return code;
}
static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGroup, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
if (NULL == pGroup->waitQueue) {
pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
if (NULL == pGroup->waitQueue) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayPush(pGroup->waitQueue, &pSession);
if (!pSession->semInit) {
tsem_init(&pSession->waitSem, 0, 0);
pSession->semInit = true;
}
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
tsem_wait(&pSession->waitSem);
if (pSession->pGroupData->needCache) {
if (NULL == pSession->pLastBlk) {
if (pSession->pGroupData->pFirstBlk) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk);
pSession->pLastBlk = pSession->pGroupData->pFirstBlk;
return TSDB_CODE_SUCCESS;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
return TSDB_CODE_SUCCESS;
}
} else if (pSession->pLastBlk->next) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next);
pSession->pLastBlk = pSession->pLastBlk->next;
return TSDB_CODE_SUCCESS;
}
} else {
*ppRes = pSession->pGroupData->pBlock;
return TSDB_CODE_SUCCESS;
}
if (atomic_load_64(&pSession->pGroupData->fetchSessionId) == sessionId) {
getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes);
}
qError("no block retrieved from downstream and waked up");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SGcBlkBufInfo* pCurr = (*ppLastBlk)->next;
*ppLastBlk = pCurr;
if (pCurr) {
SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId);
return pPage->data + pCurr->offset;
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
bool locked = false;
while (true) {
if (pSession->pGroupData->needCache) {
if (NULL == pSession->pLastBlk) {
if (pSession->pGroupData->pFirstBlk) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk);
pSession->pLastBlk = pSession->pGroupData->pFirstBlk;
goto _return;
}
} else if (pSession->pLastBlk->next) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next);
pSession->pLastBlk = pSession->pLastBlk->next;
goto _return;
}
} else if (pSession->pGroupData->pBlock || pSession->pGroupData->fetchDone) {
*ppRes = pSession->pGroupData->pBlock;
pSession->pGroupData->pBlock = NULL;
goto _return;
}
return NULL;
if ((atomic_load_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId) == sessionId)
|| (-1 == atomic_val_compare_exchange_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId, -1, sessionId))) {
if (locked) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
locked = false;
}
code = getCacheBlkFromDownstreamOperator(pOperator, pSession, ppRes);
goto _return;
}
if (locked) {
code = groupCacheSessionWait(pGCache, pSession->pGroupData, pSession, ppRes);
locked = false;
if (TSDB_CODE_SUCCESS != code) {
goto _return;
}
break;
}
taosThreadMutexLock(&pSession->pGroupData->mutex);
locked = true;
};
_return:
if (locked) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
}
return code;
}
@ -145,9 +334,11 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
return addPageToGroupCacheBuf(pInfo->pBlkBufs);
}
static int32_t initGroupCacheGroupData(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) {
static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData grpData = {0};
grpData.fetchSessionId = pParam->sessionId;
grpData.needCache = pParam->needCache;
while (true) {
if (0 != taosHashPut(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize, &grpData, sizeof(grpData))) {
if (terrno == TSDB_CODE_DUP_KEY) {
@ -162,6 +353,16 @@ static int32_t initGroupCacheGroupData(SGroupCacheOperatorInfo* pGCache, SGcOper
*ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
if (*ppGrp) {
SGcNewGroupInfo newGroup;
newGroup.uid = *(int64_t*)pParam->pGroupValue;
newGroup.pParam = pOperator->pDownstreamParams[pParam->downstreamIdx];
taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
if (NULL == taosArrayPush(pGCache->pDownstreams[pParam->downstreamIdx].pNewGrpList, &newGroup)) {
taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
break;
}
}
@ -177,14 +378,15 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato
if (pGroup) {
ctx.pGroupData = pGroup;
} else {
code = initGroupCacheGroupData(pGCache, pParam, &ctx.pGroupData);
code = initGroupCacheGroupData(pOperator, pParam, &ctx.pGroupData);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
ctx.downstreamIdx = pParam->downstreamIdx;
ctx.needCache = pParam->needCache;
taosThreadMutexUnlock(&pGCache->sessionMutex);
ctx.pParam = pParam;
int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx));
if (TSDB_CODE_SUCCESS != code) {
@ -196,19 +398,20 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato
return TSDB_CODE_SUCCESS;
}
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
if (NULL == pCtx) {
int32_t code = initGroupCacheSession(pOperator, pParam, ppSession);
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcOperatorParam* pGcParam = pOperator->pOperatorParam->value;
SGcSessionCtx* pSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (NULL == pSession) {
int32_t code = initGroupCacheSession(pOperator, pGcParam, &pSession);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} else {
*ppSession = pCtx;
taosThreadMutexUnlock(&pGCache->sessionMutex);
}
return getBlkFromSessionCacheImpl(pOperator, pParam->sessionId, *ppSession, ppRes);
return getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes);
}
static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) {
@ -223,48 +426,16 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo
*pCurrentId = 0;
}
static void setCurrentGroupCacheDone(struct SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
//destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId);
}
static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) {
*ppRes = pBlock;
}
SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SSDataBlock* getBlkFromGroupCache(struct SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam->value;
SGcSessionCtx* pSession = NULL;
SSDataBlock* pRes = NULL;
int32_t code = getBlkFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession);
int32_t code = getBlkFromSessionCache(pOperator, &pRes);
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pRes) {
return pRes;
}
while (true) {
SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, pSession->downstreamIdx);
if (NULL == pBlock) {
setCurrentGroupCacheDone(pOperator);
break;
}
pGCache->execInfo.pDownstreamBlkNum[pSession->downstreamIdx]++;
if (pSession->needCache) {
addBlkToGroupCache(pOperator, pBlock, &pRes);
} else {
pRes = pBlock;
}
break;
}
return pRes;
}
@ -278,6 +449,39 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pInfo = pOperator->info;
pInfo->pDownstreams = taosMemoryMalloc(pOperator->numOfDownstream * sizeof(*pInfo->pDownstreams));
if (NULL == pInfo->pDownstreams) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pInfo->pDownstreams[i].fetchSessionId = -1;
pInfo->pDownstreams[i].pGrpUidList = taosArrayInit(10, sizeof(int64_t));
if (NULL == pInfo->pDownstreams[i].pGrpUidList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
taosThreadMutexLock(&pGCache->sessionMutex);
int32_t code = setOperatorParams(pOperator, pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return getBlkFromGroupCache(pOperator);
}
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
@ -293,10 +497,18 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->grpByUid = pPhyciNode->grpByUid;
if (!pInfo->grpByUid) {
qError("only group cache by uid is supported now");
return TSDB_CODE_INVALID_PARA;
}
if (pPhyciNode->pGroupCols) {
code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols);
if (code) {
goto _error;
}
}
code = initGroupCacheBufPages(pInfo);
if (code) {
@ -320,12 +532,17 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
code = initGroupCacheDownstreamCtx(pOperator);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
code = initGroupCacheExecInfo(pOperator);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlkFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL);
return pOperator;

View File

@ -406,6 +406,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in g
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitting")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR, "Geometry not support in this operator")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")