fix:[TS-5441] error if cols not equal in write_raw_block

This commit is contained in:
wangmm0220 2024-10-31 14:51:56 +08:00
parent 6c155d949e
commit 3a66d2fb3b
2 changed files with 141 additions and 275 deletions

View File

@ -147,7 +147,7 @@ int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchem
}
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
SArray* tagName, uint8_t tagNum, int32_t ttl) {
SArray* tagName, uint8_t tagNum, int32_t ttl) {
pTbReq->type = TD_CHILD_TABLE;
pTbReq->ctb.pTag = (uint8_t*)pTag;
pTbReq->name = taosStrdup(tname);
@ -174,7 +174,7 @@ static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
SSchema* pSchemas = getTableColumnSchema(pTableMeta);
int32_t code = 0;
int32_t code = 0;
for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
if (NULL == taosArrayPush(pValues, &val)) {
@ -886,19 +886,77 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
return false;
}
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
if (*fields != pColSchema->type) {
if (errstr != NULL)
snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
return TSDB_CODE_INVALID_PARA;
}
if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
return TSDB_CODE_INVALID_PARA;
}
if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
return TSDB_CODE_INVALID_PARA;
}
return 0;
}
#define PRCESS_DATA(i, j) \
ret = checkSchema(pColSchema, fields, errstr, errstrLen); \
if (ret != 0) { \
goto end; \
} \
\
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { \
hasTs = true; \
} \
\
int8_t* offset = pStart; \
if (IS_VAR_DATA_TYPE(pColSchema->type)) { \
pStart += numOfRows * sizeof(int32_t); \
} else { \
pStart += BitmapLen(numOfRows); \
} \
char* pData = pStart; \
\
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j); \
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
if (ret != 0) { \
goto end; \
} \
fields += sizeof(int8_t) + sizeof(int32_t); \
if (needChangeLength && version == BLOCK_VERSION_1) { \
pStart += htonl(colLength[i]); \
} else { \
pStart += colLength[i]; \
} \
boundInfo->pColIndex[j] = -1;
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
int ret = 0;
if(data == NULL) {
if (data == NULL) {
uError("rawBlockBindData, data is NULL");
return TSDB_CODE_APP_ERROR;
}
void* tmp =
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
SVCreateTbReq *pCreateReqTmp = NULL;
if (tmp == NULL && pCreateTb != NULL){
SVCreateTbReq* pCreateReqTmp = NULL;
if (tmp == NULL && pCreateTb != NULL) {
ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
if (ret != TSDB_CODE_SUCCESS){
if (ret != TSDB_CODE_SUCCESS) {
uError("cloneSVreateTbReq error");
goto end;
}
@ -906,7 +964,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
STableDataCxt* pTableCxt = NULL;
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
if (pCreateReqTmp != NULL) {
tdDestroySVCreateTbReq(pCreateReqTmp);
taosMemoryFree(pCreateReqTmp);
@ -963,121 +1021,48 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
// if (tFields != NULL && numFields > boundInfo->numOfBound) {
// if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
// ret = TSDB_CODE_INVALID_PARA;
// goto end;
// }
if (tFields == NULL && numOfCols != boundInfo->numOfBound) {
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
bool hasTs = false;
if (tFields == NULL) {
for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (ret != 0) {
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[j]);
} else {
pStart += colLength[j];
}
int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
for (int j = 0; j < len; j++) {
SSchema* pColSchema = &pSchema[j];
PRCESS_DATA(j, j)
}
} else {
bool hasTs = false;
for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j];
char* fieldName = NULL;
char* fieldName = NULL;
if (raw) {
fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
} else {
fieldName = ((TAOS_FIELD*)tFields)[i].name;
}
if (strcmp(pColSchema->name, fieldName) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
hasTs = true;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
// for(int k = 0; k < numOfRows; k++) {
// if(!colDataIsNull_f(offset, k) && pColSchema->type == TSDB_DATA_TYPE_INT){
// printf("colName:%s,val:%d", fieldName, *(int32_t*)(pStart + k * sizeof(int32_t)));
// }
// }
}
char* pData = pStart;
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (ret != 0) {
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[i]);
} else {
pStart += colLength[i];
}
boundInfo->pColIndex[j] = -1;
PRCESS_DATA(i, j)
break;
}
}
}
}
if (!hasTs) {
if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (!hasTs) {
if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
if (ret != 0) {
goto end;
}
} else {
boundInfo->pColIndex[c] = c; // restore for next block
// process NULL data
for (int c = 0; c < boundInfo->numOfBound; ++c) {
if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
if (ret != 0) {
goto end;
}
} else {
boundInfo->pColIndex[c] = c; // restore for next block
}
}

View File

@ -19,196 +19,77 @@
#include "taos.h"
#include "types.h"
int buildStable(TAOS* pConn) {
TAOS_RES* pRes = taos_query(pConn,
"CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS "
"(`groupid` INT, `location` VARCHAR(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
TAOS* pConn = NULL;
void action(char* sql) {
TAOS_RES* pRes = taos_query(pConn, sql);
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1 using meters tags(2, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d2 using meters tags(3, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table ntba(ts timestamp, addr binary(32))");
if (taos_errno(pRes) != 0) {
printf("failed to create ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table ntbb(ts timestamp, addr binary(8))");
if (taos_errno(pRes) != 0) {
printf("failed to create ntbb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ntba values(now,'123456789abcdefg123456789')");
if (taos_errno(pRes) != 0) {
printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ntba values(now + 1s,'hello')");
if (taos_errno(pRes) != 0) {
printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
int32_t ret = -1;
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_raw");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_raw vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_raw");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
buildStable(pConn);
pRes = taos_query(pConn, "select * from d0");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
void *data = NULL;
int32_t test_write_raw_block(char* query, char* dst) {
TAOS_RES* pRes = taos_query(pConn, query);
ASSERT(taos_errno(pRes) == 0);
void* data = NULL;
int32_t numOfRows = 0;
int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){
printf("error fetch raw block, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_write_raw_block(pConn, numOfRows, data, "d1");
ASSERT(error_code == 0);
error_code = taos_write_raw_block(pConn, numOfRows, data, dst);
taos_free_result(pRes);
return error_code;
}
pRes = taos_query(pConn, "select ts,phase from d0");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){
printf("error fetch raw block, reason:%s\n", taos_errstr(pRes));
goto END;
}
int32_t test_write_raw_block_with_fields(char* query, char* dst) {
TAOS_RES* pRes = taos_query(pConn, query);
ASSERT(taos_errno(pRes) == 0);
void* data = NULL;
int32_t numOfRows = 0;
int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
ASSERT(error_code == 0);
int numFields = taos_num_fields(pRes);
TAOS_FIELD *fields = taos_fetch_fields(pRes);
taos_write_raw_block_with_fields(pConn, numOfRows, data, "d2", fields, numFields);
TAOS_FIELD* fields = taos_fetch_fields(pRes);
error_code = taos_write_raw_block_with_fields(pConn, numOfRows, data, dst, fields, numFields);
taos_free_result(pRes);
return error_code;
}
// check error msg
pRes = taos_query(pConn, "select * from ntba");
if (taos_errno(pRes) != 0) {
printf("error select * from ntba, reason:%s\n", taos_errstr(pRes));
goto END;
}
void init_env() {
pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT(pConn);
data = NULL;
numOfRows = 0;
error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){
printf("error fetch select * from ntba, reason:%s\n", taos_errstr(pRes));
goto END;
}
error_code = taos_write_raw_block(pConn, numOfRows, data, "ntbb");
if(error_code == 0) {
printf(" taos_write_raw_block to ntbb expect failed , but success!\n");
goto END;
}
action("drop database if exists db_raw");
action("create database if not exists db_raw vgroups 2");
action("use db_raw");
// pass NULL return last error code describe
const char* err = tmq_err2str(error_code);
printf("write_raw_block return code =0x%x err=%s\n", error_code, err);
if(strcmp(err, "success") == 0) {
printf("expect failed , but error string is success! err=%s\n", err);
goto END;
}
action(
"CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, "
"`location` VARCHAR(16))");
action("create table d0 using meters tags(1, 'San Francisco')");
action("create table d1 using meters tags(2, 'San Francisco')");
action("create table d2 using meters tags(3, 'San Francisco')");
action("insert into d0 (ts, current) values (now, 120)");
// no exist table
error_code = taos_write_raw_block(pConn, numOfRows, data, "no-exist-table");
if(error_code == 0) {
printf(" taos_write_raw_block to no-exist-table expect failed , but success!\n");
goto END;
}
action("create table ntba(ts timestamp, addr binary(32))");
action("create table ntbb(ts timestamp, addr binary(8))");
action("create table ntbc(ts timestamp, addr binary(8), c2 int)");
err = tmq_err2str(error_code);
printf("write_raw_block no exist table return code =0x%x err=%s\n", error_code, err);
if(strcmp(err, "success") == 0) {
printf("expect failed write no exist table, but error string is success! err=%s\n", err);
goto END;
}
// success
ret = 0;
END:
// free
if(pRes) taos_free_result(pRes);
if(pConn) taos_close(pConn);
return ret;
action("insert into ntba values(now,'123456789abcdefg123456789')");
action("insert into ntbb values(now + 1s,'hello')");
action("insert into ntbc values(now + 13s, 'sdf', 123)");
}
int main(int argc, char* argv[]) {
printf("test write_raw_block...\n");
int ret = init_env();
if (ret < 0) {
printf("test write_raw_block failed.\n");
return ret;
}
printf("test write_raw_block ok.\n");
printf("test write_raw_block start.\n");
init_env();
ASSERT(test_write_raw_block("select * from d0", "d1") == 0); // test schema same
ASSERT(test_write_raw_block("select * from ntbb", "ntba") == 0); // test schema compatible
ASSERT(test_write_raw_block("select * from ntbb", "ntbc") == 0); // test schema small
ASSERT(test_write_raw_block("select * from ntbc", "ntbb") == 0); // test schema bigger
ASSERT(test_write_raw_block("select * from ntba", "ntbb") != 0); // test schema mismatch
ASSERT(test_write_raw_block("select * from ntba", "no-exist-table") != 0); // test no exist table
ASSERT(test_write_raw_block("select addr from ntba", "ntbb") != 0); // test without ts
ASSERT(test_write_raw_block_with_fields("select ts,phase from d0", "d2") == 0); // test with fields
printf("test write_raw_block end.\n");
return 0;
}