enh: remove group cache

This commit is contained in:
dapan1121 2023-07-28 18:00:33 +08:00
parent e83a46b88f
commit 9ed1417bba
12 changed files with 563 additions and 184 deletions

View File

@ -24,6 +24,8 @@ typedef struct SDynQueryCtrlExecInfo {
int64_t prevBlkRows; int64_t prevBlkRows;
int64_t postBlkNum; int64_t postBlkNum;
int64_t postBlkRows; int64_t postBlkRows;
int64_t leftCacheNum;
int64_t rightCacheNum;
} SDynQueryCtrlExecInfo; } SDynQueryCtrlExecInfo;
typedef struct SStbJoinTableList { typedef struct SStbJoinTableList {
@ -40,7 +42,8 @@ typedef struct SStbJoinPrevJoinCtx {
bool joinBuild; bool joinBuild;
SSHashObj* leftHash; SSHashObj* leftHash;
SSHashObj* rightHash; SSHashObj* rightHash;
SSHashObj* tableTimes; SSHashObj* leftCache;
SSHashObj* rightCache;
SSHashObj* onceTable; SSHashObj* onceTable;
int64_t tableNum; int64_t tableNum;
SStbJoinTableList* pListHead; SStbJoinTableList* pListHead;
@ -48,6 +51,11 @@ typedef struct SStbJoinPrevJoinCtx {
typedef struct SStbJoinPostJoinCtx { typedef struct SStbJoinPostJoinCtx {
bool isStarted; bool isStarted;
bool leftNeedCache;
bool rightNeedCache;
int32_t leftVgId;
int32_t rightVgId;
int64_t leftCurrUid;
int64_t rightCurrUid; int64_t rightCurrUid;
int64_t rightNextUid; int64_t rightNextUid;
} SStbJoinPostJoinCtx; } SStbJoinPostJoinCtx;
@ -58,13 +66,13 @@ typedef struct SStbJoinDynCtrlCtx {
} SStbJoinDynCtrlCtx; } SStbJoinDynCtrlCtx;
typedef struct SStbJoinDynCtrlInfo { typedef struct SStbJoinDynCtrlInfo {
SStbJoinDynCtrlBasic basic; SDynQueryCtrlExecInfo execInfo;
SStbJoinDynCtrlCtx ctx; SStbJoinDynCtrlBasic basic;
SStbJoinDynCtrlCtx ctx;
} SStbJoinDynCtrlInfo; } SStbJoinDynCtrlInfo;
typedef struct SDynQueryCtrlOperatorInfo { typedef struct SDynQueryCtrlOperatorInfo {
EDynQueryType qType; EDynQueryType qType;
SDynQueryCtrlExecInfo execInfo;
union { union {
SStbJoinDynCtrlInfo stbJoin; SStbJoinDynCtrlInfo stbJoin;
}; };

View File

@ -113,6 +113,12 @@ typedef struct SGcOperatorParam {
bool needCache; bool needCache;
} SGcOperatorParam; } SGcOperatorParam;
typedef struct SGcNotifyOperatorParam {
int32_t downstreamIdx;
int32_t vgId;
int64_t tbUid;
} SGcNotifyOperatorParam;
typedef struct SExprSupp { typedef struct SExprSupp {
SExprInfo* pExprInfo; SExprInfo* pExprInfo;
int32_t numOfExprs; // the number of scalar expression in group operator int32_t numOfExprs; // the number of scalar expression in group operator
@ -168,6 +174,11 @@ typedef struct SExchangeOperatorParam {
SExchangeOperatorBasicParam basic; SExchangeOperatorBasicParam basic;
} SExchangeOperatorParam; } SExchangeOperatorParam;
typedef struct SExchangeSrcIndex {
int32_t srcIdx;
int32_t inUseIdx;
} SExchangeSrcIndex;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SSHashObj* pHashSources; SSHashObj* pHashSources;

View File

@ -24,7 +24,7 @@ extern "C" {
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct SGcBlkBufBasic { typedef struct SGcBlkBufBasic {
uint32_t fileId; int32_t fileId;
int64_t blkId; int64_t blkId;
int64_t offset; int64_t offset;
int64_t bufSize; int64_t bufSize;
@ -36,9 +36,15 @@ typedef struct SGroupCacheFileFd {
TdFilePtr fd; TdFilePtr fd;
} SGroupCacheFileFd; } SGroupCacheFileFd;
typedef struct SGroupCacheFileInfo {
uint32_t groupNum;
bool deleted;
SGroupCacheFileFd fd;
} SGroupCacheFileInfo;
typedef struct SGcFileCacheCtx { typedef struct SGcFileCacheCtx {
int64_t fileSize; int64_t fileSize;
uint32_t fileId; int32_t fileId;
SHashObj* pCacheFile; SHashObj* pCacheFile;
int32_t baseNameLen; int32_t baseNameLen;
char baseFilename[256]; char baseFilename[256];
@ -61,6 +67,7 @@ typedef struct SGcDownstreamCtx {
} SGcDownstreamCtx; } SGcDownstreamCtx;
typedef struct SGcVgroupCtx { typedef struct SGcVgroupCtx {
int32_t id;
SArray* pTbList; SArray* pTbList;
uint64_t lastBlkUid; uint64_t lastBlkUid;
SGcFileCacheCtx fileCtx; SGcFileCacheCtx fileCtx;
@ -81,7 +88,7 @@ typedef struct SGroupCacheData {
int32_t downstreamIdx; int32_t downstreamIdx;
int32_t vgId; int32_t vgId;
SGcBlkList blkList; SGcBlkList blkList;
uint32_t fileId; int32_t fileId;
int64_t startOffset; int64_t startOffset;
} SGroupCacheData; } SGroupCacheData;
@ -124,7 +131,7 @@ typedef struct SGcBlkBufInfo {
void* next; void* next;
void* pBuf; void* pBuf;
SGcDownstreamCtx* pCtx; SGcDownstreamCtx* pCtx;
SGroupCacheData* pGroup; int64_t groupId;
} SGcBlkBufInfo; } SGcBlkBufInfo;
typedef struct SGcExecInfo { typedef struct SGcExecInfo {

View File

@ -20,6 +20,11 @@
extern "C" { extern "C" {
#endif #endif
typedef enum SOperatorParamType{
OP_GET_PARAM = 1,
OP_NOTIFY_PARAM
} SOperatorParamType;
typedef struct SOperatorCostInfo { typedef struct SOperatorCostInfo {
double openCost; double openCost;
double totalCost; double totalCost;
@ -36,7 +41,7 @@ typedef void (*__optr_close_fn_t)(void* param);
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_get_ext_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param); typedef SSDataBlock* (*__optr_get_ext_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param);
typedef SSDataBlock* (*__optr_notify_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param); typedef int32_t (*__optr_notify_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param);
typedef void (*__optr_state_fn_t)(struct SOperatorInfo* pOptr); typedef void (*__optr_state_fn_t)(struct SOperatorInfo* pOptr);
typedef struct SOperatorFpSet { typedef struct SOperatorFpSet {
@ -74,8 +79,10 @@ typedef struct SOperatorInfo {
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost; SOperatorCostInfo cost;
SResultInfo resultInfo; SResultInfo resultInfo;
SOperatorParam* pOperatorParam; SOperatorParam* pOperatorGetParam;
SOperatorParam** pDownstreamParams; SOperatorParam* pOperatorNotifyParam;
SOperatorParam** pDownstreamGetParams;
SOperatorParam** pDownstreamNotifyParams;
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
int32_t numOfRealDownstream; int32_t numOfRealDownstream;
@ -171,6 +178,7 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32
void* pInfo, SExecTaskInfo* pTaskInfo); void* pInfo, SExecTaskInfo* pTaskInfo);
int32_t optrDefaultBufFn(SOperatorInfo* pOperator); int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam);
int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam);
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx); SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx);
SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx); SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx);
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx); int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx);

View File

@ -36,8 +36,9 @@ void freeVgTableList(void* ptr) {
static void destroyDynQueryCtrlOperator(void* param) { static void destroyDynQueryCtrlOperator(void* param) {
SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64,
pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); pDyn->stbJoin.execInfo.prevBlkNum, pDyn->stbJoin.execInfo.prevBlkRows, pDyn->stbJoin.execInfo.postBlkNum,
pDyn->stbJoin.execInfo.postBlkRows, pDyn->stbJoin.execInfo.leftCacheNum, pDyn->stbJoin.execInfo.rightCacheNum);
if (pDyn->stbJoin.basic.batchFetch) { if (pDyn->stbJoin.basic.batchFetch) {
if (pDyn->stbJoin.ctx.prev.leftHash) { if (pDyn->stbJoin.ctx.prev.leftHash) {
@ -48,13 +49,16 @@ static void destroyDynQueryCtrlOperator(void* param) {
tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightHash, freeVgTableList); tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightHash, freeVgTableList);
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightHash); tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightHash);
} }
} } else {
if (pDyn->stbJoin.ctx.prev.leftCache) {
if (pDyn->stbJoin.ctx.prev.tableTimes) { tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftCache);
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.tableTimes); }
} if (pDyn->stbJoin.ctx.prev.rightCache) {
if (pDyn->stbJoin.ctx.prev.onceTable) { tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightCache);
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.onceTable); }
if (pDyn->stbJoin.ctx.prev.onceTable) {
tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.onceTable);
}
} }
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
@ -95,6 +99,31 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pChildren = NULL;
SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
if (NULL == pGc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pGc->downstreamIdx = downstreamIdx;
pGc->vgId = vgId;
pGc->tbUid = tbUid;
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
(*ppRes)->downstreamIdx = downstreamIdx;
(*ppRes)->value = pGc;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) { if (NULL == *ppRes) {
@ -199,6 +228,29 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
(*ppRes)->value = NULL;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) { static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
if (batchFetch) { if (batchFetch) {
return true; return true;
@ -208,11 +260,54 @@ static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev,
return pPost->rightCurrUid == pPost->rightNextUid; return pPost->rightCurrUid == pPost->rightNextUid;
} }
uint32_t* num = tSimpleHashGet(pPrev->tableTimes, &uid, sizeof(uid)); uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
return (NULL == num) ? false : true; return (NULL == num) ? false : true;
} }
static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) {
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
SStbJoinTableList* pNode = pPrev->pListHead;
int32_t* leftVgId = pNode->pLeftVg + pNode->readIdx;
int32_t* rightVgId = pNode->pRightVg + pNode->readIdx;
int64_t* leftUid = pNode->pLeftUid + pNode->readIdx;
int64_t* rightUid = pNode->pRightUid + pNode->readIdx;
int64_t readIdx = pNode->readIdx + 1;
int64_t rightPrevUid = pPost->rightCurrUid;
pPost->leftCurrUid = *leftUid;
pPost->rightCurrUid = *rightUid;
pPost->leftVgId = *leftVgId;
pPost->rightVgId = *rightVgId;
while (true) {
if (readIdx < pNode->uidNum) {
pPost->rightNextUid = *(rightUid + readIdx);
break;
}
pNode = pNode->pNext;
if (NULL == pNode) {
pPost->rightNextUid = 0;
break;
}
rightUid = pNode->pRightUid;
readIdx = 0;
}
pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
if (pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0);
pStbJoin->execInfo.rightCacheNum++;
}
}
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) { static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
int64_t rowIdx = pPrev->pListHead->readIdx; int64_t rowIdx = pPrev->pListHead->readIdx;
SOperatorParam* pExcParam0 = NULL; SOperatorParam* pExcParam0 = NULL;
@ -228,6 +323,8 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
qError("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, qError("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64,
rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid); rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
updatePostJoinCurrTableInfo(&pInfo->stbJoin);
if (pInfo->stbJoin.basic.batchFetch) { if (pInfo->stbJoin.basic.batchFetch) {
if (pPrev->leftHash) { if (pPrev->leftHash) {
code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftHash); code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftHash);
@ -249,10 +346,10 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, tableNeedCache(*leftUid, pPrev, pPost, false, pInfo->stbJoin.basic.batchFetch), pExcParam0); code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pExcParam0);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, tableNeedCache(*rightUid, pPrev, pPost, true, pInfo->stbJoin.basic.batchFetch), pExcParam1); code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pExcParam1);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1); code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1);
@ -261,7 +358,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
} }
static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
@ -277,17 +374,67 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes)
*ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam); *ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam);
if (*ppRes) { if (*ppRes) {
pPost->isStarted = true; pPost->isStarted = true;
pInfo->execInfo.postBlkNum++; pStbJoin->execInfo.postBlkNum++;
pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
qError("join res block retrieved"); qError("join res block retrieved");
} else { } else {
qError("Empty join res block retrieved"); qError("Empty join res block retrieved");
} }
} }
static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
SOperatorParam* pGcParam = NULL;
SOperatorParam* pMergeJoinParam = NULL;
int32_t downstreamId = leftTable ? 0 : 1;
int32_t vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
int64_t uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
qError("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
}
static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) {
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
pPost->isStarted = false;
if (pStbJoin->basic.batchFetch) {
return;
}
if (pPost->leftNeedCache) {
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
if (--(*num) <= 0) {
tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
notifySeqJoinTableCacheEnd(pOperator, pPost, true);
}
}
if (!pPost->rightNeedCache) {
void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
if (NULL != v) {
tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
notifySeqJoinTableCacheEnd(pOperator, pPost, false);
}
}
}
static FORCE_INLINE void seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post; SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post;
SStbJoinPrevJoinCtx* pPrev = &pInfo->stbJoin.ctx.prev;
if (!pPost->isStarted) { if (!pPost->isStarted) {
return; return;
@ -297,10 +444,11 @@ static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDat
*ppRes = getNextBlockFromDownstream(pOperator, 1); *ppRes = getNextBlockFromDownstream(pOperator, 1);
if (NULL == *ppRes) { if (NULL == *ppRes) {
pPost->isStarted = false; handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin);
pPrev->pListHead->readIdx++;
} else { } else {
pInfo->execInfo.postBlkNum++; pInfo->stbJoin.execInfo.postBlkNum++;
pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
return; return;
} }
} }
@ -428,14 +576,14 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc
break; break;
} }
} }
} } else {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
for (int32_t i = 0; i < pBlock->info.rows; ++i) { code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i); if (TSDB_CODE_SUCCESS != code) {
break;
code = addToJoinTableHash(pStbJoin->ctx.prev.tableTimes, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid)); }
if (TSDB_CODE_SUCCESS != code) {
break;
} }
} }
@ -452,59 +600,33 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc
} }
} }
static void updatePostJoinRightTableUid(SStbJoinDynCtrlInfo* pStbJoin) {
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
SStbJoinTableList* pNode = pPrev->pListHead;
int64_t* rightUid = pNode->pRightUid;
int64_t readIdx = pNode->readIdx + 1;
if (pPost->rightNextUid) {
pPost->rightCurrUid = pPost->rightNextUid;
} else {
pPost->rightCurrUid = *rightUid;
}
while (true) {
if (readIdx < pNode->uidNum) {
pPost->rightNextUid = *(rightUid + readIdx);
return;
}
pNode = pNode->pNext;
if (NULL == pNode) {
pPost->rightNextUid = 0;
return;
}
rightUid = pNode->pRightUid;
readIdx = 0;
}
}
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) { static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
updatePostJoinRightTableUid(pStbJoin); if (pStbJoin->basic.batchFetch) {
return;
}
if (tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) { if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
tSimpleHashClear(pStbJoin->ctx.prev.tableTimes); tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
return; return;
} }
uint64_t* pUid = NULL; uint64_t* pUid = NULL;
int32_t iter = 0; int32_t iter = 0;
while (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter)) { while (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter)) {
tSimpleHashRemove(pStbJoin->ctx.prev.tableTimes, pUid, sizeof(*pUid)); tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
} }
qError("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes)); pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
qError("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
// debug only // debug only
iter = 0; iter = 0;
uint32_t* num = NULL; uint32_t* num = NULL;
while (num = tSimpleHashIterate(pStbJoin->ctx.prev.tableTimes, num, &iter)) { while (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter)) {
ASSERT(*num > 1); ASSERT(*num > 1);
} }
} }
@ -519,8 +641,8 @@ static void buildStbJoinTableList(SOperatorInfo* pOperator) {
break; break;
} }
pInfo->execInfo.prevBlkNum++; pStbJoin->execInfo.prevBlkNum++;
pInfo->execInfo.prevBlkRows += pBlock->info.rows; pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
doBuildStbJoinTableHash(pOperator, pBlock); doBuildStbJoinTableHash(pOperator, pBlock);
} }
@ -530,7 +652,7 @@ static void buildStbJoinTableList(SOperatorInfo* pOperator) {
pStbJoin->ctx.prev.joinBuild = true; pStbJoin->ctx.prev.joinBuild = true;
} }
static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
@ -544,12 +666,13 @@ static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes)
continue; continue;
} }
seqJoinLaunchPostJoin(pOperator, ppRes); seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
pPrev->pListHead->readIdx++;
if (*ppRes) { if (*ppRes) {
return; return;
} }
handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin);
pPrev->pListHead->readIdx++;
} }
*ppRes = NULL; *ppRes = NULL;
@ -567,18 +690,18 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
if (!pStbJoin->ctx.prev.joinBuild) { if (!pStbJoin->ctx.prev.joinBuild) {
buildStbJoinTableList(pOperator); buildStbJoinTableList(pOperator);
if (pInfo->execInfo.prevBlkRows <= 0) { if (pStbJoin->execInfo.prevBlkRows <= 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
} }
seqJoinContinueRetrieve(pOperator, &pRes); seqJoinContinueCurrRetrieve(pOperator, &pRes);
if (pRes) { if (pRes) {
return pRes; return pRes;
} }
seqJoinLaunchRetrieve(pOperator, &pRes); seqJoinLaunchNewRetrieve(pOperator, &pRes);
return pRes; return pRes;
} }
@ -592,15 +715,19 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
if (NULL == pPrev->rightHash) { if (NULL == pPrev->rightHash) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} } else {
pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
pPrev->tableTimes = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); if (NULL == pPrev->leftCache) {
if (NULL == pPrev->tableTimes) { return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; }
} pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); if (NULL == pPrev->rightCache) {
if (NULL == pPrev->onceTable) { return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; }
pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (NULL == pPrev->onceTable) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -70,6 +70,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
return; return;
} }
SSourceDataInfo* pDataInfo = NULL;
while (1) { while (1) {
qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
@ -79,7 +81,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
} }
for (int32_t i = 0; i < totalSources; ++i) { for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
continue; continue;
} }
@ -99,12 +101,21 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
// todo // todo
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; if (NULL != pDataInfo->pSrcUidList) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
", totalRows:%" PRIu64 ", try next %d/%" PRIzu, code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, if (code != TSDB_CODE_SUCCESS) {
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp);
taosMemoryFreeClear(pDataInfo->pRsp); goto _error;
}
} else {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
taosMemoryFreeClear(pDataInfo->pRsp);
}
break; break;
} }
@ -134,7 +145,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY; pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -289,7 +300,8 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
for (int32_t i = 0; i < numOfSources; ++i) { for (int32_t i = 0; i < numOfSources; ++i) {
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i); SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
taosArrayPush(pInfo->pSources, pNode); taosArrayPush(pInfo->pSources, pNode);
tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &i, sizeof(i)); SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
} }
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
@ -749,21 +761,32 @@ _error:
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) { int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId)); SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
if (NULL == pIdx) { if (NULL == pIdx) {
qError("No exchange source for vgId: %d", pBasicParam->vgId); qError("No exchange source for vgId: %d", pBasicParam->vgId);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
SSourceDataInfo dataInfo = {0}; if (pIdx->inUseIdx < 0) {
dataInfo.status = EX_SOURCE_DATA_NOT_READY; SSourceDataInfo dataInfo = {0};
dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo); dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.index = *pIdx; dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo);
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); dataInfo.index = pIdx->srcIdx;
dataInfo.srcOpType = pBasicParam->srcOpType; dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
dataInfo.tableSeq = pBasicParam->tableSeq; dataInfo.srcOpType = pBasicParam->srcOpType;
dataInfo.tableSeq = pBasicParam->tableSeq;
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
} else {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
}
pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
pDataInfo->srcOpType = pBasicParam->srcOpType;
pDataInfo->tableSeq = pBasicParam->tableSeq;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -773,9 +796,9 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SExchangeOperatorBasicParam* pBasicParam = NULL; SExchangeOperatorBasicParam* pBasicParam = NULL;
SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value; SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
if (pParam->multiParams) { if (pParam->multiParams) {
SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorParam->value; SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
int32_t iter = 0; int32_t iter = 0;
while (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter)) { while (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter)) {
code = addSingleExchangeSource(pOperator, pBasicParam); code = addSingleExchangeSource(pOperator, pBasicParam);
@ -788,7 +811,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
code = addSingleExchangeSource(pOperator, pBasicParam); code = addSingleExchangeSource(pOperator, pBasicParam);
} }
pOperator->pOperatorParam = NULL; pOperator->pOperatorGetParam = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -797,7 +820,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorParam)) { if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -27,6 +27,17 @@
#include "ttypes.h" #include "ttypes.h"
#include "groupcache.h" #include "groupcache.h"
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
if (pFileInfo->fd.fd) {
taosCloseFile(&pFileInfo->fd.fd);
pFileInfo->fd.fd = NULL;
taosThreadMutexDestroy(&pFileInfo->fd.mutex);
}
pFileInfo->deleted = true;
}
static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) { static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) {
pCols->colNum = LIST_LENGTH(pList); pCols->colNum = LIST_LENGTH(pList);
pCols->withNull = grpColsMayBeNull; pCols->withNull = grpColsMayBeNull;
@ -96,7 +107,7 @@ static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) { static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, SGroupCacheFileFd** ppFd, bool* pDeleted) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pFileCtx->pCacheFile) { if (NULL == pFileCtx->pCacheFile) {
pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
@ -105,22 +116,29 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, uint32_t fileId,
} }
} }
SGroupCacheFileFd* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) { if (NULL == pTmp) {
sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId); sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", fileId);
SGroupCacheFileFd newVgFd = {0}; SGroupCacheFileInfo newFile = {0};
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd)); taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
}
code = initOpenCacheFile(pTmp, pFileCtx->baseFilename); if (pTmp->deleted) {
*pDeleted = true;
return TSDB_CODE_SUCCESS;
}
if (NULL == pTmp->fd.fd) {
code = initOpenCacheFile(&pTmp->fd, pFileCtx->baseFilename);
if (code) { if (code) {
return code; return code;
} }
} }
taosThreadMutexLock(&pTmp->mutex); taosThreadMutexLock(&pTmp->fd.mutex);
*ppFd = pTmp; *ppFd = &pTmp->fd;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -131,17 +149,53 @@ static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) { static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheFileFd *pFd; SGroupCacheFileFd *pFd = NULL;
SGcFileCacheCtx* pFileCtx = NULL; SGcFileCacheCtx* pFileCtx = NULL;
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
int64_t lastGroupId = 0;
SGroupCacheData* pGroup = NULL;
while (NULL != pHead) { while (NULL != pHead) {
pFileCtx = pGCache->batchFetch ? &pHead->pCtx->fileCtx : &pHead->pGroup->pVgCtx->fileCtx; if (pGCache->batchFetch) {
pFileCtx = &pHead->pCtx->fileCtx;
} else {
if (pHead->groupId != lastGroupId) {
if (NULL != pGroup) {
taosHashRelease(pGrpHash, pGroup);
}
pGroup = taosHashAcquire(pGrpHash, &pHead->groupId, sizeof(pHead->groupId));
lastGroupId = pHead->groupId;
}
code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd); if (NULL == pGroup) {
qTrace("group %" PRIu64 " in downstream %d may already be deleted, skip write", pHead->groupId, pHead->pCtx->id);
int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
continue;
}
pFileCtx = &pGroup->pVgCtx->fileCtx;
}
bool deleted = false;
code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd, &deleted);
if (code) { if (code) {
goto _return; goto _return;
} }
if (deleted) {
qTrace("FileId:%d-%d-%d already be deleted, skip write",
pCtx->id, pGroup->vgId, pHead->basic.fileId);
int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
continue;
}
int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
if (ret == -1) { if (ret == -1) {
releaseFdToFileCtx(pFd); releaseFdToFileCtx(pFd);
@ -158,8 +212,8 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
releaseFdToFileCtx(pFd); releaseFdToFileCtx(pFd);
qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64,
pHead->basic.fileId, pHead->basic.blkId, pHead->basic.bufSize, pHead->basic.offset); pCtx->id, pGroup->vgId, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset);
int64_t blkId = pHead->basic.blkId; int64_t blkId = pHead->basic.blkId;
pHead = pHead->next; pHead = pHead->next;
@ -167,15 +221,18 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
} }
_return: _return:
if (NULL != pGroup) {
taosHashRelease(pGrpHash, pGroup);
}
atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
return code; return code;
} }
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) { if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -195,7 +252,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
} }
pCache->pDirtyTail = pBufInfo; pCache->pDirtyTail = pBufInfo;
if (pGCache->maxCacheSize > 0 && pCache->blkCacheSize > pGCache->maxCacheSize) { if (pGCache->maxCacheSize >= 0 && pCache->blkCacheSize > pGCache->maxCacheSize) {
if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) { if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) {
pWriteHead = pCache->pDirtyHead; pWriteHead = pCache->pDirtyHead;
SGcBlkBufInfo* pTmp = pCache->pDirtyHead; SGcBlkBufInfo* pTmp = pCache->pDirtyHead;
@ -220,11 +277,20 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
} }
static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx) { static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId, bool removeCheck) {
if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
return; return;
} }
if (removeCheck) {
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
if (0 == pFileInfo->groupNum) {
removeGroupCacheFile(pFileInfo);
qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId);
//taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
}
}
pFileCtx->fileId++; pFileCtx->fileId++;
pFileCtx->fileSize = 0; pFileCtx->fileSize = 0;
} }
@ -248,13 +314,13 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB
pBufInfo->basic.bufSize = bufSize; pBufInfo->basic.bufSize = bufSize;
pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize); pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize);
pBufInfo->pCtx = pCtx; pBufInfo->pCtx = pCtx;
pBufInfo->pGroup = pGroup; pBufInfo->groupId = pBlock->info.id.groupId;
if (pGCache->batchFetch) { if (pGCache->batchFetch) {
groupCacheSwitchNewFile(pFileCtx); groupCacheSwitchNewFile(pFileCtx, pCtx->id, pGroup->vgId, false);
} }
int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo); int32_t code = addBlkToDirtyBufList(pGCache, pCtx, &pGCache->blkCache, pBufInfo);
return code; return code;
} }
@ -343,10 +409,16 @@ static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int3
static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) { static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) {
SGroupCacheFileFd *pFileFd = NULL; SGroupCacheFileFd *pFileFd = NULL;
SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx; SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx;
int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd); bool deleted = false;
int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd, &deleted);
if (code) { if (code) {
return code; return code;
} }
if (deleted) {
qError("FileId:%d-%d-%d already be deleted, skip read", pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
int32_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET); int32_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET);
if (ret == -1) { if (ret == -1) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
@ -366,8 +438,8 @@ static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheDa
goto _return; goto _return;
} }
qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, qTrace("FileId:%d-%d-%d blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64,
pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset); pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset);
_return: _return:
@ -406,7 +478,7 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
pVgCtx->pTbList = pTbList; pVgCtx->pTbList = pTbList;
pVgCtx->id = vgId;
snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d",
tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId); tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId);
pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0; pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0;
@ -491,7 +563,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
} }
if (pBlock) { if (pBlock) {
qError("%s group cache retrieved block with groupId: %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); qError("%s blk retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
@ -530,6 +602,31 @@ static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) {
taosThreadMutexUnlock(&pGroup->mutex); taosThreadMutexUnlock(&pGroup->mutex);
} }
static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) {
if (NULL == pFileCtx->pCacheFile) {
pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pFileCtx->pCacheFile) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) {
sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId);
SGroupCacheFileInfo newFile = {0};
newFile.groupNum = 1;
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
pTmp = &newFile;
} else {
pTmp->groupNum++;
}
qTrace("FileId:%d-%d-%d add groupNum to %u", downstreamId, vgId, fileId, pTmp->groupNum);
return TSDB_CODE_SUCCESS;
}
static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) { if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -547,11 +644,17 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat
handleGroupFetchDone(pNew->pGroup); handleGroupFetchDone(pNew->pGroup);
} }
groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx); groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true);
pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId; pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId;
pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize; pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize;
qTrace("FileId:%d-%d-%d add groupNum for group %" PRIu64, pGroup->downstreamIdx, pGroup->vgId, pGroup->pVgCtx->fileCtx.fileId, uid);
if (pGroup->needCache) {
return addFileRefTableNum(&pGroup->pVgCtx->fileCtx, pGroup->pVgCtx->fileCtx.fileId, pGroup->downstreamIdx, pGroup->vgId);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -576,10 +679,12 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache); initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache);
qError("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);
while (true) { while (true) {
if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) { if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
if (terrno == TSDB_CODE_DUP_KEY) { if (terrno == TSDB_CODE_DUP_KEY) {
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
if (*ppGrp) { if (*ppGrp) {
break; break;
} }
@ -588,7 +693,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
} }
} }
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
if (*ppGrp && pParam->pChildren) { if (*ppGrp && pParam->pChildren) {
SGcNewGroupInfo newGroup; SGcNewGroupInfo newGroup;
newGroup.pGroup = *ppGrp; newGroup.pGroup = *ppGrp;
@ -641,7 +746,7 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
return code; return code;
} }
} else { } else {
qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.groupId); qError("group %" PRIu64 " not found in group hash", pBlock->info.id.groupId);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
} }
@ -919,7 +1024,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
if (NULL == pGroup) { if (NULL == pGroup) {
code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid); code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -980,6 +1085,32 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void freeRemoveGroupCacheData(void* p) {
SGroupCacheData* pGroup = p;
if (pGroup->vgId >= 0) {
SGcFileCacheCtx* pFileCtx = &pGroup->pVgCtx->fileCtx;
if (pGroup->fileId >= 0) {
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
uint32_t remainNum = atomic_sub_fetch_32(&pFileInfo->groupNum, 1);
qTrace("FileId:%d-%d-%d sub group num to %u", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId, remainNum);
if (0 == remainNum && pGroup->fileId != pFileCtx->fileId) {
removeGroupCacheFile(pFileInfo);
qTrace("FileId:%d-%d-%d removed", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId);
//taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
}
}
}
taosArrayDestroy(pGroup->waitQueue);
taosArrayDestroy(pGroup->blkList.pList);
qTrace("group removed");
}
static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pInfo = pOperator->info; SGroupCacheOperatorInfo* pInfo = pOperator->info;
pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams)); pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams));
@ -1012,6 +1143,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
if (pCtx->pGrpHash == NULL) { if (pCtx->pGrpHash == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosHashSetFreeFp(pCtx->pGrpHash, freeRemoveGroupCacheData);
} }
pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
@ -1038,7 +1170,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { static SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam); int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam);
@ -1050,6 +1182,20 @@ SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam*
return pBlock; return pBlock;
} }
static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam* pParam) {
SGcNotifyOperatorParam* pGcParam = pParam->value;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGcParam->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
qTrace("try to remove group %" PRIu64, pGcParam->tbUid);
if (taosHashRemove(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid))) {
qError("failed to remove group %" PRIu64 " in vgId %d downstreamIdx %d", pGcParam->tbUid, pGcParam->vgId, pGcParam->downstreamIdx);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
@ -1095,6 +1241,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
taosHashSetFreeFp(pInfo->pGrpHash, freeRemoveGroupCacheData);
} }
code = appendDownstream(pOperator, pDownstream, numOfDownstream); code = appendDownstream(pOperator, pDownstream, numOfDownstream);
@ -1112,7 +1259,9 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error; goto _error;
} }
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, groupCacheTableCacheEnd);
qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch);
return pOperator; return pOperator;

View File

@ -648,8 +648,8 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
static void setMergeJoinDone(SOperatorInfo* pOperator) { static void setMergeJoinDone(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
pOperator->pDownstreamParams[0] = NULL; pOperator->pDownstreamGetParams[0] = NULL;
pOperator->pDownstreamParams[1] = NULL; pOperator->pDownstreamGetParams[1] = NULL;
} }
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
@ -667,7 +667,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
} }
if (pJoinInfo->pLeft == NULL) { if (pJoinInfo->pLeft == NULL) {
if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) { if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initParam) {
leftEmpty = true; leftEmpty = true;
} else { } else {
setMergeJoinDone(pOperator); setMergeJoinDone(pOperator);
@ -773,7 +773,7 @@ void resetMergeJoinOperator(struct SOperatorInfo* pOperator) {
SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info; SMJoinOperatorInfo* pJoinInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
if (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1]) { if (NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); qError("total merge join res rows:%" PRId64, pJoinInfo->resRows);
return NULL; return NULL;
} else { } else {

View File

@ -654,59 +654,74 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
} }
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) {
if (NULL == pParam) { SOperatorParam** ppParam = NULL;
pOperator->pOperatorParam = NULL; SOperatorParam*** pppDownstramParam = NULL;
taosMemoryFreeClear(pOperator->pDownstreamParams); switch (type) {
case OP_GET_PARAM:
ppParam = &pOperator->pOperatorGetParam;
pppDownstramParam = &pOperator->pDownstreamGetParams;
break;
case OP_NOTIFY_PARAM:
ppParam = &pOperator->pOperatorNotifyParam;
pppDownstramParam = &pOperator->pDownstreamNotifyParams;
break;
default:
return TSDB_CODE_INVALID_PARA;
}
if (NULL == pInput) {
*ppParam = NULL;
taosMemoryFreeClear(*pppDownstramParam);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pOperator->pOperatorParam = (pParam->opType == pOperator->operatorType) ? pParam : NULL; *ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL;
if (NULL == pOperator->pDownstreamParams) { if (NULL == *pppDownstramParam) {
pOperator->pDownstreamParams = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES); *pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
if (NULL == pOperator->pDownstreamParams) { if (NULL == *pppDownstramParam) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
if (NULL == pOperator->pOperatorParam) { if (NULL == *ppParam) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pOperator->pDownstreamParams[i] = pParam; (*pppDownstramParam)[i] = pInput;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
memset(pOperator->pDownstreamParams, 0, pOperator->numOfDownstream * POINTER_BYTES); memset(*pppDownstramParam, 0, pOperator->numOfDownstream * POINTER_BYTES);
int32_t childrenNum = taosArrayGetSize(pOperator->pOperatorParam->pChildren); int32_t childrenNum = taosArrayGetSize((*ppParam)->pChildren);
if (childrenNum <= 0) { if (childrenNum <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
for (int32_t i = 0; i < childrenNum; ++i) { for (int32_t i = 0; i < childrenNum; ++i) {
SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i); SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i);
if (pOperator->pDownstreamParams[pChild->downstreamIdx]) { if ((*pppDownstramParam)[pChild->downstreamIdx]) {
int32_t code = mergeOperatorParams(pOperator->pDownstreamParams[pChild->downstreamIdx], pChild); int32_t code = mergeOperatorParams((*pppDownstramParam)[pChild->downstreamIdx], pChild);
if (code) { if (code) {
return code; return code;
} }
} else { } else {
pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild; (*pppDownstramParam)[pChild->downstreamIdx] = pChild;
} }
} }
taosArrayClear(pOperator->pOperatorParam->pChildren); taosArrayClear((*ppParam)->pChildren);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam) { SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam) {
if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) { if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name); qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]); SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamGetParams[idx]);
if (clearParam) { if (clearParam) {
pOperator->pDownstreamParams[idx] = NULL; pOperator->pDownstreamGetParams[idx] = NULL;
} }
return pBlock; return pBlock;
} }
@ -725,7 +740,7 @@ SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
int32_t code = setOperatorParams(pOperator, pParam); int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code; pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
@ -733,6 +748,29 @@ SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorP
return pOperator->fpSet.getNextFn(pOperator); return pOperator->fpSet.getNextFn(pOperator);
} }
int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
int32_t code = setOperatorParams(pOperator, pParam, OP_NOTIFY_PARAM);
if (TSDB_CODE_SUCCESS == code && pOperator->fpSet.notifyFn && pOperator->pOperatorNotifyParam) {
code = pOperator->fpSet.notifyFn(pOperator, pOperator->pOperatorNotifyParam);
}
if (TSDB_CODE_SUCCESS == code) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
if (pOperator->pDownstreamNotifyParams[i]) {
code = optrDefaultNotifyFn(pOperator->pDownstream[i], pOperator->pDownstreamNotifyParams[i]);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
}
}
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return code;
}
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) { int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
if (pOperator->transparent) { if (pOperator->transparent) {
return getOperatorResultBlockId(pOperator->pDownstream[idx], 0); return getOperatorResultBlockId(pOperator->pDownstream[idx], 0);

View File

@ -787,7 +787,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = 0; int32_t code = 0;
STableListInfo* pListInfo = pInfo->base.pTableListInfo; STableListInfo* pListInfo = pInfo->base.pTableListInfo;
STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam->value; STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorGetParam->value;
int32_t num = taosArrayGetSize(pParam->pUidList); int32_t num = taosArrayGetSize(pParam->pUidList);
if (num <= 0) { if (num <= 0) {
qError("empty table scan uid list"); qError("empty table scan uid list");
@ -916,10 +916,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
if (pOperator->pOperatorParam) { if (pOperator->pOperatorGetParam) {
pOperator->dynamicTask = true; pOperator->dynamicTask = true;
int32_t code = createTableListInfoFromParam(pOperator); int32_t code = createTableListInfoFromParam(pOperator);
pOperator->pOperatorParam = NULL; pOperator->pOperatorGetParam = NULL;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);

View File

@ -3246,7 +3246,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
pGrpCache->node.dynamicOp = true; pGrpCache->node.dynamicOp = true;
pGrpCache->grpColsMayBeNull = false; pGrpCache->grpColsMayBeNull = false;
pGrpCache->grpByUid = true; pGrpCache->grpByUid = true;
pGrpCache->batchFetch = true; pGrpCache->batchFetch = false;
pGrpCache->node.pChildren = pChildren; pGrpCache->node.pChildren = pChildren;
pGrpCache->node.pTargets = nodesMakeList(); pGrpCache->node.pTargets = nodesMakeList();
if (NULL == pGrpCache->node.pTargets) { if (NULL == pGrpCache->node.pTargets) {
@ -3352,7 +3352,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p
} }
pDynCtrl->qType = DYN_QTYPE_STB_HASH; pDynCtrl->qType = DYN_QTYPE_STB_HASH;
pDynCtrl->stbJoin.batchFetch = true; pDynCtrl->stbJoin.batchFetch = false;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pDynCtrl->node.pChildren = nodesMakeList(); pDynCtrl->node.pChildren = nodesMakeList();

View File

@ -488,6 +488,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
} }
int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) { int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) {
#if 0
if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryExecDone, true, false)) { if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryExecDone, true, false)) {
QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d", ctx->queryExecDone); QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d", ctx->queryExecDone);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
@ -496,6 +497,10 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg
QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd); QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
#else
ctx->queryExecDone = false;
ctx->queryEnd = false;
#endif
dsReset(ctx->sinkHandle); dsReset(ctx->sinkHandle);
@ -503,11 +508,14 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1); if (QW_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1);
QW_TASK_DLOG("the %dth dynamic task exec started", ctx->dynExecId++); QW_TASK_DLOG("the %dth dynamic task exec started, continue running", ctx->dynExecId++);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
QW_TASK_DLOG("the %dth dynamic task exec started", ctx->dynExecId++);
QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -959,7 +967,7 @@ _return:
} }
if (!rsped) { if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
qwMsg->connInfo.handle, code, tstrerror(code), dataLen); qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else { } else {
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);