Merge branch '3.0' into fix/syntax
This commit is contained in:
commit
6e1d061ce4
|
@ -65,16 +65,10 @@ interp_clause:
|
|||
RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val)
|
||||
|
||||
partition_by_clause:
|
||||
PARTITION BY partition_by_expr [, partition_by_expr] ...
|
||||
|
||||
partition_by_expr:
|
||||
{expr | position | c_alias}
|
||||
PARTITION BY expr [, expr] ...
|
||||
|
||||
group_by_clause:
|
||||
GROUP BY group_by_expr [, group_by_expr] ... HAVING condition
|
||||
|
||||
group_by_expr:
|
||||
{expr | position | c_alias}
|
||||
GROUP BY expr [, expr] ... HAVING condition
|
||||
|
||||
order_by_clasue:
|
||||
ORDER BY order_expr [, order_expr] ...
|
||||
|
@ -280,13 +274,7 @@ If you use a GROUP BY clause, the SELECT list can only include the following ite
|
|||
|
||||
The GROUP BY clause groups each row of data by the value of the expression following the clause and returns a combined result for each group.
|
||||
|
||||
In the GROUP BY clause, columns from a table or view can be grouped by specifying the column name. These columns do not need to be included in the SELECT list.
|
||||
|
||||
You can specify integers in GROUP BY expression to indicate the expressions in the select list used for grouping. For example, 1 indicates the first item in the select list.
|
||||
|
||||
You can specify column names in result set to indicate the expressions in the select list used for grouping.
|
||||
|
||||
When using position and result set column names for grouping in the GROUP BY clause, the corresponding expressions in the select list must not be aggregate functions.
|
||||
The expressions in a GROUP BY clause can include any column in any table or view. It is not necessary that the expressions appear in the SELECT list.
|
||||
|
||||
The GROUP BY clause does not guarantee that the results are ordered. If you want to ensure that grouped data is ordered, use the ORDER BY clause.
|
||||
|
||||
|
|
|
@ -102,7 +102,6 @@ The detailed beaviors of `NULL`, `NULL_F`, `VALUE`, and VALUE_F are described be
|
|||
1. A huge volume of interpolation output may be returned using `FILL`, so it's recommended to specify the time range when using `FILL`. The maximum number of interpolation values that can be returned in a single query is 10,000,000.
|
||||
2. The result set is in ascending order of timestamp when you aggregate by time window.
|
||||
3. If aggregate by window is used on STable, the aggregate function is performed on all the rows matching the filter conditions. If `PARTITION BY` is not used in the query, the result set will be returned in strict ascending order of timestamp; otherwise the result set will be returned in the order of ascending timestamp in each group.
|
||||
4. The output windows of Fill are related with time range of WHERE Clause. For asc fill, the first output window is the first window that conains the start time of WHERE clause. The last output window is the last window that contains the end time of WHERE clause.
|
||||
|
||||
:::
|
||||
|
||||
|
|
|
@ -65,16 +65,10 @@ interp_clause:
|
|||
RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val)
|
||||
|
||||
partition_by_clause:
|
||||
PARTITION BY partition_by_expr [, partition_by_expr] ...
|
||||
|
||||
partition_by_expr:
|
||||
{expr | position | c_alias}
|
||||
PARTITION BY expr [, expr] ...
|
||||
|
||||
group_by_clause:
|
||||
GROUP BY group_by_expr [, group_by_expr] ... HAVING condition
|
||||
|
||||
group_by_expr:
|
||||
{expr | position | c_alias}
|
||||
GROUP BY expr [, expr] ... HAVING condition
|
||||
|
||||
order_by_clasue:
|
||||
ORDER BY order_expr [, order_expr] ...
|
||||
|
@ -280,13 +274,7 @@ TDengine 支持基于时间戳主键的 INNER JOIN,规则如下:
|
|||
|
||||
GROUP BY 子句对每行数据按 GROUP BY 后的表达式的值进行分组,并为每个组返回一行汇总信息。
|
||||
|
||||
GROUP BY 子句中可以通过指定表或视图的列名来按照表或视图中的任何列分组,这些列不需要出现在 SELECT 列表中。
|
||||
|
||||
GROUP BY 子句中可以使用位置语法,位置标识为正整数,从 1 开始,表示使用 SELECT 列表的第几个表达式进行分组。
|
||||
|
||||
GROUP BY 子句中可以使用结果集列名,表示使用 SELECT 列表的指定表达式进行分组。
|
||||
|
||||
GROUP BY 子句中在使用位置语法和结果集列名进行分组时,其对应的 SELECT 列表中的表达式不能是聚集函数。
|
||||
GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些列不需要出现在 SELECT 列表中。
|
||||
|
||||
该子句对行进行分组,但不保证结果集的顺序。若要对分组进行排序,请使用 ORDER BY 子句
|
||||
|
||||
|
|
|
@ -97,7 +97,6 @@ NULL, NULL_F, VALUE, VALUE_F 这几种填充模式针对不同场景区别如下
|
|||
1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。
|
||||
2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。
|
||||
3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 PARTITION BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 PARTITION BY 语句分组,则返回结果中每个 PARTITION 内按照时间序列严格单调递增。
|
||||
4. Fill输出的起始和结束窗口与WHERE条件的时间范围有关, 如增序Fill时, 第一个窗口是包含WHERE条件开始时间的第一个窗口, 最后一个窗口是包含WHERE条件结束时间的最后一个窗口。
|
||||
|
||||
:::
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
pOperator->exprSupp.hasWindowOrGroup = false;
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
@ -140,6 +141,9 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
return code;
|
||||
|
||||
_error:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
if (pInfo != NULL) {
|
||||
destroyAggOperatorInfo(pInfo);
|
||||
}
|
||||
|
@ -480,6 +484,10 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui
|
|||
|
||||
if (taosArrayGetSize(list) == 0) {
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
SPageInfo* pi = getLastPageInfo(list);
|
||||
|
@ -496,9 +504,11 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui
|
|||
releaseBufPageInfo(pResultBuf, pi);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData != NULL) {
|
||||
pData->num = sizeof(SFilePage);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -107,6 +107,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
|
||||
SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
|
||||
code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -165,6 +166,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
int32_t capacity = 0;
|
||||
|
||||
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||
QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno);
|
||||
|
||||
// partition by tbname
|
||||
if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
|
||||
|
@ -205,6 +207,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
|
||||
p->pCtx =
|
||||
createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno);
|
||||
}
|
||||
|
||||
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
|
||||
|
|
|
@ -276,6 +276,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -244,6 +244,9 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi
|
|||
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
|
||||
if (*pResult == NULL) {
|
||||
SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize);
|
||||
if (!p) {
|
||||
return terrno;
|
||||
}
|
||||
pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset};
|
||||
*pResult = p;
|
||||
}
|
||||
|
|
|
@ -307,6 +307,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
|
|||
|
||||
int32_t len = strlen(id) + 1;
|
||||
pInfo->pTaskId = taosMemoryCalloc(1, len);
|
||||
if (!pInfo->pTaskId) {
|
||||
return terrno;
|
||||
}
|
||||
strncpy(pInfo->pTaskId, id, len);
|
||||
for (int32_t i = 0; i < numOfSources; ++i) {
|
||||
SSourceDataInfo dataInfo = {0};
|
||||
|
@ -389,7 +392,9 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
|
|||
|
||||
pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
|
||||
pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
|
||||
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
|
||||
|
||||
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
|
||||
code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
|
||||
|
|
|
@ -144,6 +144,7 @@ int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap,
|
|||
|
||||
void* pData = NULL;
|
||||
pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
|
||||
|
||||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
|
@ -353,9 +354,15 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
|||
} else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t len = ((const STag*)p)->len;
|
||||
res->datum.p = taosMemoryCalloc(len + 1, 1);
|
||||
if (NULL == res->datum.p) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
memcpy(res->datum.p, p, len);
|
||||
} else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
|
||||
res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
|
||||
if (NULL == res->datum.p) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
|
||||
varDataSetLen(res->datum.p, tagVal.nData);
|
||||
} else {
|
||||
|
@ -378,6 +385,9 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
|||
|
||||
int32_t len = strlen(mr->me.name);
|
||||
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
|
||||
if (NULL == res->datum.p) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
memcpy(varDataVal(res->datum.p), mr->me.name, len);
|
||||
varDataSetLen(res->datum.p, len);
|
||||
nodesDestroyNode(*pNode);
|
||||
|
@ -856,6 +866,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
|
|||
|
||||
// remove the duplicates
|
||||
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
||||
QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
||||
|
@ -1739,6 +1750,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
pExp->base.numOfParams = numOfParam;
|
||||
|
||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||
|
@ -1760,6 +1772,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
SDataType* pType = &pOpNode->node.resType;
|
||||
|
@ -1771,6 +1784,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
SDataType* pType = &pCaseNode->node.resType;
|
||||
|
@ -1781,9 +1795,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
||||
SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
if (!pExp->base.pParam) {
|
||||
code = terrno;
|
||||
}
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pExp->base.numOfParams = 1;
|
||||
SDataType* pType = &pCond->node.resType;
|
||||
|
@ -1808,6 +1821,9 @@ int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
|
|||
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
|
||||
*numOfExprs = LIST_LENGTH(pNodeList);
|
||||
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
|
||||
if (!pExprs) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < (*numOfExprs); ++i) {
|
||||
SExprInfo* pExp = &pExprs[i];
|
||||
|
@ -1944,6 +1960,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
} else {
|
||||
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
|
||||
pCtx->udfName = taosStrdup(udfName);
|
||||
QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
|
||||
|
||||
code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
@ -2075,6 +2093,9 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
|||
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
||||
|
||||
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
|
||||
if (!pCond->colList) {
|
||||
return terrno;
|
||||
}
|
||||
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
|
||||
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -2455,6 +2476,9 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
|||
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
|
||||
SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (!pList) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||
uint64_t gid = pInfo->groupId;
|
||||
|
|
|
@ -373,6 +373,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
QUERY_CHECK_NULL(qa, code, lino, _end, terrno);
|
||||
int32_t numOfUids = taosArrayGetSize(tableIdList);
|
||||
if (numOfUids == 0) {
|
||||
(*ppArrayRes) = qa;
|
||||
|
|
|
@ -89,6 +89,10 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
|
|||
int32_t pageId = -1;
|
||||
if (*currentPageId == -1) {
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return NULL;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
pData = getBufPage(pResultBuf, *currentPageId);
|
||||
|
@ -104,9 +108,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
|
|||
releaseBufPage(pResultBuf, pData);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData != NULL) {
|
||||
pData->num = sizeof(SFilePage);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return NULL;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -409,6 +415,9 @@ static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SF
|
|||
SColumnDataAgg* da = NULL;
|
||||
if (pInput->pColumnDataAgg[paramIndex] == NULL) {
|
||||
da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
|
||||
if (!da) {
|
||||
return terrno;
|
||||
}
|
||||
pInput->pColumnDataAgg[paramIndex] = da;
|
||||
if (da == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -397,6 +397,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
pInfo->win.ekey = win.skey;
|
||||
}
|
||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
if (!pInfo->p) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
||||
taosMemoryFree(pInfo->pFillInfo);
|
||||
|
@ -465,6 +468,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
}
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
|
||||
|
|
|
@ -859,6 +859,9 @@ _end:
|
|||
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
|
||||
if (!offset) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
offset[0] = sizeof(int32_t) +
|
||||
sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
|
||||
|
@ -1173,6 +1176,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
uint32_t defaultBufsz = 0;
|
||||
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
|
||||
code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -1193,6 +1197,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf),
|
||||
blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock)));
|
||||
pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
|
||||
QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno);
|
||||
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -1427,6 +1433,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
|
|||
SPartitionDataInfo newParData = {0};
|
||||
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
|
||||
newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
|
||||
QUERY_CHECK_NULL(newParData.rowIds, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(newParData.rowIds, &i);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
||||
|
@ -1590,6 +1597,9 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
if (!pBlock) {
|
||||
return NULL;
|
||||
}
|
||||
pBlock->info.hasVarCol = false;
|
||||
pBlock->info.id.groupId = 0;
|
||||
pBlock->info.rows = 0;
|
||||
|
@ -1597,6 +1607,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
|||
pBlock->info.watermark = INT64_MIN;
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
||||
QUERY_CHECK_NULL(pBlock->pDataBlock, code, lino, _end, terrno);
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||
if (tbName->numOfExprs > 0) {
|
||||
|
|
|
@ -476,17 +476,27 @@ static void freeTableCachedVal(void* param) {
|
|||
}
|
||||
|
||||
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
|
||||
QUERY_CHECK_NULL(pVal, code, lino, _end, terrno);
|
||||
pVal->pName = taosStrdup(pMetaReader->me.name);
|
||||
QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno);
|
||||
pVal->pTags = NULL;
|
||||
|
||||
// only child table has tag value
|
||||
if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
|
||||
STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
|
||||
pVal->pTags = taosMemoryMalloc(pTag->len);
|
||||
QUERY_CHECK_NULL(pVal->pTags, code, lino, _end, terrno);
|
||||
memcpy(pVal->pTags, pTag, pTag->len);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
return pVal;
|
||||
}
|
||||
|
||||
|
@ -581,6 +591,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
pHandle->api.metaReaderFn.readerReleaseLock(&mr);
|
||||
|
||||
STableCachedVal* pVal = createTableCacheVal(&mr);
|
||||
if(!pVal) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
val = *pVal;
|
||||
freeReader = true;
|
||||
|
@ -1280,6 +1293,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
|
||||
if (!pRecorder) {
|
||||
return terrno;
|
||||
}
|
||||
STableScanInfo* pTableScanInfo = pOptr->info;
|
||||
*pRecorder = pTableScanInfo->base.readRecorder;
|
||||
*pOptrExplain = pRecorder;
|
||||
|
@ -1290,7 +1306,9 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
|
|||
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
|
||||
cleanupQueryTableDataCond(&pBase->cond);
|
||||
|
||||
pAPI->tsdReaderClose(pBase->dataReader);
|
||||
if (pAPI->tsdReaderClose) {
|
||||
pAPI->tsdReaderClose(pBase->dataReader);
|
||||
}
|
||||
pBase->dataReader = NULL;
|
||||
|
||||
if (pBase->matchInfo.pList != NULL) {
|
||||
|
@ -1344,6 +1362,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
|||
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno);
|
||||
}
|
||||
|
||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||
|
@ -1359,6 +1378,8 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
|||
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
|
||||
|
||||
code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -2488,6 +2509,7 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of
|
|||
void* data = colDataGetData(pColPk, i);
|
||||
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
|
||||
void* tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE);
|
||||
QUERY_CHECK_NULL(tmq, code, lino, _end, terrno);
|
||||
memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData);
|
||||
varDataLen(tmq) = offset->primaryKey.nData;
|
||||
p[i] = (*ts > offset->ts) || (func(data, tmq) > 0);
|
||||
|
@ -2712,6 +2734,7 @@ _end:
|
|||
|
||||
static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SValue val = {0};
|
||||
if (hasPrimaryKey) {
|
||||
code = doBlockDataPrimaryKeyFilter(pBlock, offset);
|
||||
|
@ -2728,6 +2751,7 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff
|
|||
val.type = pColPk->info.type;
|
||||
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
|
||||
val.pData = taosMemoryMalloc(varDataLen(tmp));
|
||||
QUERY_CHECK_NULL(val.pData, code, lino, _end, terrno);
|
||||
val.nData = varDataLen(tmp);
|
||||
memcpy(val.pData, varDataVal(tmp), varDataLen(tmp));
|
||||
} else {
|
||||
|
@ -2735,6 +2759,11 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff
|
|||
}
|
||||
}
|
||||
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -3010,6 +3039,9 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
}
|
||||
|
||||
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (!pUpInfo) {
|
||||
return;
|
||||
}
|
||||
int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||
|
@ -3469,6 +3501,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
|
||||
QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno);
|
||||
|
||||
// Transfer the Array of STableKeyInfo into uid list.
|
||||
size_t size = tableListGetSize(pTableListInfo);
|
||||
|
@ -3864,6 +3897,8 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
pInfo->primaryKeyIndex = -1;
|
||||
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
||||
pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
||||
QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
|
||||
|
@ -3992,6 +4027,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -4386,6 +4422,8 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
}
|
||||
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
|
||||
info.pTagVal = taosMemoryMalloc(pCur->vLen);
|
||||
QUERY_CHECK_NULL(info.pTagVal, code, lino, _end, terrno);
|
||||
|
||||
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
|
||||
void* tmp = taosArrayPush(aUidTags, &info);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
@ -5292,6 +5330,7 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
QUERY_CHECK_NULL(pSortInfo, code, lino, _end, terrno);
|
||||
SBlockOrderInfo biTs = {0};
|
||||
SBlockOrderInfo biPk = {0};
|
||||
|
||||
|
@ -5380,9 +5419,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
||||
QUERY_CHECK_NULL(param, code, lino, _end, terrno);
|
||||
param->pOperator = pOperator;
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
QUERY_CHECK_NULL(ps, code, lino, _end, terrno);
|
||||
ps->param = param;
|
||||
ps->onlyRef = false;
|
||||
code = tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
@ -5659,6 +5700,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
|
|||
ASSERT(pOptr != NULL);
|
||||
// TODO: merge these two info into one struct
|
||||
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
|
||||
if (!execInfo) {
|
||||
return terrno;
|
||||
}
|
||||
STableMergeScanInfo* pInfo = pOptr->info;
|
||||
execInfo->blockRecorder = pInfo->base.readRecorder;
|
||||
execInfo->sortExecInfo = pInfo->sortExecInfo;
|
||||
|
@ -5705,6 +5749,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
|
|||
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno);
|
||||
}
|
||||
|
||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||
|
|
|
@ -93,6 +93,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
|||
|
||||
pOperator->exprSupp.pCtx =
|
||||
createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -784,6 +785,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
|
|||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
|
||||
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
|
|
@ -651,10 +651,12 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -837,6 +839,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -863,6 +866,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
pInfo->binfo.pRes = pResBlock;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
|
@ -884,6 +888,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->recvGetAll = false;
|
||||
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey;
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
|
||||
|
|
|
@ -619,10 +619,12 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -738,6 +740,9 @@ void streamEventReleaseState(SOperatorInfo* pOperator) {
|
|||
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||
int32_t resSize = winSize + sizeof(TSKEY);
|
||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
if (!pBuff) {
|
||||
return ;
|
||||
}
|
||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||
qDebug("===stream=== event window operator relase state. save result count:%d",
|
||||
|
@ -780,10 +785,12 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
|
|||
if (!pInfo->pSeUpdated && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeDeleted && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
SEventWindowInfo curInfo = {0};
|
||||
|
@ -892,6 +899,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -903,6 +911,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -604,6 +604,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
|||
SPoint cur = {0};
|
||||
cur.key = pFillInfo->current;
|
||||
cur.val = taosMemoryCalloc(1, pCell->bytes);
|
||||
QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
|
||||
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
|
||||
code = colDataSetVal(pColData, index, (const char*)cur.val, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -683,15 +684,24 @@ _end:
|
|||
}
|
||||
}
|
||||
|
||||
void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
|
||||
int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
|
||||
int32_t rowId, uint64_t groupId, int32_t rowSize) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
TSKEY ts = tsCol[rowId];
|
||||
pFillInfo->nextRowKey = ts;
|
||||
SResultRowData tmpNextRow = {.key = ts};
|
||||
tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize);
|
||||
QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno);
|
||||
transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
|
||||
keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize);
|
||||
taosMemoryFreeClear(tmpNextRow.pRowVal);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
|
||||
|
@ -721,13 +731,15 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
|||
pInfo->srcRowIndex++;
|
||||
|
||||
if (pInfo->srcRowIndex == 0) {
|
||||
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pInfo->srcRowIndex++;
|
||||
}
|
||||
|
||||
while (pInfo->srcRowIndex < pBlock->info.rows) {
|
||||
TSKEY ts = tsCol[pInfo->srcRowIndex];
|
||||
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
|
||||
if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
|
||||
code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
|
||||
|
@ -1214,6 +1226,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pFillSup->pResMap = tSimpleHashInit(16, hashFn);
|
||||
QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
|
||||
pFillSup->hasDelete = false;
|
||||
|
||||
_end:
|
||||
|
@ -1363,7 +1376,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
|
||||
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -647,6 +647,7 @@ int32_t addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* childIds = taosArrayInit(8, sizeof(int32_t));
|
||||
QUERY_CHECK_NULL(childIds, code, lino, _end, terrno);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
void* tmp = taosArrayPush(childIds, &i);
|
||||
if (!tmp) {
|
||||
|
@ -1579,10 +1580,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -1609,6 +1612,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(delWins, code, lino, _end, terrno);
|
||||
SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL;
|
||||
code = doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -1895,9 +1899,11 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno);
|
||||
qInfo("open state %p", pInfo->pState);
|
||||
pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
//*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||
|
@ -1921,6 +1927,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
}
|
||||
|
||||
pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pPullWins, code, lino, _error, terrno);
|
||||
pInfo->pullIndex = 0;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
|
@ -1936,6 +1943,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
|
||||
pInfo->delIndex = 0;
|
||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
|
||||
pInfo->delKey.ts = INT64_MAX;
|
||||
pInfo->delKey.groupId = 0;
|
||||
pInfo->numOfDatapack = 0;
|
||||
|
@ -1960,7 +1968,9 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
|
||||
pInfo->clearState = false;
|
||||
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(pInfo->pMidPullDatas, code, lino, _error, terrno);
|
||||
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
|
@ -2136,6 +2146,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
|
||||
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
||||
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
if (!pSup->pState) {
|
||||
return terrno;
|
||||
}
|
||||
*(pSup->pState) = *pState;
|
||||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||
|
@ -2145,6 +2158,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||
if (!pSup->pResultRows) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||
|
@ -3347,10 +3363,12 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -3363,6 +3381,7 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
QUERY_CHECK_NULL(pWins, code, lino, _end, terrno);
|
||||
// gap must be 0
|
||||
code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -3503,6 +3522,9 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
|||
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||
int32_t resSize = winSize + sizeof(TSKEY);
|
||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
if (!pBuff) {
|
||||
return;
|
||||
}
|
||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
|
||||
|
@ -3630,6 +3652,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
|||
if (!pInfo->pStUpdated && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
SResultWindowInfo winInfo = {0};
|
||||
|
@ -3713,6 +3736,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -3744,6 +3768,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pInfo->order = TSDB_ORDER_ASC;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -3770,6 +3795,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pInfo->recvGetAll = false;
|
||||
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey;
|
||||
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||
|
@ -3899,10 +3925,12 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -4021,6 +4049,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
|
||||
if (numOfChild > 0) {
|
||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
||||
QUERY_CHECK_NULL(pInfo->pChildren, code, lino, _error, terrno);
|
||||
for (int32_t i = 0; i < numOfChild; i++) {
|
||||
SOperatorInfo* pChildOp = NULL;
|
||||
code = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, pHandle, &pChildOp);
|
||||
|
@ -4611,10 +4640,12 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -4715,6 +4746,9 @@ void streamStateReleaseState(SOperatorInfo* pOperator) {
|
|||
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||
int32_t resSize = winSize + sizeof(TSKEY);
|
||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
if (!pBuff) {
|
||||
return ;
|
||||
}
|
||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||
qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
|
||||
|
@ -4765,10 +4799,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
|||
if (!pInfo->pSeUpdated && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeDeleted && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
SStateWindowInfo curInfo = {0};
|
||||
|
@ -4870,6 +4906,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -4884,6 +4921,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
|
@ -4910,6 +4948,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->recvGetAll = false;
|
||||
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey;
|
||||
|
||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||
|
@ -5004,11 +5043,13 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -5151,6 +5192,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->interval = (SInterval){
|
||||
.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
|
@ -5186,6 +5228,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno);
|
||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||
|
||||
|
@ -5207,6 +5250,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->invertible = false;
|
||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
|
||||
pInfo->delIndex = 0;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
|
@ -5247,6 +5291,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
|
||||
|
||||
// for stream
|
||||
|
@ -5479,10 +5524,12 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
|
|
@ -1772,8 +1772,10 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
|||
.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI};
|
||||
|
||||
SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex));
|
||||
QUERY_CHECK_NULL(idx, code, lino, _end, terrno);
|
||||
idx->init = 0;
|
||||
idx->uids = taosArrayInit(128, sizeof(int64_t));
|
||||
QUERY_CHECK_NULL(idx->uids, code, lino, _end, terrno);
|
||||
idx->lastIdx = 0;
|
||||
|
||||
pInfo->pIdx = idx; // set idx arg
|
||||
|
@ -1992,6 +1994,9 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
|||
|
||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||
char* buf1 = taosMemoryCalloc(1, contLen);
|
||||
if (!buf1) {
|
||||
return NULL;
|
||||
}
|
||||
int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||
if (tempRes < 0) {
|
||||
code = terrno;
|
||||
|
@ -2083,6 +2088,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
lino = __LINE__;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -2091,9 +2097,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
|
||||
int32_t num = 0;
|
||||
code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
extractTbnameSlotId(pInfo, pScanNode);
|
||||
|
||||
|
@ -2101,9 +2105,11 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
|
||||
pInfo->accountId = pScanPhyNode->accountId;
|
||||
pInfo->pUser = taosStrdup((void*)pUser);
|
||||
QUERY_CHECK_NULL(pInfo->pUser, code, lino, _error, terrno);
|
||||
pInfo->sysInfo = pScanPhyNode->sysInfo;
|
||||
pInfo->showRewrite = pScanPhyNode->showRewrite;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
pInfo->pCondition = pScanNode->node.pConditions;
|
||||
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -2143,7 +2149,7 @@ _error:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosMemoryFreeClear(pOperator);
|
||||
destroyOperator(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
|
@ -2173,15 +2179,19 @@ void destroySysScanOperator(void* param) {
|
|||
(void)tsem_destroy(&pInfo->ready);
|
||||
blockDataDestroy(pInfo->pRes);
|
||||
|
||||
const char* name = tNameGetTableName(&pInfo->name);
|
||||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
|
||||
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
}
|
||||
if (pInfo->name.type == TSDB_TABLE_NAME_T) {
|
||||
const char* name = tNameGetTableName(&pInfo->name);
|
||||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
|
||||
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
}
|
||||
|
||||
pInfo->pCur = NULL;
|
||||
pInfo->pCur = NULL;
|
||||
}
|
||||
} else {
|
||||
qError("pInfo->name is not initialized");
|
||||
}
|
||||
|
||||
if (pInfo->pIdx) {
|
||||
|
@ -2470,12 +2480,18 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
|
|||
bool hasIdx = false;
|
||||
bool hasRslt = true;
|
||||
SArray* mRslt = taosArrayInit(len, POINTER_BYTES);
|
||||
if (!mRslt) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SListCell* cell = pList->pHead;
|
||||
for (int i = 0; i < len; i++) {
|
||||
if (cell == NULL) break;
|
||||
|
||||
SArray* aRslt = taosArrayInit(16, sizeof(int64_t));
|
||||
if (!aRslt) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
ret = optSysTabFilteImpl(arg, cell->pNode, aRslt);
|
||||
if (ret == 0) {
|
||||
|
@ -2694,6 +2710,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
|||
}
|
||||
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
|
||||
code = blockDataEnsureCapacity(pInfo->pResBlock, 1);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -666,6 +666,9 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp
|
|||
pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
|
||||
pInfo->pPrevGroupKey->isNull = false;
|
||||
pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
|
||||
if (!pInfo->pPrevGroupKey->pData) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1156,6 +1159,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
|||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||
pInfo->pLinearInfo = NULL;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
pInfo->win = pInterpPhyNode->timeRange;
|
||||
pInfo->interval.interval = pInterpPhyNode->interval;
|
||||
pInfo->current = pInfo->win.skey;
|
||||
|
@ -1174,6 +1178,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
|||
|
||||
if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
|
||||
pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes);
|
||||
QUERY_CHECK_NULL(pInfo->prevKey.pks[0].pData, code, lino, _error, terrno);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1285,6 +1285,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
|||
}
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
@ -1625,6 +1626,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
|
|||
}
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
|
@ -1699,6 +1701,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
|
@ -2039,6 +2042,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&iaInfo->binfo, pResBlock);
|
||||
code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -2366,6 +2370,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
}
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
|
||||
code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -198,6 +198,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
|
|||
SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type,
|
||||
int32_t numOfCols) {
|
||||
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(taosMemoryCalloc(1, sizeof(SOperatorInfo)));
|
||||
ASSERT(!pOperator);
|
||||
pOperator->name = "dummyInputOpertor4Test";
|
||||
|
||||
if (numOfCols == 1) {
|
||||
|
@ -207,6 +208,7 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
|
|||
}
|
||||
|
||||
SDummyInputInfo* pInfo = (SDummyInputInfo*)taosMemoryCalloc(1, sizeof(SDummyInputInfo));
|
||||
ASSERT(!pInfo);
|
||||
pInfo->totalPages = numOfBlocks;
|
||||
pInfo->startVal = startVal;
|
||||
pInfo->numOfRowsPerPage = rowsPerPage;
|
||||
|
|
|
@ -16,6 +16,9 @@ from util.sql import *
|
|||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
|
||||
import subprocess
|
||||
import threading
|
||||
|
||||
def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key1='', value1=''):
|
||||
if len(key) == 0:
|
||||
tdLog.exit("taos test key is null!")
|
||||
|
@ -108,7 +111,7 @@ class TDTestCase:
|
|||
updatecfgDict["fqdn"] = hostname
|
||||
|
||||
print ("===================: ", updatecfgDict)
|
||||
|
||||
taos_output = []
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
@ -116,7 +119,6 @@ class TDTestCase:
|
|||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
|
@ -129,6 +131,29 @@ class TDTestCase:
|
|||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
def run_command(self, commands):
|
||||
|
||||
count = 0
|
||||
while count < 2:
|
||||
# print(f"count: {count}")
|
||||
value = subprocess.getoutput(f"nohup {commands} &")
|
||||
# print(f"value: {value}")
|
||||
self.taos_output.append(value)
|
||||
count += 1
|
||||
|
||||
def taos_thread_repeat_k(self, run_command, commands, threads_num=10, output=[]):
|
||||
threads = []
|
||||
taos_output = self.taos_output
|
||||
for id in range(threads_num):
|
||||
#threads.append(Process(target=cloud_consumer, args=(id,)))
|
||||
threads.append(threading.Thread(target=run_command, args=(commands,)))
|
||||
for tr in threads:
|
||||
tr.start()
|
||||
for tr in threads:
|
||||
tr.join()
|
||||
for value in taos_output:
|
||||
if "crash" in value:
|
||||
tdLog.exit(f"command: {commands} crash")
|
||||
|
||||
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
|
||||
tdSql.prepare()
|
||||
|
@ -445,6 +470,12 @@ class TDTestCase:
|
|||
|
||||
tdSql.query('drop database %s'%newDbName)
|
||||
|
||||
commands = f"{buildPath}/taos -k -c {cfgPath}"
|
||||
output = self.run_command(commands)
|
||||
os.sys
|
||||
self.taos_thread_repeat_k(self.run_command, commands, 100, output)
|
||||
# os.system("python 0-others/repeat_taos_k.py")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
|
|
@ -88,24 +88,29 @@ class TDTestCase:
|
|||
pid = 0
|
||||
return pid
|
||||
|
||||
def checkAndstopPro(self,processName,startAction):
|
||||
i = 1
|
||||
count = 10
|
||||
def checkAndstopPro(self, processName, startAction, count = 10):
|
||||
for i in range(count):
|
||||
taosdPid=self.get_process_pid(processName)
|
||||
print("taosdPid:",taosdPid)
|
||||
if taosdPid != 0 and taosdPid != "" :
|
||||
tdLog.info("stop taosd %s ,kill pid :%s "%(startAction,taosdPid))
|
||||
os.system("kill -9 %d"%taosdPid)
|
||||
break
|
||||
for j in range(count):
|
||||
os.system("kill -9 %d"%taosdPid)
|
||||
taosdPid=self.get_process_pid(processName)
|
||||
print("taosdPid2:",taosdPid)
|
||||
if taosdPid == 0 or taosdPid == "" :
|
||||
tdLog.info("taosd %s is stoped "%startAction)
|
||||
return
|
||||
else:
|
||||
tdLog.info( "wait start taosd ,times: %d "%i)
|
||||
time.sleep(1)
|
||||
i+= 1
|
||||
else :
|
||||
tdLog.exit("taosd %s is not running "%startAction)
|
||||
|
||||
|
||||
def taosdCommandStop(self,startAction,taosdCmdRun):
|
||||
processName="taosd"
|
||||
processName= "taosd"
|
||||
count = 10
|
||||
if platform.system().lower() == 'windows':
|
||||
processName="taosd.exe"
|
||||
taosdCmd = taosdCmdRun + startAction
|
||||
|
@ -116,7 +121,7 @@ class TDTestCase:
|
|||
else:
|
||||
logTime=datetime.now().strftime('%Y%m%d_%H%M%S_%f')
|
||||
os.system(f"nohup {taosdCmd} > {logTime}.log 2>&1 & ")
|
||||
self.checkAndstopPro(processName,startAction)
|
||||
self.checkAndstopPro(processName,startAction,count)
|
||||
os.system(f"rm -rf {logTime}.log")
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue