fix: fix bugs

This commit is contained in:
dapan1121 2023-07-14 19:36:56 +08:00
parent aca9c5fe19
commit 188df1d675
13 changed files with 216 additions and 140 deletions

View File

@ -162,6 +162,7 @@ typedef struct SGroupCacheLogicNode {
bool grpColsMayBeNull;
bool grpByUid;
bool globalGrp;
bool enableCache;
SNodeList* pGroupCols;
} SGroupCacheLogicNode;
@ -445,6 +446,7 @@ typedef struct SGroupCachePhysiNode {
bool grpColsMayBeNull;
bool grpByUid;
bool globalGrp;
bool enableCache;
SNodeList* pGroupCols;
} SGroupCachePhysiNode;

View File

@ -108,7 +108,6 @@ typedef struct SExchangeOpStopInfo {
typedef struct SGcOperatorParam {
int64_t sessionId;
int32_t downstreamIdx;
bool needCache;
int32_t vgId;
int64_t tbUid;
} SGcOperatorParam;

View File

@ -45,7 +45,6 @@ typedef struct SGroupCacheData {
TdThreadMutex mutex;
SArray* waitQueue;
bool fetchDone;
bool needCache;
SSDataBlock* pBlock;
SGcVgroupCtx* pVgCtx;
int32_t downstreamIdx;
@ -89,17 +88,19 @@ typedef struct SGcDownstreamCtx {
SSDataBlock* pBaseBlock;
SArray* pFreeBlock;
int64_t lastBlkUid;
SHashObj* pSessions;
SHashObj* pWaitSessions;
} SGcDownstreamCtx;
typedef struct SGcSessionCtx {
int32_t downstreamIdx;
bool needCache;
SGcOperatorParam* pParam;
SGroupCacheData* pGroupData;
int64_t lastBlkId;
int64_t nextOffset;
bool semInit;
tsem_t waitSem;
bool newFetch;
} SGcSessionCtx;
typedef struct SGcExecInfo {
@ -113,11 +114,6 @@ typedef struct SGcCacheFile {
int64_t fileSize;
} SGcCacheFile;
typedef struct SGcReadBlkInfo {
SSDataBlock* pBlock;
int64_t nextOffset;
} SGcReadBlkInfo;
typedef struct SGcBlkCacheInfo {
SRWLatch dirtyLock;
SSHashObj* pCacheFile;
@ -132,10 +128,10 @@ typedef struct SGroupCacheOperatorInfo {
TdThreadMutex sessionMutex;
int64_t maxCacheSize;
int64_t currentBlkId;
SHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
bool globalGrp;
bool grpByUid;
bool enableCache;
SGcDownstreamCtx* pDownstreams;
SGcBlkCacheInfo blkCache;
SHashObj* pGrpHash;

View File

@ -66,6 +66,7 @@ typedef struct SOperatorInfo {
int16_t resultDataBlockId;
bool blocking; // block operator or not
bool transparent;
bool dynamicTask;
uint8_t status; // denote if current operator is completed
char* name; // name, for debug purpose
void* info; // extension attribution

View File

@ -36,7 +36,7 @@ static void destroyDynQueryCtrlOperator(void* param) {
taosMemoryFreeClear(param);
}
static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) {
static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -56,7 +56,6 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
pGc->downstreamIdx = downstreamIdx;
pGc->needCache = needCache;
pGc->vgId = vgId;
pGc->tbUid = tbUid;
@ -146,10 +145,10 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ
code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, *leftVg, *leftUid, pExcParam0);
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, *rightVg, *rightUid, pExcParam1);
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1);

View File

@ -80,7 +80,6 @@ static void destroyGroupCacheOperator(void* param) {
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
taosHashCleanup(pGrpCacheOperator->pSessionHash);
taosHashCleanup(pGrpCacheOperator->pGrpHash);
taosMemoryFreeClear(param);
@ -164,12 +163,10 @@ static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** pp
return TSDB_CODE_SUCCESS;
}
static int32_t releaseBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
static void releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
taosWLockLatch(&pCtx->blkLock);
taosArrayPush(pCtx->pFreeBlock, &pBlock);
taosWUnLockLatch(&pCtx->blkLock);
return TSDB_CODE_SUCCESS;
}
@ -182,15 +179,9 @@ static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int3
return blockDataFromBuf(*ppRes, pBufInfo->pBuf);
}
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) {
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkCacheInfo* pCache = &pGCache->blkCache;
SGcReadBlkInfo* pReadBlk = taosHashAcquire(pCache->pReadBlk, &blkId, sizeof(blkId));
if (pReadBlk) {
*ppRes = pReadBlk->pBlock;
*nextOffset = pReadBlk->nextOffset;
return TSDB_CODE_SUCCESS;
}
taosRLockLatch(&pCache->dirtyLock);
SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId));
@ -202,8 +193,8 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
}
*nextOffset = pBufInfo->offset + pBufInfo->bufSize;
SGcReadBlkInfo readBlk = {.pBlock = *ppRes, .nextOffset = *nextOffset};
taosHashPut(pCache->pReadBlk, &blkId, sizeof(blkId), &readBlk, sizeof(readBlk));
taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES);
return TSDB_CODE_SUCCESS;
}
taosRUnLockLatch(&pCache->dirtyLock);
@ -279,9 +270,11 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
SOperatorParam* pDownstreamParam = NULL;
SSDataBlock* pBlock = NULL;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
if (code) {
return code;
if (pGCache->enableCache) {
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
if (code) {
return code;
}
}
if (pDownstreamParam) {
@ -290,7 +283,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]);
}
if (pBlock) {
if (pBlock && pGCache->enableCache) {
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
@ -359,41 +352,36 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
if (pGCache->grpByUid) {
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (NULL == pGroup) {
qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.uid);
return TSDB_CODE_INVALID_PARA;
}
handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.uid);
pGroup->pBlock = pBlock;
if (pGroup->needCache) {
SGcBlkBufInfo newBlkBuf;
code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
if (code) {
return code;
}
if (pGroup->endBlkId > 0) {
pGroup->endBlkId = newBlkBuf.blkId;
} else {
pGroup->startBlkId = newBlkBuf.blkId;
pGroup->endBlkId = newBlkBuf.blkId;
}
}
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId));
if (NULL == pGroup) {
qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.groupId);
return TSDB_CODE_INVALID_PARA;
}
handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId);
notifyWaitingSessions(pGroup->waitQueue);
if (pGroup == pSession->pGroupData) {
*continueFetch = false;
}
return TSDB_CODE_SUCCESS;
SGcBlkBufInfo newBlkBuf;
code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
if (code) {
return code;
}
if (pGroup->endBlkId > 0) {
pGroup->endBlkId = newBlkBuf.blkId;
} else {
pGroup->startBlkId = newBlkBuf.blkId;
pGroup->endBlkId = newBlkBuf.blkId;
}
return TSDB_CODE_INVALID_PARA;
notifyWaitingSessions(pGroup->waitQueue);
if (pGroup == pSession->pGroupData) {
pSession->lastBlkId = newBlkBuf.blkId;
pSession->nextOffset = newBlkBuf.offset + newBlkBuf.bufSize;
*continueFetch = false;
}
return TSDB_CODE_SUCCESS;
}
static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) {
@ -413,13 +401,16 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes
taosArrayClear(pVgCtx->pTbList);
}
taosHashClear(pCtx->pWaitSessions);
return TSDB_CODE_SUCCESS;
}
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
bool continueFetch = true;
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
while (continueFetch && TSDB_CODE_SUCCESS == code) {
int32_t code = getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes);
if (TSDB_CODE_SUCCESS != code) {
@ -429,15 +420,46 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator
if (NULL == *ppRes) {
code = handleDownstreamFetchDone(pOperator, pSession);
break;
} else {
} else if (pGCache->enableCache) {
code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch);
} else {
continueFetch = false;
}
}
if (!pGCache->enableCache) {
return code;
}
if (!continueFetch) {
SGcSessionCtx** ppWaitCtx = taosHashIterate(pCtx->pWaitSessions, NULL);
if (ppWaitCtx) {
taosHashCancelIterate(pCtx->pWaitSessions, ppWaitCtx);
int64_t* pSessionId = taosHashGetKey(ppWaitCtx, NULL);
if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, *pSessionId)) {
qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SGcSessionCtx* pWaitCtx = *ppWaitCtx;
pWaitCtx->newFetch = true;
taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
tsem_post(&pWaitCtx->waitSem);
return code;
}
}
if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, -1)) {
qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
return code;
}
static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGroup, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = pSession->pGroupData;
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pGroup->waitQueue) {
pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
@ -456,77 +478,73 @@ static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCac
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES);
tsem_wait(&pSession->waitSem);
if (pSession->pGroupData->needCache) {
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
return code;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
return TSDB_CODE_SUCCESS;
}
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
return code;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
return TSDB_CODE_SUCCESS;
}
} else {
*ppRes = pSession->pGroupData->pBlock;
return TSDB_CODE_SUCCESS;
if (pSession->newFetch) {
pSession->newFetch = false;
return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
}
qError("no block retrieved from downstream and waked up");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
}
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
}
return code;
}
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
bool locked = false;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
while (true) {
if (pSession->pGroupData->needCache) {
if (pGCache->enableCache) {
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
goto _return;
}
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
goto _return;
} else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL;
goto _return;
}
} else if (pSession->pGroupData->pBlock || atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = pSession->pGroupData->pBlock;
pSession->pGroupData->pBlock = NULL;
goto _return;
}
if ((atomic_load_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId) == sessionId)
|| (-1 == atomic_val_compare_exchange_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId, -1, sessionId))) {
if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId)
|| (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) {
if (locked) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
locked = false;
}
code = getCacheBlkFromDownstreamOperator(pOperator, pSession, ppRes);
code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
goto _return;
}
if (locked) {
code = groupCacheSessionWait(pGCache, pSession->pGroupData, pSession, ppRes);
code = groupCacheSessionWait(pOperator, pCtx, sessionId, pSession, ppRes);
locked = false;
if (TSDB_CODE_SUCCESS != code) {
goto _return;
@ -570,7 +588,6 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcOperatorParam* pGcParam) {
taosThreadMutexInit(&pGroup->mutex, NULL);
pGroup->needCache = pGcParam->needCache;
pGroup->downstreamIdx = pGcParam->downstreamIdx;
pGroup->vgId = pGcParam->vgId;
pGroup->fileId = -1;
@ -606,7 +623,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
newGroup.pGroup = *ppGrp;
newGroup.vgId = pGcParam->vgId;
newGroup.uid = pGcParam->tbUid;
newGroup.pParam = taosArrayGet(pParam->pChildren, 0);
newGroup.pParam = taosArrayGetP(pParam->pChildren, 0);
taosWLockLatch(&pCtx->grpLock);
if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
@ -622,6 +639,14 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) {
pSession->pParam = pGcParam;
pSession->downstreamIdx = pGcParam->downstreamIdx;
pSession->pGroupData = pGroup;
pSession->lastBlkId = -1;
pSession->nextOffset = -1;
}
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SGcSessionCtx ctx = {0};
@ -631,51 +656,50 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
if (pGroup) {
ctx.pGroupData = pGroup;
} else {
code = addNewGroupData(pOperator, pParam, &ctx.pGroupData);
if (NULL == pGroup) {
code = addNewGroupData(pOperator, pParam, &pGroup);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
ctx.pParam = pGcParam;
code = taosHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
initGroupCacheSessionCtx(&ctx, pGcParam, pGroup);
code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*ppSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
*ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
return TSDB_CODE_SUCCESS;
}
static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcOperatorParam* pGcParam = pParam->value;
SGcSessionCtx* pSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SGcSessionCtx* pSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (NULL == pSession) {
int32_t code = initGroupCacheSession(pOperator, pParam, &pSession);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} else if (pGCache->enableCache) {
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (ppBlock) {
releaseBaseBlockToList(pCtx, *ppBlock);
taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
}
}
return getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes);
}
code = getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes);
if (NULL == ppRes) {
taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
}
static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) {
if (NULL == *ppCurrent) {
return;
}
if (taosHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) {
qError("remove session %" PRIx64 " failed", *pCurrentId);
}
*ppCurrent = NULL;
*pCurrentId = 0;
return code;
}
static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
@ -690,7 +714,7 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pInfo = pOperator->info;
pInfo->pDownstreams = taosMemoryMalloc(pOperator->numOfDownstream * sizeof(*pInfo->pDownstreams));
pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams));
if (NULL == pInfo->pDownstreams) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -713,11 +737,21 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pInfo->pDownstreams[i].pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pInfo->pDownstreams[i].pSessions == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInfo->pDownstreams[i].pFreeBlock = taosArrayInit(10, POINTER_BYTES);
if (NULL == pInfo->pDownstreams[i].pFreeBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInfo->pDownstreams[i].pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pInfo->pDownstreams[i].pWaitSessions == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
@ -754,6 +788,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->maxCacheSize = -1;
pInfo->grpByUid = pPhyciNode->grpByUid;
pInfo->globalGrp = pPhyciNode->globalGrp;
pInfo->enableCache = pPhyciNode->enableCache;
if (!pInfo->grpByUid) {
qError("only group cache by uid is supported now");
@ -781,12 +816,6 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
}
}
pInfo->pSessionHash = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pInfo->pSessionHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (TSDB_CODE_SUCCESS != code) {
goto _error;

View File

@ -642,6 +642,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
return TSDB_CODE_SUCCESS;
}
static void setMergeJoinDone(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
pOperator->pDownstreamParams[0] = NULL;
pOperator->pDownstreamParams[1] = NULL;
}
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
@ -651,7 +657,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
pJoinInfo->leftPos = 0;
if (pJoinInfo->pLeft == NULL) {
qDebug("merge join left got empty block");
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
setMergeJoinDone(pOperator);
return false;
} else {
qDebug("merge join left got block");
@ -664,7 +670,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
pJoinInfo->rightPos = 0;
if (pJoinInfo->pRight == NULL) {
qDebug("merge join right got empty block");
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
setMergeJoinDone(pOperator);
return false;
} else {
qDebug("merge join right got block");
@ -729,6 +735,9 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE && (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1])) {
return NULL;
}
SSDataBlock* pRes = pJoinInfo->pRes;
blockDataCleanup(pRes);
@ -746,6 +755,9 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
if (pOperator->status == OP_EXEC_DONE) {
break;
}
}
return (pRes->info.rows > 0) ? pRes : NULL;
}

View File

@ -799,8 +799,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
qDebug("add total %d dynamic tables to scan", num);
taosArrayClear(pListInfo->pTableList);
taosHashClear(pListInfo->map);
pListInfo->oneTableForEachGroup = true;
for (int32_t i = 0; i < num; ++i) {
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
@ -838,6 +837,10 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) {
if (pInfo->base.pTableListInfo->oneTableForEachGroup) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
result->info.id.groupId = pKeyInfo->uid;
}
return result;
}
@ -851,14 +854,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
if (pOperator->pOperatorParam) {
pOperator->dynamicTask = true;
int32_t code = createTableListInfoFromParam(pOperator);
pOperator->pOperatorParam = NULL;
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pInfo->currentGroupId != -1) {
if (pOperator->status == OP_EXEC_DONE) {
pInfo->currentGroupId = 0;
return startNextGroupScan(pOperator);
}
@ -922,11 +925,19 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) {
if (pInfo->base.pTableListInfo->oneTableForEachGroup) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
result->info.id.groupId = pKeyInfo->uid;
}
return result;
}
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
if (pOperator->dynamicTask) {
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
taosHashClear(pInfo->base.pTableListInfo->map);
}
return NULL;
}

View File

@ -537,6 +537,10 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc
static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCacheLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(grpColsMayBeNull);
COPY_SCALAR_FIELD(grpByUid);
COPY_SCALAR_FIELD(globalGrp);
COPY_SCALAR_FIELD(enableCache);
CLONE_NODE_LIST_FIELD(pGroupCols);
return TSDB_CODE_SUCCESS;
}

View File

@ -1185,6 +1185,7 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull";
static const char* jkGroupCacheLogicPlanGroupByUid = "GroupByUid";
static const char* jkGroupCacheLogicPlanGlobalGroup = "GlobalGroup";
static const char* jkGroupCacheLogicPlanEnableCache = "EnableCache";
static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols";
static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
@ -1200,6 +1201,9 @@ static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGlobalGroup, pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanEnableCache, pNode->enableCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols);
}
@ -1220,6 +1224,9 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGlobalGroup, &pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanEnableCache, &pNode->enableCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols);
}
@ -2929,6 +2936,7 @@ static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns";
static const char* jkGroupCachePhysiPlanGrpColsMayBeNull = "GroupColumnsMayBeNull";
static const char* jkGroupCachePhysiPlanGroupByUid = "GroupByUid";
static const char* jkGroupCachePhysiPlanGlobalGroup = "GlobalGroup";
static const char* jkGroupCachePhysiPlanEnableCache = "EnableCache";
static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
@ -2944,6 +2952,9 @@ static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGlobalGroup, pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanEnableCache, pNode->enableCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols);
}
@ -2963,6 +2974,9 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGlobalGroup, &pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanEnableCache, &pNode->enableCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols);
}

View File

@ -3530,6 +3530,7 @@ enum {
PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL,
PHY_GROUP_CACHE_CODE_GROUP_BY_UID,
PHY_GROUP_CACHE_CODE_GLOBAL_GROUP,
PHY_GROUP_CACHE_CODE_ENABLE_CACHE,
PHY_GROUP_CACHE_CODE_GROUP_COLUMNS
};
@ -3549,6 +3550,9 @@ static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, pNode->globalGrp);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_ENABLE_CACHE, pNode->enableCache);
}
return code;
}
@ -3575,6 +3579,9 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_GROUP_CACHE_CODE_GLOBAL_GROUP:
code = tlvDecodeBool(pTlv, &pNode->globalGrp);
break;
case PHY_GROUP_CACHE_CODE_ENABLE_CACHE:
code = tlvDecodeBool(pTlv, &pNode->enableCache);
break;
default:
break;
}

View File

@ -3263,7 +3263,8 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
pScan->node.pParent = (SLogicNode*)pGrpCache;
}
pGrpCache->globalGrp = !hasCond;
pGrpCache->enableCache = pGrpCache->globalGrp;
if (TSDB_CODE_SUCCESS == code) {
*ppLogic = (SLogicNode*)pGrpCache;
} else {

View File

@ -988,6 +988,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull;
pGrpCache->grpByUid = pLogicNode->grpByUid;
pGrpCache->globalGrp = pLogicNode->globalGrp;
pGrpCache->enableCache = pLogicNode->enableCache;
SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) {