Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format
This commit is contained in:
commit
2bcdf9b3d4
|
@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
|
||||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||||
|
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
|
||||||
SSDataBlock* loadNextDataBlock(void* param);
|
SSDataBlock* loadNextDataBlock(void* param);
|
||||||
|
|
|
@ -4603,7 +4603,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) {
|
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) {
|
||||||
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||||
if (pCol == NULL) {
|
if (pCol == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -4611,9 +4611,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType)
|
||||||
}
|
}
|
||||||
|
|
||||||
pCol->slotId = slotId;
|
pCol->slotId = slotId;
|
||||||
pCol->bytes = pType->bytes;
|
pCol->colId = colId;
|
||||||
pCol->type = pType->type;
|
pCol->bytes = pType->bytes;
|
||||||
pCol->scale = pType->scale;
|
pCol->type = pType->type;
|
||||||
|
pCol->scale = pType->scale;
|
||||||
pCol->precision = pType->precision;
|
pCol->precision = pType->precision;
|
||||||
pCol->dataBlockId = blockId;
|
pCol->dataBlockId = blockId;
|
||||||
|
|
||||||
|
@ -4656,7 +4657,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
SDataType* pType = &pColNode->node.resType;
|
SDataType* pType = &pColNode->node.resType;
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||||
pType->precision, pColNode->colName);
|
pType->precision, pColNode->colName);
|
||||||
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
|
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType);
|
||||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||||
} else if (type == QUERY_NODE_VALUE) {
|
} else if (type == QUERY_NODE_VALUE) {
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
||||||
|
@ -4708,7 +4709,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
SColumnNode* pcn = (SColumnNode*)p1;
|
SColumnNode* pcn = (SColumnNode*)p1;
|
||||||
|
|
||||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
||||||
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType);
|
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType);
|
||||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||||
SValueNode* pvn = (SValueNode*)p1;
|
SValueNode* pvn = (SValueNode*)p1;
|
||||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||||
|
@ -4788,7 +4789,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
|
|
||||||
tsdbReaderT pDataReader = NULL;
|
tsdbReaderT pDataReader = NULL;
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
|
@ -4797,6 +4798,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
||||||
queryId, taskId);
|
queryId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
qDebug("pDataReader is NULL");
|
qDebug("pDataReader is NULL");
|
||||||
// return NULL;
|
// return NULL;
|
||||||
|
@ -4816,12 +4818,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
SInterval interval = extractIntervalInfo(pTableScanNode);
|
SInterval interval = extractIntervalInfo(pTableScanNode);
|
||||||
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(
|
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||||
pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList,
|
|
||||||
pResBlockDumy, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
|
||||||
|
|
||||||
// int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
|
||||||
// queryId, taskId);
|
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "filter.h"
|
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
#include "filter.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
|
@ -1408,35 +1408,33 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
char str[512] = {0};
|
char str[512] = {0};
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
||||||
|
|
||||||
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
|
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
|
||||||
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
|
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
|
||||||
|
metaGetTableEntryByUid(&mr, item->uid);
|
||||||
|
|
||||||
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
|
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
|
|
||||||
// refactor later
|
// refactor later
|
||||||
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
||||||
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
|
||||||
metaGetTableEntryByUid(&mr, item->uid);
|
|
||||||
|
|
||||||
STR_TO_VARSTR(str, mr.me.name);
|
STR_TO_VARSTR(str, mr.me.name);
|
||||||
metaReaderClear(&mr);
|
|
||||||
|
|
||||||
colDataAppend(pDst, count, str, false);
|
colDataAppend(pDst, count, str, false);
|
||||||
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
} else { // it is a tag value
|
||||||
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
|
||||||
// doSetTagValueToResultBuf(dst, data, type, bytes);
|
colDataAppend(pDst, count, p, (p == NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
count += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
count += 1;
|
||||||
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
|
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
|
|
|
@ -19,17 +19,18 @@ int32_t udf2_destroy() {
|
||||||
|
|
||||||
int32_t udf2_start(SUdfInterBuf *buf) {
|
int32_t udf2_start(SUdfInterBuf *buf) {
|
||||||
*(int64_t*)(buf->buf) = 0;
|
*(int64_t*)(buf->buf) = 0;
|
||||||
buf->bufLen = sizeof(int64_t);
|
buf->bufLen = sizeof(double);
|
||||||
buf->numOfResult = 0;
|
buf->numOfResult = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
||||||
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
double sumSquares = *(double*)interBuf->buf;
|
||||||
int8_t numOutput = 0;
|
int8_t numOutput = 0;
|
||||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||||
SUdfColumn* col = block->udfCols[i];
|
SUdfColumn* col = block->udfCols[i];
|
||||||
if (col->colMeta.type != TSDB_DATA_TYPE_INT) {
|
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT ||
|
||||||
|
col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
|
||||||
return TSDB_CODE_UDF_INVALID_INPUT;
|
return TSDB_CODE_UDF_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,17 +40,29 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
|
||||||
if (udfColDataIsNull(col, j)) {
|
if (udfColDataIsNull(col, j)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
switch (col->colMeta.type) {
|
||||||
char* cell = udfColDataGetData(col, j);
|
case TSDB_DATA_TYPE_INT: {
|
||||||
int32_t num = *(int32_t*)cell;
|
char* cell = udfColDataGetData(col, j);
|
||||||
sumSquares += num * num;
|
int32_t num = *(int32_t*)cell;
|
||||||
|
sumSquares += num * num;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
char* cell = udfColDataGetData(col, j);
|
||||||
|
double num = *(double*)cell;
|
||||||
|
sumSquares += num * num;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
numOutput = 1;
|
numOutput = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOutput == 1) {
|
if (numOutput == 1) {
|
||||||
*(int64_t*)(newInterBuf->buf) = sumSquares;
|
*(double*)(newInterBuf->buf) = sumSquares;
|
||||||
newInterBuf->bufLen = sizeof(int64_t);
|
newInterBuf->bufLen = sizeof(double);
|
||||||
}
|
}
|
||||||
newInterBuf->numOfResult = numOutput;
|
newInterBuf->numOfResult = numOutput;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -60,7 +73,7 @@ int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
|
||||||
resultData->numOfResult = 0;
|
resultData->numOfResult = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int64_t sumSquares = *(int64_t*)(buf->buf);
|
double sumSquares = *(double*)(buf->buf);
|
||||||
*(double*)(resultData->buf) = sqrt(sumSquares);
|
*(double*)(resultData->buf) = sqrt(sumSquares);
|
||||||
resultData->bufLen = sizeof(double);
|
resultData->bufLen = sizeof(double);
|
||||||
resultData->numOfResult = 1;
|
resultData->numOfResult = 1;
|
||||||
|
|
|
@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
|
||||||
system sh/deploy.sh -n dnode1 -i 1
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
system sh/cfg.sh -n dnode1 -c wallevel -v 2
|
system sh/cfg.sh -n dnode1 -c wallevel -v 2
|
||||||
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
|
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
|
||||||
system sh/cfg.sh -n dnode1 -c startUdfd -v 1
|
system sh/cfg.sh -n dnode1 -c udf -v 1
|
||||||
|
|
||||||
print ========= start dnode1 as LEADER
|
print ========= start dnode1 as LEADER
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
Loading…
Reference in New Issue