From 2cc2e5d5225c8fe008e74810be11967cf1d290cd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Mar 2023 01:39:00 +0000 Subject: [PATCH] add backend --- source/libs/executor/src/executorimpl.c | 3 - source/libs/executor/src/timewindowoperator.c | 15 --- source/libs/stream/src/streamStateRocksdb.c | 118 +++++++++--------- 3 files changed, 60 insertions(+), 76 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7b2378e273..67e68160d7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1065,12 +1065,9 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset); - qWarn("offset: idx: %d, val: %d", j, rowEntryOffset[j]); if (!isRowEntryInitialized(pResInfo)) { - qWarn("no result"); continue; } else { - qWarn("has result"); } if (pRow->numOfRows < pResInfo->numOfRes) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6825192477..0fea8cf28d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2352,8 +2352,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p SResultRow* pResult = NULL; int32_t forwardRows = 0; - int stepTrace = 0; - qWarn("step1 %d", stepTrace++); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; @@ -2366,17 +2364,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); } while (1) { - qWarn("step1 %d", stepTrace++); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { - qWarn("step1 %d", stepTrace++); break; } continue; } - qWarn("step1 %d", stepTrace++); if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { bool ignore = true; @@ -2407,7 +2402,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ignore = false; } } - qWarn("step1 %d", stepTrace++); if (ignore) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); @@ -2417,27 +2411,22 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p continue; } } - qWarn("step1 %d", stepTrace++); int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - qWarn("step1 %d", stepTrace++); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - qWarn("step1 %d", stepTrace++); if (IS_FINAL_OP(pInfo)) { forwardRows = 1; } else { forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } - qWarn("step1 %d", stepTrace++); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap); } - qWarn("step1 %d", stepTrace++); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { SWinKey key = { .ts = pResult->win.skey, @@ -2446,7 +2435,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); } - qWarn("step1 %d", stepTrace++); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pSDataBlock->info.rows, numOfOutput); @@ -2455,7 +2443,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .groupId = groupId, }; - qWarn("step1 %d", stepTrace++); saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize); releaseOutputBuf(pInfo->pState, &key, pResult); if (pInfo->delKey.ts > key.ts) { @@ -2474,7 +2461,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); } } - qWarn("step1 %d", stepTrace++); if (IS_FINAL_OP(pInfo)) { startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); @@ -2483,7 +2469,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); } if (startPos < 0) { - qWarn("step1 %d", stepTrace++); break; } } diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 1112dda1d7..6cddf6649c 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -333,6 +333,8 @@ int streamInitBackend(SStreamState* pState, char* path) { pState->pTdbState->rocksdb = db; pState->pTdbState->pHandle = cfHandle; pState->pTdbState->wopts = rocksdb_writeoptions_create(); + // rocksdb_writeoptions_ + rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->wopts, 1); pState->pTdbState->ropts = rocksdb_readoptions_create(); return 0; } @@ -341,6 +343,8 @@ void streamCleanBackend(SStreamState* pState) { for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); } + rocksdb_writeoptions_destroy(pState->pTdbState->wopts); + rocksdb_readoptions_destroy(pState->pTdbState->ropts); rocksdb_close(pState->pTdbState->rocksdb); } @@ -352,7 +356,18 @@ int streamGetInit(const char* funcName) { } return -1; } - +bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) { + bool valid = false; + rocksdb_iter_seek(iter, buf, len); + if (!rocksdb_iter_valid(iter)) { + rocksdb_iter_seek_for_prev(iter, buf, len); + if (!rocksdb_iter_valid(iter)) { + return valid; + } + } + valid = true; + return valid; +} #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ do { \ code = 0; \ @@ -542,14 +557,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta // char toString[128] = {0}; // stateSessionKeyToString(&sKey, toString); // qWarn("streamState seek key %s", toString); - - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } + bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); + if (valid == false) { + streamStateFreeCur(pCur); + return NULL; } int32_t c = 0; @@ -582,13 +593,18 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); - rocksdb_iter_seek(pCur->iter, (const char*)buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } + // rocksdb_iter_seek(pCur->iter, (const char*)buf, len); + // if (!rocksdb_iter_valid(pCur->iter)) { + // rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + // if (!rocksdb_iter_valid(pCur->iter)) { + // streamStateFreeCur(pCur); + // return NULL; + // } + // } + bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); + if (valid == false) { + streamStateFreeCur(pCur); + return NULL; } size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); @@ -617,14 +633,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con char buf[128] = {0}; int len = stateSessionKeyEncode(&sKey, buf); - rocksdb_iter_seek(pCur->iter, (const char*)buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } + + bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); + if (valid == false) { + streamStateFreeCur(pCur); + return NULL; } + size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -721,14 +736,13 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } + + bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); + if (valid == false) { + streamStateFreeCur(pCur); + return NULL; } + if (rocksdb_iter_valid(pCur->iter)) { size_t kLen; SWinKey curKey; @@ -837,13 +851,10 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; int len = stateKeyEncode((void*)&sKey, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } + + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return NULL; } if (rocksdb_iter_valid(pCur->iter)) { @@ -871,14 +882,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } + + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return NULL; } + { SWinKey curKey; size_t kLen = 0; @@ -905,13 +914,9 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return NULL; } { @@ -956,13 +961,10 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes int32_t c = 0; char buf[128] = {0}; int len = stateSessionKeyEncode(&sKey, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return -1; - } + + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return -1; } int32_t kLen;