Merge branch '3.0' of https://github.com/taosdata/TDengine into doc/TD-30393
This commit is contained in:
commit
cebb168133
|
@ -80,7 +80,7 @@ If a stream is created with PARTITION BY clause and SUBTABLE clause, the name of
|
|||
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
|
||||
```
|
||||
|
||||
IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name(Starting from 3.2.3.0, in order to avoid the expression in subtable being unable to distinguish between different subtables, add '_groupId' to the end of subtable name).
|
||||
IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name(Starting from 3.2.3.0, in order to avoid the expression in subtable being unable to distinguish between different subtables, add '_stableName_groupId' to the end of subtable name).
|
||||
|
||||
If the output length exceeds the limitation of TDengine(192), the name will be truncated. If the generated name is occupied by some other table, the creation and writing of the new subtable will be failed.
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WIN
|
|||
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
|
||||
```
|
||||
|
||||
PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名(从3.2.3.0开始,为了避免 sutable 中的表达式无法区分各个子表,即误将多个相同时间线写入一个子表,在指定的子表名后面加上 _groupId)。
|
||||
PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名(从3.2.3.0开始,为了避免 sutable 中的表达式无法区分各个子表,即误将多个相同时间线写入一个子表,在指定的子表名后面加上 __stableName_groupId)。
|
||||
|
||||
注意,子表名的长度若超过 TDengine 的限制,将被截断。若要生成的子表名已经存在于另一超级表,由于 TDengine 的子表名是唯一的,因此对应新子表的创建以及数据的写入将会失败。
|
||||
|
||||
|
|
|
@ -104,6 +104,8 @@ typedef struct SNodeAllocator SNodeAllocator;
|
|||
int32_t nodesInitAllocatorSet();
|
||||
void nodesDestroyAllocatorSet();
|
||||
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId);
|
||||
int32_t nodesSimAcquireAllocator(int64_t allocatorId);
|
||||
int32_t nodesSimReleaseAllocator(int64_t allocatorId);
|
||||
int32_t nodesAcquireAllocator(int64_t allocatorId);
|
||||
int32_t nodesReleaseAllocator(int64_t allocatorId);
|
||||
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId);
|
||||
|
|
|
@ -89,24 +89,28 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
"current:%d, app current:%d",
|
||||
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
|
||||
|
||||
if (pRequest->pQuery && pRequest->pQuery->pRoot) {
|
||||
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
|
||||
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
|
||||
tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
||||
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
||||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
||||
reqType = SLOW_LOG_TYPE_INSERT;
|
||||
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
||||
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
||||
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
||||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
|
||||
if (pRequest->pQuery && pRequest->pQuery->pRoot) {
|
||||
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
|
||||
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
|
||||
tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
||||
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
||||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
||||
reqType = SLOW_LOG_TYPE_INSERT;
|
||||
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
||||
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
||||
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
||||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
||||
reqType = SLOW_LOG_TYPE_QUERY;
|
||||
}
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
||||
reqType = SLOW_LOG_TYPE_QUERY;
|
||||
}
|
||||
}
|
||||
|
||||
nodesSimReleaseAllocator(pRequest->allocatorRefId);
|
||||
}
|
||||
|
||||
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
|
||||
|
@ -491,7 +495,10 @@ void doDestroyRequest(void *p) {
|
|||
}
|
||||
taosMemoryFree(pRequest->body.interParam);
|
||||
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
nodesSimReleaseAllocator(pRequest->allocatorRefId);
|
||||
}
|
||||
nodesDestroyAllocator(pRequest->allocatorRefId);
|
||||
|
||||
taosMemoryFreeClear(pRequest->effectiveUser);
|
||||
|
|
|
@ -939,7 +939,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
|
|||
pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SFieldWithOptions));
|
||||
for (int32_t i = 0; i < pReq.numOfColumns; ++i) {
|
||||
SField *pField = taosArrayGet(pColumns, i);
|
||||
SFieldWithOptions fieldWithOption;
|
||||
SFieldWithOptions fieldWithOption = {0};
|
||||
setFieldWithOptions(&fieldWithOption, pField);
|
||||
setDefaultOptionsForField(&fieldWithOption);
|
||||
taosArrayPush(pReq.pColumns, &fieldWithOption);
|
||||
|
|
|
@ -233,7 +233,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
|||
|
||||
SSmlKv kvTs = {0};
|
||||
smlBuildTsKv(&kvTs, ts);
|
||||
if (needConverTime) {
|
||||
if (needConverTime && info->currSTableMeta != NULL) {
|
||||
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
|
||||
}
|
||||
|
||||
|
|
|
@ -872,7 +872,10 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
tscError("tmqSendHbReq asyncSendMsgToServer failed");
|
||||
}
|
||||
|
||||
OVER:
|
||||
tDestroySMqHbReq(&req);
|
||||
|
@ -1240,12 +1243,15 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(buf);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
SMqSubscribeCbParam param = {.rspErr = 0};
|
||||
if (tsem_init(¶m.rspSem, 0, 0) != 0) {
|
||||
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(sendInfo);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -1265,10 +1271,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
// avoid double free if msg is sent
|
||||
buf = NULL;
|
||||
sendInfo = NULL;
|
||||
|
||||
tsem_wait(¶m.rspSem);
|
||||
tsem_destroy(¶m.rspSem);
|
||||
|
||||
|
@ -1307,8 +1309,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
|
||||
FAIL:
|
||||
taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(sendInfo);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1381,7 +1381,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
if(pMsg->pData == NULL){
|
||||
tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
|
||||
goto FAIL;
|
||||
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
goto END;
|
||||
}
|
||||
|
||||
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
|
||||
|
@ -1410,7 +1411,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tDecoderClear(&decoder);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
goto END;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
|
@ -1426,7 +1427,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tDecoderClear(&decoder);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
goto END;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
|
@ -1437,7 +1438,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tDecoderClear(&decoder);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
goto END;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
|
@ -1795,6 +1796,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||
if (pParam == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(msg);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -1806,6 +1808,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(msg);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -1832,7 +1835,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
|
||||
return 0;
|
||||
FAIL:
|
||||
taosMemoryFreeClear(msg);
|
||||
return tmqPollCb(pParam, NULL, code);
|
||||
}
|
||||
|
||||
|
@ -1938,8 +1940,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
terrno = pRspWrapper->code;
|
||||
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId,
|
||||
tstrerror(pRspWrapper->code));
|
||||
taosFreeQitem(pRspWrapper);
|
||||
return NULL;
|
||||
} else {
|
||||
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
|
||||
askEp(tmq, NULL, false, true);
|
||||
|
@ -2653,6 +2653,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
|
||||
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
taosMemoryFree(pReq);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -2660,6 +2661,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
if (pParam == NULL) {
|
||||
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pReq);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -2670,6 +2672,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pReq);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -2691,8 +2694,6 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
}
|
||||
|
||||
FAIL:
|
||||
taosMemoryFreeClear(pParam);
|
||||
taosMemoryFreeClear(pReq);
|
||||
askEpCb(pParam, NULL, code);
|
||||
}
|
||||
|
||||
|
@ -2799,7 +2800,6 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosMemoryFree(pParam);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2903,8 +2903,6 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
|
|||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(sendInfo);
|
||||
tsem_destroy(&pParam->sem);
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
|
@ -3161,6 +3159,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
sendInfo->requestId = req.reqId;
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->paramFreeFp = taosMemoryFree;
|
||||
sendInfo->fp = tmqGetWalInfoCb;
|
||||
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
|
||||
|
||||
|
@ -3172,8 +3171,6 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
||||
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pParam);
|
||||
taosMemoryFree(msg);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
@ -3329,8 +3326,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
|||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(msg);
|
||||
taosMemoryFree(sendInfo);
|
||||
tsem_destroy(&pParam->sem);
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
|
|
|
@ -240,6 +240,26 @@ int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAlloc
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t nodesSimAcquireAllocator(int64_t allocatorId) {
|
||||
if (allocatorId <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
|
||||
if (NULL == pAllocator) {
|
||||
return terrno;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t nodesSimReleaseAllocator(int64_t allocatorId) {
|
||||
if (allocatorId <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
|
||||
}
|
||||
|
||||
int32_t nodesAcquireAllocator(int64_t allocatorId) {
|
||||
if (allocatorId <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -70,6 +70,14 @@ class TDTestCase(TBase):
|
|||
tdSql.query(sql1)
|
||||
tdSql.checkRows(4)
|
||||
|
||||
row1 = ['2023-11-30 23:59:27.255', "201000008", 51412.900999999998021]
|
||||
row2 = ['2023-12-04 23:11:28.179', "201000008", 51458.900999999998021]
|
||||
row3 = ['2023-12-04 23:12:28.180', "201000008", 51458.800999999999476]
|
||||
row4 = ['2023-12-31 23:59:36.108', "201000008", 52855.400999999998021]
|
||||
|
||||
rows = [row1, row2, row3, row4]
|
||||
tdSql.checkDataMem(sql1, rows)
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
|
Loading…
Reference in New Issue