Merge pull request #18037 from taosdata/feature/stream
fix(stream): tbname max len to 192
This commit is contained in:
commit
b28f05e6fb
|
@ -244,7 +244,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
||||||
|
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
|
||||||
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
|
tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
|
||||||
|
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
int32_t schemaLen = 0;
|
int32_t schemaLen = 0;
|
||||||
|
@ -486,7 +486,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
blkHead->uid = 0;
|
blkHead->uid = 0;
|
||||||
blkHead->schemaLen = 0;
|
blkHead->schemaLen = 0;
|
||||||
|
|
||||||
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
|
tqDebug("tq sink, convert block2 %d, rows: %d", i, rows);
|
||||||
|
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
|
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
|
||||||
|
@ -514,6 +514,9 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
|
||||||
} else {
|
} else {
|
||||||
void* colData = colDataGetData(pColData, j);
|
void* colData = colDataGetData(pColData, j);
|
||||||
|
if (k == 0) {
|
||||||
|
tqDebug("tq sink, row %d ts %" PRId64, j, *(int64_t*)colData);
|
||||||
|
}
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -816,12 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
|
||||||
|
pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
|
||||||
createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
|
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -900,7 +900,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||||
// TODO check tbname validity
|
// TODO check tbname validity
|
||||||
if (pData != (void*)-1) {
|
if (pData != (void*)-1) {
|
||||||
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN);
|
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||||
memcpy(pDest->info.parTbName, varDataVal(pData), len);
|
memcpy(pDest->info.parTbName, varDataVal(pData), len);
|
||||||
/*pDest->info.parTbName[len + 1] = 0;*/
|
/*pDest->info.parTbName[len + 1] = 0;*/
|
||||||
} else {
|
} else {
|
||||||
|
@ -1099,11 +1099,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL,
|
pOperator->fpSet =
|
||||||
destroyStreamPartitionOperatorInfo, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL);
|
||||||
|
|
||||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -946,7 +946,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pInfo->currentGroupId = -1;
|
pInfo->currentGroupId = -1;
|
||||||
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
|
pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
|
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
|
||||||
|
@ -980,7 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
||||||
pInfo->dataReader = pReadHandle;
|
pInfo->dataReader = pReadHandle;
|
||||||
// pInfo->prevGroupId = -1;
|
// pInfo->prevGroupId = -1;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
@ -1136,8 +1138,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
@ -1581,7 +1585,7 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
|
||||||
// TODO check tbname validation
|
// TODO check tbname validation
|
||||||
if (pData != (void*)-1 && pData != NULL) {
|
if (pData != (void*)-1 && pData != NULL) {
|
||||||
memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN);
|
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||||
memcpy(pBlock->info.parTbName, varDataVal(pData), len);
|
memcpy(pBlock->info.parTbName, varDataVal(pData), len);
|
||||||
/*pBlock->info.parTbName[len + 1] = 0;*/
|
/*pBlock->info.parTbName[len + 1] = 0;*/
|
||||||
} else {
|
} else {
|
||||||
|
@ -2351,7 +2355,8 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
||||||
pInfo->vnode = pHandle->vnode;
|
pInfo->vnode = pHandle->vnode;
|
||||||
|
|
||||||
pInfo->sContext = pHandle->sContext;
|
pInfo->sContext = pHandle->sContext;
|
||||||
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
|
pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -2535,7 +2540,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
||||||
pInfo->partitionSup.needCalc = false;
|
pInfo->partitionSup.needCalc = false;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
|
pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||||
|
@ -4173,7 +4179,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
pInfo->readHandle = *(SReadHandle*)readHandle;
|
pInfo->readHandle = *(SReadHandle*)readHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -4303,7 +4310,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
pInfo->readHandle = *pReadHandle;
|
pInfo->readHandle = *pReadHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
|
pTaskInfo);
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
@ -4813,11 +4821,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
|
||||||
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
|
getTableMergeScanExplainExecInfo);
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue