diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 8dfa9c2851..5c1a833e05 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -459,12 +459,17 @@ TO_JSON(str_literal) #### TO_UNIXTIMESTAMP ```sql -TO_UNIXTIMESTAMP(expr) +TO_UNIXTIMESTAMP(expr [, return_timestamp]) + +return_timestamp: { + 0 + | 1 +} ``` **Description**: UNIX timestamp converted from a string of date/time format -**Return value type**: BIGINT +**Return value type**: BIGINT, TIMESTAMP **Applicable column types**: VARCHAR and NCHAR @@ -476,6 +481,7 @@ TO_UNIXTIMESTAMP(expr) - The input string must be compatible with ISO8601/RFC3339 standard, NULL will be returned if the string can't be converted - The precision of the returned timestamp is same as the precision set for the current data base in use +- return_timestamp indicates whether the returned value type is TIMESTAMP or not. If this parameter set to 1, function will return TIMESTAMP type. Otherwise function will return BIGINT type. If parameter is omitted, default return value type is BIGINT. ### Time and Date Functions diff --git a/docs/en/12-taos-sql/23-perf.md b/docs/en/12-taos-sql/23-perf.md index fc369ec663..43ff8e3091 100644 --- a/docs/en/12-taos-sql/23-perf.md +++ b/docs/en/12-taos-sql/23-perf.md @@ -69,7 +69,7 @@ Provides information about SQL queries currently running. Similar to SHOW QUERIE | 1 | consumer_id | BIGINT | Consumer ID | | 2 | consumer_group | BINARY(192) | Consumer group | | 3 | client_id | BINARY(192) | Client ID (user-defined) | -| 4 | status | BINARY(20) | Consumer status | +| 4 | status | BINARY(20) | Consumer status. All possible status include: ready(consumer is in normal state), lost(the connection between consumer and mnode is broken), rebalance(the redistribution of vgroups that belongs to current consumer is now in progress), unknown(consumer is in invalid state) | 5 | topics | BINARY(204) | Subscribed topic. Returns one row for each topic. | | 6 | up_time | TIMESTAMP | Time of first connection to TDengine Server | | 7 | subscribe_time | TIMESTAMP | Time of first subscription | diff --git a/docs/zh/12-taos-sql/02-database.md b/docs/zh/12-taos-sql/02-database.md index b54998e08d..dd30635660 100644 --- a/docs/zh/12-taos-sql/02-database.md +++ b/docs/zh/12-taos-sql/02-database.md @@ -36,7 +36,6 @@ database_option: { | TSDB_PAGESIZE value | WAL_RETENTION_PERIOD value | WAL_RETENTION_SIZE value - | WAL_ROLL_PERIOD value | WAL_SEGMENT_SIZE value } ``` diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 94f8052051..50e82e6b90 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -459,12 +459,17 @@ TO_JSON(str_literal) #### TO_UNIXTIMESTAMP ```sql -TO_UNIXTIMESTAMP(expr) +TO_UNIXTIMESTAMP(expr [, return_timestamp]) + +return_timestamp: { + 0 + | 1 +} ``` **功能说明**:将日期时间格式的字符串转换成为 UNIX 时间戳。 -**返回结果数据类型**:BIGINT。 +**返回结果数据类型**:BIGINT, TIMESTAMP。 **应用字段**:VARCHAR, NCHAR。 @@ -476,6 +481,7 @@ TO_UNIXTIMESTAMP(expr) - 输入的日期时间字符串须符合 ISO8601/RFC3339 标准,无法转换的字符串格式将返回 NULL。 - 返回的时间戳精度与当前 DATABASE 设置的时间精度一致。 +- return_timestamp 指定函数返回值是否为时间戳类型,设置为1时返回 TIMESTAMP 类型,设置为0时返回 BIGINT 类型。如不指定缺省返回 BIGINT 类型。 ### 时间和日期函数 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b3b665d323..4a1d8d71f1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) { } static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || - (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); + (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT); } static inline bool syncUtilUserCommit(tmsg_t msgType) { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ab89466a19..0d9292cc6b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -547,6 +547,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) #define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917) +#define TSDB_CODE_SYN_NEGO_WIN_EXCEEDED TAOS_DEF_ERROR_CODE(0, 0X0918) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/include/util/tdef.h b/include/util/tdef.h index 1e05903407..664f22eeed 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -287,7 +287,9 @@ typedef enum ELogicConditionType { #define TSDB_MAX_REPLICA 5 #define TSDB_MAX_LEARNER_REPLICA 10 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 -#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4) +#define TSDB_SYNC_LOG_BUFFER_RETENTION 256 +#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 +#define TSDB_SYNC_NEGOTIATION_WIN 512 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ae65e2ba3f..a67f246e73 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit); -void vnodeUpdCommitSched(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81f7c3d52a..2576fce998 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -378,7 +378,6 @@ struct SVnode { STQ* pTq; SSink* pSink; tsem_t canCommit; - SVCommitSched commitSched; int64_t sync; TdThreadMutex lock; bool blocked; @@ -387,9 +386,6 @@ struct SVnode { int32_t blockSec; int64_t blockSeq; SQHandle* pQuery; -#if 0 - SRpcHandleInfo blockInfo; -#endif }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 95981c2f08..d6e819d84a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -362,15 +362,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 p->ts = pCol->ts; p->colVal = pCol->colVal; singleTableLastTs = pCol->ts; - - // only set value for last row query - if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { - if (taosArrayGetSize(pTableUidList) == 0) { - taosArrayPush(pTableUidList, &pKeyInfo->uid); - } else { - taosArraySet(pTableUidList, 0, &pKeyInfo->uid); - } - } } } else { SLastCol* p = taosArrayGet(pLastCols, slotId); @@ -417,6 +408,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } + if (taosArrayGetSize(pTableUidList) == 0) { + taosArrayPush(pTableUidList, &pKeyInfo->uid); + } else { + taosArraySet(pTableUidList, 0, &pKeyInfo->uid); + } + tsdbCacheRelease(lruCache, h); } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 3eb813f394..847125018c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -143,23 +143,13 @@ _exit: return code; } -void vnodeUpdCommitSched(SVnode *pVnode) { - int64_t randNum = taosRand(); - pVnode->commitSched.commitMs = taosGetMonoTimestampMs(); - pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs); -} - int vnodeShouldCommit(SVnode *pVnode, bool atExit) { - SVCommitSched *pSched = &pVnode->commitSched; - int64_t nowMs = taosGetMonoTimestampMs(); bool diskAvail = osDataSpaceAvailable(); bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { - needCommit = - ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || - ((pVnode->inUse->size > 0) && atExit); + needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit); } taosThreadMutexUnlock(&pVnode->mutex); return needCommit; @@ -431,8 +421,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm); - vnodeUpdCommitSched(pVnode); - // persist wal before starting if (walPersist(pVnode->pWal) < 0) { vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index b432e8ba4e..deeb0af42a 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -309,8 +309,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); - vnodeUpdCommitSched(pVnode); - int8_t rollback = vnodeShouldRollback(pVnode); // open buffer pool diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index bc69378487..fccac5c625 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak pVnode->blocked = true; pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSeq = seq; -#if 0 - pVnode->blockInfo = pMsg->info; -#endif } taosThreadMutexUnlock(&pVnode->lock); @@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) { } else { tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg); } - - vnodeUpdCommitSched(pVnode); } #if BATCH_ENABLE diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index f6fc332b37..61f024ebb7 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -45,13 +45,13 @@ static void destroyCacheScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); -#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) +#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; tableListDestroy(pTableListInfo); @@ -91,7 +91,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableListInfo); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, + pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -114,7 +115,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); } - setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, + pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = @@ -123,7 +125,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->cost.openCost = 0; return pOperator; - _error: +_error: pTaskInfo->code = code; destroyCacheScanOperator(pInfo); taosMemoryFree(pOperator); @@ -136,8 +138,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } SCacheRowsScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableListInfo* pTableList = pInfo->pTableList; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableListInfo* pTableList = pInfo->pTableList; uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); @@ -194,8 +196,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pRes->info.rows = 1; SExprSupp* pSup = &pInfo->pseudoExprSup; - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, - pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, + pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -217,7 +219,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } STableKeyInfo* pList = NULL; - int32_t num = 0; + int32_t num = 0; int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num); if (code != TSDB_CODE_SUCCESS) { @@ -251,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->pRes->info.id.groupId = pKeyInfo->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { - ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); - pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); - code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, - GET_TASKID(pTaskInfo), NULL); + code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -325,7 +325,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC return TSDB_CODE_SUCCESS; } - size_t size = taosArrayGetSize(pColMatchInfo->pList); + size_t size = taosArrayGetSize(pColMatchInfo->pList); SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem)); for (int32_t i = 0; i < size; ++i) { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a293f45238..fe98a1dd53 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1933,14 +1933,35 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l } static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - if (1 != LIST_LENGTH(pFunc->pParameterList)) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + int16_t resType = TSDB_DATA_TYPE_BIGINT; + + if (1 != numOfParams && 2 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - if (!IS_STR_DATA_TYPE(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type)) { + uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_STR_DATA_TYPE(para1Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } + if (2 == numOfParams) { + uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; + if (!IS_INTEGER_TYPE(para2Type)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); + if (pValue->datum.i == 1) { + resType = TSDB_DATA_TYPE_TIMESTAMP; + } else if (pValue->datum.i == 0) { + resType = TSDB_DATA_TYPE_BIGINT; + } else { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "TO_UNIXTIMESTAMP function second parameter should be 0/1"); + } + } + // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); @@ -1948,7 +1969,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int return code; } - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[resType].bytes, .type = resType}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c6e70798a7..25e92a55ec 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3035,12 +3035,13 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect } static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (NULL == pSelect->pGroupByList && NULL != pSelect->pHaving) { + if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && + NULL != pSelect->pHaving) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); } pCxt->currClause = SQL_CLAUSE_HAVING; int32_t code = translateExpr(pCxt, &pSelect->pHaving); - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow)) { code = checkExprForGroupBy(pCxt, &pSelect->pHaving); } return code; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index c9ee83a647..6544898be9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -740,6 +740,13 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets); } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { + pWindow->node.pConditions = nodesCloneNode(pSelect->pHaving); + if (NULL == pWindow->node.pConditions) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + pSelect->hasAggFuncs = false; if (TSDB_CODE_SUCCESS == code) { @@ -1132,6 +1139,14 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS } } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving && !pSelect->hasAggFuncs && NULL == pSelect->pGroupByList && + NULL == pSelect->pWindow) { + pPartition->node.pConditions = nodesCloneNode(pSelect->pHaving); + if (NULL == pPartition->node.pConditions) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { *pLogicNode = (SLogicNode*)pPartition; } else { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 52bb03466c..07ea110d7e 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1095,7 +1095,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group *pNotOptimize = false; return TSDB_CODE_SUCCESS; } - + switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: { SScanLogicNode* pScan = (SScanLogicNode*)pNode; @@ -2139,7 +2139,7 @@ typedef struct SLastRowScanOptLastParaCkCxt { bool hasCol; } SLastRowScanOptLastParaCkCxt; -static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) { +static EDealRes lastRowScanOptLastParaIsTagImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { SLastRowScanOptLastParaCkCxt* pCxt = pContext; if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) { @@ -2152,10 +2152,10 @@ static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static bool lastRowScanOptLastParaCheck(SNode* pExpr) { +static bool lastRowScanOptLastParaIsTag(SNode* pExpr) { SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false}; - nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt); - return !cxt.hasTag && cxt.hasCol; + nodesWalkExpr(pExpr, lastRowScanOptLastParaIsTagImpl, &cxt); + return cxt.hasTag && !cxt.hasCol; } static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) { @@ -2195,15 +2195,19 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { SFunctionNode* pAggFunc = (SFunctionNode*)pFunc; if (FUNCTION_TYPE_LAST == pAggFunc->funcType) { - if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) { + if (hasSelectFunc || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) { return false; } hasLastFunc = true; - } else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) { + } else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) { if (hasLastFunc) { return false; } hasSelectFunc = true; + } else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) { + if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) { + return false; + } } else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) { return false; } @@ -2237,7 +2241,7 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols) { +static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, bool erase) { SNode* pTarget = NULL; WHERE_EACH(pTarget, pTargets) { bool found = false; @@ -2249,7 +2253,7 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo break; } } - if (!found) { + if (!found && erase) { ERASE_NODE(pTargets); continue; } @@ -2290,9 +2294,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic pScan->igLastNull = pAgg->hasLast ? true : false; if (NULL != cxt.pLastCols) { cxt.doAgg = false; - lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols); - NODES_DESTORY_LIST(pScan->pScanPseudoCols); - lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols); + lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true); + lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false); nodesClearList(cxt.pLastCols); } pAgg->hasLastRow = false; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 24b25cec80..835264eabe 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1124,7 +1124,8 @@ _end: int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t type = GET_PARAM_TYPE(pInput); int64_t timePrec; - GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData); + int32_t idx = (inputNum == 2) ? 1 : 2; + GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[idx]), pInput[idx].columnData->pData); for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 81dbead506..85c18220a6 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -53,8 +53,15 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _err; } + if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) { + terrno = TSDB_CODE_SYN_NEGO_WIN_EXCEEDED; + sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(), + index, pBuf->commitIndex); + goto _err; + } + SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); - if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) { + if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) { terrno = TSDB_CODE_SYN_WRITE_STALL; sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64, pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex); @@ -83,6 +90,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt _err: syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); + taosMsleep(1); return -1; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 056a597777..cf796c3862 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -202,21 +202,22 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo // restore error code terrno = errCode; - + SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); + if (pNode != NULL) { taosPrintLog(flags, level, dflag, - "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 - ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 + "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", applied-index:%" PRId64 + ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s", - pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex, - logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, - pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, - pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, - pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, - bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr); + pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, appliedIndex, + logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + pNode->electNum, pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, + aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, + pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, + pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr); } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 002d605793..34b09761c8 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -426,6 +426,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restor TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGO_WIN_EXCEEDED, "Sync negotiation win exceeded") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") //tq diff --git a/tests/system-test/2-query/To_unixtimestamp.py b/tests/system-test/2-query/To_unixtimestamp.py index 8ee2007450..424ebff6c5 100644 --- a/tests/system-test/2-query/To_unixtimestamp.py +++ b/tests/system-test/2-query/To_unixtimestamp.py @@ -26,7 +26,7 @@ class TDTestCase: 'c1':'int', 'c2':'float', 'c3':'binary(20)' - + } # structure of tag self.tag_dict = { @@ -60,7 +60,7 @@ class TDTestCase: if tb_type == 'ntb' or tb_type == 'ctb': tdSql.checkRows(len(values_list)) elif tb_type == 'stb': - tdSql.checkRows(len(self.values_list)*tb_num) + tdSql.checkRows(len(self.values_list)*tb_num) for time in ['2020-01-32T08:00:00','2020-13-32T08:00:00','acd']: tdSql.query(f"select to_unixtimestamp('{time}') from {tbname}") if tb_type == 'ntb' or tb_type == 'ctb': @@ -74,7 +74,7 @@ class TDTestCase: if tb_type == 'ntb' or tb_type == 'ctb': tdSql.checkRows(len(values_list)) elif tb_type == 'stb': - tdSql.checkRows(len(values_list)*tb_num) + tdSql.checkRows(len(values_list)*tb_num) for time in self.error_param: tdSql.error(f"select to_unixtimestamp({time}) from {tbname}") def timestamp_change_check_ntb(self): @@ -95,9 +95,20 @@ class TDTestCase: self.data_check(f'{self.stbname}_{i}',self.values_list,'ctb') self.data_check(self.stbname,self.values_list,'stb',self.tbnum) tdSql.execute(f'drop database {self.dbname}') + def timestamp_change_return_type(self): + tdSql.query(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 0);") + tdSql.checkEqual(tdSql.queryResult[0][0], 0) + tdSql.query(f"select to_unixtimestamp('1970-01-01 00:00:00', 1);") + tdSql.checkData(0, 0, '1970-01-01 00:00:00') + tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 2);") + tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 1.5);") + tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 'abc');") + tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', true);") + tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 1, 3);") def run(self): # sourcery skip: extract-duplicate-method self.timestamp_change_check_ntb() self.timestamp_change_check_stb() + self.timestamp_change_return_type() def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed")