memory handling
This commit is contained in:
parent
4404919d3b
commit
136189e3c8
|
@ -64,88 +64,6 @@ int compareSmlColKv(const void* p1, const void* p2) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
|
||||
schema->tags = taosArrayInit(8, sizeof(SSchema));
|
||||
schema->fields = taosArrayInit(64, sizeof(SSchema));
|
||||
schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return TSDB_CODE_TSC_DISCONNECTED;
|
||||
}
|
||||
|
||||
char sql[256];
|
||||
snprintf(sql, 256, "describe %s", tableName);
|
||||
TAOS_RES* res = taos_query(taos, sql);
|
||||
code = taos_errno(res);
|
||||
if (code != 0) {
|
||||
taos_free_result(res);
|
||||
return code;
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
||||
pSql->pTscObj = taos;
|
||||
pSql->signature = pSql;
|
||||
pSql->fp = NULL;
|
||||
|
||||
SStrToken tableToken = {.z=tableName, .n=strlen(tableName), .type=TK_ID};
|
||||
tGetToken(tableName, &tableToken.type);
|
||||
// Check if the table name available or not
|
||||
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
sprintf(pSql->cmd.payload, "table name is invalid");
|
||||
return code;
|
||||
}
|
||||
|
||||
SName sname = {0};
|
||||
if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
memset(fullTableName, 0, tListLen(fullTableName));
|
||||
tNameExtractFullName(&sname, fullTableName);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscFreeSqlObj(pSql);
|
||||
return code;
|
||||
}
|
||||
|
||||
tscFreeSqlObj(pSql);
|
||||
|
||||
uint32_t size = tscGetTableMetaMaxSize();
|
||||
STableMeta* tableMeta = calloc(1, size);
|
||||
taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1);
|
||||
|
||||
tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
|
||||
schema->precision = tableMeta->tableInfo.precision;
|
||||
for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
|
||||
SSchema field;
|
||||
tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
|
||||
field.type = tableMeta->schema[i].type;
|
||||
field.bytes = tableMeta->schema[i].bytes;
|
||||
SSchema* pField = taosArrayPush(schema->fields, &field);
|
||||
taosHashPut(schema->fieldHash, field.name, strlen(field.name), &pField, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
|
||||
int j = i + tableMeta->tableInfo.numOfColumns;
|
||||
SSchema field;
|
||||
tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
|
||||
field.type = tableMeta->schema[j].type;
|
||||
field.bytes = tableMeta->schema[j].bytes;
|
||||
SSchema* pField = taosArrayPush(schema->tags, &field);
|
||||
taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES);
|
||||
}
|
||||
free(tableMeta); tableMeta = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef enum {
|
||||
SCHEMA_ACTION_CREATE_STABLE,
|
||||
SCHEMA_ACTION_ADD_COLUMN,
|
||||
|
@ -173,7 +91,7 @@ typedef struct {
|
|||
};
|
||||
} SSchemaAction;
|
||||
|
||||
int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
|
||||
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
|
||||
if (!IS_VAR_DATA_TYPE(kv->type)) {
|
||||
*bytes = tDataTypes[kv->type].bytes;
|
||||
} else {
|
||||
|
@ -191,7 +109,7 @@ int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
|
||||
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
|
||||
SSchema* pField = NULL;
|
||||
SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
|
||||
if (ppField) {
|
||||
|
@ -227,7 +145,55 @@ int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* a
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool isTag, char sTableName[],
|
||||
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas) {
|
||||
SHashObj* sname2shema = taosHashInit(32,
|
||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
|
||||
for (int i = 0; i < numPoint; ++i) {
|
||||
TAOS_SML_DATA_POINT* point = &points[i];
|
||||
size_t stableNameLen = strlen(point->stableName);
|
||||
SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen);
|
||||
SSmlSTableSchema* pStableSchema = NULL;
|
||||
if (ppStableSchema) {
|
||||
pStableSchema= *ppStableSchema;
|
||||
} else {
|
||||
SSmlSTableSchema schema;
|
||||
strncpy(schema.sTableName, point->stableName, stableNameLen);
|
||||
schema.sTableName[stableNameLen] = '\0';
|
||||
schema.fields = taosArrayInit(64, sizeof(SSchema));
|
||||
schema.tags = taosArrayInit(8, sizeof(SSchema));
|
||||
schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
|
||||
pStableSchema = taosArrayPush(stableSchemas, &schema);
|
||||
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int j = 0; j < point->tagNum; ++j) {
|
||||
TAOS_SML_KV* tagKv = point->tags + j;
|
||||
buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags);
|
||||
}
|
||||
|
||||
for (int j = 0; j < point->fieldNum; ++j) {
|
||||
TAOS_SML_KV* fieldKv = point->fields + j;
|
||||
buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields);
|
||||
}
|
||||
|
||||
point->schema = pStableSchema;
|
||||
}
|
||||
|
||||
size_t numStables = taosArrayGetSize(stableSchemas);
|
||||
for (int32_t i = 0; i < numStables; ++i) {
|
||||
SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
|
||||
taosHashCleanup(schema->tagHash);
|
||||
taosHashCleanup(schema->fieldHash);
|
||||
}
|
||||
taosHashCleanup(sname2shema);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool isTag, char sTableName[],
|
||||
SSchemaAction* action, bool* actionNeeded) {
|
||||
SSchema** ppDbAttr = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name));
|
||||
if (ppDbAttr) {
|
||||
|
@ -262,7 +228,7 @@ int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t buildColumnDescription(SSchema* field,
|
||||
static int32_t buildColumnDescription(SSchema* field,
|
||||
char* buf, int32_t bufSize, int32_t* outBytes) {
|
||||
uint8_t type = field->type;
|
||||
|
||||
|
@ -283,7 +249,8 @@ int32_t buildColumnDescription(SSchema* field,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
|
||||
|
||||
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
|
||||
int32_t code = 0;
|
||||
int32_t capacity = TSDB_MAX_BINARY_LEN;
|
||||
int32_t outBytes = 0;
|
||||
|
@ -357,7 +324,160 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
|
||||
static int32_t destorySmlSTableSchema(SSmlSTableSchema* schema) {
|
||||
taosHashCleanup(schema->tagHash);
|
||||
taosHashCleanup(schema->fieldHash);
|
||||
taosArrayDestroy(schema->tags);
|
||||
taosArrayDestroy(schema->fields);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
|
||||
int32_t code = 0;
|
||||
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return TSDB_CODE_TSC_DISCONNECTED;
|
||||
}
|
||||
|
||||
char sql[256];
|
||||
snprintf(sql, 256, "describe %s", tableName);
|
||||
TAOS_RES* res = taos_query(taos, sql);
|
||||
code = taos_errno(res);
|
||||
if (code != 0) {
|
||||
taos_free_result(res);
|
||||
return code;
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
||||
pSql->pTscObj = taos;
|
||||
pSql->signature = pSql;
|
||||
pSql->fp = NULL;
|
||||
|
||||
SStrToken tableToken = {.z=tableName, .n=strlen(tableName), .type=TK_ID};
|
||||
tGetToken(tableName, &tableToken.type);
|
||||
// Check if the table name available or not
|
||||
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
sprintf(pSql->cmd.payload, "table name is invalid");
|
||||
return code;
|
||||
}
|
||||
|
||||
SName sname = {0};
|
||||
if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
memset(fullTableName, 0, tListLen(fullTableName));
|
||||
tNameExtractFullName(&sname, fullTableName);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscFreeSqlObj(pSql);
|
||||
return code;
|
||||
}
|
||||
|
||||
tscFreeSqlObj(pSql);
|
||||
|
||||
schema->tags = taosArrayInit(8, sizeof(SSchema));
|
||||
schema->fields = taosArrayInit(64, sizeof(SSchema));
|
||||
schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
|
||||
uint32_t size = tscGetTableMetaMaxSize();
|
||||
STableMeta* tableMeta = calloc(1, size);
|
||||
taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1);
|
||||
|
||||
tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
|
||||
schema->precision = tableMeta->tableInfo.precision;
|
||||
for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
|
||||
SSchema field;
|
||||
tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
|
||||
field.type = tableMeta->schema[i].type;
|
||||
field.bytes = tableMeta->schema[i].bytes;
|
||||
SSchema* pField = taosArrayPush(schema->fields, &field);
|
||||
taosHashPut(schema->fieldHash, field.name, strlen(field.name), &pField, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
|
||||
int j = i + tableMeta->tableInfo.numOfColumns;
|
||||
SSchema field;
|
||||
tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
|
||||
field.type = tableMeta->schema[j].type;
|
||||
field.bytes = tableMeta->schema[j].bytes;
|
||||
SSchema* pField = taosArrayPush(schema->tags, &field);
|
||||
taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES);
|
||||
}
|
||||
free(tableMeta); tableMeta = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) {
|
||||
int32_t code = 0;
|
||||
size_t numStable = taosArrayGetSize(stableSchemas);
|
||||
for (int i = 0; i < numStable; ++i) {
|
||||
SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
|
||||
SSmlSTableSchema dbSchema = {0};
|
||||
|
||||
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
|
||||
|
||||
if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
|
||||
SSchemaAction schemaAction = {0};
|
||||
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
|
||||
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
|
||||
memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
|
||||
schemaAction.createSTable.tags = pointSchema->tags;
|
||||
schemaAction.createSTable.fields = pointSchema->fields;
|
||||
applySchemaAction(taos, &schemaAction);
|
||||
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
|
||||
|
||||
pointSchema->precision = dbSchema.precision;
|
||||
|
||||
destorySmlSTableSchema(&dbSchema);
|
||||
} else if (code == TSDB_CODE_SUCCESS) {
|
||||
size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
|
||||
size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);
|
||||
|
||||
SHashObj* dbTagHash = dbSchema.tagHash;
|
||||
SHashObj* dbFieldHash = dbSchema.fieldHash;
|
||||
|
||||
for (int j = 0; j < pointTagSize; ++j) {
|
||||
SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
|
||||
SSchemaAction schemaAction = {0};
|
||||
bool actionNeeded = false;
|
||||
generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
|
||||
if (actionNeeded) {
|
||||
applySchemaAction(taos, &schemaAction);
|
||||
}
|
||||
}
|
||||
|
||||
SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
|
||||
SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
|
||||
memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);
|
||||
|
||||
for (int j = 1; j < pointFieldSize; ++j) {
|
||||
SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
|
||||
SSchemaAction schemaAction = {0};
|
||||
bool actionNeeded = false;
|
||||
generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded);
|
||||
if (actionNeeded) {
|
||||
applySchemaAction(taos, &schemaAction);
|
||||
}
|
||||
}
|
||||
|
||||
pointSchema->precision = dbSchema.precision;
|
||||
|
||||
destorySmlSTableSchema(&dbSchema);
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
|
||||
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
|
||||
|
||||
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
|
||||
|
@ -384,7 +504,7 @@ int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tabl
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
|
||||
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
|
||||
size_t numTags = taosArrayGetSize(tagsSchema);
|
||||
char sql[TSDB_MAX_BINARY_LEN] = {0};
|
||||
int freeBytes = TSDB_MAX_BINARY_LEN;
|
||||
|
@ -428,10 +548,10 @@ int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const cha
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t insertBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) {
|
||||
static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) {
|
||||
size_t numCols = taosArrayGetSize(colsSchema);
|
||||
char sql[4096];
|
||||
int32_t freeBytes = 4096;
|
||||
char sql[TSDB_MAX_BINARY_LEN];
|
||||
int32_t freeBytes = TSDB_MAX_BINARY_LEN;
|
||||
sprintf(sql, "insert into ? (");
|
||||
|
||||
for (int i = 0; i < numCols; ++i) {
|
||||
|
@ -458,6 +578,7 @@ int32_t insertBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* r
|
|||
printf("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t rows = taosArrayGetSize(rowsBind);
|
||||
for (int32_t i = 0; i < rows; ++i) {
|
||||
TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i);
|
||||
|
@ -479,13 +600,11 @@ int32_t insertBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* r
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
taos_stmt_close(stmt);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) {
|
||||
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, SHashObj* cname2points) {
|
||||
for (int32_t i = 0; i < numPoints; ++i) {
|
||||
TAOS_SML_DATA_POINT * point = points + i;
|
||||
if (!point->childTableName) {
|
||||
|
@ -526,6 +645,14 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints)
|
|||
taosArrayPush(cTablePoints, &point);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) {
|
||||
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
||||
true, false);
|
||||
arrangePointsByChildTableName(points, numPoints, cname2points);
|
||||
|
||||
int isNullColBind = TSDB_TRUE;
|
||||
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
|
||||
while (pCTablePoints) {
|
||||
|
@ -577,111 +704,44 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints)
|
|||
}
|
||||
|
||||
creatChildTableIfNotExists(taos, point->childTableName, point->stableName, point->schema->tags, tagBinds);
|
||||
insertBatch(taos, point->childTableName, point->schema->fields, rowsBind);
|
||||
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
|
||||
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
|
||||
free(bind->length);
|
||||
}
|
||||
taosArrayDestroy(tagBinds);
|
||||
|
||||
insertChildTableBatch(taos, point->childTableName, point->schema->fields, rowsBind);
|
||||
for (int i = 0; i < rows; ++i) {
|
||||
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
|
||||
for (int j = 0; j < numCols; ++j) {
|
||||
TAOS_BIND* bind = colBinds + j;
|
||||
free(bind->length);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(rowsBind);
|
||||
taosArrayDestroy(cTablePoints);
|
||||
|
||||
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
|
||||
}
|
||||
|
||||
taosHashCleanup(cname2points);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
|
||||
SHashObj* sname2shema = taosHashInit(32,
|
||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
|
||||
for (int i = 0; i < numPoint; ++i) {
|
||||
TAOS_SML_DATA_POINT* point = &points[i];
|
||||
size_t stableNameLen = strlen(point->stableName);
|
||||
SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen);
|
||||
SSmlSTableSchema* pStableSchema = NULL;
|
||||
if (ppStableSchema) {
|
||||
pStableSchema= *ppStableSchema;
|
||||
} else {
|
||||
SSmlSTableSchema schema;
|
||||
strncpy(schema.sTableName, point->stableName, stableNameLen);
|
||||
schema.sTableName[stableNameLen] = '\0';
|
||||
schema.fields = taosArrayInit(64, sizeof(SSchema));
|
||||
schema.tags = taosArrayInit(8, sizeof(SSchema));
|
||||
schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
|
||||
pStableSchema = taosArrayPush(stableArray, &schema);
|
||||
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int j = 0; j < point->tagNum; ++j) {
|
||||
TAOS_SML_KV* tagKv = point->tags + j;
|
||||
addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags);
|
||||
}
|
||||
|
||||
for (int j = 0; j < point->fieldNum; ++j) {
|
||||
TAOS_SML_KV* fieldKv = point->fields + j;
|
||||
addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields);
|
||||
}
|
||||
|
||||
point->schema = pStableSchema;
|
||||
}
|
||||
|
||||
size_t numStable = taosArrayGetSize(stableArray);
|
||||
for (int i = 0; i < numStable; ++i) {
|
||||
SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i);
|
||||
SSmlSTableSchema dbSchema = {0};
|
||||
|
||||
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
|
||||
|
||||
if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
|
||||
SSchemaAction schemaAction = {0};
|
||||
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
|
||||
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
|
||||
memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
|
||||
schemaAction.createSTable.tags = pointSchema->tags;
|
||||
schemaAction.createSTable.fields = pointSchema->fields;
|
||||
applySchemaAction(taos, &schemaAction);
|
||||
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
|
||||
pointSchema->precision = dbSchema.precision;
|
||||
} else if (code == TSDB_CODE_SUCCESS) {
|
||||
size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
|
||||
size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);
|
||||
|
||||
SHashObj* dbTagHash = dbSchema.tagHash;
|
||||
SHashObj* dbFieldHash = dbSchema.fieldHash;
|
||||
|
||||
for (int j = 0; j < pointTagSize; ++j) {
|
||||
SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
|
||||
SSchemaAction schemaAction = {0};
|
||||
bool actionNeeded = false;
|
||||
generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
|
||||
if (actionNeeded) {
|
||||
applySchemaAction(taos, &schemaAction);
|
||||
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
|
||||
pointSchema->precision = dbSchema.precision;
|
||||
}
|
||||
}
|
||||
|
||||
SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
|
||||
SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
|
||||
memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);
|
||||
|
||||
for (int j = 1; j < pointFieldSize; ++j) {
|
||||
SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
|
||||
SSchemaAction schemaAction = {0};
|
||||
bool actionNeeded = false;
|
||||
generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded);
|
||||
if (actionNeeded) {
|
||||
applySchemaAction(taos, &schemaAction);
|
||||
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
|
||||
pointSchema->precision = dbSchema.precision;
|
||||
}
|
||||
}
|
||||
|
||||
pointSchema->precision = dbSchema.precision;
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
|
||||
buildDataPointSchemas(points, numPoint, stableSchemas);
|
||||
reconcileDBSchemas(taos, stableSchemas);
|
||||
insertPoints(taos, points, numPoint);
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
|
||||
SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
|
||||
taosArrayDestroy(schema->fields);
|
||||
taosArrayDestroy(schema->tags);
|
||||
}
|
||||
taosArrayDestroy(stableSchemas);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#include <string.h>
|
||||
#include <taos.h>
|
||||
#include <unistd.h>
|
||||
#include <tconfig.h>
|
||||
|
||||
static void prepare_data(TAOS* taos) {
|
||||
TAOS_RES *result;
|
||||
|
@ -965,23 +966,13 @@ int32_t verify_schema_less(TAOS* taos) {
|
|||
|
||||
char* lines[] = {
|
||||
"st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000",
|
||||
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833640000000",
|
||||
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5 1626006833640000000",
|
||||
"st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000",
|
||||
"ste,t2=5,t3=L\"ste\" c1=true,c2=4,c3=\"iam\" 1626056811823316532",
|
||||
"ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532"
|
||||
};
|
||||
|
||||
// int code = taos_insert_by_lines(taos, lines , 5);
|
||||
int code = taos_insert_by_lines(taos, &(lines[0]), 1);
|
||||
|
||||
code = taos_insert_by_lines(taos, &(lines[1]), 1);
|
||||
|
||||
// code = taos_insert_by_lines(taos, &(lines[2]), 1);
|
||||
//
|
||||
// code = taos_insert_by_lines(taos, &(lines[3]), 1);
|
||||
//
|
||||
// code = taos_insert_by_lines(taos, &(lines[4]), 1);
|
||||
|
||||
int code = taos_insert_by_lines(taos, lines , 5);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -992,7 +983,7 @@ int main(int argc, char *argv[]) {
|
|||
const char* passwd = "taosdata";
|
||||
|
||||
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
|
||||
taos_options(TSDB_OPTION_CONFIGDIR, "/etc/taos");
|
||||
taosDumpGlobalCfg();
|
||||
TAOS* taos = taos_connect(host, user, passwd, "", 0);
|
||||
if (taos == NULL) {
|
||||
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 2000
|
||||
sql connect
|
||||
|
||||
print =============== step1
|
||||
$db = testlp
|
||||
$mte = ste
|
||||
$mt = st
|
||||
sql drop database $db -x step1
|
||||
step1:
|
||||
sql create database $db precision 'us'
|
||||
sql use $db
|
||||
sql create stable $mte (ts timestamp, f int) TAGS(t1 bigint)
|
||||
|
||||
line_insert st,t1=3i,t2=4,t3="t3" c1=3i,c3=L"passit",c2=false,c4=4 1626006833639000000
|
||||
line_insert st,t1=4i,t3="t4",t2=5 c1=3i,c3=L"passitagain",c2=true,c4=5,c5=5 1626006833640000000
|
||||
line_insert st,t1=4i,t2=5,t3="t4" c1=3i,c3=L"passitagain",c2=true,c4=5 1626006833642000000
|
||||
line_insert ste,t2=5,t3=L"ste" c1=true,c2=4,c3="iam" 1626056811823316532
|
||||
line_insert ste,t2=5,t3=L"ste2" c3="iamszhou",c4=false 1626056811843316532
|
||||
|
||||
#print =============== clear
|
||||
#sql drop database $db
|
||||
#sql show databases
|
||||
#if $rows != 0 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue