Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837
This commit is contained in:
commit
7501c5a865
|
@ -90,7 +90,7 @@ If `maven` is used to manage the projects, what needs to be done is only adding
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -1384,7 +1384,7 @@ SELECT SERVER_VERSION();
|
|||
SELECT SERVER_STATUS();
|
||||
```
|
||||
|
||||
**Description**: The server status.
|
||||
**Description**: The server status. When checking the status of a cluster, the recommended way is to use `SHOW CLUSTER ALIVE;`. Unlike `SELECT SERVER_STATUS();`, it does not return an error when some nodes in the cluster are unavailable; instead, it returns different status codes. Plese check [SHOW CLUSTER ALIVE](https://docs.tdengine.com/reference/taos-sql/show/#show-cluster-alive) for details.
|
||||
|
||||
### CURRENT_USER
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ REST connection supports all platforms that can run Java.
|
|||
|
||||
| taos-jdbcdriver version | major changes | TDengine version |
|
||||
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
|
||||
| 3.3.2 | 1. Optimized websocket prepareStatement performance; 2. Improved mybatis support| - |
|
||||
| 3.3.0 | 1. Optimized data transmission performance under Websocket connection; 2. SSL validation skipping is supported but disabled by default| 3.3.2.0 or later |
|
||||
| 3.2.11 | Fixed the result set closing bug when using a native connection.| - |
|
||||
| 3.2.10 | 1. Automatic compression/decompression for data transmission, disabled by default; 2.Automatic reconnection for websocket with configurable parameter, disabled by default; 3. A new method for schemaless writing is added in the connection class; 4. Optimized performance for data fetching on native connection; 5. Fixing for some known issues; 6. The list of supported functions can be returned by the API for retrieving metadata| - |
|
||||
|
@ -179,7 +180,7 @@ Add following dependency in the `pom.xml` file of your Maven project:
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.locationtech.jts</groupId>
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
<!-- druid -->
|
||||
<dependency>
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
|
|
|
@ -67,7 +67,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
<!-- <scope>system</scope>-->
|
||||
<!-- <systemPath>${project.basedir}/src/main/resources/lib/taos-jdbcdriver-2.0.15-dist.jar</systemPath>-->
|
||||
</dependency>
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
<!-- ANCHOR_END: dep-->
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -1374,7 +1374,7 @@ SELECT SERVER_VERSION();
|
|||
SELECT SERVER_STATUS();
|
||||
```
|
||||
|
||||
**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。
|
||||
**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。如果想要查询集群的状态,推荐使用 `SHOW CLUSTER ALIVE;`, 与 `SELECT SERVER_STATUS();` 不同,当集群中的部分节点不可用时,它不会返回错误,而是返回不同的状态码,详见:[SHOW CLUSTER ALIVE](https://docs.taosdata.com/reference/taos-sql/show/#show-cluster-alive)
|
||||
|
||||
### CURRENT_USER
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ REST 连接支持所有能运行 Java 的平台。
|
|||
|
||||
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
|
||||
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
|
||||
| 3.3.2 | 1. 优化 Websocket 连接下的参数绑定性能;2. 优化了对 mybatis 的支持 | - |
|
||||
| 3.3.0 | 1. 优化 Websocket 连接下的数据传输性能;2. 支持跳过 SSL 验证,默认关闭 | 3.3.2.0 及更高版本 |
|
||||
| 3.2.11 | 解决了 Native 连接关闭结果集 bug | - |
|
||||
| 3.2.10 | 1. REST/WebSocket 连接支持传输中的数据压缩;2. Websocket 自动重连机制,默认关闭;3. Connection 类提供无模式写入的方法;4. 优化了原生连接的数据拉取性能;5. 修复了一些已知问题;6.元数据获取函数可以返回支持的函数列表。 | - |
|
||||
|
|
|
@ -705,7 +705,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec
|
|||
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
|
||||
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
|
||||
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
||||
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
||||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pInfo, int32_t code);
|
||||
|
@ -810,6 +810,7 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
|
|||
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
|
||||
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq);
|
||||
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes);
|
||||
void streamTaskSetCheckpointFailed(SStreamTask* pTask);
|
||||
|
||||
// stream task state machine, and event handling
|
||||
int32_t streamCreateStateMachine(SStreamTask* pTask);
|
||||
|
|
|
@ -3009,6 +3009,12 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
data += colSizes[col];
|
||||
}
|
||||
|
||||
if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) {
|
||||
uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type);
|
||||
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
colSizes[col] = htonl(colSizes[col]);
|
||||
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
|
||||
// htonl(colSizes[col]), colSizes[col]);
|
||||
|
@ -3036,6 +3042,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
|
|||
// total rows sizeof(int32_t)
|
||||
int32_t numOfRows = *(int32_t*)pStart;
|
||||
pStart += sizeof(int32_t);
|
||||
if (numOfRows <= 0) {
|
||||
uError("block decode numOfRows:%d error", numOfRows);
|
||||
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// total columns sizeof(int32_t)
|
||||
int32_t numOfCols = *(int32_t*)pStart;
|
||||
|
@ -3115,14 +3126,19 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
|
|||
pStart += BitmapLen(numOfRows);
|
||||
}
|
||||
|
||||
if (colLen[i] > 0) {
|
||||
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
||||
}
|
||||
|
||||
// TODO
|
||||
// setting this flag to true temporarily so aggregate function on stable will
|
||||
// examine NULL value for non-primary key column
|
||||
pColInfoData->hasNull = true;
|
||||
|
||||
if (colLen[i] > 0) {
|
||||
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
||||
} else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
|
||||
uError("block decode colLen:%d error, colIdx:%d, type:%d", colLen[i], i, pColInfoData->info.type);
|
||||
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pStart += colLen[i];
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -251,7 +251,7 @@ void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
|
|||
int32_t code = mndKillTrans(pMnode, pTrans);
|
||||
mndReleaseTrans(pMnode, pTrans);
|
||||
if (code) {
|
||||
mError("failed to kill trans:%d", pTrans->id);
|
||||
mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
|
||||
|
|
|
@ -1129,7 +1129,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
if (pTask == NULL || code != 0) {
|
||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
|
||||
" transId:%d it may have been destroyed",
|
||||
vgId, req.taskId, req.checkpointId, req.transId);
|
||||
|
|
|
@ -410,7 +410,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to decode retrieve msg, quit handling it", pMeta->vgId);
|
||||
tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -420,9 +420,16 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||
req.dstTaskId);
|
||||
tCleanupStreamRetrieveReq(&req);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// enqueue
|
||||
tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
|
||||
|
||||
// if task is in ck status, set current ck failed
|
||||
streamTaskSetCheckpointFailed(pTask);
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
code = streamProcessRetrieveReq(pTask, &req);
|
||||
} else {
|
||||
|
@ -431,14 +438,19 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
code = streamTaskBroadcastRetrieveReq(pTask, &req);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) { // return error not send rsp manually
|
||||
tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
|
||||
req.srcTaskId, tstrerror(code));
|
||||
} else { // send rsp manually only on success.
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||
streamTaskSendRetrieveRsp(&req, &rsp);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
tCleanupStreamRetrieveReq(&req);
|
||||
|
||||
// always return success, to disable the auto rsp
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
|
|
|
@ -1743,8 +1743,6 @@ static int32_t initRowMergeIfNeeded(STsdbReader* pReader, int64_t uid) {
|
|||
if (ps == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
code = tsdbRowMergerInit(pMerger, ps);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -51,7 +51,7 @@ typedef struct SAggOperatorInfo {
|
|||
} SAggOperatorInfo;
|
||||
|
||||
static void destroyAggOperatorInfo(void* param);
|
||||
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||
|
||||
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
|
||||
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
|
||||
|
@ -63,7 +63,7 @@ static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, in
|
|||
|
||||
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
|
||||
|
||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||
|
||||
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
|
||||
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
|
||||
|
@ -184,7 +184,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
|
|||
if (pBlock) {
|
||||
pAggInfo->pNewGroupBlock = NULL;
|
||||
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
|
||||
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||
code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -225,12 +226,19 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||
code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
code = doAggregateImpl(pOperator, pSup->pCtx);
|
||||
if (code != 0) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -427,20 +435,24 @@ void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
|
|||
*ppBlock = NULL;
|
||||
}
|
||||
|
||||
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||
if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
|
||||
code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
|
||||
|
||||
// record the current active group id
|
||||
pAggInfo->groupId = groupId;
|
||||
return code;
|
||||
}
|
||||
|
||||
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||
// for simple group by query without interval, all the tables belong to one group result.
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||
|
||||
|
@ -452,23 +464,27 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
|||
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
|
||||
groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
|
||||
if (pResultRow == NULL || pTaskInfo->code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
code = pTaskInfo->code;
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
/*
|
||||
* not assign result buffer yet, add new result buffer
|
||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||
*/
|
||||
if (pResultRow->pageId == -1) {
|
||||
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, ret);
|
||||
code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// a new buffer page for each table. Needs to opt this design
|
||||
|
|
|
@ -464,7 +464,10 @@ _error:
|
|||
|
||||
void destroyExchangeOperatorInfo(void* param) {
|
||||
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
|
||||
(void)taosRemoveRef(exchangeObjRefPool, pExInfo->self);
|
||||
int32_t code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void freeBlock(void* pParam) {
|
||||
|
@ -505,7 +508,10 @@ void doDestroyExchangeOperatorInfo(void* param) {
|
|||
blockDataDestroy(pExInfo->pDummyBlock);
|
||||
tSimpleHashCleanup(pExInfo->pHashSources);
|
||||
|
||||
(void)tsem_destroy(&pExInfo->ready);
|
||||
int32_t code = tsem_destroy(&pExInfo->ready);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
taosMemoryFreeClear(pExInfo->pTaskId);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
|
@ -561,9 +567,13 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
(void)taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
|
||||
code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1190,7 +1200,14 @@ static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeIn
|
|||
return pTask->code;
|
||||
}
|
||||
}
|
||||
(void)tsem_wait(&pExchangeInfo->ready);
|
||||
|
||||
code = tsem_wait(&pExchangeInfo->ready);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
pTask->code = code;
|
||||
return pTask->code;
|
||||
}
|
||||
|
||||
if (pTask->pWorkerCb) {
|
||||
code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -601,7 +601,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||
(void)taosThreadOnce(&initPoolOnce, initRefPool);
|
||||
|
||||
qDebug("start to create task, TID:0x%" PRIx64 "QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
|
||||
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
|
||||
|
||||
int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
|
||||
if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
|
||||
|
@ -904,8 +904,14 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
|
|||
}
|
||||
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
|
||||
if (pExchangeInfo) {
|
||||
(void)tsem_post(&pExchangeInfo->ready);
|
||||
(void)taosReleaseRef(exchangeObjRefPool, pStop->refId);
|
||||
int32_t code = tsem_post(&pExchangeInfo->ready);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
code = taosReleaseRef(exchangeObjRefPool, pStop->refId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -288,7 +288,7 @@ void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst) {
|
|||
memcpy(p, "TID:0x", offset);
|
||||
offset += tintToHex(taskId, &p[offset]);
|
||||
|
||||
memcpy(&p[offset], "QID:0x", 7);
|
||||
memcpy(&p[offset], " QID:0x", 7);
|
||||
offset += 7;
|
||||
offset += tintToHex(queryId, &p[offset]);
|
||||
|
||||
|
|
|
@ -671,7 +671,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
||||
val = *pVal;
|
||||
|
||||
(void)taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
|
||||
bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
|
||||
qTrace("release LRU cache, res %d", bRes);
|
||||
}
|
||||
|
||||
qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
|
||||
|
@ -893,7 +894,10 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
|
|||
if (pInfo->base.pTableListInfo->groupOffset) {
|
||||
pInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
||||
} else {
|
||||
(void)taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
|
||||
int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4726,6 +4730,7 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe
|
|||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
|
||||
code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -4737,6 +4742,7 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe
|
|||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
|
||||
code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -723,6 +723,11 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pTbName = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
SColumnInfoData* pPrimaryKey = NULL;
|
||||
if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) {
|
||||
pPrimaryKey = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
||||
}
|
||||
for (; (*pIndex) < size; (*pIndex)++) {
|
||||
SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
|
||||
code = colDataSetVal(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
|
||||
|
@ -740,6 +745,11 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
code = colDataSetVal(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
colDataSetNULL(pTbName, pBlock->info.rows);
|
||||
if (pPrimaryKey != NULL) {
|
||||
colDataSetNULL(pPrimaryKey, pBlock->info.rows);
|
||||
}
|
||||
|
||||
pBlock->info.rows++;
|
||||
}
|
||||
if ((*pIndex) == size) {
|
||||
|
|
|
@ -2177,7 +2177,12 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
|||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
(void)tsem_wait(&pInfo->ready);
|
||||
code = tsem_wait(&pInfo->ready);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (pTaskInfo->code) {
|
||||
qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
|
||||
|
@ -2327,7 +2332,10 @@ void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNo
|
|||
|
||||
void destroySysScanOperator(void* param) {
|
||||
SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
|
||||
(void)tsem_destroy(&pInfo->ready);
|
||||
int32_t code = tsem_destroy(&pInfo->ready);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
blockDataDestroy(pInfo->pRes);
|
||||
|
||||
if (pInfo->name.type == TSDB_TABLE_NAME_T) {
|
||||
|
@ -2383,7 +2391,10 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
}
|
||||
|
||||
(void)tsem_post(&pScanResInfo->ready);
|
||||
int32_t res = tsem_post(&pScanResInfo->ready);
|
||||
if (res != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(res));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1231,7 +1231,7 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
cleanupAggSup(&pInfo->aggSup);
|
||||
cleanupExprSupp(&pInfo->scalarSupp);
|
||||
|
||||
(void)tdListFree(pInfo->binfo.resultRowInfo.openWindow);
|
||||
pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
|
||||
|
||||
taosArrayDestroy(pInfo->pInterpCols);
|
||||
pInfo->pInterpCols = NULL;
|
||||
|
@ -2132,7 +2132,7 @@ typedef struct SGroupTimeWindow {
|
|||
|
||||
void destroyMergeIntervalOperatorInfo(void* param) {
|
||||
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
|
||||
(void)tdListFree(miaInfo->groupIntervals);
|
||||
miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
|
||||
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
|
@ -2162,7 +2162,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
|
|||
|
||||
STimeWindow* prevWin = &prevGrpWin->window;
|
||||
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
|
||||
(void)tdListPopNode(miaInfo->groupIntervals, listNode);
|
||||
SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
|
||||
taosMemoryFreeClear(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -678,7 +678,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
|
||||
// set result type
|
||||
if (numOfParams > 2) {
|
||||
pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||
pFunc->node.resType = (SDataType){.bytes = 3200, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||
} else {
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||
}
|
||||
|
|
|
@ -2105,7 +2105,8 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
||||
if (pCtx->numOfParams > 2) {
|
||||
char buf[512] = {0};
|
||||
char buf[3200] = {0};
|
||||
// max length of double num is 317, e.g. use %.6lf to print -1.0e+308, consider the comma and bracket, 3200 is enough.
|
||||
size_t len = 1;
|
||||
|
||||
varDataVal(buf)[0] = '[';
|
||||
|
@ -6008,6 +6009,7 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
pInfo->buf = taosMemoryMalloc(pInfo->colBytes);
|
||||
if (NULL == pInfo->buf) {
|
||||
taosHashCleanup(pInfo->pHash);
|
||||
pInfo->pHash = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -6016,6 +6018,7 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
|
||||
static void modeFunctionCleanup(SModeInfo * pInfo) {
|
||||
taosHashCleanup(pInfo->pHash);
|
||||
pInfo->pHash = NULL;
|
||||
taosMemoryFreeClear(pInfo->buf);
|
||||
}
|
||||
|
||||
|
|
|
@ -1274,6 +1274,16 @@ int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void filterFreeGroup(void *pItem) {
|
||||
if (pItem == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SFilterGroup *p = (SFilterGroup *)pItem;
|
||||
taosMemoryFreeClear(p->unitIdxs);
|
||||
taosMemoryFreeClear(p->unitFlags);
|
||||
}
|
||||
|
||||
int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
|
||||
SOperatorNode *node = (SOperatorNode *)tree;
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
|
@ -1336,9 +1346,11 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
|
|||
SFilterGroup fgroup = {0};
|
||||
code = filterAddUnitToGroup(&fgroup, uidx);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
filterFreeGroup((void*)&fgroup);
|
||||
break;
|
||||
}
|
||||
if (NULL == taosArrayPush(group, &fgroup)) {
|
||||
filterFreeGroup((void*)&fgroup);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
|
@ -1658,16 +1670,6 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void filterFreeGroup(void *pItem) {
|
||||
if (pItem == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SFilterGroup *p = (SFilterGroup *)pItem;
|
||||
taosMemoryFreeClear(p->unitIdxs);
|
||||
taosMemoryFreeClear(p->unitFlags);
|
||||
}
|
||||
|
||||
EDealRes fltTreeToGroup(SNode *pNode, void *pContext) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray *preGroup = NULL;
|
||||
|
@ -2944,25 +2946,44 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t gResNum
|
|||
for (int32_t n = 0; n < usize; ++n) {
|
||||
SFilterUnit *u = (SFilterUnit *)taosArrayGetP((SArray *)colInfo->info, n);
|
||||
if (NULL == u) {
|
||||
FLT_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
break;
|
||||
}
|
||||
FLT_ERR_JRET(filterAddUnitFromUnit(info, &oinfo, u, &uidx));
|
||||
FLT_ERR_JRET(filterAddUnitToGroup(&ng, uidx));
|
||||
code = filterAddUnitFromUnit(info, &oinfo, u, &uidx);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
code = filterAddUnitToGroup(&ng, uidx);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
filterFreeGroup((void*)&ng);
|
||||
FLT_ERR_JRET(code);
|
||||
}
|
||||
|
||||
if (colInfo->type != RANGE_TYPE_MR_CTX) {
|
||||
fltError("filterRewrite get invalid col type : %d", colInfo->type);
|
||||
FLT_ERR_JRET(TSDB_CODE_QRY_FILTER_INVALID_TYPE);
|
||||
}
|
||||
|
||||
FLT_ERR_JRET(filterAddGroupUnitFromCtx(info, &oinfo, colInfo->info, res->colIdx[m], &ng, optr, group));
|
||||
code = filterAddGroupUnitFromCtx(info, &oinfo, colInfo->info, res->colIdx[m], &ng, optr, group);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
filterFreeGroup((void*)&ng);
|
||||
FLT_ERR_JRET(code);
|
||||
}
|
||||
}
|
||||
|
||||
if (ng.unitNum > 0) {
|
||||
if (NULL == taosArrayPush(group, &ng)) {
|
||||
filterFreeGroup((void*)&ng);
|
||||
FLT_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,12 +65,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
|||
", prev:%" PRId64,
|
||||
id, upstreamTaskId, vgId, stage, pInfo->stage);
|
||||
// record the checkpoint failure id and sent to mnode
|
||||
streamMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamTaskSetCheckpointFailed(pTask);
|
||||
}
|
||||
|
||||
if (pInfo->stage != stage) {
|
||||
|
|
|
@ -673,6 +673,15 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
|
||||
streamMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
||||
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
|
||||
int32_t code = 0;
|
||||
int32_t cap = strlen(path) + 64;
|
||||
|
@ -1111,26 +1120,20 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_
|
|||
|
||||
// record the dispatch checkpoint trigger info in the list
|
||||
// memory insufficient may cause the stream computing stopped
|
||||
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
int64_t now = taosGetTimestampMs();
|
||||
|
||||
streamMutexLock(&pInfo->lock);
|
||||
|
||||
// outputQ should be empty here
|
||||
if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
|
||||
stFatal("s-task:%s items are still in outputQ, failed to init trigger dispatch info", pTask->id.idStr);
|
||||
return;
|
||||
}
|
||||
|
||||
pInfo->dispatchTrigger = true;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) {
|
||||
// pause the stream task, if memory not enough
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
streamMutexUnlock(&pInfo->lock);
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
||||
|
@ -1141,13 +1144,15 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
|||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) {
|
||||
// pause the stream task, if memory not enough
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
streamMutexUnlock(&pInfo->lock);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pInfo->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
|
||||
|
|
|
@ -727,6 +727,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
|
||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = 0;
|
||||
SStreamDataBlock* pBlock = NULL;
|
||||
|
||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
if (numOfElems > 0) {
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
|
||||
|
@ -755,7 +758,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
|
||||
}
|
||||
|
||||
SStreamDataBlock* pBlock = NULL;
|
||||
while (1) {
|
||||
streamQueueNextItem(pTask->outputq.queue, (SStreamQueueItem**)&pBlock);
|
||||
if (pBlock == NULL) {
|
||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
|
@ -776,14 +779,28 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
|
||||
streamMutexUnlock(&pTask->msgInfo.lock);
|
||||
|
||||
int32_t code = doBuildDispatchMsg(pTask, pBlock);
|
||||
code = doBuildDispatchMsg(pTask, pBlock);
|
||||
if (code == 0) {
|
||||
destroyStreamDataBlock(pBlock);
|
||||
} else { // todo handle build dispatch msg failed
|
||||
}
|
||||
|
||||
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
streamTaskInitTriggerDispatchInfo(pTask);
|
||||
// outputQ should be empty here, otherwise, set the checkpoint failed due to the retrieve req happens
|
||||
if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
|
||||
stError("s-task:%s items are still in outputQ due to downstream retrieve, failed to init trigger dispatch",
|
||||
pTask->id.idStr);
|
||||
streamTaskSetCheckpointFailed(pTask);
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = streamTaskInitTriggerDispatchInfo(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
||||
|
|
|
@ -98,14 +98,13 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
|
|||
void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
|
||||
*totalBlocks = 0;
|
||||
*totalSize = 0;
|
||||
|
||||
int32_t size = 0;
|
||||
int32_t numOfBlocks = 0;
|
||||
SArray* pRes = NULL;
|
||||
|
||||
*totalBlocks = 0;
|
||||
*totalSize = 0;
|
||||
|
||||
while (1) {
|
||||
if (pRes == NULL) {
|
||||
pRes = taosArrayInit(4, sizeof(SSDataBlock));
|
||||
|
@ -131,6 +130,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
|
|||
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
SSDataBlock block = {0};
|
||||
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
|
||||
|
||||
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
|
||||
if (num != 1) {
|
||||
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
|
||||
|
@ -596,12 +596,32 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
|
|||
// static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
|
||||
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
|
||||
|
||||
static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, int64_t totalSize, int64_t blockSize,
|
||||
double st, const char* id) {
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
|
||||
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%" PRId64, id,
|
||||
el, SIZE_IN_MiB(totalSize), totalBlocks);
|
||||
|
||||
pInfo->outputDataBlocks += totalBlocks;
|
||||
pInfo->outputDataSize += totalSize;
|
||||
if (fabs(el - 0.0) <= DBL_EPSILON) {
|
||||
pInfo->procsThroughput = 0;
|
||||
pInfo->outputThroughput = 0;
|
||||
} else {
|
||||
pInfo->outputThroughput = (totalSize / el);
|
||||
pInfo->procsThroughput = (blockSize / el);
|
||||
}
|
||||
}
|
||||
|
||||
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t blockSize = 0;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
||||
int64_t ver = pInfo->processedVer;
|
||||
int64_t totalSize = 0;
|
||||
int32_t totalBlocks = 0;
|
||||
|
||||
stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
|
||||
|
||||
|
@ -611,23 +631,8 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i
|
|||
return;
|
||||
}
|
||||
|
||||
int64_t totalSize = 0;
|
||||
int32_t totalBlocks = 0;
|
||||
streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
|
||||
SIZE_IN_MiB(totalSize), totalBlocks);
|
||||
|
||||
pTask->execInfo.outputDataBlocks += totalBlocks;
|
||||
pTask->execInfo.outputDataSize += totalSize;
|
||||
if (fabs(el - 0.0) <= DBL_EPSILON) {
|
||||
pTask->execInfo.procsThroughput = 0;
|
||||
pTask->execInfo.outputThroughput = 0;
|
||||
} else {
|
||||
pTask->execInfo.outputThroughput = (totalSize / el);
|
||||
pTask->execInfo.procsThroughput = (blockSize / el);
|
||||
}
|
||||
doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
|
||||
|
||||
// update the currentVer if processing the submit blocks.
|
||||
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
|
||||
|
|
|
@ -1254,16 +1254,7 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
|||
continue;
|
||||
}
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
} else {
|
||||
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState.name);
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamTaskSetCheckpointFailed(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
|
|
|
@ -287,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
streamDataSubmitDestroy(px);
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t msgLen = px->submit.msgLen;
|
||||
|
@ -312,7 +312,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
streamFreeQitem(pItem);
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||
|
|
|
@ -1098,15 +1098,12 @@ static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq*
|
|||
return terrno = code;
|
||||
}
|
||||
|
||||
// enqueue
|
||||
stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
|
||||
|
||||
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
||||
pData->srcVgId = 0;
|
||||
|
||||
code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
taosFreeQitem(pData);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -124,6 +124,9 @@ class TDTestCase:
|
|||
tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.ntbname}')
|
||||
tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]')
|
||||
|
||||
tdSql.query(f'select percentile(col1 * 1e+200, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.ntbname}')
|
||||
tdSql.checkRows(1);
|
||||
|
||||
tdSql.error(f'select percentile(col1) from {self.ntbname}')
|
||||
tdSql.error(f'select percentile(col1, -1) from {self.ntbname}')
|
||||
tdSql.error(f'select percentile(col1, 101) from {self.ntbname}')
|
||||
|
@ -166,6 +169,9 @@ class TDTestCase:
|
|||
tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.stbname}_0')
|
||||
tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]')
|
||||
|
||||
tdSql.query(f'select percentile(col1 * 1e+200, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.stbname}_0')
|
||||
tdSql.checkRows(1);
|
||||
|
||||
tdSql.error(f'select percentile(col1) from {self.stbname}_0')
|
||||
tdSql.error(f'select percentile(col1, -1) from {self.stbname}_0')
|
||||
tdSql.error(f'select percentile(col1, 101) from {self.stbname}_0')
|
||||
|
|
Loading…
Reference in New Issue