Merge pull request #28589 from taosdata/enh/TS-5441-3.0

fix:[TS-5441] error if cols not equal in write_raw_block
This commit is contained in:
Shengliang Guan 2024-10-31 17:36:58 +08:00 committed by GitHub
commit 765a0f4a03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 141 additions and 275 deletions

View File

@ -886,6 +886,64 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
return false; return false;
} }
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
if (*fields != pColSchema->type) {
if (errstr != NULL)
snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
return TSDB_CODE_INVALID_PARA;
}
if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
return TSDB_CODE_INVALID_PARA;
}
if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
return TSDB_CODE_INVALID_PARA;
}
return 0;
}
#define PRCESS_DATA(i, j) \
ret = checkSchema(pColSchema, fields, errstr, errstrLen); \
if (ret != 0) { \
goto end; \
} \
\
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { \
hasTs = true; \
} \
\
int8_t* offset = pStart; \
if (IS_VAR_DATA_TYPE(pColSchema->type)) { \
pStart += numOfRows * sizeof(int32_t); \
} else { \
pStart += BitmapLen(numOfRows); \
} \
char* pData = pStart; \
\
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j); \
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
if (ret != 0) { \
goto end; \
} \
fields += sizeof(int8_t) + sizeof(int32_t); \
if (needChangeLength && version == BLOCK_VERSION_1) { \
pStart += htonl(colLength[i]); \
} else { \
pStart += colLength[i]; \
} \
boundInfo->pColIndex[j] = -1;
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) { int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
int ret = 0; int ret = 0;
@ -963,51 +1021,15 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
// if (tFields != NULL && numFields > boundInfo->numOfBound) {
// if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
// ret = TSDB_CODE_INVALID_PARA;
// goto end;
// }
if (tFields == NULL && numOfCols != boundInfo->numOfBound) {
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (tFields == NULL) {
for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (ret != 0) {
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[j]);
} else {
pStart += colLength[j];
}
}
} else {
bool hasTs = false; bool hasTs = false;
if (tFields == NULL) {
int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
for (int j = 0; j < len; j++) {
SSchema* pColSchema = &pSchema[j];
PRCESS_DATA(j, j)
}
} else {
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++) { for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j]; SSchema* pColSchema = &pSchema[j];
@ -1018,49 +1040,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
fieldName = ((TAOS_FIELD*)tFields)[i].name; fieldName = ((TAOS_FIELD*)tFields)[i].name;
} }
if (strcmp(pColSchema->name, fieldName) == 0) { if (strcmp(pColSchema->name, fieldName) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { PRCESS_DATA(i, j)
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
hasTs = true;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
// for(int k = 0; k < numOfRows; k++) {
// if(!colDataIsNull_f(offset, k) && pColSchema->type == TSDB_DATA_TYPE_INT){
// printf("colName:%s,val:%d", fieldName, *(int32_t*)(pStart + k * sizeof(int32_t)));
// }
// }
}
char* pData = pStart;
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (ret != 0) {
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[i]);
} else {
pStart += colLength[i];
}
boundInfo->pColIndex[j] = -1;
break; break;
} }
} }
} }
}
if (!hasTs) { if (!hasTs) {
if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data"); if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
@ -1068,6 +1053,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
// process NULL data
for (int c = 0; c < boundInfo->numOfBound; ++c) { for (int c = 0; c < boundInfo->numOfBound; ++c) {
if (boundInfo->pColIndex[c] != -1) { if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
@ -1079,7 +1065,6 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
boundInfo->pColIndex[c] = c; // restore for next block boundInfo->pColIndex[c] = c; // restore for next block
} }
} }
}
end: end:
return ret; return ret;

View File

@ -19,196 +19,77 @@
#include "taos.h" #include "taos.h"
#include "types.h" #include "types.h"
int buildStable(TAOS* pConn) { TAOS* pConn = NULL;
TAOS_RES* pRes = taos_query(pConn, void action(char* sql) {
"CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS " TAOS_RES* pRes = taos_query(pConn, sql);
"(`groupid` INT, `location` VARCHAR(16))"); ASSERT(taos_errno(pRes) == 0);
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1 using meters tags(2, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d2 using meters tags(3, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table ntba(ts timestamp, addr binary(32))");
if (taos_errno(pRes) != 0) {
printf("failed to create ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table ntbb(ts timestamp, addr binary(8))");
if (taos_errno(pRes) != 0) {
printf("failed to create ntbb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ntba values(now,'123456789abcdefg123456789')");
if (taos_errno(pRes) != 0) {
printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ntba values(now + 1s,'hello')");
if (taos_errno(pRes) != 0) {
printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
} }
int32_t init_env() { int32_t test_write_raw_block(char* query, char* dst) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS_RES* pRes = taos_query(pConn, query);
if (pConn == NULL) { ASSERT(taos_errno(pRes) == 0);
return -1;
}
int32_t ret = -1;
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_raw");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_raw vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_raw");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
buildStable(pConn);
pRes = taos_query(pConn, "select * from d0");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
void* data = NULL; void* data = NULL;
int32_t numOfRows = 0; int32_t numOfRows = 0;
int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data); int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){ ASSERT(error_code == 0);
printf("error fetch raw block, reason:%s\n", taos_errstr(pRes)); error_code = taos_write_raw_block(pConn, numOfRows, data, dst);
goto END;
}
taos_write_raw_block(pConn, numOfRows, data, "d1");
taos_free_result(pRes); taos_free_result(pRes);
return error_code;
}
pRes = taos_query(pConn, "select ts,phase from d0"); int32_t test_write_raw_block_with_fields(char* query, char* dst) {
if (taos_errno(pRes) != 0) { TAOS_RES* pRes = taos_query(pConn, query);
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); ASSERT(taos_errno(pRes) == 0);
goto END; void* data = NULL;
} int32_t numOfRows = 0;
error_code = taos_fetch_raw_block(pRes, &numOfRows, &data); int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){ ASSERT(error_code == 0);
printf("error fetch raw block, reason:%s\n", taos_errstr(pRes));
goto END;
}
int numFields = taos_num_fields(pRes); int numFields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes); TAOS_FIELD* fields = taos_fetch_fields(pRes);
taos_write_raw_block_with_fields(pConn, numOfRows, data, "d2", fields, numFields); error_code = taos_write_raw_block_with_fields(pConn, numOfRows, data, dst, fields, numFields);
taos_free_result(pRes); taos_free_result(pRes);
return error_code;
// check error msg
pRes = taos_query(pConn, "select * from ntba");
if (taos_errno(pRes) != 0) {
printf("error select * from ntba, reason:%s\n", taos_errstr(pRes));
goto END;
} }
data = NULL; void init_env() {
numOfRows = 0; pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
error_code = taos_fetch_raw_block(pRes, &numOfRows, &data); ASSERT(pConn);
if(error_code !=0 ){
printf("error fetch select * from ntba, reason:%s\n", taos_errstr(pRes));
goto END;
}
error_code = taos_write_raw_block(pConn, numOfRows, data, "ntbb");
if(error_code == 0) {
printf(" taos_write_raw_block to ntbb expect failed , but success!\n");
goto END;
}
// pass NULL return last error code describe action("drop database if exists db_raw");
const char* err = tmq_err2str(error_code); action("create database if not exists db_raw vgroups 2");
printf("write_raw_block return code =0x%x err=%s\n", error_code, err); action("use db_raw");
if(strcmp(err, "success") == 0) {
printf("expect failed , but error string is success! err=%s\n", err);
goto END;
}
// no exist table action(
error_code = taos_write_raw_block(pConn, numOfRows, data, "no-exist-table"); "CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, "
if(error_code == 0) { "`location` VARCHAR(16))");
printf(" taos_write_raw_block to no-exist-table expect failed , but success!\n"); action("create table d0 using meters tags(1, 'San Francisco')");
goto END; action("create table d1 using meters tags(2, 'San Francisco')");
} action("create table d2 using meters tags(3, 'San Francisco')");
action("insert into d0 (ts, current) values (now, 120)");
err = tmq_err2str(error_code); action("create table ntba(ts timestamp, addr binary(32))");
printf("write_raw_block no exist table return code =0x%x err=%s\n", error_code, err); action("create table ntbb(ts timestamp, addr binary(8))");
if(strcmp(err, "success") == 0) { action("create table ntbc(ts timestamp, addr binary(8), c2 int)");
printf("expect failed write no exist table, but error string is success! err=%s\n", err);
goto END;
}
// success action("insert into ntba values(now,'123456789abcdefg123456789')");
ret = 0; action("insert into ntbb values(now + 1s,'hello')");
action("insert into ntbc values(now + 13s, 'sdf', 123)");
END:
// free
if(pRes) taos_free_result(pRes);
if(pConn) taos_close(pConn);
return ret;
} }
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
printf("test write_raw_block...\n"); printf("test write_raw_block start.\n");
int ret = init_env(); init_env();
if (ret < 0) { ASSERT(test_write_raw_block("select * from d0", "d1") == 0); // test schema same
printf("test write_raw_block failed.\n"); ASSERT(test_write_raw_block("select * from ntbb", "ntba") == 0); // test schema compatible
return ret; ASSERT(test_write_raw_block("select * from ntbb", "ntbc") == 0); // test schema small
} ASSERT(test_write_raw_block("select * from ntbc", "ntbb") == 0); // test schema bigger
printf("test write_raw_block ok.\n"); ASSERT(test_write_raw_block("select * from ntba", "ntbb") != 0); // test schema mismatch
ASSERT(test_write_raw_block("select * from ntba", "no-exist-table") != 0); // test no exist table
ASSERT(test_write_raw_block("select addr from ntba", "ntbb") != 0); // test without ts
ASSERT(test_write_raw_block_with_fields("select ts,phase from d0", "d2") == 0); // test with fields
printf("test write_raw_block end.\n");
return 0; return 0;
} }