Revert "fix(stream):fill result immediately after delete data"
This commit is contained in:
parent
156e3ce109
commit
c2ad0a6ec5
|
@ -172,17 +172,10 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
|
||||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
SWinKey key = {.ts = ts, .groupId = groupId};
|
||||||
void* curVal = NULL;
|
void* curVal = NULL;
|
||||||
int32_t curVLen = 0;
|
int32_t curVLen = 0;
|
||||||
bool hasCurKey = true;
|
|
||||||
int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen);
|
int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||||
pFillSup->cur.key = key.ts;
|
pFillSup->cur.key = key.ts;
|
||||||
pFillSup->cur.pRowVal = curVal;
|
pFillSup->cur.pRowVal = curVal;
|
||||||
} else {
|
|
||||||
qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
|
|
||||||
pFillSup->cur.key = ts;
|
|
||||||
pFillSup->cur.pRowVal = NULL;
|
|
||||||
hasCurKey = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
|
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
|
||||||
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
|
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
|
||||||
|
@ -194,10 +187,8 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
|
||||||
pFillSup->prev.key = preKey.ts;
|
pFillSup->prev.key = preKey.ts;
|
||||||
pFillSup->prev.pRowVal = preVal;
|
pFillSup->prev.pRowVal = preVal;
|
||||||
|
|
||||||
if (hasCurKey) {
|
code = pAPI->stateStore.streamStateCurNext(pState, pCur);
|
||||||
code = pAPI->stateStore.streamStateCurNext(pState, pCur);
|
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = pAPI->stateStore.streamStateCurNext(pState, pCur);
|
code = pAPI->stateStore.streamStateCurNext(pState, pCur);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -750,8 +741,8 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE
|
||||||
getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
|
getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
|
||||||
setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
|
setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
|
||||||
SWinKey key = {.ts = startTs, .groupId = groupId};
|
SWinKey key = {.ts = startTs, .groupId = groupId};
|
||||||
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
|
|
||||||
if (!pInfo->pFillInfo->needFill) {
|
if (!pInfo->pFillInfo->needFill) {
|
||||||
|
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
|
||||||
buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
|
buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
|
||||||
} else {
|
} else {
|
||||||
STimeRange tw = {
|
STimeRange tw = {
|
||||||
|
@ -760,27 +751,11 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE
|
||||||
.groupId = groupId,
|
.groupId = groupId,
|
||||||
};
|
};
|
||||||
taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
|
taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
|
||||||
}
|
while (key.ts <= endTs) {
|
||||||
}
|
key.ts = taosTimeAdd(key.ts, pInfo->pFillSup->interval.sliding, pInfo->pFillSup->interval.slidingUnit,
|
||||||
|
pInfo->pFillSup->interval.precision);
|
||||||
static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) {
|
tSimpleHashPut(pInfo->pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
|
||||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
}
|
||||||
void* val = NULL;
|
|
||||||
int32_t len = 0;
|
|
||||||
int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
|
|
||||||
groupId);
|
|
||||||
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
|
|
||||||
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
|
|
||||||
pAPI->stateStore.streamStateFreeCur(pCur);
|
|
||||||
qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
resetFillWindow(pWinData);
|
|
||||||
pWinData->key = key.ts;
|
|
||||||
pWinData->pRowVal = val;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -790,22 +765,20 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
|
||||||
SStreamFillOperatorInfo* pInfo = pOperator->info;
|
SStreamFillOperatorInfo* pInfo = pOperator->info;
|
||||||
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
|
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
|
||||||
int32_t size = taosArrayGetSize(pFillInfo->delRanges);
|
int32_t size = taosArrayGetSize(pFillInfo->delRanges);
|
||||||
while (pFillInfo->delIndex < size) {
|
tSimpleHashClear(pInfo->pFillSup->pResMap);
|
||||||
|
for (; pFillInfo->delIndex < size; pFillInfo->delIndex++) {
|
||||||
STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
|
STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
|
||||||
if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
|
if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
|
getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
|
||||||
TSKEY realEnd = range->ekey + 1;
|
|
||||||
if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != realEnd) {
|
|
||||||
getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, realEnd, range->groupId, &pInfo->pFillSup->next);
|
|
||||||
}
|
|
||||||
setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
|
setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
|
||||||
pFillInfo->delIndex++;
|
|
||||||
if (pInfo->pFillInfo->needFill) {
|
if (pInfo->pFillInfo->needFill) {
|
||||||
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
|
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
|
||||||
pInfo->pRes->info.id.groupId = range->groupId;
|
pInfo->pRes->info.id.groupId = range->groupId;
|
||||||
}
|
}
|
||||||
|
SWinKey key = {.ts = range->skey, .groupId = range->groupId};
|
||||||
|
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2632,7 +2632,7 @@ int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOp
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4.dataVersion
|
// 4.dataVersion
|
||||||
tlen += taosEncodeFixedI64(buf, pInfo->dataVersion);
|
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
|
||||||
|
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
|
@ -3086,17 +3086,15 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
// for stream
|
||||||
// for stream
|
void* buff = NULL;
|
||||||
void* buff = NULL;
|
int32_t len = 0;
|
||||||
int32_t len = 0;
|
int32_t res =
|
||||||
int32_t res =
|
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
||||||
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
||||||
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
taosMemoryFree(buff);
|
||||||
taosMemoryFree(buff);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
@ -3281,16 +3279,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex);
|
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex);
|
||||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* buff = NULL;
|
|
||||||
int32_t len = 0;
|
|
||||||
int32_t res =
|
|
||||||
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
|
||||||
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
|
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
|
||||||
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
|
||||||
taosMemoryFree(buff);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!IS_FINAL_SESSION_OP(pOperator) || numOfChild == 0) {
|
if (!IS_FINAL_SESSION_OP(pOperator) || numOfChild == 0) {
|
||||||
|
@ -3633,7 +3621,7 @@ int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4.dataVersion
|
// 4.dataVersion
|
||||||
tlen += taosEncodeFixedI64(buf, pInfo->dataVersion);
|
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
|
||||||
|
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
|
|
|
@ -3422,7 +3422,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
|
||||||
size_t kLen = 0;
|
size_t kLen = 0;
|
||||||
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
|
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
|
||||||
winKeyDecode((void*)&curKey, keyStr);
|
winKeyDecode((void*)&curKey, keyStr);
|
||||||
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
|
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
rocksdb_iter_next(pCur->iter);
|
rocksdb_iter_next(pCur->iter);
|
||||||
|
@ -3459,7 +3459,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
|
||||||
size_t kLen = 0;
|
size_t kLen = 0;
|
||||||
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
|
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
|
||||||
winKeyDecode((void*)&curKey, keyStr);
|
winKeyDecode((void*)&curKey, keyStr);
|
||||||
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
|
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
rocksdb_iter_prev(pCur->iter);
|
rocksdb_iter_prev(pCur->iter);
|
||||||
|
|
Loading…
Reference in New Issue