reload state window state
This commit is contained in:
parent
26375e834a
commit
71b000f08c
|
@ -4057,13 +4057,20 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
|
|||
|
||||
bool compareStateKey(void* data, void* key) {
|
||||
if (!data || !key) {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
SStateKeys* stateKey = (SStateKeys*)key;
|
||||
stateKey->pData = (char*)key + sizeof(SStateKeys);
|
||||
return compareVal(data, stateKey);
|
||||
}
|
||||
|
||||
bool compareWinStateKey(SStateKeys* left, SStateKeys* right) {
|
||||
if (!left || !right) {
|
||||
return false;
|
||||
}
|
||||
return compareVal(left->pData, right);
|
||||
}
|
||||
|
||||
void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
|
||||
SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
|
||||
int32_t size = pAggSup->resultRowSize;
|
||||
|
@ -4086,10 +4093,14 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
|||
pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size);
|
||||
pCurWin->pStateKey =
|
||||
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
|
||||
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
|
||||
pCurWin->pStateKey->type = pAggSup->stateKeyType;
|
||||
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
|
||||
pCurWin->pStateKey->isNull = false;
|
||||
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
|
||||
pCurWin->pStateKey->type = pAggSup->stateKeyType;
|
||||
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
|
||||
pCurWin->pStateKey->isNull = false;
|
||||
pCurWin->winInfo.sessionWin.groupId = groupId;
|
||||
pCurWin->winInfo.sessionWin.win.skey = ts;
|
||||
pCurWin->winInfo.sessionWin.win.ekey = ts;
|
||||
qDebug("===stream===reset state win key. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, pCurWin->winInfo.sessionWin.win.ekey);
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -4241,6 +4252,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
qDebug("===stream=== stream state agg");
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
|
@ -4340,6 +4352,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
void streamStateReleaseState(SOperatorInfo* pOperator) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||
qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
|
||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (downstream->fpSet.releaseStreamStateFn) {
|
||||
|
@ -4365,6 +4378,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
|
|||
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
|
||||
if (pNextWin->isOutput && pStDeleted) {
|
||||
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, pNextWin->sessionWin.groupId);
|
||||
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
|
||||
}
|
||||
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
|
||||
|
@ -4383,19 +4397,28 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
|
||||
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
|
||||
int32_t num = size / sizeof(SSessionKey);
|
||||
qDebug("===stream=== reload state. get result count:%d", num);
|
||||
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
|
||||
ASSERT(size == num * sizeof(SSessionKey));
|
||||
if (!pInfo->pSeUpdated && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
}
|
||||
if (!pInfo->pSeDeleted && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
}
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
SStateWindowInfo curInfo = {0};
|
||||
SStateWindowInfo nextInfo = {0};
|
||||
SStateWindowInfo dummy = {0};
|
||||
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i);
|
||||
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
|
||||
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
|
||||
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
|
||||
bool cpRes = compareWinStateKey(curInfo.pStateKey,nextInfo.pStateKey);
|
||||
qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes);
|
||||
if (cpRes) {
|
||||
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeDeleted);
|
||||
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, curInfo.winInfo.sessionWin.groupId);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
|
||||
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
|
|
|
@ -581,6 +581,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
streamSchedExec(pTask);
|
||||
} else {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
|
|
|
@ -729,6 +729,7 @@ void streamStateFreeVal(void* val) {
|
|||
|
||||
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
|
||||
#ifdef USE_ROCKSDB
|
||||
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId);
|
||||
return streamStateSessionPut_rocksdb(pState, key, value, vLen);
|
||||
#else
|
||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||
|
|
Loading…
Reference in New Issue