fix issue

This commit is contained in:
54liuyao 2024-09-24 16:58:46 +08:00
parent a0e7791e49
commit db50c9230a
6 changed files with 25 additions and 28 deletions

View File

@ -145,6 +145,7 @@ void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey)
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen); int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen);
void streamFileStateGroupCurNext(SStreamStateCur* pCur); void streamFileStateGroupCurNext(SStreamStateCur* pCur);
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen); int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen);
SSHashObj* getGroupIdCache(SStreamFileState* pFileState);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -3632,7 +3632,7 @@ FETCH_NEXT_BLOCK:
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
code = copyGetResultBlock(pInfo->pUpdateRes, pBlock); code = copyGetResultBlock(pInfo->pUpdateRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
pInfo->pUpdateInfo->maxDataVersion = pBlock->info.version; pInfo->pUpdateInfo->maxDataVersion = -1;
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
} break; } break;

View File

@ -942,14 +942,6 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi
} }
static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
TSKEY prevWKey = INT64_MIN;
TSKEY nextWKey = INT64_MIN;
if (hasPrevWindow(pFillSup)) {
prevWKey = pFillSup->prev.key;
}
if (hasNextWindow(pFillSup)) {
nextWKey = pFillSup->next.key;
}
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
@ -960,39 +952,32 @@ static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamF
case TSDB_FILL_NULL_F: case TSDB_FILL_NULL_F:
case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: { case TSDB_FILL_SET_VALUE_F: {
if (ts != pFillSup->cur.key) { if (ts == pFillSup->cur.key) {
pFillInfo->pos = FILL_POS_START;
pFillInfo->needFill = false;
} else {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
} else { copyNonFillValueInfo(pFillSup, pFillInfo);
pFillInfo->needFill = false;
pFillInfo->pos = FILL_POS_START;
goto _end;
} }
copyNonFillValueInfo(pFillSup, pFillInfo);
} break; } break;
case TSDB_FILL_PREV: { case TSDB_FILL_PREV: {
if (ts != pFillSup->cur.key) { if (ts == pFillSup->cur.key) {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_START;
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); pFillInfo->needFill = false;
} else if (hasPrevWindow(pFillSup)) { } else if (hasPrevWindow(pFillSup)) {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
pFillInfo->pResRow = &pFillSup->prev;
} else { } else {
pFillInfo->needFill = false; pFillInfo->needFill = false;
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_INVALID;
goto _end;
} }
pFillInfo->pResRow = &pFillSup->prev;
} break; } break;
default: default:
qError("%s failed at line %d since invalid fill type", __func__, __LINE__); qError("%s failed at line %d since invalid fill type", __func__, __LINE__);
break; break;
} }
_end:
if (ts != pFillSup->cur.key) {
pFillInfo->pos = FILL_POS_INVALID;
}
} }
static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {

View File

@ -4306,7 +4306,6 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr
char buf[128] = {0}; char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf); int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) {
streamStateFreeCur(pCur);
return ; return ;
} }
// skip ttl expired data // skip ttl expired data

View File

@ -561,6 +561,12 @@ SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) {
SStreamStateCur* pCur = createStateCursor(pState->pFileState); SStreamStateCur* pCur = createStateCursor(pState->pFileState);
pCur->hashIter = 0; pCur->hashIter = 0;
pCur->pHashData = NULL; pCur->pHashData = NULL;
SSHashObj* pMap = getGroupIdCache(pState->pFileState);
pCur->pHashData = tSimpleHashIterate(pMap, pCur->pHashData, &pCur->hashIter);
if (pCur->pHashData == NULL) {
pCur->hashIter = -1;
streamStateParTagSeekKeyNext_rocksdb(pState, INT64_MIN, pCur);
}
return pCur; return pCur;
} }
@ -569,7 +575,7 @@ void streamStateGroupCurNext(SStreamStateCur* pCur) {
} }
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) { int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
if (pVal == NULL) { if (pVal != NULL) {
return -1; return -1;
} }
return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen); return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);

View File

@ -1144,6 +1144,7 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState; SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
if (pCur->hashIter == -1) { if (pCur->hashIter == -1) {
streamStateCurNext(pFileState->pFileStore, pCur); streamStateCurNext(pFileState->pFileStore, pCur);
return;
} }
SSHashObj* pHash = pFileState->pGroupIdMap; SSHashObj* pHash = pFileState->pGroupIdMap;
@ -1151,6 +1152,7 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
if (!pCur->pHashData) { if (!pCur->pHashData) {
pCur->hashIter = -1; pCur->hashIter = -1;
streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur); streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
return;
} }
int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL); int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
pCur->minGpId = TMIN(pCur->minGpId, gpId); pCur->minGpId = TMIN(pCur->minGpId, gpId);
@ -1164,3 +1166,7 @@ int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, voi
} }
return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL); return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
} }
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
return pFileState->pGroupIdMap;
}