fix bug that save index instead of pointer when taosarray reallocation cause point invalidate
This commit is contained in:
parent
76fc259bd0
commit
aadbf21ddd
|
@ -34,7 +34,7 @@ typedef struct {
|
||||||
char* value;
|
char* value;
|
||||||
|
|
||||||
//===================================
|
//===================================
|
||||||
SSchema* schema;
|
size_t fieldSchemaIdx;
|
||||||
} TAOS_SML_KV;
|
} TAOS_SML_KV;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -49,7 +49,7 @@ typedef struct {
|
||||||
int fieldNum;
|
int fieldNum;
|
||||||
|
|
||||||
//================================
|
//================================
|
||||||
SSmlSTableSchema* schema;
|
size_t schemaIdx;
|
||||||
} TAOS_SML_DATA_POINT;
|
} TAOS_SML_DATA_POINT;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -126,10 +126,12 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
|
||||||
|
|
||||||
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
|
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
|
||||||
SSchema* pField = NULL;
|
SSchema* pField = NULL;
|
||||||
SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
|
size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
|
||||||
|
size_t fieldIdx = -1;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (ppField) {
|
if (pFieldIdx) {
|
||||||
pField = *ppField;
|
fieldIdx = *pFieldIdx;
|
||||||
|
pField = taosArrayGet(array, fieldIdx);
|
||||||
|
|
||||||
if (pField->type != smlKv->type) {
|
if (pField->type != smlKv->type) {
|
||||||
tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type);
|
tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type);
|
||||||
|
@ -158,10 +160,11 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
|
||||||
field.bytes = bytes;
|
field.bytes = bytes;
|
||||||
|
|
||||||
pField = taosArrayPush(array, &field);
|
pField = taosArrayPush(array, &field);
|
||||||
taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES);
|
fieldIdx = taosArrayGetSize(array) - 1;
|
||||||
|
taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
|
||||||
}
|
}
|
||||||
|
|
||||||
smlKv->schema = pField;
|
smlKv->fieldSchemaIdx = fieldIdx;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -174,10 +177,12 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
|
||||||
for (int i = 0; i < numPoint; ++i) {
|
for (int i = 0; i < numPoint; ++i) {
|
||||||
TAOS_SML_DATA_POINT* point = &points[i];
|
TAOS_SML_DATA_POINT* point = &points[i];
|
||||||
size_t stableNameLen = strlen(point->stableName);
|
size_t stableNameLen = strlen(point->stableName);
|
||||||
SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen);
|
size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
|
||||||
SSmlSTableSchema* pStableSchema = NULL;
|
SSmlSTableSchema* pStableSchema = NULL;
|
||||||
if (ppStableSchema) {
|
size_t stableIdx = -1;
|
||||||
pStableSchema= *ppStableSchema;
|
if (pStableIdx) {
|
||||||
|
pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
|
||||||
|
stableIdx = *pStableIdx;
|
||||||
} else {
|
} else {
|
||||||
SSmlSTableSchema schema;
|
SSmlSTableSchema schema;
|
||||||
strncpy(schema.sTableName, point->stableName, stableNameLen);
|
strncpy(schema.sTableName, point->stableName, stableNameLen);
|
||||||
|
@ -188,7 +193,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
|
||||||
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||||
|
|
||||||
pStableSchema = taosArrayPush(stableSchemas, &schema);
|
pStableSchema = taosArrayPush(stableSchemas, &schema);
|
||||||
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES);
|
stableIdx = taosArrayGetSize(stableSchemas) - 1;
|
||||||
|
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int j = 0; j < point->tagNum; ++j) {
|
for (int j = 0; j < point->tagNum; ++j) {
|
||||||
|
@ -209,7 +215,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
point->schema = pStableSchema;
|
point->schemaIdx = stableIdx;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t numStables = taosArrayGetSize(stableSchemas);
|
size_t numStables = taosArrayGetSize(stableSchemas);
|
||||||
|
@ -601,8 +607,12 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_stmt_close(stmt);
|
code = taos_stmt_close(stmt);
|
||||||
return 0;
|
if (code != 0) {
|
||||||
|
tscError("%s", taos_stmt_errstr(stmt));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) {
|
static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) {
|
||||||
|
@ -674,7 +684,8 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, SHashObj* cname2points) {
|
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
|
||||||
|
SHashObj* cname2points, SArray* stableSchemas) {
|
||||||
for (int32_t i = 0; i < numPoints; ++i) {
|
for (int32_t i = 0; i < numPoints; ++i) {
|
||||||
TAOS_SML_DATA_POINT * point = points + i;
|
TAOS_SML_DATA_POINT * point = points + i;
|
||||||
if (!point->childTableName) {
|
if (!point->childTableName) {
|
||||||
|
@ -686,11 +697,13 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
|
||||||
point->childTableName[tableNameLen] = '\0';
|
point->childTableName[tableNameLen] = '\0';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
|
||||||
|
|
||||||
for (int j = 0; j < point->tagNum; ++j) {
|
for (int j = 0; j < point->tagNum; ++j) {
|
||||||
TAOS_SML_KV* kv = point->tags + j;
|
TAOS_SML_KV* kv = point->tags + j;
|
||||||
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
int64_t ts = *(int64_t*)(kv->value);
|
int64_t ts = *(int64_t*)(kv->value);
|
||||||
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision);
|
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
|
||||||
*(int64_t*)(kv->value) = ts;
|
*(int64_t*)(kv->value) = ts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -699,7 +712,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
|
||||||
TAOS_SML_KV* kv = point->fields + j;
|
TAOS_SML_KV* kv = point->fields + j;
|
||||||
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
int64_t ts = *(int64_t*)(kv->value);
|
int64_t ts = *(int64_t*)(kv->value);
|
||||||
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision);
|
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
|
||||||
*(int64_t*)(kv->value) = ts;
|
*(int64_t*)(kv->value) = ts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -718,10 +731,12 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) {
|
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
||||||
true, false);
|
true, false);
|
||||||
arrangePointsByChildTableName(points, numPoints, cname2points);
|
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
|
||||||
|
|
||||||
int isNullColBind = TSDB_TRUE;
|
int isNullColBind = TSDB_TRUE;
|
||||||
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
|
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
|
||||||
|
@ -729,8 +744,9 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
||||||
SArray* cTablePoints = *pCTablePoints;
|
SArray* cTablePoints = *pCTablePoints;
|
||||||
|
|
||||||
TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0);
|
TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0);
|
||||||
size_t numTags = taosArrayGetSize(point->schema->tags);
|
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
|
||||||
size_t numCols = taosArrayGetSize(point->schema->fields);
|
size_t numTags = taosArrayGetSize(sTableSchema->tags);
|
||||||
|
size_t numCols = taosArrayGetSize(sTableSchema->fields);
|
||||||
|
|
||||||
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
|
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
|
||||||
taosArraySetSize(tagBinds, numTags);
|
taosArraySetSize(tagBinds, numTags);
|
||||||
|
@ -740,8 +756,7 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
||||||
}
|
}
|
||||||
for (int j = 0; j < point->tagNum; ++j) {
|
for (int j = 0; j < point->tagNum; ++j) {
|
||||||
TAOS_SML_KV* kv = point->tags + j;
|
TAOS_SML_KV* kv = point->tags + j;
|
||||||
size_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema);
|
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
|
||||||
TAOS_BIND* bind = taosArrayGet(tagBinds, idx);
|
|
||||||
bind->buffer_type = kv->type;
|
bind->buffer_type = kv->type;
|
||||||
bind->length = malloc(sizeof(uintptr_t*));
|
bind->length = malloc(sizeof(uintptr_t*));
|
||||||
*bind->length = kv->length;
|
*bind->length = kv->length;
|
||||||
|
@ -762,8 +777,7 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
||||||
}
|
}
|
||||||
for (int j = 0; j < point->fieldNum; ++j) {
|
for (int j = 0; j < point->fieldNum; ++j) {
|
||||||
TAOS_SML_KV* kv = point->fields + j;
|
TAOS_SML_KV* kv = point->fields + j;
|
||||||
size_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema);
|
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
|
||||||
TAOS_BIND* bind = colBinds + idx;
|
|
||||||
bind->buffer_type = kv->type;
|
bind->buffer_type = kv->type;
|
||||||
bind->length = malloc(sizeof(uintptr_t*));
|
bind->length = malloc(sizeof(uintptr_t*));
|
||||||
*bind->length = kv->length;
|
*bind->length = kv->length;
|
||||||
|
@ -773,14 +787,21 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
||||||
taosArrayPush(rowsBind, &colBinds);
|
taosArrayPush(rowsBind, &colBinds);
|
||||||
}
|
}
|
||||||
|
|
||||||
creatChildTableIfNotExists(taos, point->childTableName, point->stableName, point->schema->tags, tagBinds);
|
code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds);
|
||||||
|
if (code == 0) {
|
||||||
|
code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind);
|
||||||
|
if (code != 0) {
|
||||||
|
tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
|
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
|
||||||
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
|
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
|
||||||
free(bind->length);
|
free(bind->length);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tagBinds);
|
taosArrayDestroy(tagBinds);
|
||||||
|
|
||||||
insertChildTableBatch(taos, point->childTableName, point->schema->fields, rowsBind);
|
|
||||||
for (int i = 0; i < rows; ++i) {
|
for (int i = 0; i < rows; ++i) {
|
||||||
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
|
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
|
||||||
for (int j = 0; j < numCols; ++j) {
|
for (int j = 0; j < numCols; ++j) {
|
||||||
|
@ -791,12 +812,14 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
|
||||||
}
|
}
|
||||||
taosArrayDestroy(rowsBind);
|
taosArrayDestroy(rowsBind);
|
||||||
taosArrayDestroy(cTablePoints);
|
taosArrayDestroy(cTablePoints);
|
||||||
|
if (code != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
|
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(cname2points);
|
taosHashCleanup(cname2points);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
||||||
|
@ -817,7 +840,7 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
||||||
goto clean_up;
|
goto clean_up;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = insertPoints(taos, points, numPoint);
|
code = insertPoints(taos, points, numPoint, stableSchemas);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscError("error insert points : %s", tstrerror(code));
|
tscError("error insert points : %s", tstrerror(code));
|
||||||
}
|
}
|
||||||
|
@ -1619,13 +1642,17 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
|
||||||
if (isField) {
|
if (isField) {
|
||||||
if ((*num_kvs + 2) > capacity) {
|
if ((*num_kvs + 2) > capacity) {
|
||||||
capacity *= 3; capacity /= 2;
|
capacity *= 3; capacity /= 2;
|
||||||
}
|
|
||||||
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
|
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
|
||||||
|
} else {
|
||||||
|
more_kvs = *pKVs;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if ((*num_kvs + 1) > capacity) {
|
if ((*num_kvs + 1) > capacity) {
|
||||||
capacity *= 3; capacity /= 2;
|
capacity *= 3; capacity /= 2;
|
||||||
}
|
|
||||||
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
|
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
|
||||||
|
} else {
|
||||||
|
more_kvs = *pKVs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!more_kvs) {
|
if (!more_kvs) {
|
||||||
|
@ -1642,7 +1669,6 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
|
||||||
goto done;
|
goto done;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
free(*pKVs);
|
|
||||||
return ret;
|
return ret;
|
||||||
done:
|
done:
|
||||||
*index = cur;
|
*index = cur;
|
||||||
|
@ -1738,7 +1764,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) {
|
||||||
moveTimeStampToFirstKv(&smlData, timestamp);
|
moveTimeStampToFirstKv(&smlData, timestamp);
|
||||||
tscDebug("Parse timestamp finished");
|
tscDebug("Parse timestamp finished");
|
||||||
|
|
||||||
return true;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
//=========================================================================
|
//=========================================================================
|
||||||
|
@ -1746,8 +1772,8 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) {
|
||||||
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
|
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
|
||||||
for (int32_t i = 0; i < numLines; ++i) {
|
for (int32_t i = 0; i < numLines; ++i) {
|
||||||
TAOS_SML_DATA_POINT point = {0};
|
TAOS_SML_DATA_POINT point = {0};
|
||||||
bool succ = tscParseLine(lines[i], &point);
|
int32_t code = tscParseLine(lines[i], &point);
|
||||||
if (!succ) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tscError("data point line parse failed. line %d", i);
|
tscError("data point line parse failed. line %d", i);
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue