Merge pull request #18803 from taosdata/ly_stream
fix:stream update data
This commit is contained in:
commit
6bc2be0806
|
@ -863,19 +863,20 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
|
||||||
|
|
||||||
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
||||||
SArray* res = (SArray*)data;
|
SArray* res = (SArray*)data;
|
||||||
SWinKey* pos = taosArrayGet(res, index);
|
SWinKey* pDataPos = taosArrayGet(res, index);
|
||||||
SResKeyPos* pData = (SResKeyPos*)pKey;
|
SResKeyPos* pRKey = (SResKeyPos*)pKey;
|
||||||
if (*(int64_t*)pData->key == pos->ts) {
|
if (pRKey->groupId > pDataPos->groupId) {
|
||||||
if (pData->groupId > pos->groupId) {
|
|
||||||
return 1;
|
return 1;
|
||||||
} else if (pData->groupId < pos->groupId) {
|
} else if (pRKey->groupId < pDataPos->groupId) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*(int64_t*)pRKey->key > pDataPos->ts) {
|
||||||
|
return 1;
|
||||||
|
} else if (*(int64_t*)pRKey->key < pDataPos->ts){
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
} else if (*(int64_t*)pData->key > pos->ts) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
||||||
|
@ -1400,19 +1401,21 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
||||||
|
|
||||||
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
||||||
SArray* res = (SArray*)data;
|
SArray* res = (SArray*)data;
|
||||||
SWinKey* pos = taosArrayGet(res, index);
|
SWinKey* pDataPos = taosArrayGet(res, index);
|
||||||
SWinKey* pData = (SWinKey*)pKey;
|
SWinKey* pWKey = (SWinKey*)pKey;
|
||||||
if (pData->ts == pos->ts) {
|
|
||||||
if (pData->groupId > pos->groupId) {
|
if (pWKey->groupId > pDataPos->groupId) {
|
||||||
return 1;
|
return 1;
|
||||||
} else if (pData->groupId < pos->groupId) {
|
} else if (pWKey->groupId < pDataPos->groupId) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWKey->ts > pDataPos->ts) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWKey->ts < pDataPos->ts) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
} else if (pData->ts > pos->ts) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
||||||
|
|
Loading…
Reference in New Issue