enh: cal output col counts

This commit is contained in:
factosea 2024-11-05 15:42:57 +08:00
parent 7041e64744
commit a51bd9a24f
1 changed files with 24 additions and 14 deletions

View File

@ -45,6 +45,7 @@ typedef struct SDataDispatchHandle {
SDataBlockDescNode* pSchema; SDataBlockDescNode* pSchema;
STaosQueue* pDataBlocks; STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput; SDataDispatchBuf nextOutput;
int32_t outPutColCounts;
int32_t status; int32_t status;
bool queryEnd; bool queryEnd;
uint64_t useconds; uint64_t useconds;
@ -68,23 +69,12 @@ static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData*
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SNode* pNode; if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) {
int32_t numOfCols = 0; qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock));
FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
++numOfCols;
} else {
// Slots must be sorted, and slots with 'output' set to true must come first
break;
}
}
if (numOfCols > taosArrayGetSize(pInput->pData->pDataBlock)) {
qError("invalid column number, schema:%d, input:%zu", numOfCols, taosArrayGetSize(pInput->pData->pDataBlock));
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SNode* pNode;
int32_t colNum = 0; int32_t colNum = 0;
FOREACH(pNode, pHandle->pSchema->pSlots) { FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
@ -419,6 +409,25 @@ static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
if (pInputDataBlockDesc == NULL) {
qError("invalid schema");
return 0;
}
SNode* pNode;
int32_t numOfCols = 0;
FOREACH(pNode, pInputDataBlockDesc->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
++numOfCols;
} else {
// Slots must be sorted, and slots with 'output' set to true must come first
break;
}
}
return numOfCols;
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
int32_t code; int32_t code;
code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc); code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
@ -443,6 +452,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->pManager = pManager; dispatcher->pManager = pManager;
pManager = NULL; pManager = NULL;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
dispatcher->status = DS_BUF_EMPTY; dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false; dispatcher->queryEnd = false;
code = taosOpenQueue(&dispatcher->pDataBlocks); code = taosOpenQueue(&dispatcher->pDataBlocks);