diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 15f3246013..043f2d1d12 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -196,7 +196,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 14b7fe5d0e..975d097205 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -203,6 +203,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co if (pSource->hasNull) { pColumnInfoData->hasNull = pSource->hasNull; } + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { // Handle the bitmap char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); @@ -1075,8 +1076,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { } } -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) { - if (0 == numOfRows) { +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) { + if (0 == numOfRows || numOfRows <= existRows) { return TSDB_CODE_SUCCESS; } @@ -1087,19 +1088,16 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) } pColumn->varmeta.offset = (int32_t*)tmp; - memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows); - - pColumn->varmeta.length = 0; - pColumn->varmeta.allocLen = 0; - taosMemoryFreeClear(pColumn->pData); + memset(&pColumn->varmeta.offset[existRows], 0, sizeof(int32_t) * (numOfRows - existRows)); } else { char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows)); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + int32_t oldLen = BitmapLen(existRows); pColumn->nullbitmap = tmp; - memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); + memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen); if (pColumn->info.type == TSDB_DATA_TYPE_NULL) { return TSDB_CODE_SUCCESS; @@ -1135,7 +1133,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - code = colInfoDataEnsureCapacity(p, numOfRows); + code = colInfoDataEnsureCapacity(p, pDataBlock->info.rows, numOfRows); if (code) { return code; } @@ -1180,7 +1178,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows); + int32_t code = colInfoDataEnsureCapacity(pDst, 0, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return NULL; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index bd8d54c437..12ba9026de 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -52,6 +52,10 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { if (pIter->len == 0) { pIter->len += sizeof(SSubmitReq); } else { + if (pIter->len >= pIter->totalLen) { + ASSERT(0); + } + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); ASSERT(pIter->len > 0); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9282f7197e..cf62ea8714 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -141,7 +141,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { colInfo.info.colId = pColSchema->colId; colInfo.info.type = pColSchema->type; - if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) { + if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ab933f840a..2ad34d1561 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -392,7 +392,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, SColumnInfoData colInfo = {{0}, 0}; colInfo.info = pCond->colList[i]; - int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity); + int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5ef1754913..230ccf6ace 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -342,7 +342,7 @@ typedef struct STableScanInfo { int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - double sampleRatio; // data block sample ratio + double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. } STableScanInfo; @@ -395,7 +395,6 @@ typedef struct SOptrBasicInfo { int32_t* rowCellInfoOffset; // offset value for each row result cell info SqlFunctionCtx* pCtx; SSDataBlock* pRes; - int32_t capacity; // TODO remove it } SOptrBasicInfo; // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset @@ -405,23 +404,29 @@ typedef struct SAggSupporter { SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; -typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; // basic info - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindow win; // query time range - bool timeWindowInterpo; // interpolation needed or not - char** pRow; // previous row/tuple of already processed datablock - SAggSupporter aggSup; // aggregate supporter - STableQueryInfo* pCurrent; // current tableQueryInfo struct - int32_t order; // current SSDataBlock scan order - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. +typedef struct STimeWindowSupp { + int8_t calTrigger; + int64_t waterMark; SColumnInfoData timeWindowData; // query time window info for scalar function execution. +} STimeWindowAggSupp; + +typedef struct STableIntervalOperatorInfo { + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + char** pRow; // previous row/tuple of already processed datablock + SAggSupporter aggSup; // aggregate supporter + STableQueryInfo* pCurrent; // current tableQueryInfo struct + int32_t order; // current SSDataBlock scan order + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. + STimeWindowAggSupp twAggSup; } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -439,19 +444,19 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; typedef struct SProjectOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SSDataBlock* existDataBlock; - SArray* pPseudoColInfo; - SLimit limit; - SLimit slimit; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SSDataBlock* existDataBlock; + SArray* pPseudoColInfo; + SLimit limit; + SLimit slimit; - uint64_t groupId; - int64_t curSOffset; - int64_t curGroupOutput; + uint64_t groupId; + int64_t curSOffset; + int64_t curGroupOutput; - int64_t curOffset; - int64_t curOutput; + int64_t curOffset; + int64_t curOutput; } SProjectOperatorInfo; typedef struct SFillOperatorInfo { @@ -466,10 +471,10 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct { - char* pData; - bool isNull; - int16_t type; - int32_t bytes; + char* pData; + bool isNull; + int16_t type; + int32_t bytes; } SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { @@ -488,9 +493,9 @@ typedef struct SGroupbyOperatorInfo { } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { - uint64_t groupId; - int64_t numOfRows; - SArray* pPageList; + uint64_t groupId; + int64_t numOfRows; + SArray* pPageList; } SDataGroupInfo; // The sort in partition may be needed later. @@ -505,9 +510,8 @@ typedef struct SPartitionOperatorInfo { SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data - - void* pGroupIter; // group iterator - int32_t pageIndex; // page index of current group + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -518,13 +522,13 @@ typedef struct SWindowRowsSup { } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + STimeWindowAggSupp twAggSup; } SSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { @@ -534,14 +538,14 @@ typedef struct STimeSliceOperatorInfo { } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - int32_t colIndex; // start row index - bool hasKey; - SStateKeys stateKey; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + int32_t colIndex; // start row index + bool hasKey; + SStateKeys stateKey; + STimeWindowAggSupp twAggSup; // bool reptScan; } SStateWindowOperatorInfo; @@ -602,7 +606,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); + SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); +void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, @@ -638,10 +643,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, - const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, @@ -654,7 +660,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4af7e563e6..4863b03fb9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -34,6 +34,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { + pOperator->status = OP_NOT_OPENED; + SStreamBlockScanInfo* pInfo = pOperator->info; // the block type can not be changed in the streamscan operators diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ed2454faae..87ecdde3ab 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -20,23 +20,19 @@ #include "tname.h" #include "os.h" -#include "parser.h" #include "tdatablock.h" -#include "texception.h" #include "tglobal.h" #include "tmsg.h" #include "tsort.h" #include "ttime.h" #include "executorimpl.h" -#include "function.h" #include "query.h" #include "tcompare.h" #include "tcompression.h" #include "thash.h" #include "vnode.h" #include "ttypes.h" -#include "vnode.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) @@ -836,7 +832,7 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; pColData->info.bytes = sizeof(int64_t); - colInfoDataEnsureCapacity(pColData, 5); + colInfoDataEnsureCapacity(pColData, 0, 5); colDataAppendInt64(pColData, 0, &pQueryWindow->skey); colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); @@ -1065,7 +1061,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc } ASSERT(!IS_VAR_DATA_TYPE(type)); - colInfoDataEnsureCapacity(pColInfo, numOfRows); + colInfoDataEnsureCapacity(pColInfo, 0, numOfRows); if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { int64_t v = pFuncParam->param.i; @@ -1101,6 +1097,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pInput->totalRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows; pInput->startRowIndex = 0; + + pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { if (createDummyCol) { @@ -1175,31 +1173,43 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.groupId = pSrcBlock->info.groupId; + int32_t numOfRows = 0; + for (int32_t k = 0; k < numOfOutput; ++k) { int32_t outputSlotId = pExpr[k].base.resSchema.slotId; SqlFunctionCtx* pfCtx = &pCtx[k]; if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); + if (pResult->info.rows > 0) { + colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); + } else { + colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); + } - pResult->info.rows = pSrcBlock->info.rows; + numOfRows = pfCtx->input.numOfRows; } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + + int32_t offset = pResult->info.rows; for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); + colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); } - pResult->info.rows = pSrcBlock->info.rows; + + numOfRows = pSrcBlock->info.rows; } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); - SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info}; + SScalarParam dest = {.columnData = &idata}; scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - pResult->info.rows = dest.numOfRows; + colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { ASSERT(!fmIsAggFunc(pfCtx->functionId)); @@ -1216,28 +1226,33 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); pfCtx->offset = pResult->info.rows; // set the start offset + // set the timestamp(_rowts) output buffer if (taosArrayGetSize(pPseudoList) > 0) { int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; } - int32_t numOfRows = pfCtx->fpSet.process(pfCtx); - pResult->info.rows += numOfRows; + numOfRows = pfCtx->fpSet.process(pfCtx); } else { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); - SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info}; + SScalarParam dest = {.columnData = &idata}; scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - pResult->info.rows = dest.numOfRows; + colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } } else { ASSERT(0); } } + + pResult->info.rows += numOfRows; } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, @@ -1503,8 +1518,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); - doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; while (1) { @@ -1535,8 +1550,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); - doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } if (pInfo->timeWindowInterpo) { @@ -1607,8 +1622,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j); @@ -1624,8 +1639,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -4453,7 +4468,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { SSortHandle* pHandle = pInfo->pSortHandle; SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false); - blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity); + blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity); while (1) { blockDataCleanup(pDataBlock); @@ -4465,7 +4480,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { // build datablock for merge for one group appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= pInfo->binfo.capacity) { + if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) { break; } } @@ -4500,7 +4515,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -4607,7 +4622,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pInfo->bufPageSize = 1024; pInfo->pSortInfo = pSortInfo; - pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); + pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; @@ -4879,6 +4894,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi if (!resultRow) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } + // add a new result set for a new group SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); @@ -4915,6 +4931,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } #if 0 if (pProjectInfo->existDataBlock) { // TODO refactor @@ -5106,8 +5126,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return NULL; } - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5127,7 +5147,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup } if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -5161,8 +5181,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); ASSERT(pInfo->binfo.pRes->info.rows > 0); pOperator->status = OP_RES_TO_RETURN; @@ -5263,8 +5283,8 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); OPTR_SET_OPENED(pOperator); - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5324,8 +5344,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j); @@ -5341,8 +5361,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { @@ -5355,7 +5375,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5386,8 +5406,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); - blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5404,7 +5424,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5435,8 +5455,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); - blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5614,15 +5634,23 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { } int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { + SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; - pBasicInfo->capacity = numOfRows; doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); return TSDB_CODE_SUCCESS; } +void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { + pOperator->resultInfo.capacity = numOfRows; + pOperator->resultInfo.threshold = numOfRows * 0.75; + + if (pOperator->resultInfo.threshold == 0) { + pOperator->resultInfo.capacity = numOfRows; + } +} + static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pTableQueryInfo == NULL) { @@ -5658,7 +5686,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfRows = 1; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -5808,7 +5838,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p int32_t numOfCols = num; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); @@ -5837,28 +5869,30 @@ _error: } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, - const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; - pInfo->execModel = pTaskInfo->execModel; - - pInfo->win = pTaskInfo->window; - pInfo->win.skey = 0; - pInfo->win.ekey = INT64_MAX; - - pInfo->primaryTsIndex = primaryTsSlot; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->execModel = pTaskInfo->execModel; + pInfo->win = pTaskInfo->window; + pInfo->win.skey = 0; + pInfo->win.ekey = INT64_MAX; + pInfo->primaryTsIndex = primaryTsSlotId; + pInfo->twAggSup = *pTwAggSupp; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); - initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { @@ -5927,7 +5961,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* return NULL; } -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSup, + SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5936,9 +5971,14 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pInfo->colIndex = -1; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, 4096); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->twAggSup = *pTwAggSup; + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->blockingOptr = true; @@ -5962,7 +6002,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf } SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5971,13 +6011,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } + pInfo->twAggSup = *pTwAggSupp; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - initExecTimeWindowInfo(&pInfo->timeWindowData, &pTaskInfo->window); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->gap = gap; pInfo->binfo.pRes = pResBlock; @@ -6527,8 +6570,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision }; + STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType}; + int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; @@ -6539,9 +6584,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; + STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType}; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); @@ -6552,9 +6599,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; + STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8739371dd9..8018a8dd31 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -307,11 +307,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou // pInfo->binfo.rowCellInfoOffset); // } - blockDataEnsureCapacity(pRes, pInfo->binfo.capacity); + blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); while(1) { - toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); @@ -348,7 +348,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx goto _error; } - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); + initResultSizeInfo(pOperator, 4096); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pOperator->name = "GroupbyAggOperator"; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b3050006b7..6b06f3e89b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -539,7 +539,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) SStreamBlockScanInfo* pInfo = pOperator->info; pTaskInfo->code = pOperator->_openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -547,6 +547,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); + pOperator->status = OP_EXEC_DONE; return NULL; } @@ -560,11 +561,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { terrno = pTaskInfo->code; + pOperator->status = OP_EXEC_DONE; return NULL; } if (pBlockInfo->rows == 0) { - return NULL; + break; } SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); @@ -583,6 +585,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) if (pInfo->pRes->pDataBlock == NULL) { // TODO add log + pOperator->status = OP_EXEC_DONE; pTaskInfo->code = terrno; return NULL; } @@ -594,6 +597,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) pInfo->numOfExec++; pInfo->numOfRows += pBlockInfo->rows; + if (pBlockInfo->rows == 0) { + pOperator->status = OP_EXEC_DONE; + } + return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0d7e984ae7..2b1e4b9406 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -767,17 +767,105 @@ void percentileFinalize(SqlFunctionCtx* pCtx) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = pNode->node.resType.bytes; + pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); return true; } -// TODO fix this -// This ordinary first function only handle the data block in ascending order -int32_t firstFunction(SqlFunctionCtx *pCtx) { - if (pCtx->order == TSDB_ORDER_DESC) { +static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { + if (pTsColInfo == NULL) { return 0; } + return *(TSKEY*) colDataGetData(pTsColInfo, rowIndex); +} + +// This ordinary first function does not care if current scan is ascending order or descending order scan +// the OPTIMIZED version of first function will only handle the ascending order scan +int32_t firstFunction(SqlFunctionCtx *pCtx) { + int32_t numOfElems = 0; + + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + int32_t bytes = pInputCol->info.bytes; + + // All null data column, return directly. + if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { + ASSERT(pInputCol->hasNull == true); + return 0; + } + + SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL; + + TSKEY startKey = getRowPTs(pInput->pPTS, 0); + TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1); + + int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + + if (blockDataOrder == TSDB_ORDER_ASC) { + // filter according to current result firstly + if (pResInfo->numOfRes > 0) { + TSKEY ts = *(TSKEY*)(buf + bytes); + if (ts < startKey) { + return TSDB_CODE_SUCCESS; + } + } + + for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + memcpy(buf, data, bytes); + *(TSKEY*)(buf + bytes) = cts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + pResInfo->numOfRes = 1; + } + + numOfElems++; + } + } else { + // in case of descending order time stamp serial, which usually happens as the results of the nest query, + // all data needs to be check. + if (pResInfo->numOfRes > 0) { + TSKEY ts = *(TSKEY*)(buf + bytes); + if (ts < endKey) { + return TSDB_CODE_SUCCESS; + } + } + + for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + memcpy(buf, data, bytes); + *(TSKEY*)(buf + bytes) = cts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + pResInfo->numOfRes = 1; + } + + numOfElems++; + } + } + + SET_VAL(pResInfo, numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t lastFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); @@ -792,48 +880,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { return 0; } - // Check for the first not null data - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { - continue; - } - - char* data = colDataGetData(pInputCol, i); - memcpy(buf, data, pInputCol->info.bytes); - // TODO handle the subsidary value -// if (pCtx->ptsList != NULL) { -// TSKEY k = GET_TS_DATA(pCtx, i); -// DO_UPDATE_TAG_COLUMNS(pCtx, k); -// } - - pResInfo->complete = true; - numOfElems++; - break; - } - - SET_VAL(pResInfo, numOfElems, 1); - return TSDB_CODE_SUCCESS; -} - -int32_t lastFunction(SqlFunctionCtx *pCtx) { - if (pCtx->order != TSDB_ORDER_DESC) { - return 0; - } - - int32_t numOfElems = 0; - - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); - - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; - - // All null data column, return directly. - if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { - ASSERT(pInputCol->hasNull == true); - return 0; - } - if (pCtx->order == TSDB_ORDER_DESC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 85a42387c4..1e8b732b6a 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -461,8 +461,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int if (isNullStr(pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - int64_t tmpVal = 0; - return func(pMsgBuf, &tmpVal, pSchema->bytes, param); + return buildSyntaxErrMsg(pMsgBuf, "primary timestamp can not be null", pToken->z); } return func(pMsgBuf, NULL, 0, param); diff --git a/source/libs/planner/CMakeLists.txt b/source/libs/planner/CMakeLists.txt index a095a6205f..f0bf32bf17 100644 --- a/source/libs/planner/CMakeLists.txt +++ b/source/libs/planner/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( planner - PRIVATE os util nodes catalog cjson parser function qcom + PRIVATE os util nodes catalog cjson parser function qcom scalar PUBLIC transport ) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e796d126eb..34bee3fd0b 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -15,6 +15,7 @@ #include "planInt.h" #include "functionMgt.h" +#include "filter.h" #define OPTIMIZE_FLAG_MASK(n) (1 << n) @@ -195,11 +196,175 @@ static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { return code; } -static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) { - // todo +static int32_t cpdMergeCond(SNode** pDst, SNode** pSrc) { + SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->condType = LOGIC_COND_TYPE_AND; + int32_t code = nodesListMakeAppend(&pLogicCond->pParameterList, *pSrc); + if (TSDB_CODE_SUCCESS == code) { + *pSrc = NULL; + code = nodesListMakeAppend(&pLogicCond->pParameterList, *pDst); + } + if (TSDB_CODE_SUCCESS == code) { + *pDst = (SNode*)pLogicCond; + } else { + nodesDestroyNode(pLogicCond); + } + return code; +} + +static int32_t cpdMergeConds(SNode** pDst, SNodeList** pSrc) { + if (NULL == *pSrc) { + return TSDB_CODE_SUCCESS; + } + + if (1 == LIST_LENGTH(*pSrc)) { + *pDst = nodesListGetNode(*pSrc, 0); + nodesClearList(*pSrc); + } else { + SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->condType = LOGIC_COND_TYPE_AND; + pLogicCond->pParameterList = *pSrc; + *pDst = (SNode*)pLogicCond; + } + *pSrc = NULL; + return TSDB_CODE_SUCCESS; } +static int32_t cpdCondAppend(SNode** pCond, SNode** pAdditionalCond) { + if (NULL == *pCond) { + TSWAP(*pCond, *pAdditionalCond, SNode*); + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCond)) { + code = nodesListAppend(((SLogicConditionNode*)*pCond)->pParameterList, *pAdditionalCond); + if (TSDB_CODE_SUCCESS == code) { + *pAdditionalCond = NULL; + } + } else { + code = cpdMergeCond(pCond, pAdditionalCond); + } + return code; +} + +static EDealRes cpdIsPrimaryKeyCondImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + *((bool*)pContext) = ((PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) ? true : false); + return *((bool*)pContext) ? DEAL_RES_CONTINUE : DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + +static bool cpdIsPrimaryKeyCond(SNode* pNode) { + bool isPrimaryKeyCond = false; + nodesWalkExpr(pNode, cpdIsPrimaryKeyCondImpl, &isPrimaryKeyCond); + return isPrimaryKeyCond; +} + +static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pOtherCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pScan->node.pConditions; + + int32_t code = TSDB_CODE_SUCCESS; + + SNodeList* pPrimaryKeyConds = NULL; + SNodeList* pOtherConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + if (cpdIsPrimaryKeyCond(pCond)) { + code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond)); + } else { + code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond)); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + SNode* pTempPrimaryKeyCond = NULL; + SNode* pTempOtherCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = cpdMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds); + } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMergeConds(&pTempOtherCond, &pOtherConds); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPrimaryKeyCond = pTempPrimaryKeyCond; + *pOtherCond = pTempOtherCond; + nodesDestroyNode(pScan->node.pConditions); + pScan->node.pConditions = NULL; + } else { + nodesDestroyList(pPrimaryKeyConds); + nodesDestroyList(pOtherConds); + nodesDestroyNode(pTempPrimaryKeyCond); + nodesDestroyNode(pTempOtherCond); + } + + return code; +} + +static int32_t cpdPartitionScanCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pOtherCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pScan->node.pConditions) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)pScan->node.pConditions)->condType) { + return cpdPartitionScanLogicCond(pScan, pPrimaryKeyCond, pOtherCond); + } + + if (cpdIsPrimaryKeyCond(pScan->node.pConditions)) { + *pPrimaryKeyCond = pScan->node.pConditions; + } else { + *pOtherCond = pScan->node.pConditions; + } + pScan->node.pConditions = NULL; + + return TSDB_CODE_SUCCESS; +} + +static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pOtherCond) { + bool isStrict = false; + int32_t code = filterGetTimeRange(*pPrimaryKeyCond, &pScan->scanRange, &isStrict); + if (TSDB_CODE_SUCCESS == code) { + if (isStrict) { + nodesDestroyNode(*pPrimaryKeyCond); + } else { + code = cpdCondAppend(pOtherCond, pPrimaryKeyCond); + } + *pPrimaryKeyCond = NULL; + } + return code; +} + +static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) { + if (NULL == pScan->node.pConditions) { + return TSDB_CODE_SUCCESS; + } + + SNode* pPrimaryKeyCond = NULL; + SNode* pOtherCond = NULL; + int32_t code = cpdPartitionScanCond(pScan, &pPrimaryKeyCond, &pOtherCond); + if (TSDB_CODE_SUCCESS == code && NULL != pPrimaryKeyCond) { + code = cpdCalcTimeRange(pScan, &pPrimaryKeyCond, &pOtherCond); + } + if (TSDB_CODE_SUCCESS == code) { + pScan->node.pConditions = pOtherCond; + } + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pPrimaryKeyCond); + nodesDestroyNode(pOtherCond); + } + + return code; +} + static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) { SNode* pTableCol = NULL; FOREACH(pTableCol, pTableCols) { @@ -230,28 +395,6 @@ static ECondAction cpdCondAction(EJoinType joinType, SNodeList* pLeftCols, SNode (cxt.havaLeftCol && cxt.haveRightCol ? COND_ACTION_PUSH_JOIN : (cxt.havaLeftCol ? COND_ACTION_PUSH_LEFT_CHILD : COND_ACTION_PUSH_RIGHT_CHILD))); } -static int32_t cpdMakeCond(SNodeList** pConds, SNode** pCond) { - if (NULL == *pConds) { - return TSDB_CODE_SUCCESS; - } - - if (1 == LIST_LENGTH(*pConds)) { - *pCond = nodesListGetNode(*pConds, 0); - nodesClearList(*pConds); - } else { - SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (NULL == pLogicCond) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pLogicCond->condType = LOGIC_COND_TYPE_AND; - pLogicCond->pParameterList = *pConds; - *pCond = (SNode*)pLogicCond; - } - *pConds = NULL; - - return TSDB_CODE_SUCCESS; -} - static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pJoin->node.pConditions; if (LOGIC_COND_TYPE_AND != pLogicCond->condType) { @@ -288,16 +431,16 @@ static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNo SNode* pTempRightChildCond = NULL; SNode* pTempRemainCond = NULL; if (TSDB_CODE_SUCCESS == code) { - code = cpdMakeCond(&pOnConds, &pTempOnCond); + code = cpdMergeConds(&pTempOnCond, &pOnConds); } if (TSDB_CODE_SUCCESS == code) { - code = cpdMakeCond(&pLeftChildConds, &pTempLeftChildCond); + code = cpdMergeConds(&pTempLeftChildCond, &pLeftChildConds); } if (TSDB_CODE_SUCCESS == code) { - code = cpdMakeCond(&pRightChildConds, &pTempRightChildCond); + code = cpdMergeConds(&pTempRightChildCond, &pRightChildConds); } if (TSDB_CODE_SUCCESS == code) { - code = cpdMakeCond(&pRemainConds, &pTempRemainCond); + code = cpdMergeConds(&pTempRemainCond, &pRemainConds); } if (TSDB_CODE_SUCCESS == code) { @@ -345,44 +488,12 @@ static int32_t cpdPartitionCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** } } -static int32_t cpdCondAppend(SOptimizeContext* pCxt, SNode** pCond, SNode** pAdditionalCond) { - if (NULL == *pCond) { - TSWAP(*pCond, *pAdditionalCond, SNode*); - return TSDB_CODE_SUCCESS; - } - - int32_t code = TSDB_CODE_SUCCESS; - if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCond)) { - code = nodesListAppend(((SLogicConditionNode*)*pCond)->pParameterList, *pAdditionalCond); - if (TSDB_CODE_SUCCESS == code) { - *pAdditionalCond = NULL; - } - } else { - SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (NULL == pLogicCond) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pLogicCond->condType = LOGIC_COND_TYPE_AND; - code = nodesListMakeAppend(&pLogicCond->pParameterList, *pAdditionalCond); - if (TSDB_CODE_SUCCESS == code) { - *pAdditionalCond = NULL; - code = nodesListMakeAppend(&pLogicCond->pParameterList, *pCond); - } - if (TSDB_CODE_SUCCESS == code) { - *pCond = (SNode*)pLogicCond; - } else { - nodesDestroyNode(pLogicCond); - } - } - return code; -} - static int32_t cpdPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) { - return cpdCondAppend(pCxt, &pJoin->pOnConditions, pCond); + return cpdCondAppend(&pJoin->pOnConditions, pCond); } static int32_t cpdPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) { - return cpdCondAppend(pCxt, &pScan->node.pConditions, pCond); + return cpdCondAppend(&pScan->node.pConditions, pCond); } static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 0432ae1df8..337a773d5c 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -30,7 +30,7 @@ SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) { pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; - int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); + int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pColumnData); @@ -45,7 +45,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) { in.columnData = createColumnInfoData(&pValueNode->node.resType, 1); colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); - colInfoDataEnsureCapacity(out->columnData, 1); + colInfoDataEnsureCapacity(out->columnData, 0, 1); int32_t code = vectorConvertImpl(&in, out); sclFreeParam(&in); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e7ff0bde91..02c6006f60 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -296,7 +296,6 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *in = pInputData->pData + pInputData->varmeta.offset[0]; int16_t *out = (int16_t *)pOutputData->pData; for (int32_t i = 0; i < pInput->numOfRows; ++i) { @@ -305,8 +304,8 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP continue; } + char *in = pInputData->pData + pInputData->varmeta.offset[i]; out[i] = lenFn(in, type); - in += varDataTLen(in); } pOutput->numOfRows = pInput->numOfRows; @@ -316,7 +315,8 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP static int32_t concatCopyHelper(const char *input, char *output, bool hasNchar, int32_t type, int16_t *dataLen) { if (hasNchar && type == TSDB_DATA_TYPE_VARCHAR) { TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1); - bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL); + int32_t len = varDataLen(input); + bool ret = taosMbsToUcs4(varDataVal(input), len, newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, &len); if (!ret) { taosMemoryFree(newBuf); return TSDB_CODE_FAILED; @@ -345,6 +345,7 @@ static int32_t getNumOfNullEntries(SColumnInfoData *pColumnInfoData, int32_t num } int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + int32_t ret = TSDB_CODE_SUCCESS; SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); SColumnInfoData *pOutputData = pOutput->columnData; char **input = taosMemoryCalloc(inputNum, POINTER_BYTES); @@ -360,7 +361,6 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu } for (int32_t i = 0; i < inputNum; ++i) { pInputData[i] = pInput[i].columnData; - input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; int32_t factor = 1; if (hasNchar && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { factor = TSDB_NCHAR_SIZE; @@ -378,8 +378,8 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu outputBuf = taosMemoryCalloc(outputLen, 1); char *output = outputBuf; - bool hasNull = false; for (int32_t k = 0; k < numOfRows; ++k) { + bool hasNull = false; for (int32_t i = 0; i < inputNum; ++i) { if (colDataIsNull_s(pInputData[i], k)) { colDataAppendNULL(pOutputData, k); @@ -392,14 +392,18 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu continue; } + int16_t dataLen = 0; for (int32_t i = 0; i < inputNum; ++i) { - int32_t ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); - if (ret != TSDB_CODE_SUCCESS) { - return ret; + if (pInput[i].numOfRows == 1) { + input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; + } else { + input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[k]; } - if (pInput[i].numOfRows != 1) { - input[i] += varDataTLen(input[i]); + + ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); + if (ret != TSDB_CODE_SUCCESS) { + goto DONE; } } varDataSetLen(output, dataLen); @@ -408,15 +412,18 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu } pOutput->numOfRows = numOfRows; + +DONE: taosMemoryFree(input); taosMemoryFree(outputBuf); taosMemoryFree(pInputData); - return TSDB_CODE_SUCCESS; + return ret; } int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + int32_t ret = TSDB_CODE_SUCCESS; SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); SColumnInfoData *pOutputData = pOutput->columnData; char **input = taosMemoryCalloc(inputNum, POINTER_BYTES); @@ -432,7 +439,6 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } for (int32_t i = 0; i < inputNum; ++i) { pInputData[i] = pInput[i].columnData; - input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; int32_t factor = 1; if (hasNchar && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { factor = TSDB_NCHAR_SIZE; @@ -460,40 +466,53 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } int16_t dataLen = 0; + bool hasNull = false; for (int32_t i = 1; i < inputNum; ++i) { if (colDataIsNull_s(pInputData[i], k)) { - continue; + hasNull = true; + break; } - int32_t ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); + if (pInput[i].numOfRows == 1) { + input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; + } else { + input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[k]; + } + + ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); if (ret != TSDB_CODE_SUCCESS) { - return ret; + goto DONE; } - if (pInput[i].numOfRows != 1) { - input[i] += varDataTLen(input[i]); - } if (i < inputNum - 1) { //insert the separator - char *sep = pInputData[0]->pData; - int32_t ret = concatCopyHelper(sep, output, hasNchar, GET_PARAM_TYPE(&pInput[0]), &dataLen); + char *sep = (pInput[0].numOfRows == 1) ? colDataGetData(pInputData[0], 0) : colDataGetData(pInputData[0], k); + ret = concatCopyHelper(sep, output, hasNchar, GET_PARAM_TYPE(&pInput[0]), &dataLen); if (ret != TSDB_CODE_SUCCESS) { - return ret; + goto DONE; } } } - varDataSetLen(output, dataLen); - colDataAppend(pOutputData, k, output, false); - output += varDataTLen(output); + + if (hasNull) { + colDataAppendNULL(pOutputData, k); + memset(output, 0, dataLen); + } else { + varDataSetLen(output, dataLen); + colDataAppend(pOutputData, k, output, false); + output += varDataTLen(output); + } } pOutput->numOfRows = numOfRows; + +DONE: taosMemoryFree(input); taosMemoryFree(outputBuf); taosMemoryFree(pInputData); - return TSDB_CODE_SUCCESS; + return ret; } static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) { @@ -505,12 +524,9 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *input = pInputData->pData + pInputData->varmeta.offset[0]; - char *output = NULL; - int32_t outputLen = pInputData->varmeta.length; char *outputBuf = taosMemoryCalloc(outputLen, 1); - output = outputBuf; + char *output = outputBuf; for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_s(pInputData, i)) { @@ -518,6 +534,7 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala continue; } + char *input = pInputData->pData + pInputData->varmeta.offset[i]; int32_t len = varDataLen(input); if (type == TSDB_DATA_TYPE_VARCHAR) { for (int32_t j = 0; j < len; ++j) { @@ -530,7 +547,6 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala } varDataSetLen(output, len); colDataAppend(pOutputData, i, output, false); - input += varDataTLen(input); output += varDataTLen(output); } @@ -550,18 +566,16 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *input = pInputData->pData + pInputData->varmeta.offset[0]; - char *output = NULL; - int32_t outputLen = pInputData->varmeta.length; char *outputBuf = taosMemoryCalloc(outputLen, 1); - output = outputBuf; + char *output = outputBuf; for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_s(pInputData, i)) { colDataAppendNULL(pOutputData, i); continue; } + char *input = pInputData->pData + pInputData->varmeta.offset[i]; int32_t len = varDataLen(input); int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE; @@ -569,7 +583,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar varDataSetLen(output, len); colDataAppend(pOutputData, i, output, false); - input += varDataTLen(input); output += varDataTLen(output); } diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index 26ef5dbd44..42998aba00 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -155,7 +155,7 @@ void flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in res->info.numOfCols++; SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock); - colInfoDataEnsureCapacity(pColumn, rowNum); + colInfoDataEnsureCapacity(pColumn, 0, rowNum); for (int32_t i = 0; i < rowNum; ++i) { colDataAppend(pColumn, i, (const char *)value, false); diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 61ef2fdce2..09d6528dd8 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -99,7 +99,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s SColumnInfoData idata = {0}; idata.info = *colInfo; - colInfoDataEnsureCapacity(&idata, rows); + colInfoDataEnsureCapacity(&idata, 0, rows); taosArrayPush(res->pDataBlock, &idata); @@ -186,7 +186,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in res->info.numOfCols++; SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock); - colInfoDataEnsureCapacity(pColumn, rowNum); + colInfoDataEnsureCapacity(pColumn, 0, rowNum); for (int32_t i = 0; i < rowNum; ++i) { colDataAppend(pColumn, i, (const char *)value, false); @@ -1467,7 +1467,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t input->numOfRows = num; input->columnData->info = createColumnInfo(0, type, bytes); - colInfoDataEnsureCapacity(input->columnData, num); + colInfoDataEnsureCapacity(input->columnData, 0, num); if (setVal) { for (int32_t i = 0; i < num; ++i) { diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 809d008aa7..6d3a259079 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -31,9 +31,12 @@ #define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) #define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode))) -#define FREE_HASH_NODE(_n) \ - do { \ - taosMemoryFreeClear(_n); \ +#define FREE_HASH_NODE(_fp, _n) \ + do { \ +/* if (_fp != NULL) { \ + (_fp)(_n); \ + }*/ \ + taosMemoryFreeClear(_n); \ } while (0); struct SHashNode { @@ -195,7 +198,7 @@ static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SH if (pNode->refCount <= 0) { pNewNode->next = pNode->next; - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pHashObj->freeFp, pNode); } else { pNewNode->next = pNode; pe->num++; @@ -310,7 +313,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo return -1; } - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // need the resize process, write lock applied if (HASH_NEED_RESIZE(pHashObj)) { @@ -523,7 +526,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { pe->num--; atomic_sub_fetch_64(&pHashObj->size, 1); - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pHashObj->freeFp, pNode); } } else { prevNode = pNode; @@ -558,7 +561,7 @@ void taosHashClear(SHashObj *pHashObj) { while (pNode) { pNext = pNode->next; - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pHashObj->freeFp, pNode); pNode = pNext; } @@ -769,7 +772,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { pe->num--; atomic_sub_fetch_64(&pHashObj->size, 1); - FREE_HASH_NODE(pOld); + FREE_HASH_NODE(pHashObj->freeFp, pOld); } } else { // uError("pNode:%p data:%p is not there!!!", pNode, p); diff --git a/tests/script/tsim/sync/insertDataByRunBack.sim b/tests/script/tsim/sync/insertDataByRunBack.sim new file mode 100644 index 0000000000..c86cd3844b --- /dev/null +++ b/tests/script/tsim/sync/insertDataByRunBack.sim @@ -0,0 +1,63 @@ + +sql connect + +print ================ insert data +$dbNamme = db +$ctbPrefix = ctb +$ntbPrefix = ntb +$tbNum = 10 +$rowNum = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +sql use $dbNamme + +$loop_cnt = 0 +loop_insert: +print ====> loop insert data, but once check if there is stop insert flag in interaction talbe from main thread +sql select * from interaction +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] +print $data[1][0] $data[1][1] $data[1][2] $data[1][3] +if $rows == 2 then + if $data[1][1] == stop then + goto end_insert + endi +endi + +$i = 0 +while $i < $tbNum + $ctb = $ctbPrefix . $i + $ntb = $ntbPrefix . $i + + $x = 0 + while $x < $rowNum + $binary = ' . binary + $binary = $binary . $i + $binary = $binary . ' + + sql insert into $ctb values ($tstart , $i , $x , $binary ) + sql insert into $ntb values ($tstart , 999 , 999 , 'binary-ntb' ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + $i = $i + 1 + $tstart = 1640966400000 +endw + +if $loop_cnt == 0 then + print ====> notify main to working for insert data + sql insert into interaction values (now, 'working', 0, 0); +endi +$loop_cnt = $loop_cnt + 1 +goto loop_insert + +end_insert: +print ====> save insert data rows to main thread + +$totalRowsOfCtb = $rowNum * $loop_cnt +$totalRowsOfStb = $totalRowsOfCtb * $tbNum +sql insert into interaction values (now, 'end', $totalRowsOfCtb, $totalRowsOfStb ); +print ====> totalRowsOfCtb: $totalRowsOfCtb , totalRowsOfStb: $totalRowsOfStb + + + diff --git a/tests/script/tsim/sync/oneReplica1VgElect.sim b/tests/script/tsim/sync/oneReplica1VgElect.sim new file mode 100644 index 0000000000..9726c0fdf5 --- /dev/null +++ b/tests/script/tsim/sync/oneReplica1VgElect.sim @@ -0,0 +1,363 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][0] != 1 then + return -1 +endi +if $data[0][4] != ready then + goto check_dnode_ready +endi + +#sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$loop_cnt = 0 +check_dnode_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> dnodes not ready! + return -1 +endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][4] != ready then + goto check_dnode_ready_1 +endi +if $data[1][4] != ready then + goto check_dnode_ready_1 +endi +if $data[2][4] != ready then + goto check_dnode_ready_1 +endi +if $data[3][4] != ready then + goto check_dnode_ready_1 +endi + +$vgroups = 1 +$replica = 1 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] +print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] +if $rows != 3 then + return -1 +endi +if $data(db)[20] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != NULL then + goto check_vg_ready + endi + if $data[0][8] != NULL then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + goto vg_ready +endi +if $data[0][6] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready + endi + if $data[0][8] != NULL then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + goto vg_ready +endi +if $data[0][8] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready + endi + if $data[0][6] != NULL then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + goto vg_ready +endi +vg_ready: + +print ====> create stable/child table, insert data, and select +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +$ctbPrefix = ctb +$ntbPrefix = ntb +$tbNum = 10 +$rowNum = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$i = 0 +while $i < $tbNum + $ctb = $ctbPrefix . $i + sql create table $ctb using stb tags( $i ) + $ntb = $ntbPrefix . $i + sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + + $x = 0 + while $x < $rowNum + $binary = ' . binary + $binary = $binary . $i + $binary = $binary . ' + + sql insert into $ctb values ($tstart , $i , $x , $binary ) + sql insert into $ntb values ($tstart , 999 , 999 , 'binary-ntb' ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + print ====> insert rows: $rowNum into $ctb and $ntb + + $i = $i + 1 + $tstart = 1640966400000 +endw + +$totalTblNum = $tbNum * 2 +sql show tables +if $rows != $totalTblNum then + return -1 +endi + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +$totalRowsOfStb = $rowNum * $tbNum +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> finde vnode of leader, and stop the dnode where the vnode is located, and query stb/ntb count(*) +sql show vgroups +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +if $data[0][4] == LEADER then + $dnodeId = $data[0][3] +elif $data[0][6] == LEADER then + $dnodeId = $data[0][5] +endi +elif $data[0][8] == LEADER then + $dnodeId = $data[0][7] +else + print ====> no leader vnode!!! + return -1 +endi + +$dnodeId = dnode . $dnodeId +print ====> stop $dnodeId +system sh/exec.sh -n $dnodeId -s stop -x SIGINT + +$loop_cnt = 0 +check_vg_ready_2: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> vgroups switch fail!!! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != NULL then + goto check_vg_ready_2 + endi + if $data[0][8] != NULL then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] + goto vg_ready_2 +endi +if $data[0][6] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_2 + endi + if $data[0][8] != NULL then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] + goto vg_ready_2 +endi +if $data[0][8] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_2 + endi + if $data[0][6] != NULL then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] + goto vg_ready_2 +endi +vg_ready_2: + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> stop and start all dnode(not include the dnode where mnode is located), then query +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode2 -s start + +$loop_cnt = 0 +check_vg_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> after restart dnode, vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != NULL then + goto check_vg_ready_1 + endi + if $data[0][8] != NULL then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][6] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_1 + endi + if $data[0][8] != NULL then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][8] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_1 + endi + if $data[0][6] != NULL then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +vg_ready_1: + +print ====> after restart dnode2/dnode3/dnode4, query stb/ntb count(*) +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT diff --git a/tests/script/tsim/sync/oneReplica1VgElectWithInsert.sim b/tests/script/tsim/sync/oneReplica1VgElectWithInsert.sim new file mode 100644 index 0000000000..972447ba9b --- /dev/null +++ b/tests/script/tsim/sync/oneReplica1VgElectWithInsert.sim @@ -0,0 +1,401 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][0] != 1 then + return -1 +endi +if $data[0][4] != ready then + goto check_dnode_ready +endi + +#sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$loop_cnt = 0 +check_dnode_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> dnodes not ready! + return -1 +endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][4] != ready then + goto check_dnode_ready_1 +endi +if $data[1][4] != ready then + goto check_dnode_ready_1 +endi +if $data[2][4] != ready then + goto check_dnode_ready_1 +endi +if $data[3][4] != ready then + goto check_dnode_ready_1 +endi + +$vgroups = 1 +$replica = 1 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] +print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] +if $rows != 3 then + return -1 +endi +if $data(db)[20] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != NULL then + goto check_vg_ready + endi + if $data[0][8] != NULL then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + goto vg_ready +endi +if $data[0][6] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready + endi + if $data[0][8] != NULL then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + goto vg_ready +endi +if $data[0][8] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready + endi + if $data[0][6] != NULL then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + goto vg_ready +endi +vg_ready: + +print ====> create stable/child table +sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +$ctbPrefix = ctb +$ntbPrefix = ntb +$tbNum = 10 +#$rowNum = 10 +#$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$i = 0 +while $i < $tbNum + $ctb = $ctbPrefix . $i + sql create table $ctb using stb tags( $i ) + $ntb = $ntbPrefix . $i + sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +# $x = 0 +# while $x < $rowNum +# $binary = ' . binary +# $binary = $binary . $i +# $binary = $binary . ' +# +# sql insert into $ctb values ($tstart , $i , $x , $binary ) +# sql insert into $ntb values ($tstart , 999 , 999 , 'binary-ntb' ) +# $tstart = $tstart + 1 +# $x = $x + 1 +# endw + +# print ====> insert rows: $rowNum into $ctb and $ntb + + $i = $i + 1 +# $tstart = 1640966400000 +endw + +$totalTblNum = $tbNum * 2 +sql show tables +if $rows != $totalTblNum then + return -1 +endi + +print ====> create a normal table for interaction between main and back threads +sql create table interaction (ts timestamp, flag binary(10), childrows int, stbrows int) + +print ====> start to run_back to insert data +run_back tsim/tmq/insertDataByRunBack.sim + + +print ====> waiting insert thread starting insert data +waiting_insert_data: +sql select * from interaction +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] +print $data[1][0] $data[1][1] $data[1][2] $data[1][3] +if $rows == 1 then + if $data[0][1] == working then + goto start_switch_leader + endi +endi +goto waiting_insert_data + +start_switch_leader: + +$switch_loop_cnt = 0 +switch_leader_loop: + +print ====> finde vnode of leader, and stop the dnode where the vnode is located, and query stb/ntb count(*) +sql show vgroups +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +if $data[0][4] == LEADER then + $dnodeId = $data[0][3] +elif $data[0][6] == LEADER then + $dnodeId = $data[0][5] +endi +elif $data[0][8] == LEADER then + $dnodeId = $data[0][7] +else + print ====> no leader vnode!!! + return -1 +endi + +$dnodeId = dnode . $dnodeId +print ====> stop $dnodeId +system sh/exec.sh -n $dnodeId -s stop -x SIGINT + +$loop_cnt = 0 +check_vg_ready_2: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> vgroups switch fail!!! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != NULL then + goto check_vg_ready_2 + endi + if $data[0][8] != NULL then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] + goto vg_ready_2 +endi +if $data[0][6] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_2 + endi + if $data[0][8] != NULL then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] + goto vg_ready_2 +endi +if $data[0][8] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_2 + endi + if $data[0][6] != NULL then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] + goto vg_ready_2 +endi +vg_ready_2: + +$switch_loop_cnt = $switch_loop_cnt + 1 +if $switch_loop_cnt < 3 then + print ====> start $dnodeId + system sh/exec.sh -n $dnodeId -s start + goto switch_leader_loop +endi + +loop_switch_end: + +print ====> notify insert thread to stop insert data +sql insert into interaction values (now, 'stop', 0, 0); + +print ====> waiting insert thread to stop insert data +waiting_stop_data: +sql select * from interaction +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] +print $data[1][0] $data[1][1] $data[1][2] $data[1][3] +print $data[2][0] $data[2][1] $data[2][2] $data[2][3] +if $rows == 3 then + if $data[2][1] == end then + $totalRowsOfCtb = $data[2][2] + $totalRowsOfStb = $data[2][3] + goto check_affected_rows + endi +endi +goto waiting_stop_data + +check_affected_rows: + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> stop and start all dnode(not include the dnode where mnode is located), then query +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode2 -s start + +$loop_cnt = 0 +check_vg_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> after restart dnode, vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != NULL then + goto check_vg_ready_1 + endi + if $data[0][8] != NULL then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][6] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_1 + endi + if $data[0][8] != NULL then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][8] == LEADER then + if $data[0][4] != NULL then + goto check_vg_ready_1 + endi + if $data[0][6] != NULL then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +vg_ready_1: + +print ====> after restart dnode2/dnode3/dnode4, query stb/ntb count(*) +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT diff --git a/tests/script/tsim/sync/threeReplica1VgElect.sim b/tests/script/tsim/sync/threeReplica1VgElect.sim new file mode 100644 index 0000000000..f709c72dd7 --- /dev/null +++ b/tests/script/tsim/sync/threeReplica1VgElect.sim @@ -0,0 +1,391 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][0] != 1 then + return -1 +endi +if $data[0][4] != ready then + goto check_dnode_ready +endi + +#sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$loop_cnt = 0 +check_dnode_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> dnodes not ready! + return -1 +endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][4] != ready then + goto check_dnode_ready_1 +endi +if $data[1][4] != ready then + goto check_dnode_ready_1 +endi +if $data[2][4] != ready then + goto check_dnode_ready_1 +endi +if $data[3][4] != ready then + goto check_dnode_ready_1 +endi + +$vgroups = 1 +$replica = 3 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] +print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] +if $rows != 3 then + return -1 +endi +if $data(db)[20] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != FLLOWER then + goto check_vg_ready + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + goto vg_ready +endi +if $data[0][6] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + goto vg_ready +endi +if $data[0][8] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready + endi + if $data[0][6] != FLLOWER then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + goto vg_ready +endi +vg_ready: + +print ====> create stable/child table, insert data, and select +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +$ctbPrefix = ctb +$ntbPrefix = ntb +$tbNum = 10 +$rowNum = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$i = 0 +while $i < $tbNum + $ctb = $ctbPrefix . $i + sql create table $ctb using stb tags( $i ) + $ntb = $ntbPrefix . $i + sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + + $x = 0 + while $x < $rowNum + $binary = ' . binary + $binary = $binary . $i + $binary = $binary . ' + + sql insert into $ctb values ($tstart , $i , $x , $binary ) + sql insert into $ntb values ($tstart , 999 , 999 , 'binary-ntb' ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + print ====> insert rows: $rowNum into $ctb and $ntb + + $i = $i + 1 + $tstart = 1640966400000 +endw + +$totalTblNum = $tbNum * 2 +sql show tables +if $rows != $totalTblNum then + return -1 +endi + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +$totalRowsOfStb = $rowNum * $tbNum +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> finde vnode of leader, and stop the dnode where the vnode is located, and query stb/ntb count(*) +sql show vgroups +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +if $data[0][4] == LEADER then + $dnodeId = $data[0][3] +elif $data[0][6] == LEADER then + $dnodeId = $data[0][5] +endi +elif $data[0][8] == LEADER then + $dnodeId = $data[0][7] +else + print ====> no leader vnode!!! + return -1 +endi + +$dnodeId = dnode . $dnodeId +print ====> stop $dnodeId +system sh/exec.sh -n $dnodeId -s stop -x SIGINT + +sql show vgroups +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +if $data[0][4] == LEADER then + print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] +elif $data[0][6] == LEADER then + print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] +endi +elif $data[0][8] == LEADER then + print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] +else + print ====> no leader vnode!!! + return -1 +endi + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> stop and start all dnode(not include the dnode where mnode is located), then query +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode2 -s start + +$loop_cnt = 0 +check_vg_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> after restart dnode, vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != FLLOWER then + goto check_vg_ready_1 + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][6] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready_1 + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][8] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready_1 + endi + if $data[0][6] != FLLOWER then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +vg_ready_1: + +print ====> after restart dnode2/dnode3/dnode4, query stb/ntb count(*) +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> once stop one dnode by loop, and do query every time +$i = 2 +loop_stop_dnode: +$dnodeId = dnode . $i +print ====> stop $dnodeId +system sh/exec.sh -n $dnodeId -s stop -x SIGINT + +check_vg_ready_3: +sql show vgroups +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +if $data[0][4] == LEADER then + if $data[0][6] == LEADER then + goto check_vg_ready_3 + endi + if $data[0][8] == LEADER then + goto check_vg_ready_3 + endi + print ---- vgroup $data[0][0] leader locating dnode $data[0][5] +elif $data[0][6] == LEADER then + if $data[0][4] == LEADER then + goto check_vg_ready_3 + endi + if $data[0][8] == LEADER then + goto check_vg_ready_3 + endi + print ---- vgroup $data[0][0] leader locating dnode $data[0][7] +endi +elif $data[0][8] == LEADER then + if $data[0][4] == LEADER then + goto check_vg_ready_3 + endi + if $data[0][6] == LEADER then + goto check_vg_ready_3 + endi + print ---- vgroup $data[0][0] leader locating dnode $data[0][9] +else + print ====> no leader vnode!!! + return -1 +endi + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $rowNum then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +$i = $i + 1 +if $i <= 4 then + print ====> start $dnodeId + system sh/exec.sh -n $dnodeId -s start + goto loop_stop_dnode +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT diff --git a/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim b/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim new file mode 100644 index 0000000000..d934e82b98 --- /dev/null +++ b/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim @@ -0,0 +1,469 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][0] != 1 then + return -1 +endi +if $data[0][4] != ready then + goto check_dnode_ready +endi + +#sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$loop_cnt = 0 +check_dnode_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> dnodes not ready! + return -1 +endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][4] != ready then + goto check_dnode_ready_1 +endi +if $data[1][4] != ready then + goto check_dnode_ready_1 +endi +if $data[2][4] != ready then + goto check_dnode_ready_1 +endi +if $data[3][4] != ready then + goto check_dnode_ready_1 +endi + +$vgroups = 1 +$replica = 3 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] +print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] +if $rows != 3 then + return -1 +endi +if $data(db)[20] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != FLLOWER then + goto check_vg_ready + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + goto vg_ready +endi +if $data[0][6] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + goto vg_ready +endi +if $data[0][8] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready + endi + if $data[0][6] != FLLOWER then + goto check_vg_ready + endi + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + goto vg_ready +endi +vg_ready: + +print ====> create stable/child table +sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +$ctbPrefix = ctb +$ntbPrefix = ntb +$tbNum = 10 +#$rowNum = 10 +#$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$i = 0 +while $i < $tbNum + $ctb = $ctbPrefix . $i + sql create table $ctb using stb tags( $i ) + $ntb = $ntbPrefix . $i + sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +# $x = 0 +# while $x < $rowNum +# $binary = ' . binary +# $binary = $binary . $i +# $binary = $binary . ' +# +# sql insert into $ctb values ($tstart , $i , $x , $binary ) +# sql insert into $ntb values ($tstart , 999 , 999 , 'binary-ntb' ) +# $tstart = $tstart + 1 +# $x = $x + 1 +# endw + +# print ====> insert rows: $rowNum into $ctb and $ntb + + $i = $i + 1 +# $tstart = 1640966400000 +endw + +$totalTblNum = $tbNum * 2 +sql show tables +if $rows != $totalTblNum then + return -1 +endi + +print ====> create a normal table for interaction between main and back threads +sql create table interaction (ts timestamp, flag binary(10), childrows int, stbrows int) + +print ====> start to run_back to insert data +run_back tsim/tmq/insertDataByRunBack.sim + + +print ====> waiting insert thread starting insert data +waiting_insert_data: +sql select * from interaction +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] +print $data[1][0] $data[1][1] $data[1][2] $data[1][3] +if $rows == 1 then + if $data[0][1] == working then + goto start_switch_leader + endi +endi +goto waiting_insert_data + +start_switch_leader: + +$switch_loop_cnt = 0 +switch_leader_loop: + +print ====> finde vnode of leader, and stop the dnode where the vnode is located, and query stb/ntb count(*) +sql show vgroups +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +if $data[0][4] == LEADER then + $dnodeId = $data[0][3] +elif $data[0][6] == LEADER then + $dnodeId = $data[0][5] +endi +elif $data[0][8] == LEADER then + $dnodeId = $data[0][7] +else + print ====> no leader vnode!!! + return -1 +endi + +$dnodeId = dnode . $dnodeId +print ====> stop $dnodeId +system sh/exec.sh -n $dnodeId -s stop -x SIGINT + +$loop_cnt = 0 +check_vg_ready_2: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> vgroups switch fail!!! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != FLLOWER then + goto check_vg_ready_2 + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] + goto vg_ready_2 +endi +if $data[0][6] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready_2 + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] + goto vg_ready_2 +endi +if $data[0][8] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready_2 + endi + if $data[0][6] != FLLOWER then + goto check_vg_ready_2 + endi + print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] + goto vg_ready_2 +endi +vg_ready_2: + +$switch_loop_cnt = $switch_loop_cnt + 1 +if $switch_loop_cnt < 3 then + print ====> start $dnodeId + system sh/exec.sh -n $dnodeId -s start + goto switch_leader_loop +endi + +loop_switch_end: + +print ====> notify insert thread to stop insert data +sql insert into interaction values (now, 'stop', 0, 0); + +print ====> waiting insert thread to stop insert data +waiting_stop_data: +sql select * from interaction +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] +print $data[1][0] $data[1][1] $data[1][2] $data[1][3] +print $data[2][0] $data[2][1] $data[2][2] $data[2][3] +if $rows == 3 then + if $data[2][1] == end then + $totalRowsOfCtb = $data[2][2] + $totalRowsOfStb = $data[2][3] + goto check_affected_rows + endi +endi +goto waiting_stop_data + +check_affected_rows: + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> stop and start all dnode(not include the dnode where mnode is located), then query +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode2 -s start + +$loop_cnt = 0 +check_vg_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> after restart dnode, vgroups not ready! + return -1 +endi +sql show vgroups +print ===> rows: $rows +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] +if $rows != $vgroups then + return -1 +endi +if $data[0][4] == LEADER then + if $data[0][6] != FLLOWER then + goto check_vg_ready_1 + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][6] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready_1 + endi + if $data[0][8] != FLLOWER then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +if $data[0][8] == LEADER then + if $data[0][4] != FLLOWER then + goto check_vg_ready_1 + endi + if $data[0][6] != FLLOWER then + goto check_vg_ready_1 + endi + goto vg_ready_1 +endi +vg_ready_1: + +print ====> after restart dnode2/dnode3/dnode4, query stb/ntb count(*) +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +print ====> once stop one dnode by loop, and do query every time +$i = 2 +loop_stop_dnode: +$dnodeId = dnode . $i +print ====> stop $dnodeId +system sh/exec.sh -n $dnodeId -s stop -x SIGINT + +check_vg_ready_3: +sql show vgroups +print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] +if $data[0][4] == LEADER then + if $data[0][6] == LEADER then + goto check_vg_ready_3 + endi + if $data[0][8] == LEADER then + goto check_vg_ready_3 + endi + print ---- vgroup $data[0][0] leader locating dnode $data[0][5] +elif $data[0][6] == LEADER then + if $data[0][4] == LEADER then + goto check_vg_ready_3 + endi + if $data[0][8] == LEADER then + goto check_vg_ready_3 + endi + print ---- vgroup $data[0][0] leader locating dnode $data[0][7] +endi +elif $data[0][8] == LEADER then + if $data[0][4] == LEADER then + goto check_vg_ready_3 + endi + if $data[0][6] == LEADER then + goto check_vg_ready_3 + endi + print ---- vgroup $data[0][0] leader locating dnode $data[0][9] +else + print ====> no leader vnode!!! + return -1 +endi + +sql select count(*) from ntb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from ctb0 +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfCtb then + return -1 +endi + +sql select count(*) from stb +print rows: $rows +print $data[0][0] $data[0][1] +if $data[0][0] != $totalRowsOfStb then + return -1 +endi + +$i = $i + 1 +if $i <= 4 then + print ====> start $dnodeId + system sh/exec.sh -n $dnodeId -s start + goto loop_stop_dnode +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT