diff --git a/include/common/tglobal.h b/include/common/tglobal.h index a891b1db67..c5200fb21d 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -167,6 +167,7 @@ extern int32_t tsRpcRetryInterval; extern bool tsDisableStream; extern int64_t tsStreamBufferSize; +extern int64_t tsCheckpointInterval; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 5caba81fde..b8d6e7aa8e 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -81,6 +81,8 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +bool streamStateCheck(SStreamState* pState, const SWinKey* key); +int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); void streamStateSetNumber(SStreamState* pState, int32_t number); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 1320dcd4ec..9d8a4ae186 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -35,13 +35,16 @@ typedef struct SRowBuffPos { typedef SList SStreamSnapshot; -typedef bool (*ExpiredFun)(void*, TSKEY); +typedef TSKEY (*GetTsFun)(void*); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile); -void destroyStreamFileState(SStreamFileState* pFileState); +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark); +void streamFileStateDestroy(SStreamFileState* pFileState); +void streamFileStateClear(SStreamFileState* pFileState); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); -void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos); +int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); +int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); +bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4917ebe3d3..17b9ebb65b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -197,6 +197,7 @@ char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udf char tsUdfdLdLibPath[512] = ""; bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; +int64_t tsCheckpointInterval = 24 * 60 * 60 * 1000; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -497,7 +498,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; - if (cfgAddBool(pCfg, "streamBufferSize", tsStreamBufferSize, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, 0) != 0) return -1; GRANT_CFG_ADD; return 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6d861e2b03..b06eaca6b2 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -860,13 +860,9 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); bool groupbyTbname(SNodeList* pGroupList); -int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, - SGroupResInfo* pGroupResInfo); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); -int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, - SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4df52982ad..606816d853 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2694,144 +2694,12 @@ int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, // taosMemoryFree(buf); return TSDB_CODE_SUCCESS; } -int32_t getOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow** pResult, int32_t* resSize) { - char* pVal = NULL; - int32_t size = 0; - int32_t code = streamStateGet(pState, pKey, (void**)&pVal, &size); - if (code != 0) { - return 0; - } - *pResult = (SResultRow*)pVal; - // memcpy((char*)*pResult, (char*)pVal, size); - // int tlen = resultRowDecode((void**)pResult, size, pVal); - *resSize = size; - return code; -} - -int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - // qWarn("streamStateAddIfNotExist"); - char* tVal = NULL; - int32_t size = 0; - int32_t code = streamStateGet(pState, key, (void**)&tVal, &size); - if (code != 0) { - *pVal = taosMemoryCalloc(1, *pVLen); - } else { - *pVal = (void*)tVal; - // resultRowDecode((void**)pVal, size, tVal); - *pVLen = size; - } - return 0; -} - -int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, - SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { - SWinKey key = { - .ts = win->skey, - .groupId = tableGroupId, - }; - char* value = NULL; - int32_t size = pAggSup->resultRowSize; - - if (streamStateAddIfNotExist2(pState, &key, (void**)&value, &size) < 0) { - return TSDB_CODE_OUT_OF_MEMORY; - } else { - // getOutputBuf(pState, &key, (SResultRow**)&value, &size); - } - *pResult = (SResultRow*)value; - // set time window for current result - (*pResult)->win = (*win); - setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset); - return TSDB_CODE_SUCCESS; -} int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) { streamStateReleaseBuf(pState, pKey, pResult); return TSDB_CODE_SUCCESS; } -int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, - SGroupResInfo* pGroupResInfo) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprInfo* pExprInfo = pSup->pExprInfo; - int32_t numOfExprs = pSup->numOfExprs; - int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; - SqlFunctionCtx* pCtx = pSup->pCtx; - - int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); - - for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { - SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i); - int32_t size = 0; - void* pVal = NULL; - int32_t code = getOutputBuf(pState, pKey, (SResultRow**)&pVal, &size); - // streamStateGet(pState, pKey, &pVal, &size); - ASSERT(code == 0); - SResultRow* pRow = (SResultRow*)pVal; - doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); - // no results, continue to check the next one - if (pRow->numOfRows == 0) { - pGroupResInfo->index += 1; - releaseOutputBuf(pState, pKey, pRow); - continue; - } - if (pBlock->info.id.groupId == 0) { - pBlock->info.id.groupId = pKey->groupId; - void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { - pBlock->info.parTbName[0] = 0; - } else { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } - streamFreeVal(tbname); - } else { - // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.id.groupId != pKey->groupId) { - releaseOutputBuf(pState, pKey, pRow); - break; - } - } - - if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - ASSERT(pBlock->info.rows > 0); - releaseOutputBuf(pState, pKey, pRow); - break; - } - pGroupResInfo->index += 1; - - for (int32_t j = 0; j < numOfExprs; ++j) { - int32_t slotId = pExprInfo[j].base.resSchema.slotId; - - pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); - SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; - qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, - pEnryInfo->isNullRes, pEnryInfo->numOfRes); - if (pCtx[j].fpSet.finalize) { - int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - if (TAOS_FAILED(code1)) { - qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); - T_LONG_JMP(pTaskInfo->env, code1); - } - } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { - // do nothing, todo refactor - } else { - // expand the result into multiple rows. E.g., _wstart, top(k, 20) - // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } - } - } - - pBlock->info.rows += pRow->numOfRows; - releaseOutputBuf(pState, pKey, pRow); - } - pBlock->info.dataLoad = 1; - blockDataUpdateTsWindow(pBlock, 0); - return TSDB_CODE_SUCCESS; -} - int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) { streamStateSessionPut(pState, key, (const void*)buf, size); releaseOutputBuf(pState, NULL, (SResultRow*)buf); @@ -2919,7 +2787,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta pBlock->info.dataLoad = 1; pBlock->info.rows += pRow->numOfRows; - // saveSessionDiscBuf(pState, pKey, pVal, size); releaseOutputBuf(pState, NULL, pRow); } blockDataUpdateTsWindow(pBlock, 0); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8ba152ef1b..14521df69f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -844,14 +844,15 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } -static int32_t saveWinResult(int64_t ts, uint64_t groupId, SSHashObj* pUpdatedMap) { - SWinKey key = {.ts = ts, .groupId = groupId}; - tSimpleHashPut(pUpdatedMap, &key, sizeof(SWinKey), NULL, 0); +static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { + tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES); return TSDB_CODE_SUCCESS; } -static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SSHashObj* pUpdatedMap) { - return saveWinResult(ts, groupId, pUpdatedMap); +static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { + SWinKey key = {.ts = ts, .groupId = groupId}; + saveWinResult(&key, pPos, pUpdatedMap); + return TSDB_CODE_SUCCESS; } static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) { @@ -1397,7 +1398,7 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) { SWinKey* pKey = tSimpleHashGetKey(pIte, NULL); uint64_t groupId = pKey->groupId; TSKEY ts = pKey->ts; - int32_t code = saveWinResult(ts, groupId, resWins); + int32_t code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1443,7 +1444,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp } if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins); + int32_t code = saveWinResult(pWinKey, *(SRowBuffPos**)pIte, closeWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1492,6 +1493,8 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, if (code == TSDB_CODE_SUCCESS) { *key = next; tw = getFinalTimeWindow(key->ts, pInterval); + } else { + break; } } @@ -1604,6 +1607,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); + streamFileStateDestroy(pInfo->pState->pFileState); taosMemoryFreeClear(pInfo->pState); if (pInfo->pChildren) { @@ -2122,7 +2126,27 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { - return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0); + return streamStateCheck(pState, pKey); +} + +int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId, + SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { + SWinKey key = { + .ts = win->skey, + .groupId = groupId, + }; + char* value = NULL; + int32_t size = pAggSup->resultRowSize; + + if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *pResult = (SRowBuffPos*)value; + SResultRow* res = (SResultRow*)((*pResult)->pRowBuff); + // set time window for current result + res-> win = (*win); + setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); + return TSDB_CODE_SUCCESS; } static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) { @@ -2135,9 +2159,10 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S return; } for (int32_t i = 0; i < size; i++) { - SWinKey* pWinRes = taosArrayGet(pWinArray, i); - SResultRow* pCurResult = NULL; - STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval); + SWinKey* pWinRes = taosArrayGet(pWinArray, i); + SRowBuffPos* pCurResPos = NULL; + SResultRow* pCurResult = NULL; + STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval); if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) { continue; } @@ -2152,25 +2177,27 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S continue; } if (num == 0) { - int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, + int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); - ASSERT(pCurResult != NULL); + ASSERT(pCurResPos != NULL); + pCurResult = (SResultRow*) pCurResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } + } num++; - SResultRow* pChResult = NULL; - setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, + SRowBuffPos* pChResPos = NULL; + SResultRow* pChResult = NULL; + setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); + pChResult = (SResultRow*) pChResPos->pRowBuff; updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); - releaseOutputBuf(pChInfo->pState, pWinRes, pChResult); } if (num > 0 && pUpdatedMap) { - saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap); + saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pCurResPos, pUpdatedMap); saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize); - releaseOutputBuf(pInfo->pState, pWinRes, pCurResult); } } } @@ -2185,11 +2212,10 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; - if (streamStateGet(pState, &key, NULL, 0) == TSDB_CODE_SUCCESS) { - qWarn("get from dele"); - return false; + if (streamStateCheck(pState, &key)) { + return true; } - return true; + return false; } return false; } @@ -2313,6 +2339,87 @@ static void clearFunctionContext(SExprSupp* pSup) { } } +int32_t getOutputBuf(SStreamState* pState, SRowBuffPos* pPos, SResultRow** pResult) { + return streamStateGetByPos(pState, pPos, (void**)pResult); +} + +int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, + SGroupResInfo* pGroupResInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprInfo* pExprInfo = pSup->pExprInfo; + int32_t numOfExprs = pSup->numOfExprs; + int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + SqlFunctionCtx* pCtx = pSup->pCtx; + + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + + for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { + SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, i); + SResultRow* pRow = NULL; + int32_t code = getOutputBuf(pState, pPos, &pRow); + uint64_t groupId = ((SWinKey*)pPos->pKey)->groupId; + ASSERT(code == 0); + doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); + // no results, continue to check the next one + if (pRow->numOfRows == 0) { + pGroupResInfo->index += 1; + continue; + } + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = groupId; + void* tbname = NULL; + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { + pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } + streamFreeVal(tbname); + } else { + // current value belongs to different group, it can't be packed into one datablock + if (pBlock->info.id.groupId != groupId) { + break; + } + } + + if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + ASSERT(pBlock->info.rows > 0); + break; + } + pGroupResInfo->index += 1; + + for (int32_t j = 0; j < numOfExprs; ++j) { + int32_t slotId = pExprInfo[j].base.resSchema.slotId; + + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); + SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; + qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, + pEnryInfo->isNullRes, pEnryInfo->numOfRes); + if (pCtx[j].fpSet.finalize) { + int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(code1)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); + T_LONG_JMP(pTaskInfo->env, code1); + } + } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + // do nothing, todo refactor + } else { + // expand the result into multiple rows. E.g., _wstart, top(k, 20) + // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } + } + } + + pBlock->info.rows += pRow->numOfRows; + } + pBlock->info.dataLoad = 1; + blockDataUpdateTsWindow(pBlock, 0); + return TSDB_CODE_SUCCESS; +} + void doBuildStreamIntervalResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2350,6 +2457,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p int32_t numOfOutput = pSup->numOfExprs; int32_t step = 1; TSKEY* tsCols = NULL; + SRowBuffPos* pResPos = NULL; SResultRow* pResult = NULL; int32_t forwardRows = 0; @@ -2413,8 +2521,9 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } } - int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput, + int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); + pResult = (SResultRow*) pResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } @@ -2424,28 +2533,24 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } + + SWinKey key = { + .ts = pResult->win.skey, + .groupId = groupId, + }; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { - saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap); + saveWinResult(&key, pResPos, pUpdatedMap); } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - SWinKey key = { - .ts = pResult->win.skey, - .groupId = groupId, - }; - tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); + tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pSDataBlock->info.rows, numOfOutput); - SWinKey key = { - .ts = nextWin.skey, - .groupId = groupId, - }; + key.ts = nextWin.skey; - saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize); - releaseOutputBuf(pInfo->pState, &key, pResult); if (pInfo->delKey.ts > key.ts) { pInfo->delKey = key; } @@ -2475,6 +2580,27 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } } +static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { + SRowBuffPos* pos1 = *(SRowBuffPos**)pKey1; + SRowBuffPos* pos2 = *(SRowBuffPos**)pKey2; + SWinKey* pWin1 = (SWinKey*)pos1->pKey; + SWinKey* pWin2 = (SWinKey*)pos2->pKey; + + if (pWin1->groupId > pWin2->groupId) { + return 1; + } else if (pWin1->groupId < pWin2->groupId) { + return -1; + } + + if (pWin1->ts > pWin2->ts) { + return 1; + } else if (pWin1->ts < pWin2->ts) { + return -1; + } + + return 0; +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2536,7 +2662,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey)); + pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -2640,7 +2766,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; - taosArraySort(pInfo->pUpdated, winKeyCmprImpl); + taosArraySort(pInfo->pUpdated, winPosCmprImpl); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; @@ -2678,6 +2804,11 @@ int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { return deleteMark; } +TSKEY compareTs(void* pKey) { + SWinKey* pWinKey = (SWinKey*) pKey; + return pWinKey->ts; +} + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -2783,6 +2914,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, + pInfo->pState, pInfo->twAggSup.deleteMark); pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -4761,7 +4894,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey)); + pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -4822,10 +4955,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { - SWinKey* pKey = tSimpleHashGetKey(pIte, NULL); - taosArrayPush(pInfo->pUpdated, pKey); + taosArrayPush(pInfo->pUpdated, pIte); } - taosArraySort(pInfo->pUpdated, winKeyCmprImpl); + taosArraySort(pInfo->pUpdated, winPosCmprImpl); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; @@ -4854,11 +4986,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return NULL; } -bool compareTs(void* pKey, TSKEY mark) { - SWinKey* pWinKey = (SWinKey*) pKey; - return pWinKey->ts < mark; -} - SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); @@ -4945,7 +5072,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, pInfo->pState); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, + pInfo->pState, pInfo->twAggSup.deleteMark); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 557c000a87..47c91ae090 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -132,6 +132,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int } qWarn("open stream state2, %s", statePath); pState->pTdbState->pOwner = pTask; + pState->pFileState = NULL; return pState; #else @@ -297,20 +298,32 @@ int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* val #endif } -// todo refactor int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen); - // return streamStateGet_rocksdb(pState, key, pVal, pVLen); #else SStateKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); #endif } + +bool streamStateCheck(SStreamState* pState, const SWinKey* key) { + #ifdef USE_ROCKSDB + return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey)); +#else + SStateKey sKey = {.key = *key, .opNum = pState->number}; + return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); +#endif +} + +int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) { + return getRowBuffByPos(pState->pFileState, pos, pVal); +} + // todo refactor int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { #ifdef USE_ROCKSDB - return streamStateDel_rocksdb(pState, key); + return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey)); #else SStateKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn); @@ -346,6 +359,7 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear(SStreamState* pState) { #ifdef USE_ROCKSDB + streamFileStateClear(pState->pFileState); return streamStateClear_rocksdb(pState); #else SWinKey key = {.ts = 0, .groupId = 0}; @@ -369,7 +383,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen); + return streamStateGet(pState, key, pVal, pVLen); #else // todo refactor int32_t size = *pVLen; @@ -1040,6 +1054,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal void streamStateDestroy(SStreamState* pState) { #ifdef USE_ROCKSDB + streamFileStateDestroy(pState->pFileState); streamStateDestroy_rocksdb(pState); // do nothong #endif diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 81dc2f78c3..85b674debe 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -35,14 +35,15 @@ struct SStreamFileState { uint64_t checkPointVersion; TSKEY maxTs; TSKEY deleteMark; + TSKEY flushMark; uint64_t maxRowCount; uint64_t curRowCount; - ExpiredFun expFunc; + GetTsFun getTs; }; typedef SRowBuffPos SRowBuffInfo; -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile) { +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -65,17 +66,20 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, Expired pFileState->preCheckPointVersion = 0; pFileState->checkPointVersion = 1; pFileState->pFileStore = pFile; - pFileState->expFunc = fp; + pFileState->getTs = fp; pFileState->maxRowCount = memSize / rowSize; pFileState->curRowCount = 0; + pFileState->deleteMark = delMark; + pFileState->flushMark = -1; return pFileState; _error: - destroyStreamFileState(pFileState); + streamFileStateDestroy(pFileState); return NULL; } void destroyRowBuffPos(SRowBuffPos* pPos) { + taosMemoryFreeClear(pPos->pKey); taosMemoryFreeClear(pPos->pRowBuff); taosMemoryFree(pPos); } @@ -84,32 +88,50 @@ void destroyRowBuffPosPtr(void* ptr) { if (!ptr) { return; } - void* tmp = *(void**)ptr; - SRowBuffPos* pPos = (SRowBuffPos*)tmp; + SRowBuffPos* pPos = *(SRowBuffPos**)ptr; destroyRowBuffPos(pPos); } -void destroyStreamFileState(SStreamFileState* pFileState) { - tdListFreeP(pFileState->usedBuffs, destroyRowBuffPosPtr); - tdListFreeP(pFileState->freeBuffs, taosMemoryFree); - tSimpleHashCleanup(pFileState->rowBuffMap); +void destroyRowBuff(void* ptr) { + if (!ptr) { + return; + } + taosMemoryFree(*(void**)ptr); } -void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts) { +void streamFileStateDestroy(SStreamFileState* pFileState) { + if (!pFileState) { + return; + } + tdListFreeP(pFileState->usedBuffs, destroyRowBuffPosPtr); + tdListFreeP(pFileState->freeBuffs, destroyRowBuff); + tSimpleHashCleanup(pFileState->rowBuffMap); + taosMemoryFree(pFileState); +} + +void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { - SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - if (pFileState->expFunc(pPos->pKey, ts)) { - tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff); + SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data); + if (all || (pFileState->getTs(pPos->pKey) usedBuffs, pNode); + taosMemoryFreeClear(pNode); + tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); pPos->pRowBuff = NULL; + tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); destroyRowBuffPos(pPos); } } } +void streamFileStateClear(SStreamFileState* pFileState) { + tSimpleHashClear(pFileState->rowBuffMap); + clearExpiredRowBuff(pFileState, 0, true); +} + int32_t flushRowBuff(SStreamFileState* pFileState) { SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES); if (!pFlushList) { @@ -125,6 +147,8 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (!pPos->beUsed) { tdListAppend(pFlushList, &pPos); + pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); + tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); i++; } } @@ -133,19 +157,20 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { } int32_t clearRowBuff(SStreamFileState* pFileState) { - clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark); + clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); if (isListEmpty(pFileState->freeBuffs)) { return flushRowBuff(pFileState); } return TSDB_CODE_SUCCESS; } -void* getFreeBuff(SList* lists) { +void* getFreeBuff(SList* lists, int32_t buffSize) { SListNode* pNode = tdListPopHead(lists); if (!pNode) { return NULL; } void* ptr = *(void**)pNode->data; + memset(ptr, 0, buffSize); taosMemoryFree(pNode); return ptr; } @@ -153,7 +178,7 @@ void* getFreeBuff(SList* lists) { SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); tdListAppend(pFileState->usedBuffs, &pPos); - void* pBuff = getFreeBuff(pFileState->freeBuffs); + void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); if (pBuff) { pPos->pRowBuff = pBuff; return pPos; @@ -170,40 +195,72 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { int32_t code = clearRowBuff(pFileState); ASSERT(code == 0); - pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs); + pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); return pPos; } int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey)); SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen); if (pos) { - *pVLen = pFileState->rowSize; - *pVal = *pos; + if (pVal) { + *pVLen = pFileState->rowSize; + *pVal = *pos; + } return TSDB_CODE_SUCCESS; } SRowBuffPos* pNewPos = getNewRowPos(pFileState); ASSERT(pNewPos);// todo(liuyao) delete + pNewPos->pKey = taosMemoryCalloc(1, keyLen); + memcpy(pNewPos->pKey, pKey, keyLen); + + TSKEY ts = pFileState->getTs(pKey); + if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) { + int32_t len = 0; + void *pVal = NULL; + streamStateGet_rocksdb(pFileState->pFileStore, pKey, pVal, &len); + memcpy(pNewPos->pRowBuff, pVal, len); + taosMemoryFree(pVal); + } + tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES); - *pVLen = pFileState->rowSize; - *pVal = pNewPos; + if (pVal) { + *pVLen = pFileState->rowSize; + *pVal = pNewPos; + } return TSDB_CODE_SUCCESS; } -void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos) { +int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { + int32_t code_buff = tSimpleHashRemove(pFileState->rowBuffMap, pKey, keyLen); + int32_t code_rocks = streamStateDel_rocksdb(pFileState->pFileStore, pKey); + return code_buff == TSDB_CODE_SUCCESS ? code_buff : code_rocks; +} + +int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) { if (pPos->pRowBuff) { - return pPos->pRowBuff; + (*pVal) = pPos->pRowBuff; + return TSDB_CODE_SUCCESS; } int32_t code = clearRowBuff(pFileState); ASSERT(code == 0); - pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs); + pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); - void* pVal = NULL; int32_t len = 0; - streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pVal, &len); + streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, pVal, &len); memcpy(pPos->pRowBuff, pVal, len); taosMemoryFree(pVal); - return pPos->pRowBuff; + (*pVal) = pPos->pRowBuff; + return TSDB_CODE_SUCCESS; +} + +bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { + SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen); + if (pos) { + return true; + } + return false; } void releaseRowBuffPos(SRowBuffPos* pBuff) { @@ -211,7 +268,7 @@ void releaseRowBuffPos(SRowBuffPos* pBuff) { } SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { - clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark); + clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); return pFileState->usedBuffs; } @@ -229,7 +286,5 @@ int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize) } int32_t recoverSnapshot(SStreamFileState* pFileState) { - // 设置一个时间戳标记,小于这个时间戳的,如果缓存里没有,需要从rocks db里读取状态,大于这个时间戳的,不需要 - // 这个还需要考虑一下,如果rocks db中也没有,说明真的是新的,那么这次读取是冗余的。 return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/tests/script/tsim/stream/distributeIntervalRetrive0.sim b/tests/script/tsim/stream/distributeIntervalRetrive0.sim index 529a2a1b30..5b8f4e50e8 100644 --- a/tests/script/tsim/stream/distributeIntervalRetrive0.sim +++ b/tests/script/tsim/stream/distributeIntervalRetrive0.sim @@ -2,7 +2,7 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode2 -i 2 -system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode1 -s start #==system sh/exec.sh -n dnode1 -s start -v sleep 50 @@ -275,37 +275,4 @@ endi print loop4 over -#==system sh/exec.sh -n dnode1 -s stop -x SIGINT -#==print =============== check -#==$null= - -#==system_content sh/checkValgrind.sh -n dnode1 -#==print cmd return result ----> [ $system_content ] -#==if $system_content > 0 then -#== return -1 -#==endi - -#==if $system_content == $null then -#== return -1 -#==endi - - - -#==system sh/exec.sh -n dnode2 -s stop -x SIGINT -#==print =============== check -#==$null= - -#==system_content sh/checkValgrind.sh -n dnode2 -#==print cmd return result ----> [ $system_content ] -#==if $system_content > 0 then -#== return -1 -#==endi - -#==if $system_content == $null then -#== return -1 -#==endi -#==return 1 - - - system sh/stop_dnodes.sh