before getChildTableName and insertBatch
This commit is contained in:
parent
35cba8c21f
commit
dbac6f2081
|
@ -15,263 +15,6 @@
|
|||
#include "tskiplist.h"
|
||||
#include "tscUtil.h"
|
||||
|
||||
typedef enum {
|
||||
LP_ITEM_TAG,
|
||||
LP_ITEM_FIELD
|
||||
} LPItemKind;
|
||||
|
||||
typedef struct {
|
||||
SStrToken key;
|
||||
SStrToken value;
|
||||
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
int8_t type;
|
||||
int16_t bytes;
|
||||
|
||||
char* payload;
|
||||
}SLPItem;
|
||||
|
||||
typedef struct {
|
||||
SStrToken measToken;
|
||||
SStrToken tsToken;
|
||||
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
SArray* tags;
|
||||
SArray* fields;
|
||||
int64_t ts;
|
||||
|
||||
} SLPPoint;
|
||||
|
||||
typedef enum {
|
||||
LP_MEASUREMENT,
|
||||
LP_TAG_KEY,
|
||||
LP_TAG_VALUE,
|
||||
LP_FIELD_KEY,
|
||||
LP_FIELD_VALUE
|
||||
} LPPart;
|
||||
|
||||
int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) {
|
||||
for (int32_t i = start; i < s.n; ++i) {
|
||||
if (s.z[i] == ',' || s.z[i] == ' ') {
|
||||
*index = i;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) {
|
||||
for (int32_t i = start; i < s.n; ++i) {
|
||||
if (s.z[i] == '=') {
|
||||
*index = i;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t setPointMeasurement(SLPPoint* point, SStrToken token) {
|
||||
point->measToken = token;
|
||||
if (point->measToken.n < TSDB_TABLE_NAME_LEN) {
|
||||
strncpy(point->sTableName, point->measToken.z, point->measToken.n);
|
||||
point->sTableName[point->measToken.n] = '\0';
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) {
|
||||
item->key = key;
|
||||
if (item->key.n < TSDB_COL_NAME_LEN) {
|
||||
strncpy(item->name, item->key.z, item->key.n);
|
||||
item->name[item->key.n] = '\0';
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) {
|
||||
item->value = value;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t parseItemValue(SLPItem* item, LPItemKind kind) {
|
||||
char* sv = item->value.z;
|
||||
char* last = item->value.z + item->value.n - 1;
|
||||
|
||||
if (isdigit(sv[0]) || sv[0] == '-') {
|
||||
if (*last == 'i') {
|
||||
item->type = TSDB_DATA_TYPE_BIGINT;
|
||||
item->bytes = (int16_t)tDataTypes[item->type].bytes;
|
||||
item->payload = malloc(item->bytes);
|
||||
char* endptr = NULL;
|
||||
*(item->payload) = strtoll(sv, &endptr, 10);
|
||||
} else {
|
||||
item->type = TSDB_DATA_TYPE_DOUBLE;
|
||||
item->bytes = (int16_t)tDataTypes[item->type].bytes;
|
||||
item->payload = malloc(item->bytes);
|
||||
char* endptr = NULL;
|
||||
*(item->payload) = strtold(sv, &endptr);
|
||||
}
|
||||
} else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) {
|
||||
if (sv[0] == 'L') {
|
||||
item->type = TSDB_DATA_TYPE_NCHAR;
|
||||
uint32_t bytes = item->value.n - 3;
|
||||
// uint32_t len = bytes;
|
||||
// char* ucs = malloc(len);
|
||||
// int32_t ncharBytes = 0;
|
||||
// taosMbsToUcs4(sv+2, len, ucs, len, &ncharBytes);
|
||||
// item->bytes = ncharBytes;
|
||||
// item->payload = malloc(ncharBytes);
|
||||
// memcpy(item->payload, ucs, ncharBytes);
|
||||
// free(ucs);
|
||||
item->bytes = bytes;
|
||||
item->payload = malloc(bytes);
|
||||
memcpy(item->payload, sv+1, bytes);
|
||||
} else if (sv[0]=='"'){
|
||||
item->type = TSDB_DATA_TYPE_BINARY;
|
||||
uint32_t bytes = item->value.n - 2;
|
||||
item->bytes = bytes;
|
||||
item->payload = malloc(bytes);
|
||||
memcpy(item->payload, sv+1, bytes);
|
||||
}
|
||||
} else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') {
|
||||
item->type = TSDB_DATA_TYPE_BOOL;
|
||||
item->bytes = tDataTypes[item->type].bytes;
|
||||
item->payload = malloc(tDataTypes[item->type].bytes);
|
||||
*(item->payload) = tolower(sv[0])=='t' ? true : false;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t compareLPItemKey(const void* p1, const void* p2) {
|
||||
const SLPItem* t1 = p1;
|
||||
const SLPItem* t2 = p2;
|
||||
uint32_t min = (t1->key.n < t2->key.n) ? t1->key.n : t2->key.n;
|
||||
int res = strncmp(t1->key.z, t2->key.z, min);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
} else {
|
||||
return (int)(t1->key.n) - (int)(t2->key.n);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) {
|
||||
point->tsToken = tsToken;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t parsePointTime(SLPPoint* point) {
|
||||
if (point->tsToken.n <= 0) {
|
||||
point->ts = taosGetTimestampNs();
|
||||
} else {
|
||||
char* endptr = NULL;
|
||||
point->ts = strtoll(point->tsToken.z, &endptr, 10);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tscParseLine(SStrToken line, SLPPoint* point) {
|
||||
int32_t pos = 0;
|
||||
|
||||
int32_t start = 0;
|
||||
int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT);
|
||||
if (err != 0) {
|
||||
tscError("a");
|
||||
return err;
|
||||
}
|
||||
|
||||
SStrToken measurement = {.z = line.z+start, .n = pos-start};
|
||||
setPointMeasurement(point, measurement);
|
||||
point->tags = taosArrayInit(64, sizeof(SLPItem));
|
||||
start = pos + 1;
|
||||
while (line.z[start] == ',') {
|
||||
SLPItem item;
|
||||
|
||||
err = scanToEqual(line, start, &pos);
|
||||
if (err != 0) {
|
||||
tscError("b");
|
||||
goto error;
|
||||
}
|
||||
|
||||
SStrToken tagKey = {.z = line.z + start, .n = pos-start};
|
||||
setItemKey(&item, tagKey, LP_TAG_KEY);
|
||||
|
||||
start = pos + 1;
|
||||
err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE);
|
||||
if (err != 0) {
|
||||
tscError("c");
|
||||
goto error;
|
||||
}
|
||||
|
||||
SStrToken tagValue = {.z = line.z + start, .n = pos-start};
|
||||
setItemValue(&item, tagValue, LP_TAG_VALUE);
|
||||
|
||||
parseItemValue(&item, LP_ITEM_TAG);
|
||||
taosArrayPush(point->tags, &item);
|
||||
|
||||
start = pos + 1;
|
||||
}
|
||||
|
||||
taosArraySort(point->tags, compareLPItemKey);
|
||||
|
||||
point->fields = taosArrayInit(64, sizeof(SLPItem));
|
||||
do {
|
||||
SLPItem item;
|
||||
err = scanToEqual(line, start, &pos);
|
||||
if (err != 0) {
|
||||
goto error;
|
||||
}
|
||||
SStrToken fieldKey = {.z = line.z + start, .n = pos- start};
|
||||
setItemKey(&item, fieldKey, LP_FIELD_KEY);
|
||||
|
||||
start = pos + 1;
|
||||
err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE);
|
||||
if (err != 0) {
|
||||
goto error;
|
||||
}
|
||||
SStrToken fieldValue = {.z = line.z + start, .n = pos - start};
|
||||
setItemValue(&item, fieldValue, LP_TAG_VALUE);
|
||||
|
||||
parseItemValue(&item, LP_ITEM_FIELD);
|
||||
taosArrayPush(point->fields, &item);
|
||||
|
||||
start = pos + 1;
|
||||
} while (line.z[pos] == ',');
|
||||
|
||||
taosArraySort(point->fields, compareLPItemKey);
|
||||
|
||||
SStrToken tsToken = {.z = line.z+start, .n = line.n-start};
|
||||
setPointTimeStamp(point, tsToken);
|
||||
parsePointTime(point);
|
||||
|
||||
goto done;
|
||||
|
||||
error:
|
||||
// free array
|
||||
return err;
|
||||
done:
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
|
||||
for (int32_t i = 0; i < numLines; ++i) {
|
||||
SStrToken tkLine = {.z = lines[i], .n = strlen(lines[i])+1};
|
||||
SLPPoint point;
|
||||
tscParseLine(tkLine, &point);
|
||||
taosArrayPush(points, &point);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
TAOS_RES* taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) {
|
||||
SArray* points = taosArrayInit(numLines, sizeof(SLPPoint));
|
||||
tscParseLines(lines, numLines, points, NULL);
|
||||
|
||||
|
||||
return NULL;
|
||||
}
|
||||
//=================================================================================================
|
||||
|
||||
typedef struct {
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
SHashObj* tagHash;
|
||||
|
@ -305,6 +48,8 @@ typedef struct {
|
|||
SSmlSTableSchema* schema;
|
||||
} TAOS_SML_DATA_POINT;
|
||||
|
||||
//=================================================================================================
|
||||
|
||||
int compareSmlColKv(const void* p1, const void* p2) {
|
||||
TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
|
||||
TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
|
||||
|
@ -318,32 +63,6 @@ int compareSmlColKv(const void* p1, const void* p2) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
|
||||
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
|
||||
|
||||
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
|
||||
taosStringBuilderAppendString(&sb, point->stableName);
|
||||
for (int j = 0; j < point->tagNum; ++j) {
|
||||
TAOS_SML_KV* tagKv = point->tags + j;
|
||||
taosStringBuilderAppendChar(&sb, ',');
|
||||
taosStringBuilderAppendString(&sb, tagKv->key);
|
||||
taosStringBuilderAppendChar(&sb, '=');
|
||||
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
|
||||
}
|
||||
size_t len = 0;
|
||||
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
|
||||
MD5_CTX context;
|
||||
MD5Init(&context);
|
||||
MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
|
||||
MD5Final(&context);
|
||||
*tableNameLen = snprintf(tableName, *tableNameLen,
|
||||
"%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
|
||||
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
|
||||
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
|
||||
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -393,15 +112,14 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
|
|||
|
||||
tscFreeSqlObj(pSql);
|
||||
|
||||
|
||||
uint32_t size = tscGetTableMetaMaxSize();
|
||||
STableMeta* tableMeta = calloc(1, size);
|
||||
taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1);
|
||||
|
||||
tstrncpy(schema->sTableName, tableName, strlen(tableName));
|
||||
tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
|
||||
for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
|
||||
SSchema field;
|
||||
tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name));
|
||||
tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
|
||||
field.type = tableMeta->schema[i].type;
|
||||
field.bytes = tableMeta->schema[i].bytes;
|
||||
SSchema* pField = taosArrayPush(schema->fields, &field);
|
||||
|
@ -411,13 +129,13 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
|
|||
for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
|
||||
int j = i + tableMeta->tableInfo.numOfColumns;
|
||||
SSchema field;
|
||||
tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name));
|
||||
tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
|
||||
field.type = tableMeta->schema[j].type;
|
||||
field.bytes = tableMeta->schema[j].bytes;
|
||||
SSchema* pField = taosArrayPush(schema->tags, &field);
|
||||
taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES);
|
||||
}
|
||||
|
||||
free(tableMeta); tableMeta = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -586,6 +304,7 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
|
|||
capacity-n, &outBytes);
|
||||
TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
break;
|
||||
}
|
||||
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
|
||||
int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
|
||||
|
@ -631,15 +350,46 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
|
||||
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
|
||||
|
||||
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
|
||||
taosStringBuilderAppendString(&sb, point->stableName);
|
||||
for (int j = 0; j < point->tagNum; ++j) {
|
||||
taosStringBuilderAppendChar(&sb, ',');
|
||||
TAOS_SML_KV* tagKv = point->tags + j;
|
||||
taosStringBuilderAppendString(&sb, tagKv->key);
|
||||
taosStringBuilderAppendChar(&sb, '=');
|
||||
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
|
||||
}
|
||||
size_t len = 0;
|
||||
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
|
||||
MD5_CTX context;
|
||||
MD5Init(&context);
|
||||
MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
|
||||
MD5Final(&context);
|
||||
*tableNameLen = snprintf(tableName, *tableNameLen,
|
||||
"tbl%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
|
||||
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
|
||||
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
|
||||
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
|
||||
taosStringBuilderDestroy(&sb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsSchema, char* result, int16_t freeBytes) {
|
||||
size_t numTags = taosArrayGetSize(tagsSchema);
|
||||
size_t numCols = taosArrayGetSize(colsSchema);
|
||||
sprintf(result, "insert into ? using %s(", sTableName);
|
||||
for (int i = 0; i < numTags; ++i) {
|
||||
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
|
||||
snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name);
|
||||
}
|
||||
snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") tags (");
|
||||
sprintf(result, "insert into ? using %s", sTableName);
|
||||
|
||||
// snprintf(result+strlen(result), freeBytes-strlen(result), "(");
|
||||
// for (int i = 0; i < numTags; ++i) {
|
||||
// SSchema* tagSchema = taosArrayGet(tagsSchema, i);
|
||||
// snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name);
|
||||
// }
|
||||
// snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")");
|
||||
|
||||
snprintf(result + strlen(result), freeBytes-strlen(result), " tags (");
|
||||
|
||||
for (int i = 0; i < numTags; ++i) {
|
||||
snprintf(result+strlen(result), freeBytes-strlen(result), "?,");
|
||||
|
@ -661,17 +411,38 @@ int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsS
|
|||
|
||||
int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* rowsBind) {
|
||||
TAOS_STMT* stmt = taos_stmt_init(taos);
|
||||
taos_stmt_prepare(stmt, sql, strlen(sql));
|
||||
|
||||
taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
|
||||
size_t rows = taosArrayGetSize(rowsBind);
|
||||
for (int32_t i = 0; i < rows; ++i) {
|
||||
TAOS_BIND* colBind = taosArrayGetP(rowsBind, i);
|
||||
taos_stmt_bind_param(stmt, colBind);
|
||||
taos_stmt_add_batch(stmt);
|
||||
int32_t code;
|
||||
code = taos_stmt_prepare(stmt, sql, strlen(sql));
|
||||
if (code != 0) {
|
||||
printf("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
|
||||
taos_stmt_execute(stmt);
|
||||
code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
|
||||
if (code != 0) {
|
||||
printf("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
size_t rows = taosArrayGetSize(rowsBind);
|
||||
for (int32_t i = 0; i < rows; ++i) {
|
||||
SArray* colBind = taosArrayGetP(rowsBind, i);
|
||||
code = taos_stmt_bind_param(stmt, TARRAY_GET_START(colBind));
|
||||
if (code != 0) {
|
||||
printf("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
code = taos_stmt_add_batch(stmt);
|
||||
if (code != 0) {
|
||||
printf("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
code = taos_stmt_execute(stmt);
|
||||
if (code != 0) {
|
||||
printf("%s", taos_stmt_errstr(stmt));
|
||||
return code;
|
||||
}
|
||||
TAOS_RES* res = taos_stmt_use_result(stmt);
|
||||
return taos_errno(res);
|
||||
}
|
||||
|
@ -682,7 +453,7 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints)
|
|||
TAOS_SML_DATA_POINT * point = points + i;
|
||||
if (!point->childTableName) {
|
||||
char childTableName[TSDB_TABLE_NAME_LEN];
|
||||
int32_t tableNameLen;
|
||||
int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
|
||||
getChildTableName(point, childTableName, &tableNameLen);
|
||||
point->childTableName = calloc(1, tableNameLen+1);
|
||||
strncpy(point->childTableName, childTableName, tableNameLen);
|
||||
|
@ -696,28 +467,26 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints)
|
|||
cTablePoints = taosArrayInit(64, sizeof(point));
|
||||
taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
|
||||
}
|
||||
taosArrayPush(cTablePoints, point);
|
||||
taosArrayPush(cTablePoints, &point);
|
||||
}
|
||||
|
||||
int isNullColBind = TSDB_TRUE;
|
||||
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
|
||||
while (pCTablePoints) {
|
||||
SArray* cTablePoints = *pCTablePoints;
|
||||
TAOS_SML_DATA_POINT * point = taosArrayGet(cTablePoints, 0);
|
||||
|
||||
TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0);
|
||||
int32_t numTags = taosArrayGetSize(point->schema->tags);
|
||||
int32_t numCols = taosArrayGetSize(point->schema->fields);
|
||||
char* stableName = point->stableName;
|
||||
char* ctableName = point->childTableName;
|
||||
char sql[TSDB_MAX_BINARY_LEN];
|
||||
getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN);
|
||||
|
||||
size_t rows = taosArrayGetSize(cTablePoints);
|
||||
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
|
||||
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
|
||||
|
||||
for (int i = 0; i < rows; ++i) {
|
||||
point = taosArrayGet(cTablePoints, i);
|
||||
|
||||
taosArraySetSize(tagBinds, numTags);
|
||||
for (int j = 0; j < numTags; ++j) {
|
||||
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
|
||||
bind->is_null = &isNullColBind;
|
||||
}
|
||||
for (int j = 0; j < point->tagNum; ++j) {
|
||||
TAOS_SML_KV* kv = point->tags + j;
|
||||
int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema);
|
||||
|
@ -725,10 +494,21 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints)
|
|||
bind->buffer_type = kv->type;
|
||||
bind->length = (uintptr_t*)&kv->length;
|
||||
bind->buffer = kv->value;
|
||||
bind->is_null = NULL;
|
||||
}
|
||||
|
||||
size_t rows = taosArrayGetSize(cTablePoints);
|
||||
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
|
||||
|
||||
for (int i = 0; i < rows; ++i) {
|
||||
point = taosArrayGetP(cTablePoints, i);
|
||||
|
||||
SArray* colBinds = taosArrayInit(numCols, sizeof(TAOS_BIND));
|
||||
taosArraySetSize(colBinds, numCols);
|
||||
for (int j = 0; j < numCols; ++j) {
|
||||
TAOS_BIND* bind = taosArrayGet(colBinds, j);
|
||||
bind->is_null = &isNullColBind;
|
||||
}
|
||||
for (int j = 0; j < point->fieldNum; ++j) {
|
||||
TAOS_SML_KV* kv = point->fields + j;
|
||||
int32_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema);
|
||||
|
@ -736,10 +516,13 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints)
|
|||
bind->buffer_type = kv->type;
|
||||
bind->length = (uintptr_t*)&kv->length;
|
||||
bind->buffer = kv->value;
|
||||
bind->is_null = NULL;
|
||||
}
|
||||
taosArrayPush(rowsBind, &colBinds);
|
||||
}
|
||||
|
||||
char sql[TSDB_MAX_BINARY_LEN];
|
||||
getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN);
|
||||
insertBatch(taos, sql, ctableName, tagBinds, rowsBind);
|
||||
|
||||
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
|
||||
|
@ -747,7 +530,6 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints)
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
|
||||
|
@ -792,11 +574,13 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
|||
for (int i = 0; i < numStable; ++i) {
|
||||
SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i);
|
||||
SSmlSTableSchema dbSchema = {0};
|
||||
dbSchema.fields = taosArrayInit(64, sizeof(SSchema));
|
||||
dbSchema.tags = taosArrayInit(8, sizeof(SSchema));
|
||||
dbSchema.fields = taosArrayInit(64, sizeof(SSchema));
|
||||
dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
|
||||
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
|
||||
|
||||
if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
|
||||
SSchemaAction schemaAction = {0};
|
||||
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
|
||||
|
@ -822,8 +606,12 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
|||
}
|
||||
}
|
||||
|
||||
for (int j = 0; j < pointFieldSize; ++j) {
|
||||
SSchema* pointCol = taosArrayGet(pointSchema->tags, j);
|
||||
SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
|
||||
SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
|
||||
memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);
|
||||
|
||||
for (int j = 1; j < pointFieldSize; ++j) {
|
||||
SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
|
||||
SSchemaAction schemaAction = {0};
|
||||
bool actionNeeded = false;
|
||||
generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded);
|
||||
|
@ -849,3 +637,317 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
|
|||
//todo: table/column length check
|
||||
//todo: type check
|
||||
//todo: taosmbs2ucs4 check
|
||||
|
||||
//=========================================================================
|
||||
|
||||
typedef enum {
|
||||
LP_ITEM_TAG,
|
||||
LP_ITEM_FIELD
|
||||
} LPItemKind;
|
||||
|
||||
typedef struct {
|
||||
SStrToken keyToken;
|
||||
SStrToken valueToken;
|
||||
|
||||
char key[TSDB_COL_NAME_LEN];
|
||||
int8_t type;
|
||||
int16_t length;
|
||||
|
||||
char* value;
|
||||
}SLPItem;
|
||||
|
||||
typedef struct {
|
||||
SStrToken measToken;
|
||||
SStrToken tsToken;
|
||||
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
SArray* tags;
|
||||
SArray* fields;
|
||||
int64_t ts;
|
||||
|
||||
} SLPPoint;
|
||||
|
||||
typedef enum {
|
||||
LP_MEASUREMENT,
|
||||
LP_TAG_KEY,
|
||||
LP_TAG_VALUE,
|
||||
LP_FIELD_KEY,
|
||||
LP_FIELD_VALUE
|
||||
} LPPart;
|
||||
|
||||
int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) {
|
||||
for (int32_t i = start; i < s.n; ++i) {
|
||||
if (s.z[i] == ',' || s.z[i] == ' ') {
|
||||
*index = i;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) {
|
||||
for (int32_t i = start; i < s.n; ++i) {
|
||||
if (s.z[i] == '=') {
|
||||
*index = i;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t setPointMeasurement(SLPPoint* point, SStrToken token) {
|
||||
point->measToken = token;
|
||||
if (point->measToken.n < TSDB_TABLE_NAME_LEN) {
|
||||
strncpy(point->sTableName, point->measToken.z, point->measToken.n);
|
||||
point->sTableName[point->measToken.n] = '\0';
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) {
|
||||
item->keyToken = key;
|
||||
if (item->keyToken.n < TSDB_COL_NAME_LEN) {
|
||||
strncpy(item->key, item->keyToken.z, item->keyToken.n);
|
||||
item->key[item->keyToken.n] = '\0';
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) {
|
||||
item->valueToken = value;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t parseItemValue(SLPItem* item, LPItemKind kind) {
|
||||
char* sv = item->valueToken.z;
|
||||
char* last = item->valueToken.z + item->valueToken.n - 1;
|
||||
|
||||
if (isdigit(sv[0]) || sv[0] == '-') {
|
||||
if (*last == 'i') {
|
||||
item->type = TSDB_DATA_TYPE_BIGINT;
|
||||
item->length = (int16_t)tDataTypes[item->type].bytes;
|
||||
item->value = malloc(item->length);
|
||||
char* endptr = NULL;
|
||||
*(item->value) = strtoll(sv, &endptr, 10);
|
||||
} else {
|
||||
item->type = TSDB_DATA_TYPE_DOUBLE;
|
||||
item->length = (int16_t)tDataTypes[item->type].bytes;
|
||||
item->value = malloc(item->length);
|
||||
char* endptr = NULL;
|
||||
*(item->value) = strtold(sv, &endptr);
|
||||
}
|
||||
} else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) {
|
||||
if (sv[0] == 'L') {
|
||||
item->type = TSDB_DATA_TYPE_NCHAR;
|
||||
uint32_t bytes = item->valueToken.n - 3;
|
||||
item->length = bytes;
|
||||
item->value = malloc(bytes);
|
||||
memcpy(item->value, sv+1, bytes);
|
||||
} else if (sv[0]=='"'){
|
||||
item->type = TSDB_DATA_TYPE_BINARY;
|
||||
uint32_t bytes = item->valueToken.n - 2;
|
||||
item->length = bytes;
|
||||
item->value = malloc(bytes);
|
||||
memcpy(item->value, sv+1, bytes);
|
||||
}
|
||||
} else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') {
|
||||
item->type = TSDB_DATA_TYPE_BOOL;
|
||||
item->length = tDataTypes[item->type].bytes;
|
||||
item->value = malloc(tDataTypes[item->type].bytes);
|
||||
*(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t compareLPItemKey(const void* p1, const void* p2) {
|
||||
const SLPItem* t1 = p1;
|
||||
const SLPItem* t2 = p2;
|
||||
uint32_t min = (t1->keyToken.n < t2->keyToken.n) ? t1->keyToken.n : t2->keyToken.n;
|
||||
int res = strncmp(t1->keyToken.z, t2->keyToken.z, min);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
} else {
|
||||
return (int)(t1->keyToken.n) - (int)(t2->keyToken.n);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) {
|
||||
point->tsToken = tsToken;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t parsePointTime(SLPPoint* point) {
|
||||
if (point->tsToken.n <= 0) {
|
||||
point->ts = taosGetTimestampNs();
|
||||
} else {
|
||||
char* endptr = NULL;
|
||||
point->ts = strtoll(point->tsToken.z, &endptr, 10);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tscParseLine(SStrToken line, SLPPoint* point) {
|
||||
int32_t pos = 0;
|
||||
|
||||
int32_t start = 0;
|
||||
int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT);
|
||||
if (err != 0) {
|
||||
tscError("a");
|
||||
return err;
|
||||
}
|
||||
|
||||
SStrToken measurement = {.z = line.z+start, .n = pos-start};
|
||||
setPointMeasurement(point, measurement);
|
||||
point->tags = taosArrayInit(64, sizeof(SLPItem));
|
||||
start = pos;
|
||||
while (line.z[start] == ',') {
|
||||
SLPItem item;
|
||||
|
||||
start++;
|
||||
err = scanToEqual(line, start, &pos);
|
||||
if (err != 0) {
|
||||
tscError("b");
|
||||
goto error;
|
||||
}
|
||||
|
||||
SStrToken tagKey = {.z = line.z + start, .n = pos-start};
|
||||
setItemKey(&item, tagKey, LP_TAG_KEY);
|
||||
|
||||
start = pos + 1;
|
||||
err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE);
|
||||
if (err != 0) {
|
||||
tscError("c");
|
||||
goto error;
|
||||
}
|
||||
|
||||
SStrToken tagValue = {.z = line.z + start, .n = pos-start};
|
||||
setItemValue(&item, tagValue, LP_TAG_VALUE);
|
||||
|
||||
parseItemValue(&item, LP_ITEM_TAG);
|
||||
taosArrayPush(point->tags, &item);
|
||||
|
||||
start = pos;
|
||||
}
|
||||
|
||||
taosArraySort(point->tags, compareLPItemKey);
|
||||
|
||||
point->fields = taosArrayInit(64, sizeof(SLPItem));
|
||||
|
||||
start++;
|
||||
do {
|
||||
SLPItem item;
|
||||
|
||||
err = scanToEqual(line, start, &pos);
|
||||
if (err != 0) {
|
||||
goto error;
|
||||
}
|
||||
SStrToken fieldKey = {.z = line.z + start, .n = pos- start};
|
||||
setItemKey(&item, fieldKey, LP_FIELD_KEY);
|
||||
|
||||
start = pos + 1;
|
||||
err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE);
|
||||
if (err != 0) {
|
||||
goto error;
|
||||
}
|
||||
SStrToken fieldValue = {.z = line.z + start, .n = pos - start};
|
||||
setItemValue(&item, fieldValue, LP_TAG_VALUE);
|
||||
|
||||
parseItemValue(&item, LP_ITEM_FIELD);
|
||||
taosArrayPush(point->fields, &item);
|
||||
|
||||
start = pos + 1;
|
||||
} while (line.z[pos] == ',');
|
||||
|
||||
taosArraySort(point->fields, compareLPItemKey);
|
||||
|
||||
SStrToken tsToken = {.z = line.z+start, .n = line.n-start};
|
||||
setPointTimeStamp(point, tsToken);
|
||||
parsePointTime(point);
|
||||
|
||||
goto done;
|
||||
|
||||
error:
|
||||
// free array
|
||||
return err;
|
||||
done:
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
|
||||
for (int32_t i = 0; i < numLines; ++i) {
|
||||
SStrToken tkLine = {.z = lines[i], .n = strlen(lines[i])+1};
|
||||
SLPPoint point;
|
||||
tscParseLine(tkLine, &point);
|
||||
taosArrayPush(points, &point);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) {
|
||||
SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint));
|
||||
tscParseLines(lines, numLines, lpPoints, NULL);
|
||||
|
||||
size_t numPoints = taosArrayGetSize(lpPoints);
|
||||
TAOS_SML_DATA_POINT* points = calloc(numPoints, sizeof(TAOS_SML_DATA_POINT));
|
||||
for (int i = 0; i < numPoints; ++i) {
|
||||
SLPPoint* lpPoint = taosArrayGet(lpPoints, i);
|
||||
TAOS_SML_DATA_POINT* point = points+i;
|
||||
point->stableName = calloc(1, strlen(lpPoint->sTableName)+1);
|
||||
strncpy(point->stableName, lpPoint->sTableName, strlen(lpPoint->sTableName));
|
||||
point->stableName[strlen(lpPoint->sTableName)] = '\0';
|
||||
|
||||
size_t lpTagSize = taosArrayGetSize(lpPoint->tags);
|
||||
point->tags = calloc(lpTagSize, sizeof(TAOS_SML_KV));
|
||||
point->tagNum = lpTagSize;
|
||||
for (int j=0; j<lpTagSize; ++j) {
|
||||
SLPItem* lpTag = taosArrayGet(lpPoint->tags, j);
|
||||
TAOS_SML_KV* tagKv = point->tags + j;
|
||||
|
||||
size_t kenLen = strlen(lpTag->key);
|
||||
tagKv->key = calloc(1, kenLen+1);
|
||||
strncpy(tagKv->key, lpTag->key, kenLen);
|
||||
tagKv->key[kenLen] = '\0';
|
||||
|
||||
tagKv->type = lpTag->type;
|
||||
tagKv->length = lpTag->length;
|
||||
tagKv->value = malloc(tagKv->length);
|
||||
memcpy(tagKv->value, lpTag->value, tagKv->length);
|
||||
}
|
||||
|
||||
size_t lpFieldsSize = taosArrayGetSize(lpPoint->fields);
|
||||
point->fields = calloc(lpFieldsSize + 1, sizeof(TAOS_SML_KV));
|
||||
point->fieldNum = lpFieldsSize + 1;
|
||||
|
||||
TAOS_SML_KV* tsField = point->fields + 0;
|
||||
char tsKey[256];
|
||||
snprintf(tsKey, 256, "_%s_ts", point->stableName);
|
||||
size_t tsKeyLen = strlen(tsKey);
|
||||
tsField->key = calloc(1, tsKeyLen+1);
|
||||
strncpy(tsField->key, tsKey, tsKeyLen);
|
||||
tsField->key[tsKeyLen] = '\0';
|
||||
tsField->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
tsField->length = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||
tsField->value = malloc(tsField->length);
|
||||
memcpy(tsField->value, &(lpPoint->ts), tsField->length);
|
||||
|
||||
for (int j=0; j<lpFieldsSize; ++j) {
|
||||
SLPItem* lpField = taosArrayGet(lpPoint->fields, j);
|
||||
TAOS_SML_KV* fieldKv = point->fields + j + 1;
|
||||
|
||||
size_t kenLen = strlen(lpField->key);
|
||||
fieldKv->key = calloc(1, kenLen+1);
|
||||
strncpy(fieldKv->key, lpField->key, kenLen);
|
||||
fieldKv->key[kenLen] = '\0';
|
||||
|
||||
fieldKv->type = lpField->type;
|
||||
fieldKv->length = lpField->length;
|
||||
fieldKv->value = malloc(fieldKv->length);
|
||||
memcpy(fieldKv->value, lpField->value, fieldKv->length);
|
||||
}
|
||||
}
|
||||
|
||||
taos_sml_insert(taos, points, numPoints);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -169,6 +169,8 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
|
|||
|
||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
|
||||
|
||||
DLL_EXPORT int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -949,6 +949,15 @@ void verify_stream(TAOS* taos) {
|
|||
taos_close_stream(strm);
|
||||
}
|
||||
|
||||
int32_t verify_schema_less(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
char* lines[] = {
|
||||
"st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639162922"
|
||||
};
|
||||
int code = taos_insert_by_lines(taos, lines , 1);
|
||||
return code;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
const char* host = "127.0.0.1";
|
||||
const char* user = "root";
|
||||
|
@ -967,6 +976,12 @@ int main(int argc, char *argv[]) {
|
|||
info = taos_get_client_info(taos);
|
||||
printf("client info: %s\n", info);
|
||||
|
||||
printf("************ verify query *************\n");
|
||||
int code = verify_schema_less(taos);
|
||||
if (code == 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
printf("************ verify query *************\n");
|
||||
verify_query(taos);
|
||||
|
||||
|
|
Loading…
Reference in New Issue