feat(stream):optimize state window disc buff

This commit is contained in:
54liuyao 2022-10-20 09:04:51 +08:00
parent 67ed0cf313
commit 368f5134c5
3 changed files with 36 additions and 28 deletions

View File

@ -68,9 +68,10 @@ int32_t streamStateSessionClear(SStreamState* pState);
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key);
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);

View File

@ -3594,9 +3594,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
pKey->win.skey = startTs; pKey->win.skey = startTs;
pKey->win.ekey = endTs; pKey->win.ekey = endTs;
pKey->groupId = groupId; pKey->groupId = groupId;
SStreamStateCur* pCur = streamStateSessionGetCur(pAggSup->pState, pKey); int32_t code = streamStateSessionGetKey(pAggSup->pState, pKey, pKey);
int32_t code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
streamStateFreeCur(pCur);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_KEY_INVALID(pKey); SET_SESSION_WIN_KEY_INVALID(pKey);
} }

View File

@ -526,7 +526,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn); return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn);
} }
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
@ -544,7 +544,7 @@ SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSess
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (c > 0) return pCur; if (c >= 0) return pCur;
if (tdbTbcMoveToPrev(pCur->pCur) < 0) { if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
@ -572,7 +572,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (c > 0) return pCur; if (c < 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) { if (tdbTbcMoveToNext(pCur->pCur) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
@ -630,7 +630,7 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe
streamStateCurPrev(pState, pCur); streamStateCurPrev(pState, pCur);
SSessionKey tmpKey = *key; SSessionKey tmpKey = *key;
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == TSDB_CODE_SUCCESS && sessionKeyCmpr(key, &tmpKey) == 0) { if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
resKey = tmpKey; resKey = tmpKey;
} else { } else {
break; break;
@ -640,9 +640,28 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe
return streamStateSessionGetRanomCur(pState, &resKey); return streamStateSessionGetRanomCur(pState, &resKey);
} }
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
SSessionKey resKey = *key;
int32_t res = -1;
while (1) {
SSessionKey tmpKey = *key;
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
res = 0;
resKey = tmpKey;
} else {
break;
}
streamStateCurPrev(pState, pCur);
}
*curKey = resKey;
return res;
}
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
// todo refactor // todo refactor
SStreamStateCur* pCur = streamStateSessionGetCur(pState, key); SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
int32_t size = *pVLen; int32_t size = *pVLen;
void* tmp = NULL; void* tmp = NULL;
*pVal = tdbRealloc(NULL, size); *pVal = tdbRealloc(NULL, size);
@ -659,7 +678,7 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key,
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
// todo refactor // todo refactor
int32_t res = TSDB_CODE_SUCCESS; int32_t res = 0;
SSessionKey tmpKey = *key; SSessionKey tmpKey = *key;
int32_t valSize = *pVLen; int32_t valSize = *pVLen;
void* tmp = tdbRealloc(NULL, valSize); void* tmp = tdbRealloc(NULL, valSize);
@ -667,21 +686,14 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
return -1; return -1;
} }
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) { if (code == 0) {
memcpy(tmp, *pVal, valSize); if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
*pVal = tmp; memcpy(tmp, *pVal, valSize);
streamStateFreeCur(pCur); goto _end;
return res; }
}
streamStateFreeCur(pCur);
streamStateSessionPut(pState, key, NULL, 0);
pCur = streamStateSessionGetRanomCur(pState, key);
streamStateCurPrev(pState, pCur);
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
@ -689,11 +701,9 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
} }
} }
streamStateFreeCur(pCur); streamStateCurNext(pState, pCur);
*key = tmpKey;
pCur = streamStateSessionSeekKeyNext(pState, key);
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) { if (code == 0) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
@ -708,7 +718,6 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
_end: _end:
*pVal = tmp; *pVal = tmp;
streamStateSessionDel(pState, &tmpKey);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return res;
} }