fix: primary key

This commit is contained in:
factosea 2024-04-15 15:07:45 +08:00
parent f831870efb
commit d14f7889c7
5 changed files with 39 additions and 27 deletions

View File

@ -177,9 +177,8 @@ typedef struct SColumnDefNode {
ENodeType type; ENodeType type;
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
SDataType dataType; SDataType dataType;
SColumnOptions* pOptions; SNode* pOptions;
bool sma; bool sma;
bool is_pk;
} SColumnDefNode; } SColumnDefNode;
typedef struct SCreateTableStmt { typedef struct SCreateTableStmt {

View File

@ -129,8 +129,8 @@ static int32_t columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) {
static int32_t columnDefNodeCopy(const SColumnDefNode* pSrc, SColumnDefNode* pDst) { static int32_t columnDefNodeCopy(const SColumnDefNode* pSrc, SColumnDefNode* pDst) {
COPY_CHAR_ARRAY_FIELD(colName); COPY_CHAR_ARRAY_FIELD(colName);
COPY_OBJECT_FIELD(dataType, sizeof(SDataType)); COPY_OBJECT_FIELD(dataType, sizeof(SDataType));
COPY_SCALAR_FIELD(sma); COPY_SCALAR_FIELD(sma);;
COPY_SCALAR_FIELD(is_pk); CLONE_NODE_FIELD(pOptions);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -800,7 +800,10 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(((SDataBlockDescNode*)pNode)->pSlots); nodesDestroyList(((SDataBlockDescNode*)pNode)->pSlots);
break; break;
case QUERY_NODE_SLOT_DESC: // no pointer field case QUERY_NODE_SLOT_DESC: // no pointer field
case QUERY_NODE_COLUMN_DEF: // no pointer field break;
case QUERY_NODE_COLUMN_DEF:
nodesDestroyNode(((SColumnDefNode*)pNode)->pOptions);
break;
case QUERY_NODE_DOWNSTREAM_SOURCE: // no pointer field case QUERY_NODE_DOWNSTREAM_SOURCE: // no pointer field
break; break;
case QUERY_NODE_DATABASE_OPTIONS: { case QUERY_NODE_DATABASE_OPTIONS: {

View File

@ -1630,7 +1630,7 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, SToken* pColName, SDataType
CHECK_OUT_OF_MEM(pCol); CHECK_OUT_OF_MEM(pCol);
COPY_STRING_FORM_ID_TOKEN(pCol->colName, pColName); COPY_STRING_FORM_ID_TOKEN(pCol->colName, pColName);
pCol->dataType = dataType; pCol->dataType = dataType;
pCol->pOptions = (SColumnOptions*)pNode; pCol->pOptions = pNode;
pCol->sma = true; pCol->sma = true;
return (SNode*)pCol; return (SNode*)pCol;
} }

View File

@ -5911,11 +5911,11 @@ static int32_t checkColumnOptions(SNodeList* pList) {
FOREACH(pNode, pList) { FOREACH(pNode, pList) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode; SColumnDefNode* pCol = (SColumnDefNode*)pNode;
if (!pCol->pOptions) return TSDB_CODE_TSC_ENCODE_PARAM_NULL; if (!pCol->pOptions) return TSDB_CODE_TSC_ENCODE_PARAM_NULL;
if (!checkColumnEncodeOrSetDefault(pCol->dataType.type, pCol->pOptions->encode)) if (!checkColumnEncodeOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->encode))
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
if (!checkColumnCompressOrSetDefault(pCol->dataType.type, pCol->pOptions->compress)) if (!checkColumnCompressOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compress))
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
if (!checkColumnLevelOrSetDefault(pCol->dataType.type, pCol->pOptions->compressLevel)) if (!checkColumnLevelOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compressLevel))
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -5929,14 +5929,14 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray, bool calB
SFieldWithOptions field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)}; SFieldWithOptions field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
strcpy(field.name, pCol->colName); strcpy(field.name, pCol->colName);
if (pCol->pOptions) { if (pCol->pOptions) {
setColEncode(&field.compress, columnEncodeVal(pCol->pOptions->encode)); setColEncode(&field.compress, columnEncodeVal(((SColumnOptions*)pCol->pOptions)->encode));
setColCompress(&field.compress, columnCompressVal(pCol->pOptions->compress)); setColCompress(&field.compress, columnCompressVal(((SColumnOptions*)pCol->pOptions)->compress));
setColLevel(&field.compress, columnLevelVal(pCol->pOptions->compressLevel)); setColLevel(&field.compress, columnLevelVal(((SColumnOptions*)pCol->pOptions)->compressLevel));
} }
if (pCol->sma) { if (pCol->sma) {
field.flags |= COL_SMA_ON; field.flags |= COL_SMA_ON;
} }
if (pCol->is_pk) { if (pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey) {
field.flags |= COL_IS_KEY; field.flags |= COL_IS_KEY;
} }
taosArrayPush(*pArray, &field); taosArrayPush(*pArray, &field);
@ -6116,10 +6116,10 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, in
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FIRST_COLUMN); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FIRST_COLUMN);
} }
} }
if (TSDB_CODE_SUCCESS == code && pCol->pOptions && pCol->pOptions->bPrimaryKey && colIndex != 1) { if (TSDB_CODE_SUCCESS == code && pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey && colIndex != 1) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SECOND_COL_PK); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SECOND_COL_PK);
} }
if (TSDB_CODE_SUCCESS == code && pCol->is_pk && if (TSDB_CODE_SUCCESS == code && pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey &&
!(TSDB_DATA_TYPE_INT == pCol->dataType.type || TSDB_DATA_TYPE_UINT == pCol->dataType.type || !(TSDB_DATA_TYPE_INT == pCol->dataType.type || TSDB_DATA_TYPE_UINT == pCol->dataType.type ||
TSDB_DATA_TYPE_BIGINT == pCol->dataType.type || TSDB_DATA_TYPE_UBIGINT == pCol->dataType.type || TSDB_DATA_TYPE_BIGINT == pCol->dataType.type || TSDB_DATA_TYPE_UBIGINT == pCol->dataType.type ||
TSDB_DATA_TYPE_VARCHAR == pCol->dataType.type)) { TSDB_DATA_TYPE_VARCHAR == pCol->dataType.type)) {
@ -6350,7 +6350,7 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem
if (pCol->sma) { if (pCol->sma) {
flags |= COL_SMA_ON; flags |= COL_SMA_ON;
} }
if (pCol->pOptions && pCol->pOptions->bPrimaryKey) { if (pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey) {
flags |= COL_IS_KEY; flags |= COL_IS_KEY;
} }
pSchema->colId = colId; pSchema->colId = colId;
@ -7832,6 +7832,16 @@ static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* a
return; return;
} }
static void setColumnDefNodePrimaryKey(SColumnDefNode* pNode, bool isPk) {
if (!pNode) return;
if (!isPk && !pNode->pOptions) return;
if(!pNode->pOptions) {
pNode->pOptions = nodesMakeNode(QUERY_NODE_COLUMN_OPTIONS);
}
((SColumnOptions*)pNode->pOptions)->bPrimaryKey = isPk;
return;
}
static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect, static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect,
SHashObj* pUserAliasSet, SNodeList* pCols, SCMCreateStreamReq* pReq) { SHashObj* pUserAliasSet, SNodeList* pCols, SCMCreateStreamReq* pReq) {
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0); SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
@ -7855,7 +7865,7 @@ static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSele
strcpy(pColDef->colName, pFunc->node.aliasName); strcpy(pColDef->colName, pFunc->node.aliasName);
pColDef->dataType = pFunc->node.resType; pColDef->dataType = pFunc->node.resType;
pColDef->sma = true; pColDef->sma = true;
pColDef->is_pk = false; setColumnDefNodePrimaryKey(pColDef, false);
code = nodesListPushFront(pCols, (SNode*)pColDef); code = nodesListPushFront(pCols, (SNode*)pColDef);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -7994,7 +8004,7 @@ static int32_t addColDefNodeByProj(SNodeList** ppCols, SNode* pProject, int8_t f
strcpy(pColDef->colName, pExpr->userAlias); strcpy(pColDef->colName, pExpr->userAlias);
pColDef->dataType = pExpr->resType; pColDef->dataType = pExpr->resType;
pColDef->sma = flags & COL_SMA_ON; pColDef->sma = flags & COL_SMA_ON;
pColDef->is_pk = flags & COL_IS_KEY; setColumnDefNodePrimaryKey(pColDef, flags & COL_IS_KEY);
return nodesListMakeAppend(ppCols, (SNode*)pColDef); return nodesListMakeAppend(ppCols, (SNode*)pColDef);
} }
@ -8168,7 +8178,7 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
strcpy(pColDef->colName, pSchema->name); strcpy(pColDef->colName, pSchema->name);
pColDef->dataType = dt; pColDef->dataType = dt;
pColDef->sma = pSchema->flags & COL_SMA_ON; pColDef->sma = pSchema->flags & COL_SMA_ON;
pColDef->is_pk = pSchema->flags & COL_IS_KEY; setColumnDefNodePrimaryKey(pColDef, pSchema->flags & COL_IS_KEY);
int32_t code = nodesListMakeAppend(ppCols, (SNode*)pColDef); int32_t code = nodesListMakeAppend(ppCols, (SNode*)pColDef);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
@ -8663,9 +8673,9 @@ static int32_t checkAndAdjStreamDestTableSchema(STranslateContext* pCxt, SCreate
pNode = nodesListGetNode(pStmt->pCols, 1); pNode = nodesListGetNode(pStmt->pCols, 1);
pCol = (SColumnDefNode*)pNode; pCol = (SColumnDefNode*)pNode;
if (STREAM_CREATE_STABLE_TRUE == pReq->createStb) { if (STREAM_CREATE_STABLE_TRUE == pReq->createStb) {
pCol->is_pk = true; setColumnDefNodePrimaryKey(pCol, true);
} }
if (!pCol->is_pk) { if (!pCol->pOptions || !((SColumnOptions*)pCol->pOptions)->bPrimaryKey) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Source table has primary key, dest table must has primary key"); "Source table has primary key, dest table must has primary key");
} }
@ -10227,8 +10237,8 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
if (pColDef->pOptions) { if (pColDef->pOptions) {
req.colCmpr.pColCmpr[index].id = index + 1; req.colCmpr.pColCmpr[index].id = index + 1;
int8_t valid = setColCompressByOption( int8_t valid = setColCompressByOption(
pScheam->type, columnEncodeVal(pColDef->pOptions->encode), columnCompressVal(pColDef->pOptions->compress), pScheam->type, columnEncodeVal(((SColumnOptions*)pColDef->pOptions)->encode), columnCompressVal(((SColumnOptions*)pColDef->pOptions)->compress),
columnLevelVal(pColDef->pOptions->compressLevel), true, &req.colCmpr.pColCmpr[index].alg); columnLevelVal(((SColumnOptions*)pColDef->pOptions)->compressLevel), true, &req.colCmpr.pColCmpr[index].alg);
if (!valid) { if (!valid) {
tdDestroySVCreateTbReq(&req); tdDestroySVCreateTbReq(&req);
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;