diff --git a/packaging/rpm/tdengine.spec b/packaging/rpm/tdengine.spec index 000a82b6b4..3e23e29a40 100644 --- a/packaging/rpm/tdengine.spec +++ b/packaging/rpm/tdengine.spec @@ -80,8 +80,8 @@ if [ -f %{_compiledir}/../../../explorer/target/taos-explorer.service ]; then cp %{_compiledir}/../../../explorer/target/taos-explorer.service %{buildroot}%{homepath}/cfg ||: fi -if [ -f %{_compiledir}/../../../explorer/server/example/explorer.toml ]; then - cp %{_compiledir}/../../../explorer/server/example/explorer.toml %{buildroot}%{homepath}/cfg ||: +if [ -f %{_compiledir}/../../../explorer/server/examples/explorer.toml ]; then + cp %{_compiledir}/../../../explorer/server/examples/explorer.toml %{buildroot}%{homepath}/cfg ||: fi #cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index cfcb68598f..0a3c365c59 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -247,7 +247,7 @@ typedef struct { SMqCommitCbParamSet* params; char topicName[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; - tmq_t* pTmq; + int64_t consumerId; } SMqCommitCbParam; typedef struct SSyncCommitInfo { @@ -485,7 +485,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); - return commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); + return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId); } static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, @@ -529,7 +529,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pParam->params = pParamSet; pParam->vgId = vgId; - pParam->pTmq = tmq; + pParam->consumerId = tmq->consumerId; tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); @@ -1505,22 +1505,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = NULL; SMqPollCbParam* pParam = (SMqPollCbParam*)param; if (pParam == NULL || pMsg == NULL) { - goto FAIL2; + return TSDB_CODE_TSC_INTERNAL_ERROR; } int64_t refId = pParam->refId; int32_t vgId = pParam->vgId; uint64_t requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - code = TSDB_CODE_TMQ_CONSUMER_CLOSED; - goto FAIL2; + return TSDB_CODE_TMQ_CONSUMER_CLOSED; } SMqPollRspWrapper* pRspWrapper = NULL; - code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); - if (code) { + int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); + if (ret) { + code = ret; tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - goto FAIL1; + goto END; } if (code != 0) { @@ -1603,25 +1603,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } END: - pRspWrapper->code = code; - pRspWrapper->vgId = vgId; - (void)strcpy(pRspWrapper->topicName, pParam->topicName); - code = taosWriteQitem(tmq->mqueue, pRspWrapper); - if(code != 0){ - tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + if (pRspWrapper){ + pRspWrapper->code = code; + pRspWrapper->vgId = vgId; + (void)strcpy(pRspWrapper->topicName, pParam->topicName); + code = taosWriteQitem(tmq->mqueue, pRspWrapper); + if(code != 0){ + tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + } } - int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); -FAIL1: - (void)taosReleaseRef(tmqMgmt.rsetId, refId); - -FAIL2: if (tmq) (void)tsem2_post(&tmq->rspSem); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); + (void)taosReleaseRef(tmqMgmt.rsetId, refId); return code; } diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 96309ece86..335c654acd 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1178,6 +1178,13 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, &getObjectDataCallback}; TS3SizeCBD cbd = {0}; + int retryCount = 0; + static int maxRetryCount = 5; + static int minRetryInterval = 1000; // ms + static int maxRetryInterval = 3000; // ms + +_retry: + (void)memset(&cbd, 0, sizeof(cbd)); cbd.content_length = size; cbd.buf_pos = 0; do { @@ -1185,6 +1192,11 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, } while (S3_status_is_retryable(cbd.status) && should_retry()); if (cbd.status != S3StatusOK) { + if (S3StatusErrorSlowDown == cbd.status && retryCount++ < maxRetryCount) { + taosMsleep(taosRand() % (maxRetryInterval - minRetryInterval + 1) + minRetryInterval); + uInfo("%s: %d/%s(%s) retry get object", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); + goto _retry; + } uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO)); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a1fd75c774..6847e5c4c2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -290,7 +290,7 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap pWrapper->nCols = taosArrayGetSize(pFields); pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema)); if (NULL == pWrapper->pSchema) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } SNode *pNode; @@ -328,15 +328,18 @@ static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) { static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { SNode *pAst = NULL; SQueryPlan *pPlan = NULL; + int32_t code = 0; mInfo("stream:%s to create", pCreate->name); memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); pObj->createTime = taosGetTimestampMs(); pObj->updateTime = pObj->createTime; pObj->version = 1; + if (pCreate->smaId > 0) { pObj->subTableWithoutMd5 = 1; } + pObj->smaId = pCreate->smaId; pObj->indexForMultiAggBalance = -1; @@ -360,8 +363,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); if (pSourceDb == NULL) { mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); - return terrno; + code = terrno; + goto FAIL; } + pObj->sourceDbUid = pSourceDb->uid; mndReleaseDb(pMnode, pSourceDb); @@ -369,9 +374,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); if (pTargetDb == NULL) { - mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); - return terrno; + mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); + code = terrno; + goto FAIL; } + tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) { @@ -389,12 +396,12 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pCreate->ast = NULL; // deserialize ast - if (nodesStringToNode(pObj->ast, &pAst) < 0) { + if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) { goto FAIL; } // create output schema - if (createSchemaByFields(pCreate->pCols, &pObj->outputSchema) != TSDB_CODE_SUCCESS) { + if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) { goto FAIL; } @@ -403,6 +410,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->outputSchema.nCols += numOfNULL; SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema)); if (!pFullSchema) { + code = terrno; goto FAIL; } @@ -410,6 +418,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, int32_t dataIndex = 0; for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); + if (pos == NULL) { + continue; + } + if (nullIndex >= numOfNULL || i < pos->slotId) { pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; @@ -444,22 +456,31 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, }; // using ast and param to build physical plan - if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { + if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) { goto FAIL; } // save physcial plan - if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { + if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) { goto FAIL; } pObj->tagSchema.nCols = pCreate->numOfTags; if (pCreate->numOfTags) { pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); + if (pObj->tagSchema.pSchema == NULL) { + code = terrno; + goto FAIL; + } } + /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ for (int32_t i = 0; i < pCreate->numOfTags; i++) { SField *pField = taosArrayGet(pCreate->pTags, i); + if (pField == NULL) { + continue; + } + pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; pObj->tagSchema.pSchema[i].bytes = pField->bytes; pObj->tagSchema.pSchema[i].flags = pField->flags; @@ -470,7 +491,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, FAIL: if (pAst != NULL) nodesDestroyNode(pAst); if (pPlan != NULL) qDestroyQueryPlan(pPlan); - return 0; + return code; } int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { @@ -575,12 +596,15 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; + int32_t code = 0; + int32_t lino = 0; SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfTags = 1; // group id createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns); + TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno); // build fields for (int32_t i = 0; i < createReq.numOfColumns; i++) { @@ -595,6 +619,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre if (pStream->tagSchema.nCols == 0) { createReq.numOfTags = 1; createReq.pTags = taosArrayInit_s(sizeof(SField), 1); + TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno); + // build tags SField *pField = taosArrayGet(createReq.pTags, 0); strcpy(pField->name, "group_id"); @@ -604,6 +630,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre } else { createReq.numOfTags = pStream->tagSchema.nCols; createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags); + TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno); + for (int32_t i = 0; i < createReq.numOfTags; i++) { SField *pField = taosArrayGet(createReq.pTags, i); pField->bytes = pStream->tagSchema.pSchema[i].bytes; @@ -657,7 +685,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols); - return 0; + return code; _OVER: tFreeSMCreateStbReq(&createReq); @@ -665,7 +693,7 @@ _OVER: mndReleaseDb(pMnode, pDb); mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); - return -1; + return code; } // 1. stream number check @@ -709,9 +737,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { char *sql = NULL; int32_t sqlLen = 0; const char *pMsg = "create stream tasks on dnodes"; - int32_t code = 0; - terrno = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; + terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createReq = {0}; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { code = TSDB_CODE_INVALID_MSG; @@ -749,6 +777,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (createReq.sql != NULL) { sqlLen = strlen(createReq.sql); sql = taosMemoryMalloc(sqlLen + 1); + if (sql == NULL) { + code = terrno; + goto _OVER; + } + memset(sql, 0, sqlLen + 1); memcpy(sql, createReq.sql, sqlLen); } @@ -942,8 +975,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); @@ -1150,7 +1182,11 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + if (p == NULL) { + continue; + } + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { continue; @@ -1159,8 +1195,12 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { if (pEntry->status == TASK_STATUS__STOP) { for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { STaskId *pId = taosArrayGet(pInvalidList, j); + if (pId == NULL) { + continue; + } + if (pEntry->id.streamId == pId->streamId) { - void* px = taosArrayPush(pInvalidList, &pEntry->id); + void *px = taosArrayPush(pInvalidList, &pEntry->id); if (px == NULL) { mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); } @@ -1243,6 +1283,10 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); + if (pList == NULL) { + return -1; + } + int64_t now = taosGetTimestampMs(); while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { @@ -2472,14 +2516,15 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); - doAddReportStreamTask(pList, &req); + if (pList != NULL) { + doAddReportStreamTask(pList, &req); + code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + if (code) { + mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId); + } - code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); - if (code) { - mError("stream:0x%"PRIx64 " failed to put into checkpoint stream", req.streamId); + pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } - - pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } else { doAddReportStreamTask(*pReqTaskList, &req); } @@ -2545,6 +2590,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; int64_t now = taosGetTimestampMs(); SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); + if (pStreamList == NULL) { + return terrno; + } mDebug("start to process consensus-checkpointId in tmr"); @@ -2572,6 +2620,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { int64_t streamId = -1; int32_t num = taosArrayGetSize(pInfo->pTaskList); SArray *pList = taosArrayInit(4, sizeof(int32_t)); + if (pList == NULL) { + continue; + } SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index d9e39ad6f5..3b375f7f82 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -957,6 +957,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId); + metaReaderClear(&mer1); + return; + } + pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag); metaReaderClear(&mer1); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d810cf2428..853b2865bb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1777,6 +1777,19 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName); pExp->pExpr->_optrRoot.pRootNode = pNode; + } else if (type == QUERY_NODE_LOGIC_CONDITION) { + pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; + SLogicConditionNode* pCond = (SLogicConditionNode*)pNode; + pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + if (!pExp->base.pParam) { + code = terrno; + } + if (TSDB_CODE_SUCCESS == code) { + pExp->base.numOfParams = 1; + SDataType* pType = &pCond->node.resType; + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName); + pExp->pExpr->_optrRoot.pRootNode = pNode; + } } else { ASSERT(0); } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7dfe88fe85..a0c56df49c 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -365,6 +365,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { ps->onlyRef = true; code = tsortAddSource(pInfo->pSortHandle, ps); if (code) { + taosMemoryFree(ps); return code; } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 66e503fd89..ad9e5ce7d4 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -229,6 +229,7 @@ static void udfWatchUdfd(void *args) { if(uv_loop_close(&pData->loop) != 0) { fnError("udfd loop close failed, lino:%d", __LINE__); } + return; _exit: if (terrno != 0) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 879f527a85..85d9ddde85 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2608,7 +2608,8 @@ static int32_t calcSelectFuncNum(SFunctionNode* pFunc, int32_t currSelectFuncNum : 1); } -static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { +static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc) { + SNode* pCurrStmt = pCxt->pCurrStmt; if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) { SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt; pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId); @@ -2641,7 +2642,9 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType); pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId); - pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; + if (SQL_CLAUSE_SELECT == pCxt->currClause) { + pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; + } } } @@ -2903,7 +2906,7 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SNode** ppNode) code = translateBlockDistFunc(pCxt, pFunc); } if (TSDB_CODE_SUCCESS == code) { - setFuncClassification(pCxt->pCurrStmt, pFunc); + setFuncClassification(pCxt, pFunc); } return code; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 3e5471700c..23cc7324f0 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -262,7 +262,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP colDataSetNULL(pOutputData, i); continue; } - out[i] = f1(in[i]); + out[i] = f1(in[i]) + 0; } break; } @@ -276,7 +276,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP colDataSetNULL(pOutputData, i); continue; } - out[i] = d1(in[i]); + out[i] = d1(in[i]) + 0; } break; } diff --git a/tests/system-test/2-query/ceil.py b/tests/system-test/2-query/ceil.py index aabc716a74..e719d819d8 100644 --- a/tests/system-test/2-query/ceil.py +++ b/tests/system-test/2-query/ceil.py @@ -57,7 +57,7 @@ class TDTestCase: ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) - ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) @@ -223,6 +223,9 @@ class TDTestCase: tdSql.checkData(3, 4, 33) tdSql.checkData(5, 5, None) + tdSql.query(f"select ceil(c5) from {dbname}.t1") + tdSql.checkData(4 , 0, 0) + self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), ceil(c2) ,ceil(c3), ceil(c4), ceil(c5) from {dbname}.t1") # used for sub table diff --git a/tests/system-test/2-query/project_group.py b/tests/system-test/2-query/project_group.py index 44943e5088..19fe8b1cf0 100644 --- a/tests/system-test/2-query/project_group.py +++ b/tests/system-test/2-query/project_group.py @@ -57,6 +57,8 @@ class TDTestCase: tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2"); tdSql.checkRows(2) + tdSql.query('select col1 > 0 and col2 > 0 from stb') + tdSql.checkRows(12) def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/system-test/2-query/round.py b/tests/system-test/2-query/round.py index d647f516ae..f87f234fa3 100644 --- a/tests/system-test/2-query/round.py +++ b/tests/system-test/2-query/round.py @@ -53,7 +53,7 @@ class TDTestCase: ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) - ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) @@ -232,6 +232,9 @@ class TDTestCase: tdSql.checkData(3, 4, 33) tdSql.checkData(5, 5, None) + tdSql.query(f"select round(c5) from {dbname}.t1") + tdSql.checkData(4 , 0, 0) + self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), round(c2) ,round(c3), round(c4), round(c5) from {dbname}.t1") # used for sub table diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index 1386814f0c..ede3ef427a 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -41,8 +41,9 @@ class TDTestCase: time.sleep(10) tdSql.execute("use test", queryTimes=100) tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") - tdLog.debug("========create stream and insert data ok========") + time.sleep(5) + tdLog.debug("========create stream and insert data ok========") tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") rowCnt = tdSql.getRows() results_meters = tdSql.queryResult