diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5a2cdffa92..29dad17b31 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1199,9 +1199,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows); qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", - (pRow->numOfRows+pBlock->info.rows), - pBlock->info.capacity, GET_TASKID(pTaskInfo)); - // todo set the pOperator->resultInfo size + (pRow->numOfRows + pBlock->info.rows), pBlock->info.capacity, GET_TASKID(pTaskInfo)); + // todo set the pOperator->resultInfo size } pGroupResInfo->index += 1; @@ -1242,7 +1241,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } - tdbFree(tbname); + streamFreeVal(tbname); } void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, @@ -2596,6 +2595,7 @@ int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResul } int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { + qWarn("write to stream state"); streamStatePut(pState, pKey, pResult, resSize); return TSDB_CODE_SUCCESS; } @@ -2633,7 +2633,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } - tdbFree(tbname); + streamFreeVal(tbname); } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.id.groupId != pKey->groupId) { @@ -2726,7 +2726,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } - tdbFree(tbname); + streamFreeVal(tbname); } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.id.groupId != pKey->groupId) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 2a33e3527a..0ce5390d50 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -140,7 +140,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); if (pBlock == NULL) { - if (pInfo->totalInputRows == 0 && (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) { + if (pInfo->totalInputRows == 0 && + (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) { setOperatorCompleted(pOperator); return NULL; } @@ -342,8 +343,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType - ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval - : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval + : &((SIntervalAggOperatorInfo*)downstream->info)->interval; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); @@ -381,12 +382,13 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: if (pInfo != NULL) { destroyFillOperatorInfo(pInfo); } @@ -843,9 +845,9 @@ static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill int32_t slotId = GET_DEST_SLOT_ID(pFillCol); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId); SFillInfo tmpInfo = { - .currentKey = ts, - .order = TSDB_ORDER_ASC, - .interval = pFillSup->interval, + .currentKey = ts, + .order = TSDB_ORDER_ASC, + .interval = pFillSup->interval, }; bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows); if (!filled) { @@ -886,9 +888,9 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; SFillInfo tmp = { - .currentKey = pFillInfo->current, - .order = TSDB_ORDER_ASC, - .interval = pFillSup->interval, + .currentKey = pFillInfo->current, + .order = TSDB_ORDER_ASC, + .interval = pFillSup->interval, }; int32_t slotId = GET_DEST_SLOT_ID(pFillCol); @@ -1049,7 +1051,7 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_ char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); - tdbFree(tbname); + streamFreeVal(tbname); } pBlock->info.rows++; @@ -1209,7 +1211,8 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { return NULL; } blockDataCleanup(pInfo->pRes); - if (hasRemainCalc(pInfo->pFillInfo) || (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true )) { + if (hasRemainCalc(pInfo->pFillInfo) || + (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); if (pInfo->pRes->info.rows > 0) { printDataBlock(pInfo->pRes, "stream fill"); @@ -1373,8 +1376,8 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pResRow = NULL; - if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F - || pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) { + if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F || + pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) { pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData)); pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); @@ -1476,7 +1479,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pInfo->srcRowIndex = 0; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -1484,7 +1488,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi } return pOperator; - _error: +_error: destroyStreamFillOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 0fc2cdb46a..9537a76bd6 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -966,7 +966,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { void* tbname = NULL; if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - tdbFree(tbname); + streamFreeVal(tbname); } } taosArrayDestroy(pParInfo->rowIds); @@ -1118,7 +1118,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated right before apply the group aggregation. if (pInfo->scalarSup.pExprInfo != NULL) { projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, - pInfo->scalarSup.numOfExprs, NULL); + pInfo->scalarSup.numOfExprs, NULL); } taosHashClear(pInfo->pPartitions); doStreamHashPartitionImpl(pInfo, pBlock); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 40b9597643..2a69f12639 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1361,7 +1361,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); varDataSetLen(tbname, strlen(varDataVal(tbname))); - tdbFree(parTbname); + streamFreeVal(parTbname); } appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, tbname[0] == 0 ? NULL : tbname); @@ -1608,8 +1608,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, - pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); + qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, + pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey, + pInfo->tqReader->pWalReader->curVersion); pTaskInfo->streamInfo.returned = 1; return pResult; } else { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index bfdf5ea89b..65f9e6c850 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1552,7 +1552,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); } - tdbFree(tbname); + streamFreeVal(tbname); (*index)++; } } @@ -3266,7 +3266,7 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); - tdbFree(tbname); + streamFreeVal(tbname); } pBlock->info.rows += 1; } diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index d11ec74ee8..c63020e64d 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -13,7 +13,7 @@ if(${BUILD_WITH_ROCKSDB}) PUBLIC rocksdb tdb PRIVATE os util transport qcom executor ) - #add_definitions(-DUSE_ROCKSDB) + add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h new file mode 100644 index 0000000000..04d8093fa7 --- /dev/null +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _STREAM_BACKEDN_ROCKSDB_H_ +#define _STREAM_BACKEDN_ROCKSDB_H_ + +#include +#include +#include "executor.h" +#include "osMemory.h" +#include "rocksdb/c.h" +#include "streamInc.h" +#include "streamState.h" +#include "tcoding.h" +#include "tcommon.h" +#include "tcompare.h" +#include "ttimer.h" + +int streamInitBackend(SStreamState* pState, char* path); +void streamCleanBackend(SStreamState* pState); + +int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); +int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key); +int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key); +int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key); +int32_t streamStateClear_rocksdb(SStreamState* pState); + +int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); +SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key); +SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); +SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); +int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); +int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); +int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key); +int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, + int32_t* pVLen); + +int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, + int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); + +int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); +int32_t streamStateSessionClear_rocksdb(SStreamState* pState); +int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); + +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); + +int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); +int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); +int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); +int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); +int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); +int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); +void streamStateDestroy_rocksdb(SStreamState* pState); +#endif \ No newline at end of file diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 57a0077a79..721d72f7c4 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -19,6 +19,7 @@ #include "executor.h" #include "osMemory.h" #include "rocksdb/c.h" +#include "streamBackendRocksdb.h" #include "streamInc.h" #include "tcoding.h" #include "tcommon.h" @@ -102,6 +103,7 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { } SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { + qWarn("open stream state, %s", path); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -113,15 +115,6 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int streamStateDestroy(pState); return NULL; } -#ifdef USE_ROCKSDB - int code = streamInitBackend(pState, path); - if (code == -1) { - taosMemoryFree(pState); - pState = NULL; - } - return pState; - -#else char statePath[1024]; if (!specPath) { @@ -130,6 +123,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); } +#ifdef USE_ROCKSDB + qWarn("open stream state1"); + int code = streamInitBackend(pState, statePath); + if (code == -1) { + taosMemoryFree(pState); + pState = NULL; + } + qWarn("open stream state2, %s", statePath); + pState->pTdbState->pOwner = pTask; + return pState; + +#else char cfgPath[1030]; sprintf(cfgPath, "%s/cfg", statePath); @@ -211,7 +216,7 @@ _err: void streamStateClose(SStreamState* pState) { #ifdef USE_ROCKSDB - streamCleanBackend(pState); + // streamCleanBackend(pState); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -227,15 +232,22 @@ void streamStateClose(SStreamState* pState) { } int32_t streamStateBegin(SStreamState* pState) { +#ifdef USE_ROCKSDB + return 0; +#else if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { tdbAbort(pState->pTdbState->db, pState->pTdbState->txn); return -1; } return 0; +#endif } int32_t streamStateCommit(SStreamState* pState) { +#ifdef USE_ROCKSDB + return 0; +#else if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) { return -1; } @@ -248,9 +260,13 @@ int32_t streamStateCommit(SStreamState* pState) { return -1; } return 0; +#endif } int32_t streamStateAbort(SStreamState* pState) { +#ifdef USE_ROCKSDB + return 0; +#else if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) { return -1; } @@ -260,54 +276,90 @@ int32_t streamStateAbort(SStreamState* pState) { return -1; } return 0; +#endif } int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { +#ifdef USE_ROCKSDB + return streamStateFuncPut_rocksdb(pState, key, value, vLen); +#else return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn); +#endif } int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateFuncGet(pState, key, pVal, pVLen); +#else return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); +#endif } int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { +#ifdef USE_ROCKSDB + return streamStateFuncDel_rocksdb(pState, key); +#else return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn); +#endif } // todo refactor int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { +#ifdef USE_ROCKSDB + return streamStatePut_rocksdb(pState, key, value, vLen); +#else SStateKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn); +#endif } // todo refactor int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { +#ifdef USE_ROCKSDB + return streamStateFillPut_rocksdb(pState, key, value, vLen); +#else return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn); +#endif } // todo refactor int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateGet_rocksdb(pState, key, pVal, pVLen); +#else SStateKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); +#endif } // todo refactor int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateFillGet_rocksdb(pState, key, pVal, pVLen); +#else return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); +#endif } // todo refactor int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateDel_rocksdb(pState, key); +#else SStateKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn); +#endif } int32_t streamStateClear(SStreamState* pState) { +#ifdef USE_ROCKSDB + return streamStateClear_rocksdb(pState); +#else SWinKey key = {.ts = 0, .groupId = 0}; streamStatePut(pState, &key, NULL, 0); while (1) { SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key); - SWinKey delKey = {0}; - int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); + SWinKey delKey = {0}; + int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); streamStateFreeCur(pCur); if (code == 0) { streamStateDel(pState, &delKey); @@ -316,16 +368,24 @@ int32_t streamStateClear(SStreamState* pState) { } } return 0; +#endif } void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } // todo refactor int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateFillDel_rocksdb(pState, key); +#else return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn); +#endif } int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen); +#else // todo refactor int32_t size = *pVLen; if (streamStateGet(pState, key, pVal, pVLen) == 0) { @@ -334,6 +394,7 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* *pVal = tdbRealloc(NULL, size); memset(*pVal, 0, size); return 0; +#endif } int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { @@ -341,16 +402,23 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV if (!pVal) { return 0; } +#ifdef USE_ROCKSDB + taosMemoryFree(pVal); +#else streamFreeVal(pVal); +#endif return 0; } SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateGetCur_rocksdb(pState, key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL); - int32_t c = 0; + int32_t c = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c); if (c != 0) { @@ -359,9 +427,13 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { } pCur->number = pState->number; return pCur; +#endif } SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateFillGetCur_rocksdb(pState, key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL); @@ -373,9 +445,13 @@ SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) return NULL; } return pCur; +#endif } SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateGetAndCheckCur_rocksdb(pState, key); +#else SStreamStateCur* pCur = streamStateFillGetCur(pState, key); if (pCur) { int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0); @@ -385,14 +461,18 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { streamStateFreeCur(pCur); } return NULL; +#endif } int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); +#else if (!pCur) { return -1; } const SStateKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } @@ -401,44 +481,57 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** } *pKey = pKTmp->key; return 0; +#endif } int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); +#else if (!pCur) { return -1; } const SWinKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } *pKey = *pKTmp; return 0; +#endif } int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen); +#else if (!pCur) { return -1; } uint64_t groupId = pKey->groupId; - int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); + int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); if (code == 0) { if (pKey->groupId == groupId) { return 0; } } return -1; +#endif } int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateGetFirst_rocksdb(pState, key); +#else // todo refactor SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut(pState, &tmp, NULL, 0); SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp); - int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0); + int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0); streamStateFreeCur(pCur); streamStateDel(pState, &tmp); return code; +#endif } int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { @@ -452,6 +545,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { } SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateSeekKeyNext_rocksdb(pState, key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -463,7 +559,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key } SStateKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -476,9 +572,13 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key } return pCur; +#endif } SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateFillSeekKeyNext_rocksdb(pState, key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (!pCur) { return NULL; @@ -501,9 +601,13 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* } return pCur; +#endif } SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateFillSeekKeyPrev_rocksdb(pState, key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -526,45 +630,67 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* } return pCur; +#endif } int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { +#ifdef USE_ROCKSDB + return streamStateCurNext_rocksdb(pState, pCur); +#else if (!pCur) { return -1; } // return tdbTbcMoveToNext(pCur->pCur); +#endif } int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { - // +#ifdef USE_ROCKSDB + return streamStateCurPrev_rocksdb(pState, pCur); +#else if (!pCur) { return -1; } return tdbTbcMoveToPrev(pCur->pCur); +#endif } void streamStateFreeCur(SStreamStateCur* pCur) { if (!pCur) { return; } - tdbTbcClose(pCur->pCur); rocksdb_iter_destroy(pCur->iter); + tdbTbcClose(pCur->pCur); taosMemoryFree(pCur); } -void streamFreeVal(void* val) { tdbFree(val); } +void streamFreeVal(void* val) { +#ifdef USE_ROCKSDB + taosMemoryFree(val); +#else + tdbFree(val); +#endif +} int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { +#ifdef USE_ROCKSDB + return streamStateSessionPut_rocksdb(pState, key, value, vLen); +#else SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, pState->pTdbState->txn); +#endif } int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen); +#else + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); - SSessionKey resKey = *key; - void* tmp = NULL; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); + SSessionKey resKey = *key; + void* tmp = NULL; + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); if (code == 0) { if (key->win.skey != resKey.win.skey) { code = -1; @@ -576,14 +702,22 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa } streamStateFreeCur(pCur); return code; +#endif } int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { +#ifdef USE_ROCKSDB + return streamStateSessionDel_rocksdb(pState, key); +#else SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn); +#endif } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { +#ifdef USE_ROCKSDB + return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -595,7 +729,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -608,9 +742,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons } return pCur; +#endif } SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) { +#ifdef USE_ROCKSDB + return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -622,7 +760,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -636,9 +774,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons } return pCur; +#endif } SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) { +#ifdef USE_ROCKSDB + return streamStateSessionSeekKeyNext_rocksdb(pState, key); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -650,7 +792,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -663,14 +805,18 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess } return pCur; +#endif } int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); +#else if (!pCur) { return -1; } SStateSessionKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) { return -1; } @@ -682,16 +828,20 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v } *pKey = pKTmp->key; return 0; +#endif } int32_t streamStateSessionClear(SStreamState* pState) { - SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; +#ifdef USE_ROCKSDB + return streamStateSessionClear_rocksdb(pState); +#else + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key); while (1) { SSessionKey delKey = {0}; - void* buf = NULL; - int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); + void* buf = NULL; + int32_t size = 0; + int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { memset(buf, 0, size); streamStateSessionPut(pState, &delKey, buf, size); @@ -702,9 +852,13 @@ int32_t streamStateSessionClear(SStreamState* pState) { } streamStateFreeCur(pCur); return 0; +#endif } int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { +#ifdef USE_ROCKSDB + return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey); +#else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return -1; @@ -716,14 +870,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return -1; } SSessionKey resKey = *key; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -750,24 +904,28 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* streamStateFreeCur(pCur); return -1; +#endif } int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen); +#else // todo refactor - int32_t res = 0; + int32_t res = 0; SSessionKey originKey = *key; SSessionKey searchKey = *key; searchKey.win.skey = key->win.skey - gap; searchKey.win.ekey = key->win.ekey + gap; int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); @@ -799,21 +957,27 @@ _end: *pVal = tmp; streamStateFreeCur(pCur); return res; + +#endif } int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { // todo refactor - int32_t res = 0; + +#ifdef USE_ROCKSDB + return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen); +#else + int32_t res = 0; SSessionKey tmpKey = *key; - int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + int32_t valSize = *pVLen; + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); @@ -854,27 +1018,48 @@ _end: *pVal = tmp; streamStateFreeCur(pCur); return res; +#endif } int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { +#ifdef USE_ROCKSDB + return streamStatePutParTag_rocksdb(pState, groupId, tag, tagLen); +#else return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn); +#endif } int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { +#ifdef USE_ROCKSDB + return streamStateGetParTag_rocksdb(pState, groupId, tagVal, tagLen); +#else return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen); +#endif } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { +#ifdef USE_ROCKSDB + return streamStatePutParName_rocksdb(pState, groupId, tbname); +#else return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, pState->pTdbState->txn); +#endif } int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { +#ifdef USE_ROCKSDB + return streamStateGetParName_rocksdb(pState, groupId, pVal); +#else int32_t len; return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); +#endif } void streamStateDestroy(SStreamState* pState) { +#ifdef USE_ROCKSDB + streamStateDestroy_rocksdb(pState); + // do nothong +#endif taosMemoryFreeClear(pState->pTdbState); taosMemoryFreeClear(pState); } diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 6b96fcaa26..8c24a5e0b2 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -13,18 +13,10 @@ * along with this program. If not, see . */ -#include -#include -#include "executor.h" -#include "osMemory.h" +#include "query.h" #include "rocksdb/c.h" -#include "streamInc.h" -#include "streamState.h" -#include "tcoding.h" +#include "streamBackendRocksdb.h" #include "tcommon.h" -#include "tcompare.h" -#include "ttimer.h" - // // SStateKey // |--groupid--|---ts------|--opNum----| @@ -70,6 +62,15 @@ int stateKeyDecode(void* k, char* buf) { return p - buf; } +int stateKeyToString(void* k, char* buf) { + SStateKey* key = k; + int n = 0; + n += sprintf(buf + n, "groupId:%" PRId64 " ", key->key.groupId); + n += sprintf(buf + n, "ts:%" PRIi64 " ", key->key.ts); + n += sprintf(buf + n, "opNum:%" PRIi64 " ", key->opNum); + return n; +} + // // SStateSessionKey // |-----------SSessionKey----------| @@ -120,6 +121,15 @@ int stateSessionKeyDecode(void* ses, char* buf) { p = taosDecodeFixedI64(p, &sess->opNum); return p - buf; } +int stateSessionKeyToString(void* k, char* buf) { + SStateSessionKey* key = k; + int n = 0; + n += sprintf(buf + n, "skey:%" PRIi64 " ", key->key.win.skey); + n += sprintf(buf + n, "ekey:%" PRIi64 " ", key->key.win.ekey); + n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->key.groupId); + n += sprintf(buf + n, "opNum:%" PRIi64 " ", key->opNum); + return n; +} /** * SWinKey @@ -159,6 +169,14 @@ int winKeyDecode(void* k, char* buf) { p = taosDecodeFixedI64(p, &key->ts); return len; } + +int winKeyToString(void* k, char* buf) { + SWinKey* key = k; + int n = 0; + n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->groupId); + n += sprintf(buf + n, "ts:%" PRIi64 " ", key->ts); + return n; +} /* * STupleKey * |---groupId---|---ts---|---exprIdx---| @@ -201,6 +219,14 @@ int tupleKeyDecode(void* k, char* buf) { p = taosDecodeFixedI32(p, &key->exprIdx); return len; } +int tupleKeyToString(void* k, char* buf) { + int n = 0; + STupleKey* key = k; + n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->groupId); + n += sprintf(buf + n, "ts:%" PRIi64 " ", key->ts); + n += sprintf(buf + n, "exprIdx:%d ", key->exprIdx); + return n; +} int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int64_t w1, w2; @@ -229,31 +255,39 @@ int parKeyDecode(void* k, char* buf) { p = taosDecodeFixedI64(p, groupid); return p - buf; } +int parKeyToString(void* k, char* buf) { + int64_t* key = k; + int n = 0; + n = sprintf(buf + n, "groupId:%" PRIi64 " ", *key); + return n; +} const char* cfName[] = {"default", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); typedef int (*DecodeFunc)(void* key, char* buf); +typedef int (*ToStringFunc)(void* key, char* buf); ////typedef int32_t (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); ////typedef const char* (*BackendCmpNameFunc)(void* statue); typedef struct { - const char* key; - int idx; - EncodeFunc enFunc; - DecodeFunc deFunc; + const char* key; + int idx; + EncodeFunc enFunc; + DecodeFunc deFunc; + ToStringFunc toStrFunc; } SCfInit; SCfInit ginitDict[] = { - {"default", 0, stateKeyEncode, stateKeyDecode}, - {"fill", 1, winKeyEncode, winKeyDecode}, - {"sess", 2, stateSessionKeyEncode, stateSessionKeyDecode}, - {"func", 3, tupleKeyEncode, tupleKeyDecode}, - {"parname", 4, parKeyEncode, parKeyDecode}, - {"partag", 5, parKeyEncode, parKeyDecode}, + {"default", 0, stateKeyEncode, stateKeyDecode, stateKeyToString}, + {"fill", 1, winKeyEncode, winKeyDecode, winKeyToString}, + {"sess", 2, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString}, + {"func", 3, tupleKeyEncode, tupleKeyDecode, tupleKeyToString}, + {"parname", 4, parKeyEncode, parKeyDecode, parKeyToString}, + {"partag", 5, parKeyEncode, parKeyDecode, parKeyToString}, }; const char* compareStateName(void* name) { return cfName[0]; } -const char* compareWinKey(void* name) { return cfName[1]; } +const char* compareWinKeyName(void* name) { return cfName[1]; } const char* compareSessionKey(void* name) { return cfName[2]; } const char* compareFuncKey(void* name) { return cfName[3]; } const char* compareParKey(void* name) { return cfName[4]; } @@ -278,7 +312,7 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[0], stateCompare); - rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, winKeyDBComp, compareWinKey); + rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, winKeyDBComp, compareWinKeyName); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare); rocksdb_comparator_t* sessCompare = rocksdb_comparator_create(NULL, NULL, stateSessionKeyDBComp, compareSessionKey); @@ -294,7 +328,7 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare); rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); - rocksdb_t* db = rocksdb_open_column_families(opts, "rocksdb", cfLen, cfName, cfOpt, cfHandle, &err); + rocksdb_t* db = rocksdb_open_column_families(opts, path, cfLen, cfName, cfOpt, cfHandle, &err); pState->pTdbState->rocksdb = db; pState->pTdbState->pHandle = cfHandle; @@ -321,63 +355,86 @@ int streamGetInit(const char* funcName) { #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ do { \ + code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ + qWarn("failed to get cf name: %s", funcname); \ return -1; \ } \ + char toString[128] = {0}; \ ginitDict[i].enFunc((void*)key, buf); \ + ginitDict[i].toStrFunc((void*)key, toString); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ + qWarn("str: %s failed to write to %s, err: %s", toString, funcname, err); \ code = -1; \ + } else { \ + qWarn("str:%s succ to write to %s", toString, funcname); \ } \ - code = 0; \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ do { \ + code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ + qWarn("failed to get cf name: %s", funcname); \ return -1; \ } \ + char toString[128] = {0}; \ + ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)vLen, &err); \ - *pVal = val; \ + if (val == NULL) { \ + qWarn("str: %s failed to read from %s, err: not exist", toString, funcname); \ + code = -1; \ + } else { \ + *pVal = val; \ + } \ if (err != NULL) { \ taosMemoryFree(err); \ + qWarn("str: %s failed to read from %s, err: %s", toString, funcname, err); \ code = -1; \ + } else { \ + if (code == 0) qWarn("str: %s succ to read from %s", toString, funcname); \ } \ - code = 0; \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ do { \ + code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ + qWarn("failed to get cf name: %s", funcname); \ return -1; \ } \ + char toString[128] = {0}; \ + ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \ if (err != NULL) { \ + qWarn("str: %s failed to del from %s, err: %s", toString, funcname, err); \ taosMemoryFree(err); \ code = -1; \ + } else { \ + qWarn("str: %s succ to del from %s", toString, funcname); \ } \ - code = 0; \ } while (0); int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { @@ -426,16 +483,41 @@ int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, con } // todo refactor + +int32_t streamStateClear_rocksdb(SStreamState* pState) { + SWinKey key = {.ts = 0, .groupId = 0}; + // batch clear later + streamStatePut_rocksdb(pState, &key, NULL, 0); + while (1) { + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &key); + SWinKey delKey = {0}; + int32_t code = streamStateGetKVByCur_rocksdb(pCur, &delKey, NULL, 0); + streamStateFreeCur(pCur); + if (code == 0) { + streamStateDel_rocksdb(pState, &delKey); + } else { + break; + } + } + return 0; +} int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); return code; } +int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "fill", key); + return code; +} int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int code = 0; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; STREAM_STATE_PUT_ROCKSDB(pState, "sess", key, value, vLen); + if (code == -1) { + } return code; } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { @@ -527,6 +609,126 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con return pCur; } +int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + int32_t size = *pVLen; + if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { + return 0; + } + *pVal = taosMemoryMalloc(size); + memset(*pVal, 0, size); + return 0; +} +SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); + + int32_t c = 0; + SStateKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + stateKeyEncode((void*)&sKey, buf); + rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); + if (rocksdb_iter_valid(pCur->iter)) { + SStateKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { + pCur->number = pState->number; + return pCur; + } + } + streamStateFreeCur(pCur); + return NULL; +} +SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + char buf[128] = {0}; + winKeyDecode((void*)key, buf); + rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); + if (rocksdb_iter_valid(pCur->iter)) { + size_t kLen; + SWinKey curKey; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + winKeyDecode((void*)&curKey, keyStr); + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { + return pCur; + } + } + + streamStateFreeCur(pCur); + return NULL; +} +SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { + SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); + if (pCur) { + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); + if (code == 0) return pCur; + streamStateFreeCur(pCur); + } + return NULL; +} +int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + if (!pCur) return -1; + SStateKey tkey; + SStateKey* pKtmp = &tkey; + + if (rocksdb_iter_valid(pCur->iter)) { + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + stateKeyDecode((void*)pKtmp, keyStr); + if (pKtmp->opNum != pCur->number) { + return -1; + } + *pKey = pKtmp->key; + return 0; + } + return -1; +} +int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + if (!pCur) { + return -1; + } + SWinKey winKey; + SWinKey* pKTmp = &winKey; + if (rocksdb_iter_valid(pCur->iter)) { + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + winKeyDecode(&winKey, keyStr); + } else { + return -1; + } + *pKey = *pKTmp; + return 0; +} +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + if (!pCur) { + return -1; + } + uint64_t groupId = pKey->groupId; + + int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + if (code == 0) { + if (pKey->groupId == groupId) { + return 0; + } + } + return -1; +} +int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { + SWinKey tmp = {.ts = 0, .groupId = 0}; + streamStatePut_rocksdb(pState, &tmp, NULL, 0); + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); + int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + streamStateFreeCur(pCur); + streamStateDel_rocksdb(pState, &tmp); + return code; +} int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { if (!pCur) { return -1; @@ -555,11 +757,115 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* return 0; } +SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); + // if (!rocksdb_iter_valid(pCur->iter)) { + // streamStateFreeCur(pCur); + // return NULL; + // } + SStateKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + stateKeyEncode((void*)&sKey, buf); + rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); + if (rocksdb_iter_valid(pCur->iter)) { + SStateKey curKey; + size_t kLen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) { + return pCur; + } + rocksdb_iter_next(pCur->iter); + if (rocksdb_iter_valid(pCur->iter)) { + return pCur; + } + } + streamStateFreeCur(pCur); + return NULL; +} +SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (!pCur) { + return NULL; + } + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + + char buf[128] = {0}; + winKeyEncode((void*)key, buf); + rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + } + { + SWinKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + winKeyDecode((void*)&curKey, keyStr); + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) { + return pCur; + } + rocksdb_iter_next(pCur->iter); + if (rocksdb_iter_valid(pCur->iter)) { + return pCur; + } + } + streamStateFreeCur(pCur); + return NULL; +} +SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + + char buf[128] = {0}; + winKeyEncode((void*)key, buf); + rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + } + + { + SWinKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + winKeyDecode((void*)&curKey, keyStr); + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) { + return pCur; + } + rocksdb_iter_prev(pCur->iter); + if (rocksdb_iter_valid(pCur->iter)) { + return pCur; + } + } + streamStateFreeCur(pCur); + return NULL; +} +int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { + if (!pCur) return -1; + rocksdb_iter_prev(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + return -1; + } + return 0; +} int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { if (!pCur) { return -1; } rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + return -1; + } return 0; } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {