From 8e409f81ae79a56618e68bf6127b2ae64c61a989 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Mar 2023 06:43:17 +0000 Subject: [PATCH] add backend --- source/libs/executor/inc/executil.h | 11 +- source/libs/executor/src/executorimpl.c | 188 +++++++++++++++++- source/libs/executor/src/timewindowoperator.c | 22 +- source/libs/function/src/builtinsimpl.c | 2 +- source/libs/stream/src/streamState.c | 4 +- source/libs/stream/src/streamStateRocksdb.c | 14 +- tests/script/tsim/stream/basic0.sim | 2 +- tests/script/tsim/stream/basic1.sim | 2 +- 8 files changed, 219 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f99c7de93d..119ae1eed7 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -15,6 +15,7 @@ #ifndef TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H +#include "executor.h" #include "function.h" #include "nodes.h" #include "plannodes.h" @@ -22,7 +23,6 @@ #include "tpagedbuf.h" #include "tsimplehash.h" #include "vnode.h" -#include "executor.h" #define T_LONG_JMP(_obj, _c) \ do { \ @@ -37,7 +37,7 @@ memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ } while (0) -#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) +#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) @@ -67,7 +67,7 @@ typedef struct SResultRowPosition { typedef struct SResKeyPos { SResultRowPosition pos; uint64_t groupId; - char key[]; + char key[]; } SResKeyPos; typedef struct SResultRowInfo { @@ -88,12 +88,13 @@ typedef struct SColMatchInfo { int32_t matchType; // determinate the source according to col id or slot id } SColMatchInfo; -typedef struct SExecTaskInfo SExecTaskInfo; +typedef struct SExecTaskInfo SExecTaskInfo; typedef struct STableListInfo STableListInfo; struct SqlFunctionCtx; int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo); + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, + SExecTaskInfo* pTaskInfo); STableListInfo* tableListCreate(); void* tableListDestroy(STableListInfo* pTableListInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 29dad17b31..7b2378e273 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1065,8 +1065,12 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset); + qWarn("offset: idx: %d, val: %d", j, rowEntryOffset[j]); if (!isRowEntryInitialized(pResInfo)) { + qWarn("no result"); continue; + } else { + qWarn("has result"); } if (pRow->numOfRows < pResInfo->numOfRes) { @@ -2569,6 +2573,158 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf return TSDB_CODE_SUCCESS; } +void resultRowToString(void* row, int32_t size, char* buf) { + SResultRow* p = row; + int32_t len = 0; + len += sprintf(buf + len, + "pageId:%d, offset:%d, startInterp:%d, endInterp:%d, closed:%d, numOfRows:%d, skey:%" PRId64 + ", ekey:%" PRId64, + p->pageId, p->offset, p->startInterp, p->endInterp, p->closed, p->numOfRows, p->win.skey, p->win.ekey); + + int32_t numOfEntryInfo = (size - sizeof(SResultRow)) / sizeof(struct SResultRowEntryInfo); + len += sprintf(buf + len, ", entryInfo size:%d", numOfEntryInfo); + for (int i = 0; i < numOfEntryInfo; i++) { + SResultRowEntryInfo* pInfo = &p->pEntryInfo[i]; + if (len >= 200 * size - 64 || i >= 5) { + break; + } + len += sprintf(buf + len, "[inited:%d, complete:%d, nullRes:%d, numOfRes:%d]", pInfo->initialized, pInfo->complete, + pInfo->isNullRes, pInfo->numOfRes); + } +} +/* + * + */ + +int32_t resultRowEncode(void* k, int32_t* size, char* buf) { + // SResultRow* key = k; + // int len = 0; + // int struLen = *size; + // len += taosEncodeFixedI32((void**)&buf, key->pageId); + + // uint32_t offset = key->offset; + // len += taosEncodeFixedU32((void**)&buf, offset); + + // len += taosEncodeFixedI8((void**)&buf, key->startInterp); + // len += taosEncodeFixedI8((void**)&buf, key->endInterp); + // len += taosEncodeFixedI8((void**)&buf, key->closed); + // len += taosEncodeFixedU32((void**)&buf, key->numOfRows); + + // len += taosEncodeFixedI64((void**)&buf, key->win.skey); + // len += taosEncodeFixedI64((void**)&buf, key->win.ekey); + + // int32_t numOfEntryInfo = (struLen - sizeof(SResultRow)) / sizeof(struct SResultRowEntryInfo); + // len += taosEncodeFixedI32((void**)&buf, numOfEntryInfo); + // for (int i = 0; i < numOfEntryInfo; i++) { + // SResultRowEntryInfo* p = &key->pEntryInfo[i]; + + // uint8_t value = p->initialized ? 1 : 0; + // len += taosEncodeFixedU8((void**)&buf, value); + + // value = p->complete ? 1 : 0; + // len += taosEncodeFixedU8((void**)&buf, value); + + // value = p->isNullRes; + // len += taosEncodeFixedU8((void**)&buf, value); + + // len += taosEncodeFixedU16((void**)&buf, p->numOfRes); + // } + // { + // char* strBuf = taosMemoryCalloc(1, *size * 100); + // resultRowToString(key, *size, strBuf); + // qWarn("encode result row:%s", strBuf); + // } + + // return len; + return 0; +} + +int32_t resultRowDecode(void** k, size_t size, char* buf) { + // char* p1 = buf; + // int32_t numOfEntryInfo = 0; + // uint32_t entryOffset = sizeof(int32_t) + sizeof(uint32_t) + sizeof(int8_t) + sizeof(int8_t) + sizeof(int8_t) + + // sizeof(uint32_t) + sizeof(int64_t) + sizeof(int64_t); + // taosDecodeFixedI32(p1 + entryOffset, &numOfEntryInfo); + + // char* p = buf; + // size = sizeof(SResultRow) + numOfEntryInfo * sizeof(SResultRowEntryInfo); + // SResultRow* key = taosMemoryCalloc(1, size); + + // p = taosDecodeFixedI32(p, (int32_t*)&key->pageId); + // uint32_t offset = 0; + // p = taosDecodeFixedU32(p, &offset); + // key->offset = offset; + + // p = taosDecodeFixedI8(p, (int8_t*)(&key->startInterp)); + // p = taosDecodeFixedI8(p, (int8_t*)(&key->endInterp)); + // p = taosDecodeFixedI8(p, (int8_t*)&key->closed); + // p = taosDecodeFixedU32(p, &key->numOfRows); + + // p = taosDecodeFixedI64(p, &key->win.skey); + // p = taosDecodeFixedI64(p, &key->win.ekey); + // p = taosDecodeFixedI32(p, &numOfEntryInfo); + // for (int i = 0; i < numOfEntryInfo; i++) { + // SResultRowEntryInfo* pInfo = &key->pEntryInfo[i]; + // uint8_t value = 0; + // p = taosDecodeFixedU8(p, &value); + // pInfo->initialized = (value == 1) ? true : false; + + // p = taosDecodeFixedU8(p, &value); + // pInfo->complete = (value == 1) ? true : false; + + // p = taosDecodeFixedU8(p, &value); + // pInfo->isNullRes = value; + + // p = taosDecodeFixedU16(p, &pInfo->numOfRes); + // } + // *k = key; + + // { + // char* strBuf = taosMemoryCalloc(1, size * 100); + // resultRowToString(key, size, strBuf); + // qWarn("decode result row:%s", strBuf); + // } + // return size; + return 0; +} + +int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { + // char* buf = taosMemoryCalloc(1, resSize * 10); + // int len = resultRowEncode((void*)pResult, &resSize, buf); + // char* buf = taosMemoryCalloc(1, resSize); + // memcpy(buf, pResult, resSize); + streamStatePut(pState, pKey, (char*)pResult, resSize); + // taosMemoryFree(buf); + return TSDB_CODE_SUCCESS; +} +int32_t getOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow** pResult, int32_t* resSize) { + char* pVal = NULL; + int32_t size = 0; + int32_t code = streamStateGet(pState, pKey, (void**)&pVal, &size); + if (code != 0) { + return 0; + } + *pResult = (SResultRow*)pVal; + // memcpy((char*)*pResult, (char*)pVal, size); + // int tlen = resultRowDecode((void**)pResult, size, pVal); + *resSize = size; + return code; +} + +int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + qWarn("streamStateAddIfNotExist"); + char* tVal = NULL; + int32_t size = 0; + int32_t code = streamStateGet(pState, key, (void**)&tVal, &size); + if (code != 0) { + *pVal = taosMemoryCalloc(1, *pVLen); + } else { + *pVal = (void*)tVal; + // resultRowDecode((void**)pVal, size, tVal); + *pVLen = size; + } + return 0; +} int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { @@ -2579,8 +2735,10 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul char* value = NULL; int32_t size = pAggSup->resultRowSize; - if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) { + if (streamStateAddIfNotExist2(pState, &key, (void**)&value, &size) < 0) { return TSDB_CODE_OUT_OF_MEMORY; + } else { + // getOutputBuf(pState, &key, (SResultRow**)&value, &size); } *pResult = (SResultRow*)value; // set time window for current result @@ -2594,12 +2752,6 @@ int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResul return TSDB_CODE_SUCCESS; } -int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { - qWarn("write to stream state"); - streamStatePut(pState, pKey, pResult, resSize); - return TSDB_CODE_SUCCESS; -} - int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2614,30 +2766,38 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i); int32_t size = 0; void* pVal = NULL; - int32_t code = streamStateGet(pState, pKey, &pVal, &size); + int32_t code = getOutputBuf(pState, pKey, (SResultRow**)&pVal, &size); + // streamStateGet(pState, pKey, &pVal, &size); ASSERT(code == 0); SResultRow* pRow = (SResultRow*)pVal; doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); // no results, continue to check the next one + qWarn("indx 1"); if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; + qWarn("indx 2"); releaseOutputBuf(pState, pKey, pRow); continue; } - + qWarn("indx 3"); if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = pKey->groupId; void* tbname = NULL; + qWarn("indx 4"); if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { + qWarn("indx 5"); pBlock->info.parTbName[0] = 0; } else { + qWarn("indx 6"); memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } + qWarn("indx 7"); streamFreeVal(tbname); } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.id.groupId != pKey->groupId) { releaseOutputBuf(pState, pKey, pRow); + qWarn("indx 8"); break; } } @@ -2647,28 +2807,36 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat releaseOutputBuf(pState, pKey, pRow); break; } - + qWarn("indx 10"); pGroupResInfo->index += 1; for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; + qWarn("indx 10"); pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); + SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; + qWarn("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, pEnryInfo->isNullRes, + pEnryInfo->numOfRes); if (pCtx[j].fpSet.finalize) { + qWarn("indx 14"); int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code1)) { qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); T_LONG_JMP(pTaskInfo->env, code1); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + qWarn("indx 11"); // do nothing, todo refactor } else { // expand the result into multiple rows. E.g., _wstart, top(k, 20) // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + qWarn("indx 12"); for (int32_t k = 0; k < pRow->numOfRows; ++k) { colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + qWarn("indx 13"); } } } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 65f9e6c850..71f9c4b581 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -16,10 +16,12 @@ #include "filter.h" #include "function.h" #include "functionMgt.h" +#include "query.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" +#include "tlog.h" #include "ttime.h" #define IS_FINAL_OP(op) ((op)->isFinal) @@ -2152,6 +2154,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S if (num == 0) { int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); + ASSERT(pCurResult != NULL); if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } @@ -2183,6 +2186,7 @@ bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pS if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; if (streamStateGet(pState, &key, NULL, 0) == TSDB_CODE_SUCCESS) { + qWarn("get from dele"); return false; } return true; @@ -2349,6 +2353,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p SResultRow* pResult = NULL; int32_t forwardRows = 0; + int stepTrace = 0; + qWarn("step1 %d", stepTrace++); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; @@ -2361,14 +2367,17 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); } while (1) { + qWarn("step1 %d", stepTrace++); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { + qWarn("step1 %d", stepTrace++); break; } continue; } + qWarn("step1 %d", stepTrace++); if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { bool ignore = true; @@ -2399,6 +2408,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ignore = false; } } + qWarn("step1 %d", stepTrace++); if (ignore) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); @@ -2408,23 +2418,27 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p continue; } } + qWarn("step1 %d", stepTrace++); int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + qWarn("step1 %d", stepTrace++); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - + qWarn("step1 %d", stepTrace++); if (IS_FINAL_OP(pInfo)) { forwardRows = 1; } else { forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } + qWarn("step1 %d", stepTrace++); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap); } + qWarn("step1 %d", stepTrace++); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { SWinKey key = { .ts = pResult->win.skey, @@ -2432,6 +2446,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p }; tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); } + + qWarn("step1 %d", stepTrace++); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pSDataBlock->info.rows, numOfOutput); @@ -2439,6 +2455,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .ts = nextWin.skey, .groupId = groupId, }; + + qWarn("step1 %d", stepTrace++); saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize); releaseOutputBuf(pInfo->pState, &key, pResult); if (pInfo->delKey.ts > key.ts) { @@ -2457,6 +2475,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); } } + qWarn("step1 %d", stepTrace++); if (IS_FINAL_OP(pInfo)) { startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); @@ -2465,6 +2484,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); } if (startPos < 0) { + qWarn("step1 %d", stepTrace++); break; } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fbc60041b2..ba8acef786 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -881,7 +881,7 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu } if (pCtx->saveHandle.pState) { - tdbFree((void*)p); + streamFreeVal((void*)p); } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 5efcb483bf..6453ff383d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -18,6 +18,7 @@ #include #include "executor.h" #include "osMemory.h" +#include "query.h" #include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "streamInc.h" @@ -288,7 +289,7 @@ int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const voi } int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateFuncGet(pState, key, pVal, pVLen); + return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen); #else return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); #endif @@ -398,6 +399,7 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { // todo refactor + qWarn("streamStateReleaseBuf"); if (!pVal) { return 0; } diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index a161b1b612..87cb3676be 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -395,12 +395,14 @@ int streamGetInit(const char* funcName) { rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)vLen, &err); \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)&len, &err); \ if (val == NULL) { \ qWarn("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ code = -1; \ } else { \ - *pVal = val; \ + if (pVal != NULL) *pVal = val; \ + if (vLen != NULL) *vLen = len; \ } \ if (err != NULL) { \ taosMemoryFree(err); \ @@ -754,9 +756,8 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* if (!pCur) { return -1; } - SStateSessionKey ktmp = {0}; - SStateSessionKey* pKTmp = &ktmp; - int32_t kLen, vLen; + SStateSessionKey ktmp = {0}; + int32_t kLen, vLen; if (!rocksdb_iter_valid(pCur->iter)) { return -1; @@ -764,7 +765,8 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); stateSessionKeyDecode((void*)&ktmp, (char*)curKey); - const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); + SStateSessionKey* pKTmp = &ktmp; + const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); if (pVal != NULL) *pVal = (char*)val; if (pVLen != NULL) *pVLen = vLen; diff --git a/tests/script/tsim/stream/basic0.sim b/tests/script/tsim/stream/basic0.sim index 68c5894cbf..f560a5df61 100644 --- a/tests/script/tsim/stream/basic0.sim +++ b/tests/script/tsim/stream/basic0.sim @@ -1,6 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c debugflag -v 131 +system sh/cfg.sh -n dnode1 -c qDebugflag -v 143 system sh/exec.sh -n dnode1 -s start sql connect diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index e69875d69f..86e8ff1f26 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -1,6 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode1 -s start sleep 50 sql connect