add interface

This commit is contained in:
yihaoDeng 2024-10-10 21:28:13 +08:00 committed by Jing Sima
parent 5aeb1ec2ad
commit 193220aa88
2 changed files with 8 additions and 4 deletions

View File

@ -3307,7 +3307,7 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void**
int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
char* tVal;
char* tVal = NULL;
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen);
if (code != 0) {
@ -3569,6 +3569,10 @@ int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, c
int code = 0;
char* dst = NULL;
size_t size = 0;
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, (int32_t)vLen);
return code;
}
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;

View File

@ -228,17 +228,17 @@ void *backendOpen() {
memset(&key, 0, sizeof(key));
char *val = NULL;
int32_t vlen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key);
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]);
pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key);
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
ASSERT(vlen == strlen("Value"));
ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]);