Merge branch 'enh/rocksRevert' of https://github.com/taosdata/TDengine into enh/rocksRevert
This commit is contained in:
commit
d0246a80cf
|
@ -51,18 +51,18 @@ static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
|
|||
SWinKey* pWin1 = (SWinKey*)pKey1;
|
||||
SWinKey* pWin2 = (SWinKey*)pKey2;
|
||||
|
||||
if (pWin1->ts > pWin2->ts) {
|
||||
return 1;
|
||||
} else if (pWin1->ts < pWin2->ts) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pWin1->groupId > pWin2->groupId) {
|
||||
return 1;
|
||||
} else if (pWin1->groupId < pWin2->groupId) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pWin1->ts > pWin2->ts) {
|
||||
return 1;
|
||||
} else if (pWin1->ts < pWin2->ts) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -1454,42 +1454,8 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
|
|||
return w;
|
||||
}
|
||||
|
||||
static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, TSKEY mark, SInterval* pInterval,
|
||||
SWinKey* key) {
|
||||
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
||||
SWinKey next = {0};
|
||||
while (tw.ekey < mark) {
|
||||
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, key);
|
||||
int32_t code = streamStateGetKVByCur(pCur, &next, NULL, 0);
|
||||
streamStateFreeCur(pCur);
|
||||
|
||||
void* chIds = taosHashGet(pPullDataMap, key, sizeof(SWinKey));
|
||||
if (chIds && pPullDataMap) {
|
||||
SArray* chAy = *(SArray**)chIds;
|
||||
int32_t size = taosArrayGetSize(chAy);
|
||||
qDebug("===stream===window %" PRId64 " wait child size:%d", key->ts, size);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
qDebug("===stream===window %" PRId64 " wait child id:%d", key->ts, *(int32_t*)taosArrayGet(chAy, i));
|
||||
}
|
||||
break;
|
||||
}
|
||||
qDebug("===stream===delete window %" PRId64, key->ts);
|
||||
int32_t codeDel = streamStateDel(pState, key);
|
||||
if (codeDel != TSDB_CODE_SUCCESS) {
|
||||
code = streamStateGetFirst(pState, key);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qDebug("===stream===stream state first key: empty-empty");
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
*key = next;
|
||||
tw = getFinalTimeWindow(key->ts, pInterval);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
static void deleteIntervalDiscBuf(SStreamState* pState, TSKEY mark) {
|
||||
//todo(liuyao) delete expired check point
|
||||
}
|
||||
|
||||
static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
|
||||
|
@ -2174,7 +2140,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
|
|||
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
|
||||
if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
|
||||
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
|
||||
if (streamStateCheck(pState, &key)) {
|
||||
if (!hasIntervalWindow(pState, &key)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -2609,8 +2575,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
||||
qDebug("===stream===clear semi operator");
|
||||
} else {
|
||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
|
||||
&pInfo->interval, &pInfo->delKey);
|
||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
||||
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
|
||||
streamStateCommit(pInfo->pState);
|
||||
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
|
||||
|
@ -4848,8 +4813,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
|
||||
&pInfo->delKey);
|
||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
||||
setOperatorCompleted(pOperator);
|
||||
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
|
||||
streamStateCommit(pInfo->pState);
|
||||
|
|
|
@ -313,16 +313,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
|
|||
return pFileState->usedBuffs;
|
||||
}
|
||||
|
||||
// void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) {
|
||||
// pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark);
|
||||
// }
|
||||
|
||||
// void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) {
|
||||
// *pLen = sizeof(TSKEY);
|
||||
// (*pVal) = taosMemoryCalloc(1, *pLen);
|
||||
// void* buff = *pVal;
|
||||
// taosEncodeFixedI64(&buff, pFileState->flushMark);
|
||||
// }
|
||||
void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); }
|
||||
|
||||
void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) {
|
||||
|
@ -421,7 +411,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
|||
TSKEY ts;
|
||||
sscanf(val, "%" PRId64 "", &ts);
|
||||
taosMemoryFree(val);
|
||||
if (ts < pFileState->flushMark) {
|
||||
if (ts < pFileState->deleteMark) {
|
||||
forceRemoveCheckpoint(pFileState, i);
|
||||
break;
|
||||
} else {
|
||||
|
|
|
@ -223,40 +223,47 @@
|
|||
,,n,script,./test.sh -f tsim/stream/basic0.sim -g
|
||||
,,y,script,./test.sh -f tsim/stream/basic1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic3.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic4.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/drop_stream.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/state0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
|
||||
,,y,script,./test.sh -f tsim/stream/windowClose.sim
|
||||
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
,,y,script,./test.sh -f tsim/stream/sliding.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalRange.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
|
||||
,,y,script,./test.sh -f tsim/stream/ignoreCheckUpdate.sim
|
||||
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby.sim
|
||||
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/sliding.sim
|
||||
,,y,script,./test.sh -f tsim/stream/state0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/state1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/windowClose.sim
|
||||
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
|
||||
,,y,script,./test.sh -f tsim/trans/create_db.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/basic1.sim
|
||||
|
@ -1334,38 +1341,47 @@
|
|||
,,n,script,./test.sh -f tsim/stream/basic0.sim -g
|
||||
,,y,script,./test.sh -f tsim/stream/basic1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic3.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic4.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/drop_stream.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/state0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
|
||||
,,y,script,./test.sh -f tsim/stream/windowClose.sim
|
||||
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
,,y,script,./test.sh -f tsim/stream/sliding.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/deleteState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalRange.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
|
||||
,,y,script,./test.sh -f tsim/stream/ignoreCheckUpdate.sim
|
||||
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby.sim
|
||||
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/sliding.sim
|
||||
,,y,script,./test.sh -f tsim/stream/state0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/state1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/windowClose.sim
|
||||
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/basic1.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/basic2.sim
|
||||
|
|
|
@ -213,38 +213,47 @@ rem ./test.sh -f tsim/db/alter_replica_13.sim
|
|||
./test.sh -f tsim/stream/basic0.sim -g
|
||||
./test.sh -f tsim/stream/basic1.sim
|
||||
./test.sh -f tsim/stream/basic2.sim
|
||||
./test.sh -f tsim/stream/basic3.sim
|
||||
./test.sh -f tsim/stream/basic4.sim
|
||||
./test.sh -f tsim/stream/checkStreamSTable1.sim
|
||||
./test.sh -f tsim/stream/checkStreamSTable.sim
|
||||
./test.sh -f tsim/stream/deleteInterval.sim
|
||||
./test.sh -f tsim/stream/deleteSession.sim
|
||||
./test.sh -f tsim/stream/deleteState.sim
|
||||
./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
./test.sh -f tsim/stream/distributeSession0.sim
|
||||
./test.sh -f tsim/stream/drop_stream.sim
|
||||
./test.sh -f tsim/stream/fillHistoryBasic1.sim
|
||||
./test.sh -f tsim/stream/fillHistoryBasic2.sim
|
||||
./test.sh -f tsim/stream/fillHistoryBasic3.sim
|
||||
./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
./test.sh -f tsim/stream/distributeSession0.sim
|
||||
./test.sh -f tsim/stream/session0.sim
|
||||
./test.sh -f tsim/stream/session1.sim
|
||||
./test.sh -f tsim/stream/state0.sim
|
||||
./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
./test.sh -f tsim/stream/triggerSession0.sim
|
||||
./test.sh -f tsim/stream/partitionby.sim
|
||||
./test.sh -f tsim/stream/partitionby1.sim
|
||||
./test.sh -f tsim/stream/schedSnode.sim
|
||||
./test.sh -f tsim/stream/windowClose.sim
|
||||
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
./test.sh -f tsim/stream/sliding.sim
|
||||
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||
./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||
./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||
./test.sh -f tsim/stream/deleteInterval.sim
|
||||
./test.sh -f tsim/stream/deleteSession.sim
|
||||
./test.sh -f tsim/stream/deleteState.sim
|
||||
./test.sh -f tsim/stream/fillIntervalDelete0.sim
|
||||
./test.sh -f tsim/stream/fillIntervalDelete1.sim
|
||||
./test.sh -f tsim/stream/fillIntervalLinear.sim
|
||||
./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
|
||||
./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
|
||||
./test.sh -f tsim/stream/fillIntervalPrevNext.sim
|
||||
./test.sh -f tsim/stream/fillIntervalRange.sim
|
||||
./test.sh -f tsim/stream/fillIntervalValue.sim
|
||||
./test.sh -f tsim/stream/ignoreCheckUpdate.sim
|
||||
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
./test.sh -f tsim/stream/partitionby1.sim
|
||||
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||
./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||
./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||
./test.sh -f tsim/stream/partitionby.sim
|
||||
./test.sh -f tsim/stream/schedSnode.sim
|
||||
./test.sh -f tsim/stream/session0.sim
|
||||
./test.sh -f tsim/stream/session1.sim
|
||||
./test.sh -f tsim/stream/sliding.sim
|
||||
./test.sh -f tsim/stream/state0.sim
|
||||
./test.sh -f tsim/stream/state1.sim
|
||||
./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
./test.sh -f tsim/stream/triggerSession0.sim
|
||||
./test.sh -f tsim/stream/udTableAndTag0.sim
|
||||
./test.sh -f tsim/stream/udTableAndTag1.sim
|
||||
./test.sh -f tsim/stream/udTableAndTag2.sim
|
||||
./test.sh -f tsim/stream/windowClose.sim
|
||||
./test.sh -f tsim/trans/lossdata1.sim
|
||||
./test.sh -f tsim/trans/create_db.sim
|
||||
./test.sh -f tsim/tmq/basic1.sim
|
||||
|
|
Loading…
Reference in New Issue