enh: support concurrent fetch from group cache

This commit is contained in:
dapan1121 2023-07-14 13:30:13 +08:00
parent 6763c6f98c
commit 2312f10372
11 changed files with 519 additions and 149 deletions

View File

@ -160,6 +160,8 @@ typedef struct SInterpFuncLogicNode {
typedef struct SGroupCacheLogicNode {
SLogicNode node;
bool grpColsMayBeNull;
bool grpByUid;
bool globalGrp;
SNodeList* pGroupCols;
} SGroupCacheLogicNode;
@ -441,8 +443,8 @@ typedef struct SHashJoinPhysiNode {
typedef struct SGroupCachePhysiNode {
SPhysiNode node;
bool grpColsMayBeNull;
SArray* pDownstreamKey;
bool grpByUid;
bool globalGrp;
SNodeList* pGroupCols;
} SGroupCachePhysiNode;

View File

@ -109,8 +109,8 @@ typedef struct SGcOperatorParam {
int64_t sessionId;
int32_t downstreamIdx;
bool needCache;
void* pGroupValue;
int32_t groupValueSize;
int32_t vgId;
int64_t tbUid;
} SGcOperatorParam;
typedef struct SExprSupp {
@ -158,7 +158,7 @@ typedef struct SExchangeOperatorBasicParam {
typedef struct SExchangeOperatorBatchParam {
bool multiParams;
SArray* pBatchs; // SArray<SExchangeOperatorBasicParam>
SSHashObj* pBatchs; // SExchangeOperatorBasicParam
} SExchangeOperatorBatchParam;
typedef struct SExchangeOperatorParam {
@ -717,6 +717,9 @@ uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator);
void destroyOperatorParamValue(void* pValues);
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
#ifdef __cplusplus
}
#endif

View File

@ -19,21 +19,27 @@
extern "C" {
#endif
#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760
#define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600
#pragma pack(push, 1)
typedef struct SGcBlkBufInfo {
void* prev;
void* next;
uint16_t pageId;
int32_t offset;
int64_t blkId;
int64_t offset;
int64_t bufSize;
void* pBuf;
uint32_t fileId;
} SGcBlkBufInfo;
#pragma pack(pop)
typedef struct SGcBufPageInfo {
int32_t pageSize;
int32_t offset;
char* data;
} SGcBufPageInfo;
typedef struct SGcVgroupCtx {
SArray* pTbList;
uint64_t lastUid;
int64_t fileSize;
uint32_t fileId;
} SGcVgroupCtx;
typedef struct SGroupCacheData {
TdThreadMutex mutex;
@ -41,8 +47,13 @@ typedef struct SGroupCacheData {
bool fetchDone;
bool needCache;
SSDataBlock* pBlock;
SGcBlkBufInfo* pFirstBlk;
SGcBlkBufInfo* pLastBlk;
SGcVgroupCtx* pVgCtx;
int32_t downstreamIdx;
int32_t vgId;
uint32_t fileId;
int64_t startBlkId;
int64_t endBlkId;
int64_t startOffset;
} SGroupCacheData;
typedef struct SGroupColInfo {
@ -62,15 +73,22 @@ typedef struct SGroupColsInfo {
} SGroupColsInfo;
typedef struct SGcNewGroupInfo {
int64_t uid;
SOperatorParam* pParam;
int32_t vgId;
int64_t uid;
SGroupCacheData* pGroup;
SOperatorParam* pParam;
} SGcNewGroupInfo;
typedef struct SGcDownstreamCtx {
SRWLatch lock;
SRWLatch grpLock;
int64_t fetchSessionId;
SArray* pNewGrpList; // SArray<SGcNewGroupInfo>
SArray* pGrpUidList;
SSHashObj* pVgTbHash;
SHashObj* pGrpHash;
SRWLatch blkLock;
SSDataBlock* pBaseBlock;
SArray* pFreeBlock;
int64_t lastBlkUid;
} SGcDownstreamCtx;
typedef struct SGcSessionCtx {
@ -78,7 +96,8 @@ typedef struct SGcSessionCtx {
bool needCache;
SGcOperatorParam* pParam;
SGroupCacheData* pGroupData;
SGcBlkBufInfo* pLastBlk;
int64_t lastBlkId;
int64_t nextOffset;
bool semInit;
tsem_t waitSem;
} SGcSessionCtx;
@ -88,14 +107,38 @@ typedef struct SGcExecInfo {
int64_t* pDownstreamBlkNum;
} SGcExecInfo;
typedef struct SGcCacheFile {
uint32_t grpNum;
uint32_t grpDone;
int64_t fileSize;
} SGcCacheFile;
typedef struct SGcReadBlkInfo {
SSDataBlock* pBlock;
int64_t nextOffset;
} SGcReadBlkInfo;
typedef struct SGcBlkCacheInfo {
SRWLatch dirtyLock;
SSHashObj* pCacheFile;
SHashObj* pDirtyBlk;
SGcBlkBufInfo* pDirtyHead;
SGcBlkBufInfo* pDirtyTail;
SHashObj* pReadBlk;
int64_t blkCacheSize;
} SGcBlkCacheInfo;
typedef struct SGroupCacheOperatorInfo {
TdThreadMutex sessionMutex;
SSHashObj* pSessionHash;
int64_t maxCacheSize;
int64_t currentBlkId;
SHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
bool globalGrp;
bool grpByUid;
SGcDownstreamCtx* pDownstreams;
SArray* pBlkBufs;
SHashObj* pBlkHash;
SGcBlkCacheInfo blkCache;
SHashObj* pGrpHash;
SGcExecInfo execInfo;
} SGroupCacheOperatorInfo;

View File

@ -36,7 +36,7 @@ static void destroyDynQueryCtrlOperator(void* param) {
taosMemoryFreeClear(param);
}
static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, void* pGrpValue, int32_t grpValSize, SOperatorParam* pChild) {
static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -57,8 +57,8 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
pGc->downstreamIdx = downstreamIdx;
pGc->needCache = needCache;
pGc->pGroupValue = pGrpValue;
pGc->groupValueSize = grpValSize;
pGc->vgId = vgId;
pGc->tbUid = tbUid;
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
(*ppRes)->downstreamIdx = downstreamIdx;
@ -78,15 +78,16 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i
if (NULL == pExc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExc->vgId = *pVgId;
pExc->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pExc->uidList = taosArrayInit(1, sizeof(int64_t));
if (NULL == pExc->uidList) {
pExc->multiParams = false;
pExc->basic.vgId = *pVgId;
pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
if (NULL == pExc->basic.uidList) {
taosMemoryFree(pExc);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pExc->uidList, pUid);
taosArrayPush(pExc->basic.uidList, pUid);
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
(*ppRes)->downstreamIdx = downstreamIdx;
@ -145,10 +146,10 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ
code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, leftUid, pUid0->info.bytes, pExcParam0);
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, *leftVg, *leftUid, pExcParam0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, rightUid, pUid1->info.bytes, pExcParam1);
code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, *rightVg, *rightUid, pExcParam1);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1);

View File

@ -745,24 +745,45 @@ _error:
return code;
}
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value;
int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pParam->vgId, sizeof(pParam->vgId));
int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
if (NULL == pIdx) {
qError("No exchange source for vgId: %d", pParam->vgId);
pOperator->pTaskInfo->code = TSDB_CODE_INVALID_PARA;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
qError("No exchange source for vgId: %d", pBasicParam->vgId);
return TSDB_CODE_INVALID_PARA;
}
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo);
dataInfo.index = *pIdx;
dataInfo.pSrcUidList = taosArrayDup(pParam->uidList, NULL);
dataInfo.srcOpType = pParam->srcOpType;
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
dataInfo.srcOpType = pBasicParam->srcOpType;
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
return TSDB_CODE_SUCCESS;
}
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
SExchangeOperatorBasicParam* pBasicParam = NULL;
SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value;
if (pParam->multiParams) {
SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorParam->value;
int32_t iter = 0;
while (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter)) {
code = addSingleExchangeSource(pOperator, pBasicParam);
if (code) {
return code;
}
}
} else {
pBasicParam = &pParam->basic;
code = addSingleExchangeSource(pOperator, pBasicParam);
}
pOperator->pOperatorParam = NULL;
return TSDB_CODE_SUCCESS;

View File

@ -61,11 +61,6 @@ static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, S
return TSDB_CODE_SUCCESS;
}
static void freeGroupCacheBufPage(void* param) {
SGcBufPageInfo* pInfo = (SGcBufPageInfo*)param;
taosMemoryFree(pInfo->data);
}
static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
char* buf = taosMemoryMalloc(pGrpCacheOperator->execInfo.downstreamNum * 32 + 100);
if (NULL == buf) {
@ -85,33 +80,155 @@ static void destroyGroupCacheOperator(void* param) {
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage);
tSimpleHashCleanup(pGrpCacheOperator->pSessionHash);
taosHashCleanup(pGrpCacheOperator->pBlkHash);
taosHashCleanup(pGrpCacheOperator->pSessionHash);
taosHashCleanup(pGrpCacheOperator->pGrpHash);
taosMemoryFreeClear(param);
}
static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) {
SGcBufPageInfo page;
page.pageSize = GROUP_CACHE_DEFAULT_PAGE_SIZE;
page.offset = 0;
page.data = taosMemoryMalloc(page.pageSize);
if (NULL == page.data) {
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId));
taosWLockLatch(&pCache->dirtyLock);
if (NULL == pCache->pDirtyHead) {
pCache->pDirtyHead = pBufInfo;
} else {
pBufInfo->prev = pCache->pDirtyTail;
pCache->pDirtyTail->next = pBufInfo;
}
pCache->pDirtyTail = pBufInfo;
taosWUnLockLatch(&pCache->dirtyLock);
int64_t blkCacheSize = atomic_add_fetch_64(&pCache->blkCacheSize, pBufInfo->bufSize);
qDebug("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), blkCacheSize);
if (pGCache->maxCacheSize > 0 && blkCacheSize > pGCache->maxCacheSize) {
//TODO
}
taosArrayPush(pBlkBufs, &page);
return TSDB_CODE_SUCCESS;
}
static void addBlkToBlkBufs(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcBlkBufInfo** ppBuf) {
*ppRes = pBlock;
static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t);
pBufInfo->pBuf = taosMemoryMalloc(bufSize);
if (NULL == pBufInfo->pBuf) {
qError("group cache add block to cache failed, size:%" PRId64, bufSize);
return TSDB_CODE_OUT_OF_MEMORY;
}
blockDataToBuf(pBufInfo->pBuf, pBlock);
pBufInfo->prev = NULL;
pBufInfo->next = NULL;
pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1);
pBufInfo->fileId = pGroup->fileId;
pBufInfo->offset = pGroup->pVgCtx->fileSize;
pBufInfo->bufSize = bufSize;
pGroup->pVgCtx->fileSize += bufSize;
int32_t code = addBlkToDirtyBufList(pGCache, &pGCache->blkCache, pBufInfo);
return code;
}
static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) {
SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId);
return pPage->data + pBlkInfo->offset;
static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) {
*ppDst = taosMemoryMalloc(sizeof(*pSrc));
if (NULL == *ppDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppDst)->pBlockAgg = NULL;
(*ppDst)->pDataBlock = taosArrayDup(pSrc->pDataBlock, NULL);
if (NULL == (*ppDst)->pDataBlock) {
taosMemoryFree(*ppDst);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info));
return TSDB_CODE_SUCCESS;
}
static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** ppRes) {
taosWLockLatch(&pCtx->blkLock);
if (taosArrayGetSize(pCtx->pFreeBlock) <= 0) {
taosWUnLockLatch(&pCtx->blkLock);
return buildGroupCacheBaseBlock(ppRes, pCtx->pBaseBlock);
}
*ppRes = taosArrayPop(pCtx->pFreeBlock);
taosWUnLockLatch(&pCtx->blkLock);
return TSDB_CODE_SUCCESS;
}
static int32_t releaseBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
taosWLockLatch(&pCtx->blkLock);
taosArrayPush(pCtx->pFreeBlock, &pBlock);
taosWUnLockLatch(&pCtx->blkLock);
return TSDB_CODE_SUCCESS;
}
static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, SGcBlkBufInfo* pBufInfo, SSDataBlock** ppRes) {
int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes);
if (code) {
return code;
}
//TODO OPTIMIZE PERF
return blockDataFromBuf(*ppRes, pBufInfo->pBuf);
}
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkCacheInfo* pCache = &pGCache->blkCache;
SGcReadBlkInfo* pReadBlk = taosHashAcquire(pCache->pReadBlk, &blkId, sizeof(blkId));
if (pReadBlk) {
*ppRes = pReadBlk->pBlock;
*nextOffset = pReadBlk->nextOffset;
return TSDB_CODE_SUCCESS;
}
taosRLockLatch(&pCache->dirtyLock);
SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId));
if (pBufInfo) {
code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBufInfo, ppRes);
taosRUnLockLatch(&pCache->dirtyLock);
if (code) {
return code;
}
*nextOffset = pBufInfo->offset + pBufInfo->bufSize;
SGcReadBlkInfo readBlk = {.pBlock = *ppRes, .nextOffset = *nextOffset};
taosHashPut(pCache->pReadBlk, &blkId, sizeof(blkId), &readBlk, sizeof(readBlk));
return TSDB_CODE_SUCCESS;
}
taosRUnLockLatch(&pCache->dirtyLock);
//TODO READ FROM FILE
code = TSDB_CODE_INVALID_PARA;
return code;
}
static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) {
SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx;
if (NULL == pVgCtx) {
SArray* pList = taosArrayInit(10, sizeof(*pNew));
if (NULL == pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pList, pNew);
SGcVgroupCtx vgCtx = {.pTbList = pList, .lastUid = 0, .fileSize = 0, .fileId = 0};
tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx));
pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pVgCtx->pTbList, pNew);
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) {
@ -120,7 +237,7 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx];
SOperatorParam* pDst = NULL;
taosWLockLatch(&pCtx->lock);
taosWLockLatch(&pCtx->grpLock);
int32_t num = taosArrayGetSize(pCtx->pNewGrpList);
if (num <= 0) {
goto _return;
@ -128,10 +245,11 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp
for (int32_t i = 0; i < num; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
if (NULL == taosArrayPush(pCtx->pGrpUidList, &pNew->uid)) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = addNewGroupToVgHash(pCtx->pVgTbHash, pNew);
if (code) {
goto _return;
}
if (num > 1) {
if (0 == i) {
pDst = pNew->pParam;
@ -150,7 +268,7 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp
_return:
taosWUnLockLatch(&pCtx->lock);
taosWUnLockLatch(&pCtx->grpLock);
*ppParam = pDst;
return code;
@ -174,11 +292,18 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
if (pBlock) {
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
if (code) {
return code;
}
taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock);
}
}
*ppRes = pBlock;
return TSDB_CODE_SUCCESS;
return code;
}
static void notifyWaitingSessions(SArray* pWaitQueue) {
@ -193,26 +318,70 @@ static void notifyWaitingSessions(SArray* pWaitQueue) {
}
}
int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) {
pGroup->pBlock = NULL;
pGroup->fetchDone = true;
taosThreadMutexLock(&pGroup->mutex);
notifyWaitingSessions(pGroup->waitQueue);
taosArrayClear(pGroup->waitQueue);
taosThreadMutexUnlock(&pGroup->mutex);
}
static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) {
return;
}
pCtx->lastBlkUid = uid;
pGroup->pVgCtx->lastUid = uid;
int32_t i = 0;
while (true) {
SGcNewGroupInfo* pNew = taosArrayGet(pGroup->pVgCtx->pTbList, i++);
if (NULL == pNew || pNew->uid == uid) {
break;
}
handleGroupFetchDone(pNew->pGroup);
}
if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
pGroup->pVgCtx->fileId++;
pGroup->pVgCtx->fileSize = 0;
}
pGroup->fileId = pGroup->pVgCtx->fileId;
pGroup->startOffset = pGroup->pVgCtx->fileSize;
}
static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
if (pGCache->grpByUid) {
SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (NULL == pGroup) {
qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.uid);
return TSDB_CODE_INVALID_PARA;
}
handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.uid);
pGroup->pBlock = pBlock;
if (pGroup->needCache) {
SGcBlkBufInfo* pNewBlk = NULL;
code = addBlkToBlkBufs(pOperator, pBlock, &pNewBlk);
SGcBlkBufInfo newBlkBuf;
code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
if (code) {
return code;
}
if (pGroup->pLastBlk) {
pGroup->pLastBlk->next = pNewBlk;
pGroup->pLastBlk = pNewBlk;
if (pGroup->endBlkId > 0) {
pGroup->endBlkId = newBlkBuf.blkId;
} else {
pGroup->pFirstBlk = pNewBlk;
pGroup->pLastBlk = pNewBlk;
pGroup->startBlkId = newBlkBuf.blkId;
pGroup->endBlkId = newBlkBuf.blkId;
}
}
@ -231,17 +400,19 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
int32_t uidNum = taosArrayGetSize(pCtx->pGrpUidList);
for (int32_t i = 0; i < uidNum; ++i) {
int64_t* pUid = taosArrayGet(pCtx->pGrpUidList, i);
SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, pUid, sizeof(*pUid));
pGroup->pBlock = NULL;
pGroup->fetchDone = true;
notifyWaitingSessions(pGroup->waitQueue);
int32_t uidNum = 0;
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGcVgroupCtx* pVgCtx = NULL;
int32_t iter = 0;
while (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter)) {
uidNum = taosArrayGetSize(pVgCtx->pTbList);
for (int32_t i = 0; i < uidNum; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i);
handleGroupFetchDone(pNew->pGroup);
}
taosArrayClear(pVgCtx->pTbList);
}
taosArrayClear(pCtx->pGrpUidList);
return TSDB_CODE_SUCCESS;
}
@ -267,6 +438,7 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator
}
static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGroup, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pGroup->waitQueue) {
pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
if (NULL == pGroup->waitQueue) {
@ -287,18 +459,22 @@ static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCac
tsem_wait(&pSession->waitSem);
if (pSession->pGroupData->needCache) {
if (NULL == pSession->pLastBlk) {
if (pSession->pGroupData->pFirstBlk) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk);
pSession->pLastBlk = pSession->pGroupData->pFirstBlk;
return TSDB_CODE_SUCCESS;
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
return code;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
return TSDB_CODE_SUCCESS;
}
} else if (pSession->pLastBlk->next) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next);
pSession->pLastBlk = pSession->pLastBlk->next;
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
return code;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
return TSDB_CODE_SUCCESS;
}
} else {
@ -317,18 +493,22 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
while (true) {
if (pSession->pGroupData->needCache) {
if (NULL == pSession->pLastBlk) {
if (pSession->pGroupData->pFirstBlk) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk);
pSession->pLastBlk = pSession->pGroupData->pFirstBlk;
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
goto _return;
}
} else if (pSession->pLastBlk->next) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next);
pSession->pLastBlk = pSession->pLastBlk->next;
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
goto _return;
} else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL;
goto _return;
}
} else if (pSession->pGroupData->pBlock || pSession->pGroupData->fetchDone) {
} else if (pSession->pGroupData->pBlock || atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = pSession->pGroupData->pBlock;
pSession->pGroupData->pBlock = NULL;
goto _return;
@ -370,25 +550,48 @@ _return:
}
static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
pInfo->pBlkBufs = taosArrayInit(32, sizeof(SGcBufPageInfo));
if (NULL == pInfo->pBlkBufs) {
static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
SGcBlkCacheInfo* pCache = &pInfo->blkCache;
pCache->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pCache->pCacheFile) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pCache->pDirtyBlk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pCache->pReadBlk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return addPageToGroupCacheBuf(pInfo->pBlkBufs);
return TSDB_CODE_SUCCESS;
}
static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp) {
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcOperatorParam* pGcParam) {
taosThreadMutexInit(&pGroup->mutex, NULL);
pGroup->needCache = pGcParam->needCache;
pGroup->downstreamIdx = pGcParam->downstreamIdx;
pGroup->vgId = pGcParam->vgId;
pGroup->fileId = -1;
pGroup->startBlkId = -1;
pGroup->endBlkId = -1;
pGroup->startOffset = -1;
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
}
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcOperatorParam* pGcParam = pParam->value;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData grpData = {0};
grpData.needCache = pGcParam->needCache;
initNewGroupData(pCtx, &grpData, pGcParam);
while (true) {
if (0 != taosHashPut(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize, &grpData, sizeof(grpData))) {
if (0 != taosHashPut(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid), &grpData, sizeof(grpData))) {
if (terrno == TSDB_CODE_DUP_KEY) {
*ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize);
*ppGrp = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
if (*ppGrp) {
break;
}
@ -397,18 +600,20 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperato
}
}
*ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize);
*ppGrp = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
if (*ppGrp) {
SGcNewGroupInfo newGroup;
newGroup.uid = *(int64_t*)pGcParam->pGroupValue;
newGroup.pGroup = *ppGrp;
newGroup.vgId = pGcParam->vgId;
newGroup.uid = pGcParam->tbUid;
newGroup.pParam = taosArrayGet(pParam->pChildren, 0);
taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
if (NULL == taosArrayPush(pGCache->pDownstreams[pParam->downstreamIdx].pNewGrpList, &newGroup)) {
taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
taosWLockLatch(&pCtx->grpLock);
if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
taosWUnLockLatch(&pCtx->grpLock);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock);
taosWUnLockLatch(&pCtx->grpLock);
break;
}
@ -418,15 +623,18 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperato
}
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SGcSessionCtx ctx = {0};
int32_t code = 0;
SGcOperatorParam* pGcParam = pParam->value;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize);
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
if (pGroup) {
ctx.pGroupData = pGroup;
} else {
code = initGroupCacheGroupData(pOperator, pParam, &ctx.pGroupData);
code = addNewGroupData(pOperator, pParam, &ctx.pGroupData);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -434,12 +642,12 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
ctx.pParam = pGcParam;
int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
code = taosHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*ppSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
*ppSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
return TSDB_CODE_SUCCESS;
}
@ -447,7 +655,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcOperatorParam* pGcParam = pParam->value;
SGcSessionCtx* pSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
SGcSessionCtx* pSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (NULL == pSession) {
int32_t code = initGroupCacheSession(pOperator, pParam, &pSession);
if (TSDB_CODE_SUCCESS != code) {
@ -462,7 +670,7 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo
if (NULL == *ppCurrent) {
return;
}
if (tSimpleHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) {
if (taosHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) {
qError("remove session %" PRIx64 " failed", *pCurrentId);
}
@ -489,8 +697,9 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pInfo->pDownstreams[i].fetchSessionId = -1;
pInfo->pDownstreams[i].pGrpUidList = taosArrayInit(10, sizeof(int64_t));
if (NULL == pInfo->pDownstreams[i].pGrpUidList) {
pInfo->pDownstreams[i].lastBlkUid = 0;
pInfo->pDownstreams[i].pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pInfo->pDownstreams[i].pVgTbHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -498,6 +707,17 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
if (NULL == pInfo->pDownstreams[i].pNewGrpList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!pInfo->globalGrp) {
pInfo->pDownstreams[i].pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pInfo->pDownstreams[i].pGrpHash == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pInfo->pDownstreams[i].pFreeBlock = taosArrayInit(10, POINTER_BYTES);
if (NULL == pInfo->pDownstreams[i].pFreeBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
@ -531,10 +751,14 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->maxCacheSize = -1;
pInfo->grpByUid = pPhyciNode->grpByUid;
pInfo->globalGrp = pPhyciNode->globalGrp;
if (!pInfo->grpByUid) {
qError("only group cache by uid is supported now");
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
goto _error;
}
if (pPhyciNode->pGroupCols) {
@ -544,18 +768,20 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
}
}
code = initGroupCacheBufPages(pInfo);
code = initGroupCacheBlockCache(pInfo);
if (code) {
goto _error;
}
pInfo->pBlkHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pInfo->pBlkHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
if (pInfo->globalGrp) {
pInfo->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pInfo->pGrpHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
pInfo->pSessionHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
pInfo->pSessionHash = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pInfo->pSessionHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;

View File

@ -616,28 +616,37 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
SExchangeOperatorParam* pDExc = pDst->value;
SExchangeOperatorParam* pSExc = pSrc->value;
if (!pDExc->multiParams) {
SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
if (NULL == pBatch) {
return TSDB_CODE_OUT_OF_MEMORY;
if (pSExc->basic.vgId != pDExc->basic.vgId) {
SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
if (NULL == pBatch) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBatch->multiParams = true;
pBatch->pBatchs = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pBatch->pBatchs) {
taosMemoryFree(pBatch);
return TSDB_CODE_OUT_OF_MEMORY;
}
tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic, sizeof(pDExc->basic));
tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic));
destroyOperatorParamValue(pDst->value);
pDst->value = pBatch;
} else {
taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList);
}
pBatch->multiParams = true;
pBatch->pBatchs = taosArrayInit(4, sizeof(SExchangeOperatorBasicParam));
if (NULL == pBatch->pBatchs) {
taosMemoryFree(pBatch);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBatch->pBatchs, &pDExc->basic);
taosArrayPush(pBatch->pBatchs, &pSExc->basic);
destroyOperatorParamValue(pDst->value);
pDst->value = pBatch;
} else {
SExchangeOperatorBatchParam* pBatch = pDst->value;
taosArrayPush(pBatch->pBatchs, &pSExc->basic);
SExchangeOperatorBasicParam* pBasic = tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
if (pBasic) {
taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList);
} else {
tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic));
}
}
break;
}
default:
qError("invalid optype %d for merge operator params", (*ppDst)->opType);
qError("invalid optype %d for merge operator params", pDst->opType);
return TSDB_CODE_INVALID_PARA;
}
@ -678,7 +687,7 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara
for (int32_t i = 0; i < childrenNum; ++i) {
SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i);
if (pOperator->pDownstreamParams[pChild->downstreamIdx]) {
int32_t code = mergeOperatorParams(&pOperator->pDownstreamParams[pChild->downstreamIdx], pChild);
int32_t code = mergeOperatorParams(pOperator->pDownstreamParams[pChild->downstreamIdx], pChild);
if (code) {
return code;
}

View File

@ -1183,6 +1183,8 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
}
static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull";
static const char* jkGroupCacheLogicPlanGroupByUid = "GroupByUid";
static const char* jkGroupCacheLogicPlanGlobalGroup = "GlobalGroup";
static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols";
static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
@ -1192,6 +1194,12 @@ static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, pNode->grpColsMayBeNull);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGroupByUid, pNode->grpByUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGlobalGroup, pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols);
}
@ -1206,6 +1214,12 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, &pNode->grpColsMayBeNull);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGroupByUid, &pNode->grpByUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGlobalGroup, &pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols);
}
@ -2912,12 +2926,24 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
}
static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns";
static const char* jkGroupCachePhysiPlanGrpColsMayBeNull = "GroupColumnsMayBeNull";
static const char* jkGroupCachePhysiPlanGroupByUid = "GroupByUid";
static const char* jkGroupCachePhysiPlanGlobalGroup = "GlobalGroup";
static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
const SGroupCachePhysiNode* pNode = (const SGroupCachePhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGrpColsMayBeNull, pNode->grpColsMayBeNull);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGroupByUid, pNode->grpByUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGlobalGroup, pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols);
}
@ -2928,6 +2954,15 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) {
SGroupCachePhysiNode* pNode = (SGroupCachePhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGrpColsMayBeNull, &pNode->grpColsMayBeNull);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGroupByUid, &pNode->grpByUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGlobalGroup, &pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols);
}

View File

@ -3527,6 +3527,9 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) {
enum {
PHY_GROUP_CACHE_CODE_BASE_NODE = 1,
PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL,
PHY_GROUP_CACHE_CODE_GROUP_BY_UID,
PHY_GROUP_CACHE_CODE_GLOBAL_GROUP,
PHY_GROUP_CACHE_CODE_GROUP_COLUMNS
};
@ -3537,6 +3540,16 @@ static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_GROUP_CACHE_CODE_GROUP_COLUMNS, nodeListToMsg, pNode->pGroupCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL, pNode->grpColsMayBeNull);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GROUP_BY_UID, pNode->grpByUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, pNode->globalGrp);
}
return code;
}
@ -3553,6 +3566,15 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_GROUP_CACHE_CODE_GROUP_COLUMNS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupCols);
break;
case PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL:
code = tlvDecodeBool(pTlv, &pNode->grpColsMayBeNull);
break;
case PHY_GROUP_CACHE_CODE_GROUP_BY_UID:
code = tlvDecodeBool(pTlv, &pNode->grpByUid);
break;
case PHY_GROUP_CACHE_CODE_GLOBAL_GROUP:
code = tlvDecodeBool(pTlv, &pNode->globalGrp);
break;
default:
break;
}

View File

@ -3231,6 +3231,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
pGrpCache->node.dynamicOp = true;
pGrpCache->grpColsMayBeNull = false;
pGrpCache->grpByUid = true;
pGrpCache->node.pChildren = pChildren;
pGrpCache->node.pTargets = nodesMakeList();
if (NULL == pGrpCache->node.pTargets) {
@ -3252,11 +3253,16 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
}
}
bool hasCond = false;
SNode* pNode = NULL;
FOREACH(pNode, pChildren) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
if (pScan->node.pConditions) {
hasCond = true;
}
pScan->node.pParent = (SLogicNode*)pGrpCache;
}
pGrpCache->globalGrp = !hasCond;
if (TSDB_CODE_SUCCESS == code) {
*ppLogic = (SLogicNode*)pGrpCache;

View File

@ -986,6 +986,8 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
}
pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull;
pGrpCache->grpByUid = pLogicNode->grpByUid;
pGrpCache->globalGrp = pLogicNode->globalGrp;
SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) {