From 5aeb1ec2ad75de2e08ff6042b61d78c238464fd1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Oct 2024 21:10:15 +0800 Subject: [PATCH] add interface --- source/libs/stream/inc/streamBackendRocksdb.h | 17 +-- source/libs/stream/src/streamBackendRocksdb.c | 102 +++++++++++++----- source/libs/stream/src/streamSessionState.c | 24 +++-- source/libs/stream/src/tstreamFileState.c | 2 +- 4 files changed, 96 insertions(+), 49 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 567d9de949..c4cf6a47cd 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -80,7 +80,7 @@ typedef struct { TdThreadRwlock chkpDirLock; int64_t dataWritten; - void* pMeta; + void* pMeta; int8_t removeAllFiles; } STaskDbWrapper; @@ -153,7 +153,7 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); -void taskDbSetClearFileFlag(void* pTaskDb); +void taskDbSetClearFileFlag(void* pTaskDb); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); @@ -191,7 +191,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId); int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur); -int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); +int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey, + void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); @@ -255,11 +256,11 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); -int32_t bkdMgtCreate(char* path, SBkdMgt **bm); -int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); -int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); -int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); -void bkdMgtDestroy(SBkdMgt* bm); +int32_t bkdMgtCreate(char* path, SBkdMgt** bm); +int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); +int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); +int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); +void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* id); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8a487ffeae..d469580d04 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3291,13 +3291,16 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v SStateKey sKey = {.key = *key, .opNum = pState->number}; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); - if (code != 0) { - return code; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen); + } else { + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size); + taosMemoryFree(dst); } - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size); - - taosMemoryFree(dst); return code; } int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { @@ -3311,7 +3314,11 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** taosMemoryFree(tVal); return code; } - + if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) { + *pVal = tVal; + *pVLen = tValLen; + return code; + } code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); return code; @@ -3580,6 +3587,13 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v taosMemoryFree(tVal); return code; } + + if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) { + *pVal = tVal; + *pVLen = tValLen; + return code; + } + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); @@ -3600,6 +3614,11 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k } char* dst = NULL; size_t size = 0; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen); + return code; + } + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; @@ -3617,7 +3636,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo void* tmp = NULL; int32_t vLen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen); if (code == 0 && key->win.skey == resKey.win.skey) { *key = resKey; @@ -3856,7 +3875,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con return pCur; } -int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { +int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey, + void** pVal, int32_t* pVLen) { if (!pCur) { return -1; } @@ -3890,13 +3910,27 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* return -1; } + char* tVal = val; + size_t tVlen = len; + if (pVal != NULL) { - *pVal = (char*)val; + if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) { + int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen); + if (code != 0) { + taosMemoryFree(val); + return code; + } + taosMemoryFree(val); + *pVal = (char*)tVal; + } else { + *pVal = (char*)tVal; + } } else { taosMemoryFree(val); } - if (pVLen != NULL) *pVLen = len; + if (pVLen != NULL) *pVLen = tVlen; + *pKey = pKTmp->key; return 0; } @@ -4085,7 +4119,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey)); SSessionKey resKey = *key; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -4094,7 +4128,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes if (c > 0) { streamStateCurNext_rocksdb(pCur); - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -4102,7 +4136,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes } } else if (c < 0) { streamStateCurPrev(pState, pCur); - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -4132,7 +4166,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { @@ -4149,7 +4183,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); } - code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, *pVLen); @@ -4176,7 +4210,7 @@ void streamStateSessionClear_rocksdb(SStreamState* pState) { SSessionKey delKey = {0}; void* buf = NULL; int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { memset(buf, 0, size); // refactor later @@ -4204,7 +4238,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); @@ -4224,7 +4258,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); } taosMemoryFreeClear(*pVal); - code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { @@ -4251,6 +4285,10 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons int code = 0; char* dst = NULL; size_t size = 0; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); + return code; + } code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; @@ -4277,7 +4315,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void #endif // parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - int code = 0; + int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); return code; } @@ -4289,12 +4327,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi } int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { - int code = 0; + int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { - int code = 0; + int code = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } @@ -4437,15 +4475,21 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl, void* tmpBuf) { - char buf[128] = {0}; + int32_t code = 0; + char buf[128] = {0}; - char* dst = NULL; - size_t size = 0; - int32_t code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); - if (code != 0) { - return code; + char* dst = NULL; + size_t size = 0; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + dst = val; + size = vlen; + return -1; + } else { + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); + if (code != 0) { + return code; + } } - int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf); char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 7e3d8d59f9..bb8ea6c03c 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -284,7 +284,7 @@ _end: int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t* pWinCode) { - SWinKey* pTmpkey = pKey; + SWinKey* pTmpkey = pKey; SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode); } @@ -343,7 +343,8 @@ _end: return code; } -int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { +int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); @@ -353,7 +354,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v } pNewPos->needFree = true; pNewPos->beFlushed = true; - void* pBuff = NULL; + void* pBuff = NULL; (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); if ((*pWinCode) != TSDB_CODE_SUCCESS) { goto _end; @@ -575,7 +576,7 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { SSessionKey key = {.groupId = groupId}; - int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); + int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, *ppCur, &key, NULL, NULL); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { if (!(*ppCur)) { @@ -653,7 +654,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS SSessionKey key = {0}; void* pVal = NULL; int len = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len); + int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateFreeCur(pCur); return pBuffCur; @@ -667,7 +668,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS } streamStateCurPrev(pFileStore, pCur); while (1) { - code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len); + code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateCurNext(pFileStore, pCur); return pCur; @@ -710,7 +711,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void *pKey = *(SSessionKey*)(pPos->pKey); } else { void* pData = NULL; - code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, pKey, &pData, pVLen); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); @@ -915,7 +916,7 @@ _end: int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey); - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen); streamStateFreeCur(pCur); if (code == TSDB_CODE_SUCCESS) { return code; @@ -923,7 +924,7 @@ int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey); } - code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen); streamStateFreeCur(pCur); return code; } @@ -1060,7 +1061,8 @@ _end: return code; } -int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { +int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, + int32_t* pVLen) { SSessionKey* pWinKey = pKey; const TSKEY gap = 0; int32_t code = TSDB_CODE_SUCCESS; @@ -1098,7 +1100,7 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey QUERY_CHECK_CODE(code, lino, _end); } qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, - pWinKey->win.ekey, code_file); + pWinKey->win.ekey, code_file); } } else { code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 7237f23671..6a102743cd 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -849,7 +849,7 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { void* pVal = NULL; int32_t vlen = 0; SSessionKey key = {0}; - winRes = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen); + winRes = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &vlen); if (winRes != TSDB_CODE_SUCCESS) { break; }