enh: primary key column should not be null

This commit is contained in:
kailixu 2024-04-10 18:41:20 +08:00
parent 9811a9e216
commit 7f11a3682b
4 changed files with 14 additions and 32 deletions

View File

@ -101,16 +101,18 @@ typedef struct {
int32_t kvRowSize;
} SRowBuildScanInfo;
static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
sinfo->numOfNone++;
return 0;
}
static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
static FORCE_INLINE int32_t tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
sinfo->numOfNull++;
sinfo->kvMaxOffset = sinfo->kvPayloadSize;
sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId);
return 0;
}
static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) {
@ -142,6 +144,7 @@ static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal
}
static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) {
int32_t code = 0;
int32_t colValIndex = 1;
int32_t numOfColVals = TARRAY_SIZE(colVals);
SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals);
@ -158,7 +161,7 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
for (int32_t i = 1; i < schema->numOfCols; i++) {
for (;;) {
if (colValIndex >= numOfColVals) {
tRowBuildScanAddNone(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
break;
}
@ -168,15 +171,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) {
tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i);
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) {
tRowBuildScanAddNull(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNull(sinfo, schema->columns + i))) goto _exit;
} else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) {
tRowBuildScanAddNone(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
}
colValIndex++;
break;
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) {
tRowBuildScanAddNone(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
break;
} else { // skip useless value
colValIndex++;
@ -250,7 +253,8 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
+ sinfo->kvIndexSize // index array
+ sinfo->kvPayloadSize; // payload
return 0;
_exit:
return code;
}
static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema,

View File

@ -216,11 +216,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type);
if (colDataIsNull_s(pColInfoData, j)) {
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
@ -248,11 +244,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
goto _end;
}
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);

View File

@ -1657,9 +1657,6 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
}
if (pSchema->flags & COL_IS_KEY) {
return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z);
}
pVal->flag = CV_FLAG_NULL;
return TSDB_CODE_SUCCESS;

View File

@ -267,11 +267,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
pBind = bind + c;
}
if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
if (code) {
goto _return;
@ -318,11 +313,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
pBind = bind;
}
if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) {
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);