fix: batch fetch issue

This commit is contained in:
dapan1121 2023-07-19 19:14:30 +08:00
parent d125bb0952
commit a41888a9ef
4 changed files with 136 additions and 66 deletions

View File

@ -41,6 +41,11 @@ typedef struct SGcVgroupCtx {
uint32_t fileId;
} SGcVgroupCtx;
typedef struct SGroupSeqBlkList {
int64_t startBlkId;
int64_t endBlkId;
} SGroupSeqBlkList;
typedef struct SGroupCacheData {
TdThreadMutex mutex;
SArray* waitQueue;
@ -49,9 +54,11 @@ typedef struct SGroupCacheData {
SGcVgroupCtx* pVgCtx;
int32_t downstreamIdx;
int32_t vgId;
union {
SGroupSeqBlkList blkList;
SArray* pBlkList;
};
uint32_t fileId;
int64_t startBlkId;
int64_t endBlkId;
int64_t startOffset;
} SGroupCacheData;

View File

@ -924,18 +924,30 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa
return pResBlock;
}
static void doSetQualifiedUid(SArray* pUidList, const SArray* pUidTagList, bool* pResultList) {
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) {
taosArrayClear(pUidList);
STableKeyInfo info = {.uid = 0, .groupId = 0};
int32_t numOfTables = taosArrayGetSize(pUidTagList);
for (int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
if (pResultList[i]) {
taosArrayPush(pUidList, &uid);
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
info.uid = uid;
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
taosArrayDestroy(pUidList);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (addUid) {
taosArrayPush(pUidList, &uid);
}
}
}
return TSDB_CODE_SUCCESS;
}
static void copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
@ -952,7 +964,7 @@ static void copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
}
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
SIdxFltStatus status, SStorageAPI* pAPI) {
SIdxFltStatus status, SStorageAPI* pAPI, bool addUid) {
if (pTagCond == NULL) {
return TSDB_CODE_SUCCESS;
}
@ -1041,7 +1053,11 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
goto end;
}
doSetQualifiedUid(pUidList, pUidTagList, (bool*)output.columnData->pData);
code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto end;
}
end:
taosHashCleanup(ctx.colHash);
@ -1059,6 +1075,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numOfTables = 0;
bool listAdded = false;
pListInfo->idInfo.suid = pScanNode->suid;
pListInfo->idInfo.tableType = pScanNode->tableType;
@ -1071,10 +1088,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
taosArrayPush(pUidList, &pScanNode->uid);
}
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI);
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
listAdded = true;
} else {
T_MD5_CTX context = {0};
@ -1115,10 +1133,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
}
}
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI);
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, tsTagFilterCache);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
listAdded = true;
// let's add the filter results into meta-cache
numOfTables = taosArrayGetSize(pUidList);
@ -1139,17 +1158,19 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
}
_end:
numOfTables = taosArrayGetSize(pUidList);
for (int i = 0; i < numOfTables; i++) {
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(pUidList, i), .groupId = 0};
if (!listAdded) {
numOfTables = taosArrayGetSize(pUidList);
for (int i = 0; i < numOfTables; i++) {
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(pUidList, i), .groupId = 0};
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
taosArrayDestroy(pUidList);
return TSDB_CODE_OUT_OF_MEMORY;
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
taosArrayDestroy(pUidList);
return TSDB_CODE_OUT_OF_MEMORY;
}
qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
}
qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
}
taosArrayDestroy(pUidList);

View File

@ -363,13 +363,18 @@ static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData*
}
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId) {
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch) {
taosThreadMutexInit(&pGroup->mutex, NULL);
pGroup->downstreamIdx = downstreamIdx;
pGroup->vgId = vgId;
pGroup->fileId = -1;
pGroup->startBlkId = -1;
pGroup->endBlkId = -1;
if (batchFetch) {
pGroup->pBlkList = taosArrayInit(10, POINTER_BYTES);
} else {
pGroup->blkList.startBlkId = -1;
pGroup->blkList.endBlkId = -1;
}
pGroup->startOffset = -1;
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
}
@ -379,7 +384,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData grpData = {0};
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId);
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch);
while (true) {
if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
@ -415,6 +420,24 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
return TSDB_CODE_SUCCESS;
}
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) {
if (batchFetch) {
taosArrayPush(pGroup->pBlkList, &pNewBlk->blkId);
qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->pBlkList));
return TSDB_CODE_SUCCESS;
}
if (pGroup->blkList.endBlkId > 0) {
pGroup->blkList.endBlkId = pNewBlk->blkId;
} else {
pGroup->blkList.startBlkId = pNewBlk->blkId;
pGroup->blkList.endBlkId = pNewBlk->blkId;
}
qError("block added to group cache, total block num:%" PRId64, pGroup->blkList.endBlkId - pGroup->blkList.startBlkId + 1);
return TSDB_CODE_SUCCESS;
}
static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
int32_t code = TSDB_CODE_SUCCESS;
@ -446,12 +469,10 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
if (code) {
return code;
}
if (pGroup->endBlkId > 0) {
pGroup->endBlkId = newBlkBuf.blkId;
} else {
pGroup->startBlkId = newBlkBuf.blkId;
pGroup->endBlkId = newBlkBuf.blkId;
code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf);
if (code) {
return code;
}
notifyWaitingSessions(pGroup->waitQueue);
@ -541,6 +562,50 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator
return code;
}
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes, bool* got) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
*got = true;
if (pGCache->batchFetch) {
if (pSession->lastBlkId < 0) {
if (taosArrayGetSize(pSession->pGroupData->pBlkList) > 0) {
int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, 0);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes);
pSession->lastBlkId = *pIdx;
return code;
}
} else if (pSession->lastBlkId < taosArrayGetSize(pSession->pGroupData->pBlkList)) {
int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, pSession->lastBlkId + 1);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
return code;
}
}
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->blkList.startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
return code;
}
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->blkList.endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
return code;
}
if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL;
return code;
}
*got = false;
return code;
}
static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = pSession->pGroupData;
@ -573,44 +638,21 @@ static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstr
taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
}
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
} else if (pGroup->fetchDone) {
*ppRes = NULL;
}
return code;
bool got = false;
return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
}
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
bool locked = false;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
while (true) {
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
goto _return;
}
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
goto _return;
} else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL;
bool got = false;
code = getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
if (TSDB_CODE_SUCCESS != code || got) {
goto _return;
}
@ -723,7 +765,7 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
}
}
code = getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes);
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
if (NULL == ppRes) {
taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
}

View File

@ -447,6 +447,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
while (endPos == dataBlock->info.rows) {
SOperatorInfo* ds = pOperator->pDownstream[whichChild];
dataBlock = getNextBlockFromDownstream(pOperator, whichChild);
qError("merge join %s got block for same ts, rows:%" PRId64, whichChild == 0 ? "left" : "right", dataBlock ? dataBlock->info.rows : 0);
if (whichChild == 0) {
pJoinInfo->leftPos = 0;
pJoinInfo->pLeft = dataBlock;
@ -656,16 +657,15 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0);
pJoinInfo->leftPos = 0;
qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0);
if (pJoinInfo->pLeft == NULL) {
qError("merge join left got empty block");
if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) {
leftEmpty = true;
} else {
setMergeJoinDone(pOperator);
return false;
}
} else {
qError("merge join left got block");
}
}
@ -673,12 +673,12 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1);
pJoinInfo->rightPos = 0;
qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0);
if (pJoinInfo->pRight == NULL) {
qError("merge join right got empty block");
setMergeJoinDone(pOperator);
return false;
} else {
qError("merge join right got block");
if (leftEmpty) {
setMergeJoinDone(pOperator);
return false;