Merge branch '3.0' into feat/learner

This commit is contained in:
dm chen 2023-04-20 19:11:06 +08:00 committed by GitHub
commit d594e4963f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 138 additions and 89 deletions

View File

@ -459,12 +459,17 @@ TO_JSON(str_literal)
#### TO_UNIXTIMESTAMP
```sql
TO_UNIXTIMESTAMP(expr)
TO_UNIXTIMESTAMP(expr [, return_timestamp])
return_timestamp: {
0
| 1
}
```
**Description**: UNIX timestamp converted from a string of date/time format
**Return value type**: BIGINT
**Return value type**: BIGINT, TIMESTAMP
**Applicable column types**: VARCHAR and NCHAR
@ -476,6 +481,7 @@ TO_UNIXTIMESTAMP(expr)
- The input string must be compatible with ISO8601/RFC3339 standard, NULL will be returned if the string can't be converted
- The precision of the returned timestamp is same as the precision set for the current data base in use
- return_timestamp indicates whether the returned value type is TIMESTAMP or not. If this parameter set to 1, function will return TIMESTAMP type. Otherwise function will return BIGINT type. If parameter is omitted, default return value type is BIGINT.
### Time and Date Functions

View File

@ -69,7 +69,7 @@ Provides information about SQL queries currently running. Similar to SHOW QUERIE
| 1 | consumer_id | BIGINT | Consumer ID |
| 2 | consumer_group | BINARY(192) | Consumer group |
| 3 | client_id | BINARY(192) | Client ID (user-defined) |
| 4 | status | BINARY(20) | Consumer status |
| 4 | status | BINARY(20) | Consumer status. All possible status include: ready(consumer is in normal state), lost(the connection between consumer and mnode is broken), rebalance(the redistribution of vgroups that belongs to current consumer is now in progress), unknown(consumer is in invalid state)
| 5 | topics | BINARY(204) | Subscribed topic. Returns one row for each topic. |
| 6 | up_time | TIMESTAMP | Time of first connection to TDengine Server |
| 7 | subscribe_time | TIMESTAMP | Time of first subscription |

View File

@ -36,7 +36,6 @@ database_option: {
| TSDB_PAGESIZE value
| WAL_RETENTION_PERIOD value
| WAL_RETENTION_SIZE value
| WAL_ROLL_PERIOD value
| WAL_SEGMENT_SIZE value
}
```

View File

@ -459,12 +459,17 @@ TO_JSON(str_literal)
#### TO_UNIXTIMESTAMP
```sql
TO_UNIXTIMESTAMP(expr)
TO_UNIXTIMESTAMP(expr [, return_timestamp])
return_timestamp: {
0
| 1
}
```
**功能说明**:将日期时间格式的字符串转换成为 UNIX 时间戳。
**返回结果数据类型**BIGINT。
**返回结果数据类型**BIGINT, TIMESTAMP
**应用字段**VARCHAR, NCHAR。
@ -476,6 +481,7 @@ TO_UNIXTIMESTAMP(expr)
- 输入的日期时间字符串须符合 ISO8601/RFC3339 标准,无法转换的字符串格式将返回 NULL。
- 返回的时间戳精度与当前 DATABASE 设置的时间精度一致。
- return_timestamp 指定函数返回值是否为时间戳类型设置为1时返回 TIMESTAMP 类型设置为0时返回 BIGINT 类型。如不指定缺省返回 BIGINT 类型。
### 时间和日期函数

View File

@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) {
}
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT);
}
static inline bool syncUtilUserCommit(tmsg_t msgType) {

View File

@ -547,6 +547,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
#define TSDB_CODE_SYN_NEGO_WIN_EXCEEDED TAOS_DEF_ERROR_CODE(0, 0X0918)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq

View File

@ -287,7 +287,9 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_REPLICA 5
#define TSDB_MAX_LEARNER_REPLICA 10
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4)
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
#define TSDB_SYNC_NEGOTIATION_WIN 512
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta

View File

@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit);
void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir);

View File

@ -378,7 +378,6 @@ struct SVnode {
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
SVCommitSched commitSched;
int64_t sync;
TdThreadMutex lock;
bool blocked;
@ -387,9 +386,6 @@ struct SVnode {
int32_t blockSec;
int64_t blockSeq;
SQHandle* pQuery;
#if 0
SRpcHandleInfo blockInfo;
#endif
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)

View File

@ -362,15 +362,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
p->ts = pCol->ts;
p->colVal = pCol->colVal;
singleTableLastTs = pCol->ts;
// only set value for last row query
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
}
} else {
SLastCol* p = taosArrayGet(pLastCols, slotId);
@ -417,6 +408,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
tsdbCacheRelease(lruCache, h);
}

View File

@ -143,23 +143,13 @@ _exit:
return code;
}
void vnodeUpdCommitSched(SVnode *pVnode) {
int64_t randNum = taosRand();
pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
}
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
bool diskAvail = osDataSpaceAvailable();
bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) {
needCommit =
((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
((pVnode->inUse->size > 0) && atExit);
needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit);
}
taosThreadMutexUnlock(&pVnode->mutex);
return needCommit;
@ -431,8 +421,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
vnodeUpdCommitSched(pVnode);
// persist wal before starting
if (walPersist(pVnode->pWal) < 0) {
vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());

View File

@ -309,8 +309,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
vnodeUpdCommitSched(pVnode);
int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool

View File

@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock(&pVnode->lock);
@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
} else {
tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
vnodeUpdCommitSched(pVnode);
}
#if BATCH_ENABLE

View File

@ -45,13 +45,13 @@ static void destroyCacheScanOperator(void* param);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
@ -91,7 +91,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -114,7 +115,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
}
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
@ -123,7 +125,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->cost.openCost = 0;
return pOperator;
_error:
_error:
pTaskInfo->code = code;
destroyCacheScanOperator(pInfo);
taosMemoryFree(pOperator);
@ -136,8 +138,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
@ -194,8 +196,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pRes->info.rows = 1;
SExprSupp* pSup = &pInfo->pseudoExprSup;
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
@ -217,7 +219,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
STableKeyInfo* pList = NULL;
int32_t num = 0;
int32_t num = 0;
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) {
@ -251,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) {
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
GET_TASKID(pTaskInfo), NULL);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
@ -325,7 +325,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
return TSDB_CODE_SUCCESS;
}
size_t size = taosArrayGetSize(pColMatchInfo->pList);
size_t size = taosArrayGetSize(pColMatchInfo->pList);
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
for (int32_t i = 0; i < size; ++i) {

View File

@ -1933,14 +1933,35 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
}
static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
int16_t resType = TSDB_DATA_TYPE_BIGINT;
if (1 != numOfParams && 2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (!IS_STR_DATA_TYPE(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type)) {
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_STR_DATA_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (2 == numOfParams) {
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
if (pValue->datum.i == 1) {
resType = TSDB_DATA_TYPE_TIMESTAMP;
} else if (pValue->datum.i == 0) {
resType = TSDB_DATA_TYPE_BIGINT;
} else {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TO_UNIXTIMESTAMP function second parameter should be 0/1");
}
}
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
@ -1948,7 +1969,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
return code;
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
pFunc->node.resType = (SDataType){.bytes = tDataTypes[resType].bytes, .type = resType};
return TSDB_CODE_SUCCESS;
}

View File

@ -3035,12 +3035,13 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect
}
static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pGroupByList && NULL != pSelect->pHaving) {
if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow &&
NULL != pSelect->pHaving) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
}
pCxt->currClause = SQL_CLAUSE_HAVING;
int32_t code = translateExpr(pCxt, &pSelect->pHaving);
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow)) {
code = checkExprForGroupBy(pCxt, &pSelect->pHaving);
}
return code;

View File

@ -740,6 +740,13 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
pWindow->node.pConditions = nodesCloneNode(pSelect->pHaving);
if (NULL == pWindow->node.pConditions) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
pSelect->hasAggFuncs = false;
if (TSDB_CODE_SUCCESS == code) {
@ -1132,6 +1139,14 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving && !pSelect->hasAggFuncs && NULL == pSelect->pGroupByList &&
NULL == pSelect->pWindow) {
pPartition->node.pConditions = nodesCloneNode(pSelect->pHaving);
if (NULL == pPartition->node.pConditions) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pPartition;
} else {

View File

@ -1095,7 +1095,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group
*pNotOptimize = false;
return TSDB_CODE_SUCCESS;
}
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
@ -2139,7 +2139,7 @@ typedef struct SLastRowScanOptLastParaCkCxt {
bool hasCol;
} SLastRowScanOptLastParaCkCxt;
static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
static EDealRes lastRowScanOptLastParaIsTagImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SLastRowScanOptLastParaCkCxt* pCxt = pContext;
if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
@ -2152,10 +2152,10 @@ static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static bool lastRowScanOptLastParaCheck(SNode* pExpr) {
static bool lastRowScanOptLastParaIsTag(SNode* pExpr) {
SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false};
nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt);
return !cxt.hasTag && cxt.hasCol;
nodesWalkExpr(pExpr, lastRowScanOptLastParaIsTagImpl, &cxt);
return cxt.hasTag && !cxt.hasCol;
}
static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) {
@ -2195,15 +2195,19 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) {
if (hasSelectFunc || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
}
hasLastFunc = true;
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) {
if (hasLastFunc) {
return false;
}
hasSelectFunc = true;
} else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
}
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
return false;
}
@ -2237,7 +2241,7 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols) {
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, bool erase) {
SNode* pTarget = NULL;
WHERE_EACH(pTarget, pTargets) {
bool found = false;
@ -2249,7 +2253,7 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo
break;
}
}
if (!found) {
if (!found && erase) {
ERASE_NODE(pTargets);
continue;
}
@ -2290,9 +2294,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
pScan->igLastNull = pAgg->hasLast ? true : false;
if (NULL != cxt.pLastCols) {
cxt.doAgg = false;
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols);
NODES_DESTORY_LIST(pScan->pScanPseudoCols);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols);
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false);
nodesClearList(cxt.pLastCols);
}
pAgg->hasLastRow = false;

View File

@ -1124,7 +1124,8 @@ _end:
int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int32_t type = GET_PARAM_TYPE(pInput);
int64_t timePrec;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
int32_t idx = (inputNum == 2) ? 1 : 2;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[idx]), pInput[idx].columnData->pData);
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) {

View File

@ -53,8 +53,15 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
goto _err;
}
if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
terrno = TSDB_CODE_SYN_NEGO_WIN_EXCEEDED;
sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(),
index, pBuf->commitIndex);
goto _err;
}
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) {
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
terrno = TSDB_CODE_SYN_WRITE_STALL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
@ -83,6 +90,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
_err:
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
taosMsleep(1);
return -1;
}

View File

@ -202,21 +202,22 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
// restore error code
terrno = errCode;
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode != NULL) {
taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64
", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", applied-index:%" PRId64
", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum,
pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems,
pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser,
bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, appliedIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->electNum, pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum,
aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
}
}

View File

@ -426,6 +426,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restor
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGO_WIN_EXCEEDED, "Sync negotiation win exceeded")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq

View File

@ -26,7 +26,7 @@ class TDTestCase:
'c1':'int',
'c2':'float',
'c3':'binary(20)'
}
# structure of tag
self.tag_dict = {
@ -60,7 +60,7 @@ class TDTestCase:
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(len(values_list))
elif tb_type == 'stb':
tdSql.checkRows(len(self.values_list)*tb_num)
tdSql.checkRows(len(self.values_list)*tb_num)
for time in ['2020-01-32T08:00:00','2020-13-32T08:00:00','acd']:
tdSql.query(f"select to_unixtimestamp('{time}') from {tbname}")
if tb_type == 'ntb' or tb_type == 'ctb':
@ -74,7 +74,7 @@ class TDTestCase:
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(len(values_list))
elif tb_type == 'stb':
tdSql.checkRows(len(values_list)*tb_num)
tdSql.checkRows(len(values_list)*tb_num)
for time in self.error_param:
tdSql.error(f"select to_unixtimestamp({time}) from {tbname}")
def timestamp_change_check_ntb(self):
@ -95,9 +95,20 @@ class TDTestCase:
self.data_check(f'{self.stbname}_{i}',self.values_list,'ctb')
self.data_check(self.stbname,self.values_list,'stb',self.tbnum)
tdSql.execute(f'drop database {self.dbname}')
def timestamp_change_return_type(self):
tdSql.query(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 0);")
tdSql.checkEqual(tdSql.queryResult[0][0], 0)
tdSql.query(f"select to_unixtimestamp('1970-01-01 00:00:00', 1);")
tdSql.checkData(0, 0, '1970-01-01 00:00:00')
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 2);")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 1.5);")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 'abc');")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', true);")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 1, 3);")
def run(self): # sourcery skip: extract-duplicate-method
self.timestamp_change_check_ntb()
self.timestamp_change_check_stb()
self.timestamp_change_return_type()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")