Merge branch '3.0' into fix/3_liaohj
This commit is contained in:
commit
ecda2a4f12
|
@ -4,7 +4,7 @@ sidebar_label: Taos-Explorer
|
||||||
description: User guide about taosExplorer
|
description: User guide about taosExplorer
|
||||||
---
|
---
|
||||||
|
|
||||||
taos-explorer is a web service which provides GUI based interactive database management tool.
|
taos-explorer is a web service which provides GUI based interactive database management tool. To ensure the best experience when accessing taosExplorer, please use Chrome version 79 or higher, Edge version 79 or higher.
|
||||||
|
|
||||||
## Install
|
## Install
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ sidebar_label: taosExplorer
|
||||||
toc_max_heading_level: 4
|
toc_max_heading_level: 4
|
||||||
---
|
---
|
||||||
|
|
||||||
taosExplorer 是一个为用户提供 TDengine 实例的可视化管理交互工具的 web 服务。本节主要讲述其安装和部署。它的各项功能都是基于简单易上手的图形界面,可以直接尝试,如果有需要也可以考高级功能和运维指南中的相关内容。
|
taosExplorer 是一个为用户提供 TDengine 实例的可视化管理交互工具的 web 服务。本节主要讲述其安装和部署。它的各项功能都是基于简单易上手的图形界面,可以直接尝试,如果有需要也可以考高级功能和运维指南中的相关内容。为了确保访问 taosExplorer 的最佳体验,请使用 Chrome 79 及以上版本,或 Edge 79 及以上版本。
|
||||||
|
|
||||||
## 安装
|
## 安装
|
||||||
|
|
||||||
|
|
|
@ -233,6 +233,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||||
* @brief find how many rows already in order start from first row
|
* @brief find how many rows already in order start from first row
|
||||||
*/
|
*/
|
||||||
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||||
|
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk);
|
||||||
|
|
||||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
||||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||||
|
|
|
@ -74,26 +74,26 @@ int32_t tGetMachineId(char **result);
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
#define GRANTS_SCHEMA \
|
#define GRANTS_SCHEMA \
|
||||||
static const SSysDbTableSchema grantsSchema[] = { \
|
static const SSysDbTableSchema grantsSchema[] = { \
|
||||||
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "version", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "timeseries", .bytes = 43 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "dnodes", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "cpu_cores", .bytes = 13 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "cpu_cores", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
#define GRANTS_SCHEMA \
|
#define GRANTS_SCHEMA \
|
||||||
static const SSysDbTableSchema grantsSchema[] = { \
|
static const SSysDbTableSchema grantsSchema[] = { \
|
||||||
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "version", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "timeseries", .bytes = 43 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "dnodes", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
{.name = "cpu_cores", .bytes = 13 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
{.name = "cpu_cores", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
// #define GRANT_CFG_ADD
|
// #define GRANT_CFG_ADD
|
||||||
|
|
|
@ -731,6 +731,10 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
|
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
|
||||||
|
if (numOfRows == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (numOfRows >= pBlock->info.rows) {
|
if (numOfRows >= pBlock->info.rows) {
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
return;
|
return;
|
||||||
|
@ -2936,6 +2940,8 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
|
||||||
|
|
||||||
// return length of encoded data, return -1 if failed
|
// return length of encoded data, return -1 if failed
|
||||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||||
|
blockDataCheck(pBlock, false);
|
||||||
|
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
|
@ -3177,6 +3183,9 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
|
||||||
}
|
}
|
||||||
|
|
||||||
*pEndPos = pStart;
|
*pEndPos = pStart;
|
||||||
|
|
||||||
|
blockDataCheck(pBlock, false);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3386,3 +3395,77 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
||||||
|
|
||||||
return nextRowIdx;
|
return nextRowIdx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (NULL == pDataBlock || pDataBlock->info.rows == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#define BLOCK_DATA_CHECK_TRESSA(o) ;
|
||||||
|
//#define BLOCK_DATA_CHECK_TRESSA(o) A S S E R T(o)
|
||||||
|
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
|
||||||
|
|
||||||
|
if (!pDataBlock->info.dataLoad && !forceChk) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isVarType = false;
|
||||||
|
int32_t colLen = 0;
|
||||||
|
int32_t nextPos = 0;
|
||||||
|
int64_t checkRows = 0;
|
||||||
|
int64_t typeValue = 0;
|
||||||
|
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
|
for (int32_t i = 0; i < colNum; ++i) {
|
||||||
|
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
|
isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
|
||||||
|
checkRows = pDataBlock->info.rows;
|
||||||
|
|
||||||
|
if (isVarType) {
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
|
||||||
|
} else {
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
|
||||||
|
}
|
||||||
|
|
||||||
|
nextPos = 0;
|
||||||
|
for (int64_t r = 0; r < checkRows; ++r) {
|
||||||
|
if (!colDataIsNull_s(pCol, r)) {
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->pData);
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
|
||||||
|
|
||||||
|
if (isVarType) {
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] < pCol->varmeta.length);
|
||||||
|
if (pCol->reassigned) {
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
|
||||||
|
} else if (0 == r) {
|
||||||
|
nextPos = pCol->varmeta.offset[r];
|
||||||
|
} else {
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
|
||||||
|
}
|
||||||
|
|
||||||
|
colLen = varDataTLen(pCol->pData + pCol->varmeta.offset[r]);
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
|
||||||
|
|
||||||
|
if (pCol->reassigned) {
|
||||||
|
BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
|
||||||
|
} else {
|
||||||
|
nextPos += colLen;
|
||||||
|
BLOCK_DATA_CHECK_TRESSA(nextPos <= pCol->varmeta.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
|
||||||
|
} else {
|
||||||
|
GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2685,12 +2685,14 @@ static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarli
|
||||||
switch (state->state) {
|
switch (state->state) {
|
||||||
case SMEMNEXTROW_ENTER: {
|
case SMEMNEXTROW_ENTER: {
|
||||||
if (state->pMem != NULL) {
|
if (state->pMem != NULL) {
|
||||||
|
/*
|
||||||
if (state->pMem->maxKey <= state->lastTs) {
|
if (state->pMem->maxKey <= state->lastTs) {
|
||||||
*ppRow = NULL;
|
*ppRow = NULL;
|
||||||
*pIgnoreEarlierTs = true;
|
*pIgnoreEarlierTs = true;
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
|
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
|
||||||
|
|
||||||
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
|
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
|
||||||
|
|
|
@ -528,6 +528,7 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock**
|
||||||
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
|
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
|
||||||
code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
|
code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
|
||||||
if (*ppRes && (code == 0)) {
|
if (*ppRes && (code == 0)) {
|
||||||
|
blockDataCheck(*ppRes, false);
|
||||||
pPost->isStarted = true;
|
pPost->isStarted = true;
|
||||||
pStbJoin->execInfo.postBlkNum++;
|
pStbJoin->execInfo.postBlkNum++;
|
||||||
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
|
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
|
||||||
|
|
|
@ -692,8 +692,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
|
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
|
||||||
pTaskInfo->paramSet = true;
|
pTaskInfo->paramSet = true;
|
||||||
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
|
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
|
||||||
|
blockDataCheck(pRes, false);
|
||||||
} else {
|
} else {
|
||||||
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
||||||
|
blockDataCheck(pRes, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -740,6 +742,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
}
|
}
|
||||||
|
|
||||||
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
||||||
|
blockDataCheck(pRes, false);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -839,6 +842,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blockDataCheck(*pRes, false);
|
||||||
|
|
||||||
uint64_t el = (taosGetTimestampUs() - st);
|
uint64_t el = (taosGetTimestampUs() - st);
|
||||||
|
|
||||||
pTaskInfo->cost.elapsedTime += el;
|
pTaskInfo->cost.elapsedTime += el;
|
||||||
|
|
|
@ -617,6 +617,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
blockDataCheck(pBlock, true);
|
||||||
|
|
||||||
colDataDestroy(p);
|
colDataDestroy(p);
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -733,6 +733,8 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blockDataCheck(pBlock, false);
|
||||||
|
|
||||||
*ppRes = pBlock;
|
*ppRes = pBlock;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -474,8 +474,11 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
|
||||||
pCtx->midRemains = false;
|
pCtx->midRemains = false;
|
||||||
} else {
|
} else {
|
||||||
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
|
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
|
||||||
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
|
if (copyRows > 0) {
|
||||||
blockDataShrinkNRows(pLess, copyRows);
|
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
|
||||||
|
blockDataShrinkNRows(pLess, copyRows);
|
||||||
|
}
|
||||||
|
|
||||||
pCtx->midRemains = true;
|
pCtx->midRemains = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1742,6 +1745,7 @@ int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBloc
|
||||||
if (pBlock && pBlock->info.rows > 0) {
|
if (pBlock && pBlock->info.rows > 0) {
|
||||||
*pResBlock = pBlock;
|
*pResBlock = pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,7 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
|
||||||
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||||
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||||
|
blockDataCheck(*ppBlock, false);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,6 +525,7 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
|
|
||||||
if ((*pResBlock) != NULL) {
|
if ((*pResBlock) != NULL) {
|
||||||
pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
|
pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
|
||||||
|
blockDataCheck(*pResBlock, false);
|
||||||
} else {
|
} else {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
|
@ -868,12 +868,14 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
|
||||||
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
|
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||||
SSDataBlock* p = NULL;
|
SSDataBlock* p = NULL;
|
||||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
|
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
|
||||||
|
blockDataCheck(p, false);
|
||||||
return (code == 0)? p:NULL;
|
return (code == 0)? p:NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
|
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||||
SSDataBlock* p = NULL;
|
SSDataBlock* p = NULL;
|
||||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
|
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
|
||||||
|
blockDataCheck(p, false);
|
||||||
return (code == 0)? p:NULL;
|
return (code == 0)? p:NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -328,7 +328,9 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
||||||
|
|
||||||
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||||
return pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||||
|
blockDataCheck(*ppBlock, false);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor: merged with fetch fp
|
// todo refactor: merged with fetch fp
|
||||||
|
@ -611,6 +613,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (block != NULL) {
|
if (block != NULL) {
|
||||||
|
blockDataCheck(block, false);
|
||||||
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
|
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
|
||||||
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
|
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
|
||||||
*ppBlock = block;
|
*ppBlock = block;
|
||||||
|
|
|
@ -140,6 +140,7 @@ void* rpcMallocCont(int64_t contLen) {
|
||||||
char* start = taosMemoryCalloc(1, size);
|
char* start = taosMemoryCalloc(1, size);
|
||||||
if (start == NULL) {
|
if (start == NULL) {
|
||||||
tError("failed to malloc msg, size:%" PRId64, size);
|
tError("failed to malloc msg, size:%" PRId64, size);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
tTrace("malloc mem:%p size:%" PRId64, start, size);
|
tTrace("malloc mem:%p size:%" PRId64, start, size);
|
||||||
|
@ -155,10 +156,13 @@ void* rpcReallocCont(void* ptr, int64_t contLen) {
|
||||||
|
|
||||||
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
|
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
|
||||||
int64_t sz = contLen + TRANS_MSG_OVERHEAD;
|
int64_t sz = contLen + TRANS_MSG_OVERHEAD;
|
||||||
st = taosMemoryRealloc(st, sz);
|
char* nst = taosMemoryRealloc(st, sz);
|
||||||
if (st == NULL) {
|
if (nst == NULL) {
|
||||||
|
taosMemoryFree(st);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
} else {
|
||||||
|
st = nst;
|
||||||
}
|
}
|
||||||
|
|
||||||
return st + TRANS_MSG_OVERHEAD;
|
return st + TRANS_MSG_OVERHEAD;
|
||||||
|
@ -168,7 +172,7 @@ int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64
|
||||||
return transSendRequest(shandle, pEpSet, pMsg, NULL);
|
return transSendRequest(shandle, pEpSet, pMsg, NULL);
|
||||||
}
|
}
|
||||||
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||||
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0|| pRid == NULL) {
|
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
|
||||||
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
|
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
|
||||||
} else {
|
} else {
|
||||||
return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
|
return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
|
||||||
|
|
Loading…
Reference in New Issue