TD-5478]:add modify child tag value through line protocol
This commit is contained in:
parent
6ff1991d35
commit
7d2cc83a38
|
@ -570,6 +570,40 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind) {
|
||||
char sql[512];
|
||||
sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName);
|
||||
|
||||
int32_t code;
|
||||
TAOS_STMT* stmt = taos_stmt_init(taos);
|
||||
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
|
||||
|
||||
if (code != 0) {
|
||||
tscError("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = taos_stmt_bind_param(stmt, bind);
|
||||
if (code != 0) {
|
||||
tscError("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = taos_stmt_execute(stmt);
|
||||
if (code != 0) {
|
||||
tscError("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = taos_stmt_close(stmt);
|
||||
if (code != 0) {
|
||||
tscError("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
|
||||
size_t numTags = taosArrayGetSize(tagsSchema);
|
||||
char* sql = malloc(tsMaxSQLStringLen+1);
|
||||
|
@ -742,31 +776,39 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
||||
true, false);
|
||||
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
|
||||
|
||||
int isNullColBind = TSDB_TRUE;
|
||||
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
|
||||
while (pCTablePoints) {
|
||||
SArray* cTablePoints = *pCTablePoints;
|
||||
|
||||
TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0);
|
||||
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
|
||||
static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
|
||||
SSmlSTableSchema* sTableSchema, SArray* cTablePoints) {
|
||||
size_t numTags = taosArrayGetSize(sTableSchema->tags);
|
||||
size_t numCols = taosArrayGetSize(sTableSchema->fields);
|
||||
size_t rows = taosArrayGetSize(cTablePoints);
|
||||
|
||||
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
|
||||
for (int i= 0; i < rows; ++i) {
|
||||
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
|
||||
for (int j = 0; j < pDataPoint->tagNum; ++j) {
|
||||
TAOS_SML_KV* kv = pDataPoint->tags + j;
|
||||
tagKVs[kv->fieldSchemaIdx] = kv;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t notNullTagsIndices[TSDB_MAX_TAGS] = {0};
|
||||
int32_t numNotNullTags = 0;
|
||||
for (int32_t i = 0; i < numTags; ++i) {
|
||||
if (tagKVs[i] != NULL) {
|
||||
notNullTagsIndices[numNotNullTags] = i;
|
||||
++numNotNullTags;
|
||||
}
|
||||
}
|
||||
|
||||
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
|
||||
taosArraySetSize(tagBinds, numTags);
|
||||
int isNullColBind = TSDB_TRUE;
|
||||
for (int j = 0; j < numTags; ++j) {
|
||||
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
|
||||
bind->is_null = &isNullColBind;
|
||||
}
|
||||
for (int j = 0; j < point->tagNum; ++j) {
|
||||
TAOS_SML_KV* kv = point->tags + j;
|
||||
for (int j = 0; j < numTags; ++j) {
|
||||
if (tagKVs[j] == NULL) continue;
|
||||
TAOS_SML_KV* kv = tagKVs[j];
|
||||
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
|
||||
bind->buffer_type = kv->type;
|
||||
bind->length = malloc(sizeof(uintptr_t*));
|
||||
|
@ -775,17 +817,86 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
|||
bind->is_null = NULL;
|
||||
}
|
||||
|
||||
// select tag1,tag2,... from stable where tbname in (ctable)
|
||||
char sql[TSDB_MAX_BINARY_LEN];
|
||||
int capacity = TSDB_MAX_BINARY_LEN;
|
||||
snprintf(sql, capacity, "select tbname, ");
|
||||
for (int i = 0; i < numNotNullTags ; ++i) {
|
||||
snprintf(sql + strlen(sql), capacity-strlen(sql), "%s,", tagKVs[notNullTagsIndices[i]]->key);
|
||||
}
|
||||
|
||||
snprintf(sql + strlen(sql) - 1, capacity - strlen(sql) + 1,
|
||||
" from %s where tbname in (\'%s\')", sTableName, cTableName);
|
||||
TAOS_RES* result = taos_query(taos, sql);
|
||||
int32_t code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
tscError("%s", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
// check tag value and set tag values if different
|
||||
TAOS_ROW row = taos_fetch_row(result);
|
||||
if (row != NULL) {
|
||||
int numFields = taos_field_count(result);
|
||||
TAOS_FIELD* fields = taos_fetch_fields(result);
|
||||
int* lengths = taos_fetch_lengths(result);
|
||||
for (int i = 1; i < numFields; ++i) {
|
||||
uint8_t type = fields[i].type;
|
||||
int32_t length = lengths[i];
|
||||
char* val = row[i];
|
||||
|
||||
TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]];
|
||||
if (tagKV->type != type) {
|
||||
tscError("child table %s tag %s type mismatch. point type : %d, db type : %d",
|
||||
cTableName, tagKV->key, tagKV->type, type);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
if (memcmp(tagKV->value, val, length) != 0) {
|
||||
TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx);
|
||||
code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind);
|
||||
if (code != 0) {
|
||||
tscError("change child table tag failed. table name %s, tag %s", cTableName, tagKV->key);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds);
|
||||
if (code != 0) {
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
|
||||
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
|
||||
free(bind->length);
|
||||
}
|
||||
taosArrayDestroy(tagBinds);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, SArray* cTablePoints) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
size_t numCols = taosArrayGetSize(sTableSchema->fields);
|
||||
size_t rows = taosArrayGetSize(cTablePoints);
|
||||
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
|
||||
|
||||
for (int i = 0; i < rows; ++i) {
|
||||
point = taosArrayGetP(cTablePoints, i);
|
||||
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
|
||||
|
||||
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
|
||||
if (colBinds == NULL) {
|
||||
tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
|
||||
"num of rows: %zu, num of cols: %zu", rows, numCols);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int isNullColBind = TSDB_TRUE;
|
||||
for (int j = 0; j < numCols; ++j) {
|
||||
TAOS_BIND* bind = colBinds + j;
|
||||
bind->is_null = &isNullColBind;
|
||||
|
@ -802,21 +913,11 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
|||
taosArrayPush(rowsBind, &colBinds);
|
||||
}
|
||||
|
||||
code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds);
|
||||
if (code == 0) {
|
||||
code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind);
|
||||
code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind);
|
||||
if (code != 0) {
|
||||
tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code));
|
||||
tscError("insert into child table %s failed. error %s", cTableName, tstrerror(code));
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
|
||||
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
|
||||
free(bind->length);
|
||||
}
|
||||
taosArrayDestroy(tagBinds);
|
||||
for (int i = 0; i < rows; ++i) {
|
||||
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
|
||||
for (int j = 0; j < numCols; ++j) {
|
||||
|
@ -826,13 +927,41 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
|||
free(colBinds);
|
||||
}
|
||||
taosArrayDestroy(rowsBind);
|
||||
taosArrayDestroy(cTablePoints);
|
||||
if (code != 0) {
|
||||
break;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
|
||||
|
||||
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
|
||||
while (pCTablePoints) {
|
||||
SArray* cTablePoints = *pCTablePoints;
|
||||
|
||||
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
|
||||
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
|
||||
code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints);
|
||||
if (code != 0) {
|
||||
tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
|
||||
goto cleanup;
|
||||
}
|
||||
code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints);
|
||||
if (code != 0) {
|
||||
tscError("Apply child table fields failed. child table %s, error %s", point->childTableName, tstrerror(code));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
pCTablePoints = taosHashIterate(cname2points, NULL);
|
||||
while (pCTablePoints) {
|
||||
SArray* pPoints = *pCTablePoints;
|
||||
taosArrayDestroy(pPoints);
|
||||
}
|
||||
taosHashCleanup(cname2points);
|
||||
return code;
|
||||
}
|
||||
|
@ -855,7 +984,7 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
|||
goto clean_up;
|
||||
}
|
||||
|
||||
code = insertPoints(taos, points, numPoint, stableSchemas);
|
||||
code = applyDataPoints(taos, points, numPoint, stableSchemas);
|
||||
if (code != 0) {
|
||||
tscError("error insert points : %s", tstrerror(code));
|
||||
}
|
||||
|
|
|
@ -954,7 +954,7 @@ int32_t verify_schema_less(TAOS* taos) {
|
|||
result = taos_query(taos, "drop database if exists test;");
|
||||
taos_free_result(result);
|
||||
usleep(100000);
|
||||
result = taos_query(taos, "create database test precision 'us';");
|
||||
result = taos_query(taos, "create database test precision 'us' update 1;");
|
||||
taos_free_result(result);
|
||||
usleep(100000);
|
||||
|
||||
|
@ -963,23 +963,11 @@ int32_t verify_schema_less(TAOS* taos) {
|
|||
taos_free_result(result);
|
||||
usleep(100000);
|
||||
|
||||
char* lines[] = {
|
||||
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
|
||||
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
|
||||
"ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns",
|
||||
"st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns",
|
||||
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns",
|
||||
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns",
|
||||
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
|
||||
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
|
||||
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
|
||||
};
|
||||
|
||||
int code = 0;
|
||||
code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
|
||||
char* lines2[] = {
|
||||
"stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
|
||||
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
|
||||
"zqlbgs,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000ns",
|
||||
"zqlbgs,t9=f,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t11=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\",t10=L\"ncharTagValue\" c10=f,c0=f,c1=127i8,c12=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64,c11=L\"ncharColValue\" 1626006833639000000ns"
|
||||
};
|
||||
code = taos_insert_lines(taos, &lines2[0], 1);
|
||||
code = taos_insert_lines(taos, &lines2[1], 1);
|
||||
|
|
Loading…
Reference in New Issue