add interface

This commit is contained in:
yihaoDeng 2024-10-10 21:10:15 +08:00 committed by Jing Sima
parent 06121e6c9d
commit 5aeb1ec2ad
4 changed files with 96 additions and 49 deletions

View File

@ -80,7 +80,7 @@ typedef struct {
TdThreadRwlock chkpDirLock;
int64_t dataWritten;
void* pMeta;
void* pMeta;
int8_t removeAllFiles;
} STaskDbWrapper;
@ -153,7 +153,7 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
void* taskDbAddRef(void* pTaskDb);
void taskDbRemoveRef(void* pTaskDb);
void taskDbSetClearFileFlag(void* pTaskDb);
void taskDbSetClearFileFlag(void* pTaskDb);
int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);
@ -191,7 +191,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId);
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
@ -255,11 +256,11 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId);
int32_t bkdMgtCreate(char* path, SBkdMgt **bm);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t bkdMgtCreate(char* path, SBkdMgt** bm);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
const char* id);

View File

@ -3291,13 +3291,16 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
SStateKey sKey = {.key = *key, .opNum = pState->number};
char* dst = NULL;
size_t size = 0;
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen);
} else {
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
taosMemoryFree(dst);
}
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
taosMemoryFree(dst);
return code;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
@ -3311,7 +3314,11 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void**
taosMemoryFree(tVal);
return code;
}
if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
*pVal = tVal;
*pVLen = tValLen;
return code;
}
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
return code;
@ -3580,6 +3587,13 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v
taosMemoryFree(tVal);
return code;
}
if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
*pVal = tVal;
*pVLen = tValLen;
return code;
}
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
@ -3600,6 +3614,11 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
}
char* dst = NULL;
size_t size = 0;
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen);
return code;
}
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
@ -3617,7 +3636,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
void* tmp = NULL;
int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen);
if (code == 0 && key->win.skey == resKey.win.skey) {
*key = resKey;
@ -3856,7 +3875,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con
return pCur;
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
}
@ -3890,13 +3910,27 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
return -1;
}
char* tVal = val;
size_t tVlen = len;
if (pVal != NULL) {
*pVal = (char*)val;
if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
if (code != 0) {
taosMemoryFree(val);
return code;
}
taosMemoryFree(val);
*pVal = (char*)tVal;
} else {
*pVal = (char*)tVal;
}
} else {
taosMemoryFree(val);
}
if (pVLen != NULL) *pVLen = len;
if (pVLen != NULL) *pVLen = tVlen;
*pKey = pKTmp->key;
return 0;
}
@ -4085,7 +4119,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));
SSessionKey resKey = *key;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
@ -4094,7 +4128,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
if (c > 0) {
streamStateCurNext_rocksdb(pCur);
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
@ -4102,7 +4136,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
}
} else if (c < 0) {
streamStateCurPrev(pState, pCur);
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
@ -4132,7 +4166,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
}
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
@ -4149,7 +4183,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, *pVLen);
@ -4176,7 +4210,7 @@ void streamStateSessionClear_rocksdb(SStreamState* pState) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
memset(buf, 0, size);
// refactor later
@ -4204,7 +4238,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
}
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize);
@ -4224,7 +4258,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
}
taosMemoryFreeClear(*pVal);
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
if (code == 0) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
@ -4251,6 +4285,10 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons
int code = 0;
char* dst = NULL;
size_t size = 0;
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
return code;
}
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
@ -4277,7 +4315,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void
#endif
// parname cfg
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
int code = 0;
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
return code;
}
@ -4289,12 +4327,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
}
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
return code;
}
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
return code;
}
@ -4437,15 +4475,21 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
char buf[128] = {0};
int32_t code = 0;
char buf[128] = {0};
char* dst = NULL;
size_t size = 0;
int32_t code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
if (code != 0) {
return code;
char* dst = NULL;
size_t size = 0;
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
dst = val;
size = vlen;
return -1;
} else {
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
if (code != 0) {
return code;
}
}
int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);

View File

@ -284,7 +284,7 @@ _end:
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
SWinKey* pTmpkey = pKey;
SWinKey* pTmpkey = pKey;
SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
}
@ -343,7 +343,8 @@ _end:
return code;
}
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
@ -353,7 +354,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
}
pNewPos->needFree = true;
pNewPos->beFlushed = true;
void* pBuff = NULL;
void* pBuff = NULL;
(*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
if ((*pWinCode) != TSDB_CODE_SUCCESS) {
goto _end;
@ -575,7 +576,7 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur)
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
SStreamStateCur** ppCur) {
SSessionKey key = {.groupId = groupId};
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, *ppCur, &key, NULL, NULL);
if (taosArrayGetSize(pWinStates) > 0 &&
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
if (!(*ppCur)) {
@ -653,7 +654,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
SSessionKey key = {0};
void* pVal = NULL;
int len = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len);
int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
if (code == TSDB_CODE_FAILED) {
streamStateFreeCur(pCur);
return pBuffCur;
@ -667,7 +668,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
}
streamStateCurPrev(pFileStore, pCur);
while (1) {
code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len);
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
if (code == TSDB_CODE_FAILED) {
streamStateCurNext(pFileStore, pCur);
return pCur;
@ -710,7 +711,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
*pKey = *(SSessionKey*)(pPos->pKey);
} else {
void* pData = NULL;
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen);
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, pKey, &pData, pVLen);
if (taosArrayGetSize(pWinStates) > 0 &&
(code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
transformCursor(pCur->pStreamFileState, pCur);
@ -915,7 +916,7 @@ _end:
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
if (code == TSDB_CODE_SUCCESS) {
return code;
@ -923,7 +924,7 @@ int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void**
pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
return code;
}
@ -1060,7 +1061,8 @@ _end:
return code;
}
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
int32_t* pVLen) {
SSessionKey* pWinKey = pKey;
const TSKEY gap = 0;
int32_t code = TSDB_CODE_SUCCESS;
@ -1098,7 +1100,7 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
QUERY_CHECK_CODE(code, lino, _end);
}
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
pWinKey->win.ekey, code_file);
}
} else {
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);

View File

@ -849,7 +849,7 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
void* pVal = NULL;
int32_t vlen = 0;
SSessionKey key = {0};
winRes = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen);
winRes = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &vlen);
if (winRes != TSDB_CODE_SUCCESS) {
break;
}