add colDataSetVal func desc

This commit is contained in:
xsren 2024-11-03 23:00:31 +08:00
parent 8c3e4ce63d
commit 7041e64744
2 changed files with 37 additions and 6 deletions

View File

@ -189,7 +189,11 @@ static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint
int32_t getJsonValueLen(const char* data); int32_t getJsonValueLen(const char* data);
// For the VAR_DATA_TYPE type, new data is inserted strictly according to the position of SVarColAttr.length.
// If the same row is inserted repeatedly, data holes will result.
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
// For the VAR_DATA_TYPE type, if a row already has data before inserting it (judged by offset != -1),
// it will be inserted at the original position and the old data will be overwritten.
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows,

View File

@ -70,21 +70,15 @@ static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData*
SNode* pNode; SNode* pNode;
int32_t numOfCols = 0; int32_t numOfCols = 0;
int32_t realOutputRowSize = 0;
FOREACH(pNode, pHandle->pSchema->pSlots) { FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) { if (pSlotDesc->output) {
realOutputRowSize += pSlotDesc->dataType.bytes;
++numOfCols; ++numOfCols;
} else { } else {
// Slots must be sorted, and slots with 'output' set to true must come first // Slots must be sorted, and slots with 'output' set to true must come first
break; break;
} }
} }
if (realOutputRowSize != pSchema->outputRowSize) {
qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pSchema->outputRowSize);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (numOfCols > taosArrayGetSize(pInput->pData->pDataBlock)) { if (numOfCols > taosArrayGetSize(pInput->pData->pDataBlock)) {
qError("invalid column number, schema:%d, input:%zu", numOfCols, taosArrayGetSize(pInput->pData->pDataBlock)); qError("invalid column number, schema:%d, input:%zu", numOfCols, taosArrayGetSize(pInput->pData->pDataBlock));
@ -397,8 +391,41 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc) {
if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
return TSDB_CODE_SUCCESS;
}
if (pInputDataBlockDesc == NULL) {
qError("invalid schema");
return TSDB_CODE_QRY_INVALID_INPUT;
}
SNode* pNode;
int32_t realOutputRowSize = 0;
FOREACH(pNode, pInputDataBlockDesc->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
realOutputRowSize += pSlotDesc->dataType.bytes;
} else {
// Slots must be sorted, and slots with 'output' set to true must come first
break;
}
}
if (realOutputRowSize != pInputDataBlockDesc->outputRowSize) {
qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize);
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
int32_t code; int32_t code;
code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
if (code) {
qError("failed to check input data block desc, code:%d", code);
return code;
}
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) { if (NULL == dispatcher) {