feat:add stream delete mark

This commit is contained in:
54liuyao 2023-01-17 16:21:51 +08:00
parent c1fffe4266
commit e03deeaed8
4 changed files with 52 additions and 0 deletions

View File

@ -110,6 +110,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
#if 0
char* streamStateSessionDump(SStreamState* pState);
char* streamStateIntervalDump(SStreamState* pState);
#endif
#ifdef __cplusplus

View File

@ -295,6 +295,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
@ -343,6 +344,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
.deleteMark = pObj->deleteMark,
};
// using ast and param to build physical plan

View File

@ -4803,6 +4803,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
taosHashCleanup(pUpdatedMap);
#if 0
char* pBuf = streamStateIntervalDump(pInfo->pState);
qDebug("===stream===interval state%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single interval delete");

View File

@ -879,4 +879,47 @@ char* streamStateSessionDump(SStreamState* pState) {
streamStateFreeCur(pCur);
return dumpBuf;
}
char* streamStateIntervalDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
tdbTbcMoveToFirst(pCur->pCur);
SWinKey key = {0};
void* buf = NULL;
int32_t bufSize = 0;
int32_t code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
if (code != 0) {
streamStateFreeCur(pCur);
return NULL;
}
int32_t size = 2048;
char* dumpBuf = taosMemoryCalloc(size, 1);
int64_t len = 0;
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
// len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
while (1) {
tdbTbcMoveToNext(pCur->pCur);
key = (SWinKey){0};
code = streamStateGetKVByCur(pCur, &key, NULL, 0);
if (code != 0) {
streamStateFreeCur(pCur);
return dumpBuf;
}
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
// len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
}
streamStateFreeCur(pCur);
return dumpBuf;
}
#endif