From 5dadb26070a2bfec9073cc5db3d553f99d4af150 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 May 2022 18:18:54 +0800 Subject: [PATCH 1/5] fix(query): set the correct state value key index. --- source/libs/executor/inc/executorimpl.h | 7 +- source/libs/executor/src/executorimpl.c | 48 ++++++------ source/libs/executor/src/groupoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 74 ++++++++++++++----- source/libs/parser/src/parTranslater.c | 4 +- source/libs/parser/src/parUtil.c | 2 +- 6 files changed, 91 insertions(+), 48 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 34b7fce33c..d0161067dc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -564,7 +564,7 @@ typedef struct SStateWindowOperatorInfo { SAggSupporter aggSup; SGroupResInfo groupResInfo; SWindowRowsSup winSup; - int32_t colIndex; // start row index + SColumn stateCol; // start row index bool hasKey; SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id @@ -636,7 +636,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); -void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); +void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, @@ -659,6 +659,7 @@ void cleanupAggSup(SAggSupporter* pAggSup); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); +SColumn extractColumnFromColumnNode(SColumnNode* pColNode); SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo); SSDataBlock* loadNextDataBlock(void* param); @@ -712,7 +713,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8db5a282d3..5cb0b812c0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -155,9 +155,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SExecTaskInfo *taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset, - SqlFunctionCtx* pCtx); +static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, + int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); @@ -2214,7 +2213,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p * @param result */ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { + int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -2248,13 +2247,12 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn pGroupResInfo->index += 1; - for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); if (pCtx[j].fpSet.finalize) { - int32_t code = TSDB_CODE_SUCCESS; - code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code)); taskInfo->code = code; @@ -2286,10 +2284,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn return 0; } -void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, - SDiskbasedBuf* pBuf) { +void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); + SExprInfo* pExprInfo = pOperator->pExpr; + int32_t numOfExprs = pOperator->numOfExprs; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; SSDataBlock* pBlock = pbInfo->pRes; SqlFunctionCtx* pCtx = pbInfo->pCtx; @@ -2300,7 +2301,7 @@ void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGr } int32_t orderType = TSDB_ORDER_ASC; - doCopyToSDataBlock(taskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx); + doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx, numOfExprs); // add condition (pBlock->info.rows >= 1) just to runtime happy blockDataUpdateTsWindow(pBlock); @@ -3803,7 +3804,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pTaskInfo, pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -4974,7 +4975,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; - pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, pTaskInfo); + SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; + SColumn col = extractColumnFromColumnNode(pColNode); + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -5039,6 +5042,17 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi return TSDB_CODE_SUCCESS; } +SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { + SColumn c = {0}; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.scale = pColNode->node.resType.scale; + c.precision = pColNode->node.resType.precision; + return c; +} + SArray* extractColumnInfo(SNodeList* pNodeList) { size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); @@ -5053,15 +5067,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - // todo extract method - SColumn c = {0}; - c.slotId = pColNode->slotId; - c.colId = pColNode->colId; - c.type = pColNode->node.resType.type; - c.bytes = pColNode->node.resType.bytes; - c.scale = pColNode->node.resType.scale; - c.precision = pColNode->node.resType.precision; - + SColumn c = extractColumnFromColumnNode(pColNode); taosArrayPush(pList, &c); } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { SValueNode* pValNode = (SValueNode*)pNode->pExpr; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ac6f0cf881..d8ccac8cea 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false); while(1) { - doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 332a116f76..10dc482462 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -806,10 +806,22 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +static bool compareVal(const char* v, const SStateKeys* pKey) { + if (IS_VAR_DATA_TYPE(pKey->type)) { + if (varDataLen(v) != varDataLen(pKey->pData)) { + return false; + } else { + return strncmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0; + } + } else { + return memcmp(pKey->pData, v, pKey->bytes) == 0; + } +} + static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); + SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId); int64_t gid = pBlock->info.groupId; bool masterScan = true; @@ -822,20 +834,28 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI SWindowRowsSup* pRowSup = &pInfo->winSup; pRowSup->numOfRows = 0; + struct SColumnDataAgg* pAgg = NULL; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg[pInfo->colIndex])) { + pAgg = (pBlock->pBlockAgg != NULL)? pBlock->pBlockAgg[pInfo->stateCol.slotId]: NULL; + if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { continue; } char* val = colDataGetData(pStateColInfoData, j); if (!pInfo->hasKey) { - memcpy(pInfo->stateKey.pData, val, bytes); + // todo extract method + if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { + varDataCopy(pInfo->stateKey.pData, val); + } else { + memcpy(pInfo->stateKey.pData, val, bytes); + } + pInfo->hasKey = true; doKeepNewWindowStartInfo(pRowSup, tsList, j); doKeepTuple(pRowSup, tsList[j]); - } else if (memcmp(pInfo->stateKey.pData, val, bytes) == 0) { + } else if (compareVal(val, &pInfo->stateKey)) { doKeepTuple(pRowSup, tsList[j]); if (j == 0 && pRowSup->startRowIndex != 0) { pRowSup->startRowIndex = 0; @@ -861,6 +881,13 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j); doKeepTuple(pRowSup, tsList[j]); + + // todo extract method + if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { + varDataCopy(pInfo->stateKey.pData, val); + } else { + memcpy(pInfo->stateKey.pData, val, bytes); + } } } @@ -888,7 +915,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -921,7 +948,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -948,7 +975,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -1012,7 +1039,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -1053,7 +1080,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); // TODO: remove for stream /*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/ @@ -1283,7 +1310,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -1316,7 +1343,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -1406,14 +1433,21 @@ _error: SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId, - SExecTaskInfo* pTaskInfo) { + SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->colIndex = -1; + pInfo->stateCol = *pStateKeyCol; + pInfo->stateKey.type = pInfo->stateCol.type; + pInfo->stateKey.bytes = pInfo->stateCol.bytes; + pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); + if (pInfo->stateKey.pData == NULL) { + goto _error; + } + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); @@ -1423,15 +1457,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pInfo->twAggSup = *pTwAggSup; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = tsSlotId; - pOperator->name = "StateWindowOperator"; + pInfo->tsSlotId = tsSlotId; + pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExpr; pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c86c1ac2e9..b7f007fce6 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1538,7 +1538,9 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { STranslateContext* pCxt = pContext; SColumnNode* pCol = (SColumnNode*)pNode; - if (!IS_INTEGER_TYPE(pCol->node.resType.type)) { + + int32_t type = pCol->node.resType.type; + if (!IS_INTEGER_TYPE(type) && type != TSDB_DATA_TYPE_BOOL && !IS_VAR_DATA_TYPE(type)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE); } if (COLUMN_TYPE_TAG == pCol->colType) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 43aea8de7c..7183d956a9 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -91,7 +91,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_AGG_FUNC_NESTING: return "Aggregate functions do not support nesting"; case TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE: - return "Only support STATE_WINDOW on integer column"; + return "Only support STATE_WINDOW on integer/bool/varchar column"; case TSDB_CODE_PAR_INVALID_STATE_WIN_COL: return "Not support STATE_WINDOW on tag column"; case TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE: From 4d2ac0af0bf92c7f31e5f4d963df1489ab2c3e46 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 May 2022 18:19:49 +0800 Subject: [PATCH 2/5] test: Print the executed sql statements in the script files. --- tests/system-test/2-query/join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/join.py b/tests/system-test/2-query/join.py index 289dd3d62d..8fc131e581 100644 --- a/tests/system-test/2-query/join.py +++ b/tests/system-test/2-query/join.py @@ -28,7 +28,7 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) def __query_condition(self,tbname): query_condition = [] From d54e345dc96b735803c3db05ff1c8dd8403e58ab Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 13 May 2022 21:00:46 +0800 Subject: [PATCH 3/5] fix: ensure capacity before column merge: select agg1(col/10), agg2(col/2) --- source/libs/executor/src/executorimpl.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8db5a282d3..b5c6daa69c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -898,6 +898,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; + colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); numOfRows = dest.numOfRows; @@ -937,6 +938,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; + colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); numOfRows = dest.numOfRows; From a6900a7e961649a4e3e1f57502b204d8ac9a3d3b Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 13 May 2022 21:02:32 +0800 Subject: [PATCH 4/5] fix : udf2 overflow for large int multiply large int --- source/libs/function/test/udf2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index ba39b09f56..6410af2a4b 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -44,7 +44,7 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte case TSDB_DATA_TYPE_INT: { char* cell = udfColDataGetData(col, j); int32_t num = *(int32_t*)cell; - sumSquares += num * num; + sumSquares += (double)num * num; break; } case TSDB_DATA_TYPE_DOUBLE: { From 0815843e94baf90150b749841087db2a173a9507 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Fri, 13 May 2022 21:42:16 +0800 Subject: [PATCH 5/5] fix(os): make taosd.exe taos.exe run on windows. --- contrib/CMakeLists.txt | 2 +- include/util/tjson.h | 17 +-- source/client/inc/clientStmt.h | 2 +- source/client/src/clientSml.c | 8 +- source/dnode/vnode/src/vnd/vnodeCfg.c | 98 ++++++++++------ source/dnode/vnode/src/vnd/vnodeCommit.c | 7 +- source/libs/function/src/tudf.c | 24 ++++ source/libs/nodes/src/nodesCodeFuncs.c | 75 ++++++------ source/os/src/osAtomic.c | 30 ++--- source/os/src/osFile.c | 72 ++++++------ source/os/src/osMemory.c | 2 +- source/os/src/osProc.c | 4 + source/os/src/osShm.c | 4 + source/os/src/osSocket.c | 11 +- source/os/src/osSysinfo.c | 33 +++++- source/os/src/osSystem.c | 8 +- source/util/src/tconfig.c | 9 +- tools/shell/src/shellCommand.c | 143 +++++++---------------- tools/shell/src/shellEngine.c | 4 +- 19 files changed, 299 insertions(+), 254 deletions(-) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 926fbc8957..14a85ee4f6 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -365,7 +365,7 @@ if(${BUILD_ADDR2LINE}) if(HAVE_LIBELF_H OR HAVE_LIBELF_LIBELF_H) target_link_libraries(libdwarf PUBLIC libelf) endif() - target_include_directories(libdwarf SYSTEM PUBLIC "libdwarf/src/lib/libdwarf" ${CMAKE_BINARY_DIR}/contrib) + target_include_directories(libdwarf SYSTEM PUBLIC "libdwarf/src/lib/libdwarf" ${CMAKE_CURRENT_BINARY_DIR}) file(READ "addr2line/addr2line.c" ADDR2LINE_CONTENT) string(REPLACE "static int" "int" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}") string(REPLACE "static void" "void" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}") diff --git a/include/util/tjson.h b/include/util/tjson.h index d23f7b402e..a95efe56e7 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -22,17 +22,12 @@ extern "C" { #endif -#ifdef WINDOWS -#define tjsonGetNumberValue(pJson, pName, val) -1 -#else -#define tjsonGetNumberValue(pJson, pName, val) \ - ({ \ - uint64_t _tmp = 0; \ - int32_t _code = tjsonGetUBigIntValue(pJson, pName, &_tmp); \ - val = _tmp; \ - _code; \ - }) -#endif +#define tjsonGetNumberValue(pJson, pName, val, code) \ + do { \ + uint64_t _tmp = 0; \ + code = tjsonGetUBigIntValue(pJson, pName, &_tmp); \ + val = _tmp; \ + } while (0) typedef void SJson; diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index ae27e611cb..32625b4c51 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -63,7 +63,7 @@ typedef struct SStmtBindInfo { int8_t tbType; bool tagsCached; void* boundTags; - char tbName[TSDB_TABLE_FNAME_LEN];; + char tbName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN]; char stbFName[TSDB_TABLE_FNAME_LEN]; SName sname; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 3e71714f21..dfb1942824 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1985,8 +1985,8 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { (*oneTable)->sTableName = elements.measure; (*oneTable)->sTableNameLen = elements.measureLen; - RandTableName rName = {.tags=(*oneTable)->tags, .sTableName=(*oneTable)->sTableName, .sTableNameLen=(uint8_t)(*oneTable)->sTableNameLen, - .childTableName=(*oneTable)->childTableName}; + RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen, + (*oneTable)->childTableName, 0 }; buildChildTableName(&rName); (*oneTable)->uid = rName.uid; @@ -2045,8 +2045,8 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) { } taosHashClear(info->dumplicateKey); - RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=(uint8_t)tinfo->sTableNameLen, - .childTableName=tinfo->childTableName}; + RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, + tinfo->childTableName, 0 }; buildChildTableName(&rName); tinfo->uid = rName.uid; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 32866d7469..a66ecc493d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -114,24 +114,42 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { int vnodeDecodeConfig(const SJson *pJson, void *pObj) { SVnodeCfg *pCfg = (SVnodeCfg *)pObj; - if (tjsonGetNumberValue(pJson, "vgId", pCfg->vgId) < 0) return -1; + int32_t code; + tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code); + if(code < 0) return -1; if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1; - if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1; - if (tjsonGetNumberValue(pJson, "szPage", pCfg->szPage) < 0) return -1; - if (tjsonGetNumberValue(pJson, "szCache", pCfg->szCache) < 0) return -1; - if (tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf) < 0) return -1; - if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap) < 0) return -1; - if (tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak) < 0) return -1; - if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; - if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; - if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; - if (tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel) < 0) return -1; - if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1; - if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1; - if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1; - if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; - if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; - if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; + tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code); + if(code < 0) return -1; SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); int32_t nRetention = tjsonGetArraySize(pNodeRetentions); if (nRetention > TSDB_RETENTION_MAX) { @@ -140,24 +158,36 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { for (int32_t i = 0; i < nRetention; ++i) { SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); ASSERT(pNodeRetention != NULL); - tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq); - tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit); - tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep); - tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit); + tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code); + tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code); + tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code); + tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); } - if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; - if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; - if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; - if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1; - if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1; - if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1; - if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1; - if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1; - if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; - if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; + tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code); + if(code < 0) return -1; - if (tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; - if (tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; + tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code); + if(code < 0) return -1; SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo"); int arraySize = tjsonGetArraySize(pNodeInfoArr); @@ -166,7 +196,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { for (int i = 0; i < arraySize; ++i) { SJson *pNodeInfo = tjsonGetArrayItem(pNodeInfoArr, i); assert(pNodeInfo != NULL); - tjsonGetNumberValue(pNodeInfo, "nodePort", (pCfg->syncCfg.nodeInfo)[i].nodePort); + tjsonGetNumberValue(pNodeInfo, "nodePort", (pCfg->syncCfg.nodeInfo)[i].nodePort, code); tjsonGetStringValue(pNodeInfo, "nodeFqdn", (pCfg->syncCfg.nodeInfo)[i].nodeFqdn); } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 6d8bcb35c8..e7bee3342a 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -310,8 +310,11 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) { static int vnodeDecodeState(const SJson *pJson, void *pObj) { SVState *pState = (SVState *)pObj; - if (tjsonGetNumberValue(pJson, "commit version", pState->committed) < 0) return -1; - if (tjsonGetNumberValue(pJson, "applied version", pState->applied) < 0) return -1; + int32_t code; + tjsonGetNumberValue(pJson, "commit version", pState->committed, code); + if(code < 0) return -1; + tjsonGetNumberValue(pJson, "applied version", pState->applied, code); + if(code < 0) return -1; return 0; } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 8e96a2a063..51829cfdd8 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -34,6 +34,9 @@ typedef struct SUdfdData { uv_thread_t thread; uv_barrier_t barrier; uv_process_t process; +#ifdef WINDOWS + HANDLE jobHandle; +#endif int spawnErr; uv_pipe_t ctrlPipe; uv_async_t stopAsync; @@ -104,6 +107,24 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { int err = uv_spawn(&pData->loop, &pData->process, &options); pData->process.data = (void*)pData; +#ifdef WINDOWS + // End udfd.exe by Job. + if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle); + pData->jobHandle = CreateJobObject(NULL, NULL); + bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle); + if (!add_job_ok) { + fnError("Assign udfd to job failed."); + } else { + JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info; + memset(&limit_info, 0x0, sizeof(limit_info)); + limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + bool set_auto_kill_ok = SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info)); + if (!set_auto_kill_ok) { + fnError("Set job auto kill udfd failed."); + } + } +#endif + if (err != 0) { fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); } @@ -182,6 +203,9 @@ int32_t udfStopUdfd() { uv_barrier_destroy(&pData->barrier); uv_async_send(&pData->stopAsync); uv_thread_join(&pData->thread); +#ifdef WINDOWS + if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle); +#endif fnInfo("dnode-mgmt udfd cleaned up"); return 0; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 71b0774ca6..6e0775ff17 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -317,15 +317,16 @@ static int32_t tableComInfoToJson(const void* pObj, SJson* pJson) { static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) { STableComInfo* pNode = (STableComInfo*)pObj; - int32_t code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags); + int32_t code; + tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags, code);; if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision); + tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns); + tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize); + tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize, code);; } return code; @@ -356,12 +357,13 @@ static int32_t schemaToJson(const void* pObj, SJson* pJson) { static int32_t jsonToSchema(const SJson* pJson, void* pObj) { SSchema* pNode = (SSchema*)pObj; - int32_t code = tjsonGetNumberValue(pJson, jkSchemaType, pNode->type); + int32_t code; + tjsonGetNumberValue(pJson, jkSchemaType, pNode->type, code);; if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId); + tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes); + tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes, code);; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name); @@ -412,21 +414,22 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) { static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) { STableMeta* pNode = (STableMeta*)pObj; - int32_t code = tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId); + int32_t code; + tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId, code);; if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType); + tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid); + tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid); + tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion); + tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion); + tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion, code);; } if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo); @@ -602,7 +605,7 @@ static int32_t jsonToLogicFillNode(const SJson* pJson, void* pObj) { int32_t code = jsonToLogicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode); + tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode, code);; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillLogicPlanWStartTs, &pNode->pWStartTs); @@ -878,7 +881,7 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { code = jsonToNodeObject(pJson, jkLogicSubplanRootNode, (SNode**)&pNode->pNode); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType); + tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code);; } int32_t objSize = 0; if (TSDB_CODE_SUCCESS == code) { @@ -1118,25 +1121,25 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanRatio, &pNode->ratio); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired); + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval); + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset); + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding); + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit); + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit); + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);; } return code; @@ -1178,7 +1181,7 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) { code = tjsonGetBoolValue(pJson, jkSysTableScanPhysiPlanShowRewrite, &pNode->showRewrite); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId); + tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId, code);; } return code; @@ -1262,7 +1265,7 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); + tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); @@ -1424,10 +1427,10 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType); + tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark); + tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code);; } return code; @@ -1523,7 +1526,7 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode); + tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode, code);; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillPhysiPlanWStartTs, &pNode->pWStartTs); @@ -1562,7 +1565,7 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysiWindowNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap); + tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap, code);; } return code; @@ -1724,7 +1727,7 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType); + tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code);; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType); @@ -1914,7 +1917,7 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) { code = tjsonGetSmallIntValue(pJson, jkColumnColId, &pNode->colId); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType); + tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType, code);; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkColumnDbName, pNode->dbName); @@ -2168,7 +2171,7 @@ static int32_t jsonToOperatorNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType); + tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType, code);; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkOperatorLeft, &pNode->pLeft); @@ -2202,7 +2205,7 @@ static int32_t jsonToLogicConditionNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType); + tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType, code);; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkLogicCondParameters, &pNode->pParameterList); @@ -2384,10 +2387,10 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) { int32_t code = jsonToNodeObject(pJson, jkOrderByExprExpr, &pNode->pExpr); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order); + tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order, code);; } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder); + tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder, code);; } return code; @@ -2493,7 +2496,8 @@ static int32_t fillNodeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToFillNode(const SJson* pJson, void* pObj) { SFillNode* pNode = (SFillNode*)pObj; - int32_t code = tjsonGetNumberValue(pJson, jkFillMode, pNode->mode); + int32_t code; + tjsonGetNumberValue(pJson, jkFillMode, pNode->mode, code);; if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillValues, &pNode->pValues); } @@ -3033,7 +3037,8 @@ static int32_t nodeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToNode(const SJson* pJson, void* pObj) { SNode* pNode = (SNode*)pObj; - int32_t code = tjsonGetNumberValue(pJson, jkNodeType, pNode->type); + int32_t code; + tjsonGetNumberValue(pJson, jkNodeType, pNode->type, code);; if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, nodesNodeName(pNode->type), jsonToSpecificNode, pNode); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/os/src/osAtomic.c b/source/os/src/osAtomic.c index 0fe946bf68..e4d880f40a 100644 --- a/source/os/src/osAtomic.c +++ b/source/os/src/osAtomic.c @@ -36,7 +36,7 @@ int64_t interlocked_add_fetch_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_add_fetch_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)(_InterlockedExchangeAdd((int32_t volatile*)(ptr), (int32_t)val) + (int32_t)val); #else return (void*)(InterlockedExchangeAdd64((int64_t volatile*)(ptr), (int64_t)val) + (int64_t)val); @@ -56,7 +56,7 @@ int32_t interlocked_and_fetch_32(int32_t volatile* ptr, int32_t val) { } int64_t interlocked_and_fetch_64(int64_t volatile* ptr, int64_t val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS int64_t old, res; do { old = *ptr; @@ -69,7 +69,7 @@ int64_t interlocked_and_fetch_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_and_fetch_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)interlocked_and_fetch_32((int32_t volatile*)ptr, (int32_t)val); #else return (void*)interlocked_and_fetch_64((int64_t volatile*)ptr, (int64_t)val); @@ -77,7 +77,7 @@ void* interlocked_and_fetch_ptr(void* volatile* ptr, void* val) { } int64_t interlocked_fetch_and_64(int64_t volatile* ptr, int64_t val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS int64_t old; do { old = *ptr; @@ -89,7 +89,7 @@ int64_t interlocked_fetch_and_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_fetch_and_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)_InterlockedAnd((int32_t volatile*)(ptr), (int32_t)(val)); #else return (void*)_InterlockedAnd64((int64_t volatile*)(ptr), (int64_t)(val)); @@ -109,7 +109,7 @@ int32_t interlocked_or_fetch_32(int32_t volatile* ptr, int32_t val) { } int64_t interlocked_or_fetch_64(int64_t volatile* ptr, int64_t val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS int64_t old, res; do { old = *ptr; @@ -122,7 +122,7 @@ int64_t interlocked_or_fetch_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_or_fetch_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)interlocked_or_fetch_32((int32_t volatile*)ptr, (int32_t)val); #else return (void*)interlocked_or_fetch_64((int64_t volatile*)ptr, (int64_t)val); @@ -130,7 +130,7 @@ void* interlocked_or_fetch_ptr(void* volatile* ptr, void* val) { } int64_t interlocked_fetch_or_64(int64_t volatile* ptr, int64_t val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS int64_t old; do { old = *ptr; @@ -142,7 +142,7 @@ int64_t interlocked_fetch_or_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_fetch_or_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)_InterlockedOr((int32_t volatile*)(ptr), (int32_t)(val)); #else return (void*)interlocked_fetch_or_64((int64_t volatile*)(ptr), (int64_t)(val)); @@ -162,7 +162,7 @@ int32_t interlocked_xor_fetch_32(int32_t volatile* ptr, int32_t val) { } int64_t interlocked_xor_fetch_64(int64_t volatile* ptr, int64_t val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS int64_t old, res; do { old = *ptr; @@ -175,7 +175,7 @@ int64_t interlocked_xor_fetch_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_xor_fetch_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)interlocked_xor_fetch_32((int32_t volatile*)(ptr), (int32_t)(val)); #else return (void*)interlocked_xor_fetch_64((int64_t volatile*)(ptr), (int64_t)(val)); @@ -183,7 +183,7 @@ void* interlocked_xor_fetch_ptr(void* volatile* ptr, void* val) { } int64_t interlocked_fetch_xor_64(int64_t volatile* ptr, int64_t val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS int64_t old; do { old = *ptr; @@ -195,7 +195,7 @@ int64_t interlocked_fetch_xor_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_fetch_xor_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)_InterlockedXor((int32_t volatile*)(ptr), (int32_t)(val)); #else return (void*)interlocked_fetch_xor_64((int64_t volatile*)(ptr), (int64_t)(val)); @@ -211,7 +211,7 @@ int64_t interlocked_sub_fetch_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_sub_fetch_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)interlocked_sub_fetch_32((int32_t volatile*)ptr, (int32_t)val); #else return (void*)interlocked_add_fetch_64((int64_t volatile*)ptr, (int64_t)val); @@ -226,7 +226,7 @@ int64_t interlocked_fetch_sub_64(int64_t volatile* ptr, int64_t val) { } void* interlocked_fetch_sub_ptr(void* volatile* ptr, void* val) { -#ifdef _TD_WINDOWS_32 +#ifdef WINDOWS return (void*)interlocked_fetch_sub_32((int32_t volatile*)ptr, (int32_t)val); #else return (void*)interlocked_fetch_sub_64((int64_t volatile*)ptr, (int64_t)val); diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 425bf8b7ac..3cd05b65cd 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -109,6 +109,7 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha int64_t taosCopyFile(const char *from, const char *to) { #ifdef WINDOWS + assert(0); return 0; #else char buffer[4096]; @@ -152,16 +153,16 @@ int32_t taosRemoveFile(const char *path) { return remove(path); } int32_t taosRenameFile(const char *oldName, const char *newName) { #ifdef WINDOWS - int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED); - if (code < 0) { - // printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); + bool code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED); + if (!code) { + printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); } - return code; + return !code; #else int32_t code = rename(oldName, newName); if (code < 0) { - // printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); + printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); } return code; @@ -169,11 +170,12 @@ int32_t taosRenameFile(const char *oldName, const char *newName) { } int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) { -#ifdef WINDOWS - return 0; -#else struct stat fileStat; +#ifdef WINDOWS + int32_t code = _stat(path, &fileStat); +#else int32_t code = stat(path, &fileStat); +#endif if (code < 0) { return code; } @@ -187,14 +189,15 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) { } return 0; -#endif } int32_t taosDevInoFile(const char *path, int64_t *stDev, int64_t *stIno) { -#ifdef WINDOWS - return 0; -#else + struct stat fileStat; +#ifdef WINDOWS + int32_t code = _stat(path, &fileStat); +#else int32_t code = stat(path, &fileStat); +#endif if (code < 0) { return code; } @@ -208,7 +211,6 @@ int32_t taosDevInoFile(const char *path, int64_t *stDev, int64_t *stIno) { } return 0; -#endif } void autoDelFileListAdd(const char *path) { return; } @@ -276,9 +278,6 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { } int64_t taosCloseFile(TdFilePtr *ppFile) { -#ifdef WINDOWS - return 0; -#else if (ppFile == NULL || *ppFile == NULL) { return 0; } @@ -294,7 +293,12 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { (*ppFile)->fp = NULL; } if ((*ppFile)->fd >= 0) { + #ifdef WINDOWS + HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd); + !FlushFileBuffers(h); + #else fsync((*ppFile)->fd); + #endif close((*ppFile)->fd); (*ppFile)->fd = -1; } @@ -306,7 +310,6 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { taosMemoryFree(*ppFile); *ppFile = NULL; return 0; -#endif } int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { @@ -412,13 +415,17 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { } int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { -#ifdef WINDOWS - return 0; -#else + if (pFile == NULL) { + return 0; + } assert(pFile->fd >= 0); // Please check if you have closed the file. struct stat fileStat; +#ifdef WINDOWS + int32_t code = _fstat(pFile->fd, &fileStat); +#else int32_t code = fstat(pFile->fd, &fileStat); +#endif if (code < 0) { return code; } @@ -432,7 +439,6 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { } return 0; -#endif } int32_t taosLockFile(TdFilePtr pFile) { @@ -459,7 +465,7 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) { #ifdef WINDOWS if (pFile->fd < 0) { errno = EBADF; - printf("%s\n", "fd arg was negative"); + printf("Ftruncate file error, fd arg was negative\n"); return -1; } @@ -516,26 +522,20 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) { } int32_t taosFsyncFile(TdFilePtr pFile) { -#ifdef WINDOWS - if (pFile->fd < 0) { - errno = EBADF; - printf("%s\n", "fd arg was negative"); - return -1; - } - - HANDLE h = (HANDLE)_get_osfhandle(pFile->fd); - - return !FlushFileBuffers(h); -#else if (pFile == NULL) { return 0; } if (pFile->fp != NULL) return fflush(pFile->fp); - if (pFile->fd >= 0) return fsync(pFile->fd); - + if (pFile->fd >= 0) { + #ifdef WINDOWS + HANDLE h = (HANDLE)_get_osfhandle(pFile->fd); + return !FlushFileBuffers(h); + #else + return fsync(pFile->fd); + #endif + } return 0; -#endif } int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) { diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 7c877b463a..e3791af618 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -138,7 +138,7 @@ static void print_line(Dwarf_Debug dbg, Dwarf_Line line, Dwarf_Addr pc) { dwarf_linesrc(line, &linesrc, NULL); dwarf_lineno(line, &lineno, NULL); } - printf("%s:%" DW_PR_DUu "\n", linesrc, lineno); + printf("BackTrace %08" PRId64 " %s:%" DW_PR_DUu "\n", taosGetSelfPthreadId(), linesrc, lineno); if (line) dwarf_dealloc(dbg, linesrc, DW_DLA_STRING); } void taosPrintBackTrace() { diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index f92a3b3783..74f1356abf 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -19,6 +19,7 @@ int32_t taosNewProc(char **args) { #ifdef WINDOWS + assert(0); return 0; #else int32_t pid = fork(); @@ -36,6 +37,7 @@ int32_t taosNewProc(char **args) { void taosWaitProc(int32_t pid) { #ifdef WINDOWS + assert(0); #else int32_t status = -1; waitpid(pid, &status, 0); @@ -44,6 +46,7 @@ void taosWaitProc(int32_t pid) { void taosKillProc(int32_t pid) { #ifdef WINDOWS + assert(0); #else kill(pid, SIGINT); #endif @@ -51,6 +54,7 @@ void taosKillProc(int32_t pid) { bool taosProcExist(int32_t pid) { #ifdef WINDOWS + assert(0); return false; #else int32_t p = getpgid(pid); diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index 1cd51f94a0..cb09e2fb38 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -23,6 +23,7 @@ static int32_t shmids[MAX_SHMIDS] = {0}; static void taosDeleteCreatedShms() { #if defined(WINDOWS) + assert(0); #else for (int32_t i = 0; i < MAX_SHMIDS; ++i) { int32_t shmid = shmids[i] - 1; @@ -35,6 +36,7 @@ static void taosDeleteCreatedShms() { int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { #if defined(WINDOWS) + assert(0); #else pShm->id = -1; @@ -75,6 +77,7 @@ int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { void taosDropShm(SShm* pShm) { #if defined(WINDOWS) + assert(0); #else if (pShm->id >= 0) { if (pShm->ptr != NULL) { @@ -90,6 +93,7 @@ void taosDropShm(SShm* pShm) { int32_t taosAttachShm(SShm* pShm) { #if defined(WINDOWS) + assert(0); #else errno = 0; diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 2410586287..105acb188a 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -285,6 +285,7 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void return -1; } #ifdef WINDOWS + assert(0); return 0; #else return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen); @@ -642,6 +643,7 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { int taosGetLocalIp(const char *eth, char *ip) { #if defined(WINDOWS) // DO NOTHAING + assert(0); return 0; #else int fd; @@ -668,6 +670,7 @@ int taosGetLocalIp(const char *eth, char *ip) { int taosValidIp(uint32_t ip) { #if defined(WINDOWS) // DO NOTHAING + assert(0); return 0; #else int ret = -1; @@ -866,6 +869,7 @@ int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len void taosBlockSIGPIPE() { #ifdef WINDOWS + // assert(0); #else sigset_t signal_mask; sigemptyset(&signal_mask); @@ -976,14 +980,12 @@ void tinet_ntoa(char *ipstr, uint32_t ip) { } void taosIgnSIGPIPE() { -#ifdef WINDOWS -#else signal(SIGPIPE, SIG_IGN); -#endif } void taosSetMaskSIGPIPE() { #ifdef WINDOWS + // assert(0); #else sigset_t signal_mask; sigemptyset(&signal_mask); @@ -1005,6 +1007,7 @@ int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *a TdEpollPtr taosCreateEpoll(int32_t size) { EpollFd fd = -1; #ifdef WINDOWS + assert(0); #else fd = epoll_create(size); #endif @@ -1027,6 +1030,7 @@ int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocke return -1; } #ifdef WINDOWS + assert(0); #else code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event); #endif @@ -1038,6 +1042,7 @@ int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxE return -1; } #ifdef WINDOWS + assert(0); #else code = epoll_wait(pEpoll->fd, event, maxEvents, timeout); #endif diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 348424b372..fd6172e04f 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -129,7 +129,21 @@ static void taosGetProcIOnfos() { static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { #ifdef WINDOWS + FILETIME pre_idleTime = {0}; + FILETIME pre_kernelTime = {0}; + FILETIME pre_userTime = {0}; + FILETIME idleTime; + FILETIME kernelTime; + FILETIME userTime; + bool res = GetSystemTimes(&idleTime, &kernelTime, &userTime); + if (res) { + cpuInfo->idle = CompareFileTime(&pre_idleTime, &idleTime); + cpuInfo->system = CompareFileTime(&pre_kernelTime, &kernelTime); + cpuInfo->user = CompareFileTime(&pre_userTime, &userTime); + cpuInfo->nice = 0; + } #elif defined(_TD_DARWIN_64) + assert(0); #else TdFilePtr pFile = taosOpenFile(tsSysCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { @@ -155,7 +169,18 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) { #ifdef WINDOWS + FILETIME pre_krnlTm = {0}; + FILETIME pre_usrTm = {0}; + FILETIME creatTm, exitTm, krnlTm, usrTm; + + if (GetThreadTimes(GetCurrentThread(), &creatTm, &exitTm, &krnlTm, &usrTm)) { + cpuInfo->stime = CompareFileTime(&pre_krnlTm, &krnlTm); + cpuInfo->utime = CompareFileTime(&pre_usrTm, &usrTm); + cpuInfo->cutime = 0; + cpuInfo->cstime = 0; + } #elif defined(_TD_DARWIN_64) + assert(0); #else TdFilePtr pFile = taosOpenFile(tsProcCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { @@ -219,6 +244,7 @@ void taosGetSystemInfo() { int32_t taosGetEmail(char *email, int32_t maxLen) { #ifdef WINDOWS + // assert(0); #elif defined(_TD_DARWIN_64) const char *filepath = "/usr/local/taos/email"; @@ -250,6 +276,7 @@ int32_t taosGetEmail(char *email, int32_t maxLen) { int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) { #ifdef WINDOWS + assert(0); #elif defined(_TD_DARWIN_64) char *line = NULL; size_t size = 0; @@ -305,6 +332,7 @@ int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) { int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) { #ifdef WINDOWS + assert(0); #elif defined(_TD_DARWIN_64) char *line = NULL; size_t size = 0; @@ -716,9 +744,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { #ifdef WINDOWS GUID guid; CoCreateGuid(&guid); - - sprintf(uid, "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0], - guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]); + memcpy(uid, &guid, uidlen); return 0; #elif defined(_TD_DARWIN_64) @@ -750,6 +776,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { char *taosGetCmdlineByPID(int pid) { #ifdef WINDOWS + assert(0); return ""; #elif defined(_TD_DARWIN_64) static char cmdline[1024]; diff --git a/source/os/src/osSystem.c b/source/os/src/osSystem.c index 62c1747619..ba07b6c3dd 100644 --- a/source/os/src/osSystem.c +++ b/source/os/src/osSystem.c @@ -33,6 +33,7 @@ typedef struct FILE TdCmd; void* taosLoadDll(const char* filename) { #if defined(WINDOWS) + assert(0); return NULL; #elif defined(_TD_DARWIN_64) return NULL; @@ -51,6 +52,7 @@ void* taosLoadDll(const char* filename) { void* taosLoadSym(void* handle, char* name) { #if defined(WINDOWS) + assert(0); return NULL; #elif defined(_TD_DARWIN_64) return NULL; @@ -71,6 +73,7 @@ void* taosLoadSym(void* handle, char* name) { void taosCloseDll(void* handle) { #if defined(WINDOWS) + assert(0); return; #elif defined(_TD_DARWIN_64) return; @@ -121,6 +124,7 @@ int taosSetConsoleEcho(bool on) { void taosSetTerminalMode() { #if defined(WINDOWS) + // assert(0); #else struct termios newtio; @@ -154,7 +158,7 @@ void taosSetTerminalMode() { int32_t taosGetOldTerminalMode() { #if defined(WINDOWS) - + // assert(0); #else /* Make sure stdin is a terminal. */ if (!isatty(STDIN_FILENO)) { @@ -172,7 +176,7 @@ int32_t taosGetOldTerminalMode() { void taosResetTerminalMode() { #if defined(WINDOWS) - + // assert(0); #else if (tcsetattr(0, TCSANOW, &oldtio) != 0) { fprintf(stderr, "Fail to reset the terminal properties!\n"); diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 8f48e0585e..11d7d9831a 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -640,13 +640,14 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) { } int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) { - char *buf, *name, *value, *value2, *value3; + char buf[1024], *name, *value, *value2, *value3; int32_t olen, vlen, vlen2, vlen3; int32_t index = 0; if (envCmd == NULL) return 0; while (envCmd[index]!=NULL) { - buf = taosMemoryMalloc(strlen(envCmd[index])); - taosEnvToCfg(envCmd[index], buf); + strncpy(buf, envCmd[index], sizeof(buf)-1); + buf[sizeof(buf)-1] = 0; + taosEnvToCfg(buf, buf); index++; name = value = value2 = value3 = NULL; @@ -671,8 +672,6 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) { if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_CMD); } - - taosMemoryFree(buf); } uInfo("load from env cmd cfg success"); diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index f4f7c893c4..ef71f3fce6 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -53,79 +53,6 @@ static void shellResetCommand(SShellCmd *cmd, const char s[]); static void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos); static void shellShowOnScreen(SShellCmd *cmd); -#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) -// static void shellPrintContinuePrompt() { printf("%s", shell.args.promptContinue); } -// static void shellPrintPrompt() { printf("%s", shell.args.promptHeader); } - -void shellUpdateBuffer(SShellCmd *cmd) { - if (shellRegexMatch(cmd->buffer, "(\\s+$)|(^$)", REG_EXTENDED)) strcat(cmd->command, " "); - strcat(cmd->buffer, cmd->command); - - memset(cmd->command, 0, SHELL_MAX_COMMAND_SIZE); - cmd->cursorOffset = 0; -} - -int shellIsReadyGo(SShellCmd *cmd) { - char *total = taosMemoryMalloc(SHELL_MAX_COMMAND_SIZE); - memset(total, 0, SHELL_MAX_COMMAND_SIZE); - sprintf(total, "%s%s", cmd->buffer, cmd->command); - - char *reg_str = - "(^.*;\\s*$)|(^\\s*$)|(^\\s*exit\\s*$)|(^\\s*q\\s*$)|(^\\s*quit\\s*$)|(^" - "\\s*clear\\s*$)"; - if (shellRegexMatch(total, reg_str, REG_EXTENDED | REG_ICASE)) { - taosMemoryFree(total); - return 1; - } - - taosMemoryFree(total); - return 0; -} - -void shellInsertChar(SShellCmd *cmd, char c) { - if (cmd->cursorOffset >= SHELL_MAX_COMMAND_SIZE) { - fprintf(stdout, "sql is larger than %d bytes", SHELL_MAX_COMMAND_SIZE); - return; - } - cmd->command[cmd->cursorOffset++] = c; -} - -int32_t shellReadCommand(char command[]) { - SShellCmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.buffer = (char *)taosMemoryCalloc(1, SHELL_MAX_COMMAND_SIZE); - cmd.command = (char *)taosMemoryCalloc(1, SHELL_MAX_COMMAND_SIZE); - - // Read input. - char c; - while (1) { - c = getchar(); - - switch (c) { - case '\n': - case '\r': - if (shellIsReadyGo(&cmd)) { - sprintf(command, "%s%s", cmd.buffer, cmd.command); - taosMemoryFree(cmd.buffer); - cmd.buffer = NULL; - taosMemoryFree(cmd.command); - cmd.command = NULL; - return 0; - } else { - // shellPrintContinuePrompt(); - shellUpdateBuffer(&cmd); - } - break; - default: - shellInsertChar(&cmd, c); - } - } - - return 0; -} - -#else - int32_t shellCountPrefixOnes(uint8_t c) { uint8_t mask = 127; mask = ~mask; @@ -181,7 +108,10 @@ void shellInsertChar(SShellCmd *cmd, char *c, int32_t size) { cmd->cursorOffset += size; cmd->screenOffset += taosWcharWidth(wc); cmd->endOffset += taosWcharWidth(wc); +#ifdef WINDOWS +#else shellShowOnScreen(cmd); +#endif } void shellBackspaceChar(SShellCmd *cmd) { @@ -371,17 +301,33 @@ void shellResetCommand(SShellCmd *cmd, const char s[]) { shellShowOnScreen(cmd); } -void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) { + +void shellGetScreenSize(int32_t *ws_col, int32_t *ws_row) { +#ifdef WINDOWS + CONSOLE_SCREEN_BUFFER_INFO csbi; + GetConsoleScreenBufferInfo(GetStdHandle(STD_OUTPUT_HANDLE), &csbi); + if (ws_col != NULL) *ws_col = csbi.srWindow.Right - csbi.srWindow.Left + 1; + if (ws_row != NULL) *ws_row = csbi.srWindow.Bottom - csbi.srWindow.Top + 1; +#else struct winsize w; if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { // fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n"); - w.ws_col = 120; - w.ws_row = 30; + if (ws_col != NULL) *ws_col = 120; + if (ws_row != NULL) *ws_row = 30; + } else { + if (ws_col != NULL) *ws_col = w.ws_col; + if (ws_row != NULL) *ws_row = w.ws_row; } +#endif +} - int32_t cursor_x = cursor_pos / w.ws_col; - int32_t cursor_y = cursor_pos % w.ws_col; - int32_t command_x = ecmd_pos / w.ws_col; +void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) { + int32_t ws_col; + shellGetScreenSize(&ws_col, NULL); + + int32_t cursor_x = cursor_pos / ws_col; + int32_t cursor_y = cursor_pos % ws_col; + int32_t command_x = ecmd_pos / ws_col; shellPositionCursor(cursor_y, LEFT); shellPositionCursor(command_x - cursor_x, DOWN); fprintf(stdout, "\033[2K"); @@ -393,12 +339,8 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) { } void shellShowOnScreen(SShellCmd *cmd) { - struct winsize w; - if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { - // fprintf(stderr, "No stream device\n"); - w.ws_col = 120; - w.ws_row = 30; - } + int32_t ws_col; + shellGetScreenSize(&ws_col, NULL); TdWchar wc; int32_t size = 0; @@ -411,8 +353,7 @@ void shellShowOnScreen(SShellCmd *cmd) { } else { sprintf(total_string, "%s%s", shell.info.promptContinue, cmd->command); } - - int32_t remain_column = w.ws_col; + int32_t remain_column = ws_col; for (char *str = total_string; size < cmd->commandSize + PSIZE;) { int32_t ret = taosMbToWchar(&wc, str, MB_CUR_MAX); if (ret < 0) break; @@ -425,10 +366,10 @@ void shellShowOnScreen(SShellCmd *cmd) { } else { if (remain_column == width) { printf("%lc\n\r", wc); - remain_column = w.ws_col; + remain_column = ws_col; } else { printf("\n\r%lc", wc); - remain_column = w.ws_col - width; + remain_column = ws_col - width; } } @@ -436,17 +377,16 @@ void shellShowOnScreen(SShellCmd *cmd) { } taosMemoryFree(total_string); - // Position the cursor int32_t cursor_pos = cmd->screenOffset + PSIZE; int32_t ecmd_pos = cmd->endOffset + PSIZE; - int32_t cursor_x = cursor_pos / w.ws_col; - int32_t cursor_y = cursor_pos % w.ws_col; - // int32_t cursor_y = cursor % w.ws_col; - int32_t command_x = ecmd_pos / w.ws_col; - int32_t command_y = ecmd_pos % w.ws_col; - // int32_t command_y = (command.size() + PSIZE) % w.ws_col; + int32_t cursor_x = cursor_pos / ws_col; + int32_t cursor_y = cursor_pos % ws_col; + // int32_t cursor_y = cursor % ws_col; + int32_t command_x = ecmd_pos / ws_col; + int32_t command_y = ecmd_pos % ws_col; + // int32_t command_y = (command.size() + PSIZE) % ws_col; shellPositionCursor(command_y, LEFT); shellPositionCursor(command_x, UP); shellPositionCursor(cursor_x, DOWN); @@ -490,7 +430,11 @@ int32_t shellReadCommand(char *command) { case 3: printf("\n"); shellResetCommand(&cmd, ""); - kill(0, SIGINT); + #ifdef WINDOWS + raise(SIGINT); + #else + kill(0, SIGINT); + #endif break; case 4: // EOF or Ctrl+D printf("\n"); @@ -503,7 +447,10 @@ int32_t shellReadCommand(char *command) { break; case '\n': case '\r': + #ifdef WINDOWS + #else printf("\n"); + #endif if (shellIsReadyGo(&cmd)) { sprintf(command, "%s%s", cmd.buffer, cmd.command); taosMemoryFreeClear(cmd.buffer); @@ -608,5 +555,3 @@ int32_t shellReadCommand(char *command) { return 0; } - -#endif \ No newline at end of file diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 8b126cf90e..1e832c0c46 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -307,7 +307,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i break; case TSDB_DATA_TYPE_DOUBLE: n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val)); - if (n > MAX(25, length)) { + if (n > TMAX(25, length)) { taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val)); } else { taosFprintfFile(pFile, "%s", buf); @@ -488,7 +488,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t break; case TSDB_DATA_TYPE_DOUBLE: n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); - if (n > MAX(25, width)) { + if (n > TMAX(25, width)) { printf("%*.15e", width, GET_DOUBLE_VAL(val)); } else { printf("%s", buf);