From c4f06ca30b1a37b42b60a89285fbceba21f2cde8 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 6 Jul 2021 18:05:27 +0800 Subject: [PATCH 01/10] develop schemaless --- src/client/src/tscParseLineProtocol.c | 731 ++++++++++++++++++++++++++ 1 file changed, 731 insertions(+) create mode 100644 src/client/src/tscParseLineProtocol.c diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c new file mode 100644 index 0000000000..79e8955f9a --- /dev/null +++ b/src/client/src/tscParseLineProtocol.c @@ -0,0 +1,731 @@ +#include +#include +#include +#include +#include "os.h" +#include "osString.h" +#include "ttype.h" +#include "tmd5.h" +#include "tstrbuild.h" +#include "tname.h" +#include "taos.h" +#include "tsclient.h" +#include "tscLog.h" +#include "hash.h" +#include "tskiplist.h" +#include "tscUtil.h" + +typedef enum { + LP_ITEM_TAG, + LP_ITEM_FIELD +} LPItemKind; + +typedef struct { + SStrToken key; + SStrToken value; + + char name[TSDB_COL_NAME_LEN]; + int8_t type; + int16_t bytes; + + char* payload; +}SLPItem; + +typedef struct { + SStrToken measToken; + SStrToken tsToken; + + char sTableName[TSDB_TABLE_NAME_LEN]; + SArray* tags; + SArray* fields; + int64_t ts; + +} SLPPoint; + +typedef enum { + LP_MEASUREMENT, + LP_TAG_KEY, + LP_TAG_VALUE, + LP_FIELD_KEY, + LP_FIELD_VALUE +} LPPart; + +int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == ',' || s.z[i] == ' ') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == '=') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t setPointMeasurement(SLPPoint* point, SStrToken token) { + point->measToken = token; + if (point->measToken.n < TSDB_TABLE_NAME_LEN) { + strncpy(point->sTableName, point->measToken.z, point->measToken.n); + point->sTableName[point->measToken.n] = '\0'; + } + return 0; +} + +int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) { + item->key = key; + if (item->key.n < TSDB_COL_NAME_LEN) { + strncpy(item->name, item->key.z, item->key.n); + item->name[item->key.n] = '\0'; + } + return 0; +} + +int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) { + item->value = value; + return 0; +} + +int32_t parseItemValue(SLPItem* item, LPItemKind kind) { + char* sv = item->value.z; + char* last = item->value.z + item->value.n - 1; + + if (isdigit(sv[0]) || sv[0] == '-') { + if (*last == 'i') { + item->type = TSDB_DATA_TYPE_BIGINT; + item->bytes = (int16_t)tDataTypes[item->type].bytes; + item->payload = malloc(item->bytes); + char* endptr = NULL; + *(item->payload) = strtoll(sv, &endptr, 10); + } else { + item->type = TSDB_DATA_TYPE_DOUBLE; + item->bytes = (int16_t)tDataTypes[item->type].bytes; + item->payload = malloc(item->bytes); + char* endptr = NULL; + *(item->payload) = strtold(sv, &endptr); + } + } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { + if (sv[0] == 'L') { + item->type = TSDB_DATA_TYPE_NCHAR; + uint32_t bytes = item->value.n - 3; +// uint32_t len = bytes; +// char* ucs = malloc(len); +// int32_t ncharBytes = 0; +// taosMbsToUcs4(sv+2, len, ucs, len, &ncharBytes); +// item->bytes = ncharBytes; +// item->payload = malloc(ncharBytes); +// memcpy(item->payload, ucs, ncharBytes); +// free(ucs); + item->bytes = bytes; + item->payload = malloc(bytes); + memcpy(item->payload, sv+1, bytes); + } else if (sv[0]=='"'){ + item->type = TSDB_DATA_TYPE_BINARY; + uint32_t bytes = item->value.n - 2; + item->bytes = bytes; + item->payload = malloc(bytes); + memcpy(item->payload, sv+1, bytes); + } + } else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') { + item->type = TSDB_DATA_TYPE_BOOL; + item->bytes = tDataTypes[item->type].bytes; + item->payload = malloc(tDataTypes[item->type].bytes); + *(item->payload) = tolower(sv[0])=='t' ? true : false; + } + return 0; +} + +int32_t compareLPItemKey(const void* p1, const void* p2) { + const SLPItem* t1 = p1; + const SLPItem* t2 = p2; + uint32_t min = (t1->key.n < t2->key.n) ? t1->key.n : t2->key.n; + int res = strncmp(t1->key.z, t2->key.z, min); + if (res != 0) { + return res; + } else { + return (int)(t1->key.n) - (int)(t2->key.n); + } +} + +int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) { + point->tsToken = tsToken; + return 0; +} + +int32_t parsePointTime(SLPPoint* point) { + if (point->tsToken.n <= 0) { + point->ts = taosGetTimestampNs(); + } else { + char* endptr = NULL; + point->ts = strtoll(point->tsToken.z, &endptr, 10); + } + return 0; +} + +int32_t tscParseLine(SStrToken line, SLPPoint* point) { + int32_t pos = 0; + + int32_t start = 0; + int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT); + if (err != 0) { + tscError("a"); + return err; + } + + SStrToken measurement = {.z = line.z+start, .n = pos-start}; + setPointMeasurement(point, measurement); + point->tags = taosArrayInit(64, sizeof(SLPItem)); + start = pos + 1; + while (line.z[start] == ',') { + SLPItem item; + + err = scanToEqual(line, start, &pos); + if (err != 0) { + tscError("b"); + goto error; + } + + SStrToken tagKey = {.z = line.z + start, .n = pos-start}; + setItemKey(&item, tagKey, LP_TAG_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE); + if (err != 0) { + tscError("c"); + goto error; + } + + SStrToken tagValue = {.z = line.z + start, .n = pos-start}; + setItemValue(&item, tagValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_TAG); + taosArrayPush(point->tags, &item); + + start = pos + 1; + } + + taosArraySort(point->tags, compareLPItemKey); + + point->fields = taosArrayInit(64, sizeof(SLPItem)); + do { + SLPItem item; + err = scanToEqual(line, start, &pos); + if (err != 0) { + goto error; + } + SStrToken fieldKey = {.z = line.z + start, .n = pos- start}; + setItemKey(&item, fieldKey, LP_FIELD_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE); + if (err != 0) { + goto error; + } + SStrToken fieldValue = {.z = line.z + start, .n = pos - start}; + setItemValue(&item, fieldValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_FIELD); + taosArrayPush(point->fields, &item); + + start = pos + 1; + } while (line.z[pos] == ','); + + taosArraySort(point->fields, compareLPItemKey); + + SStrToken tsToken = {.z = line.z+start, .n = line.n-start}; + setPointTimeStamp(point, tsToken); + parsePointTime(point); + + goto done; + +error: + // free array + return err; +done: + return 0; +} + + +int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { + for (int32_t i = 0; i < numLines; ++i) { + SStrToken tkLine = {.z = lines[i], .n = strlen(lines[i])+1}; + SLPPoint point; + tscParseLine(tkLine, &point); + taosArrayPush(points, &point); + } + return 0; +} + +TAOS_RES* taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { + SArray* points = taosArrayInit(numLines, sizeof(SLPPoint)); + tscParseLines(lines, numLines, points, NULL); + + + return NULL; +} +//================================================================================================= + +typedef struct { + char* key; + uint8_t type; + int16_t length; + char* value; +} TAOS_SML_KV; + +typedef struct { + char* stableName; + + char* childTableName; + TAOS_SML_KV* tags; + int tagNum; + + // first kv must be timestamp + TAOS_SML_KV* fields; + int fieldNum; + +} TAOS_SML_DATA_POINT; + +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SHashObj* tagHash; + SHashObj* fieldHash; + SArray* tags; //SArray + SArray* fields; //SArray +} SSmlSTableSchema; + + +int compareSmlColKv(const void* p1, const void* p2) { + TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; + TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; + int kvLen1 = (int)strlen(kv1->key); + int kvLen2 = (int)strlen(kv2->key); + int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2)); + if (res != 0) { + return res; + } else { + return kvLen1-kvLen2; + } +} + +int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { + qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); + + SStringBuilder sb; memset(&sb, 0, sizeof(sb)); + taosStringBuilderAppendString(&sb, point->stableName); + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* tagKv = point->tags + j; + taosStringBuilderAppendChar(&sb, ','); + taosStringBuilderAppendString(&sb, tagKv->key); + taosStringBuilderAppendChar(&sb, '='); + taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); + } + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); + MD5_CTX context; + MD5Init(&context); + MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + MD5Final(&context); + *tableNameLen = snprintf(tableName, *tableNameLen, + "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], + context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], + context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + return 0; +} + +int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { + int32_t code = 0; + + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL || pObj->signature != pObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return TSDB_CODE_TSC_DISCONNECTED; + } + + char sql[256]; + snprintf(sql, 256, "describe %s", tableName); + TAOS_RES* res = taos_query(taos, sql); + code = taos_errno(res); + if (code != 0) { + taos_free_result(res); + return code; + } + taos_free_result(res); + + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + pSql->pTscObj = taos; + pSql->signature = pSql; + pSql->fp = NULL; + + SStrToken tableToken = {.z=tableName, .n=strlen(tableName), .type=TK_ID}; + tGetToken(tableName, &tableToken.type); + // Check if the table name available or not + if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; + sprintf(pSql->cmd.payload, "table name is invalid"); + return code; + } + + SName sname = {0}; + if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) { + return code; + } + + char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; + memset(fullTableName, 0, tListLen(fullTableName)); + tNameExtractFullName(&sname, fullTableName); + + if (code != TSDB_CODE_SUCCESS) { + tscFreeSqlObj(pSql); + return code; + } + + tscFreeSqlObj(pSql); + + + uint32_t size = tscGetTableMetaMaxSize(); + STableMeta* tableMeta = calloc(1, size); + taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); + + tstrncpy(schema->sTableName, tableName, strlen(tableName)); + for (int i=0; itableInfo.numOfColumns; ++i) { + SSchema field; + tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)); + field.type = tableMeta->schema[i].type; + field.bytes = tableMeta->schema[i].bytes; + SSchema* pField = taosArrayPush(schema->fields, &field); + taosHashPut(schema->fieldHash, field.name, strlen(field.name), &pField, POINTER_BYTES); + } + + for (int i=0; itableInfo.numOfTags; ++i) { + int j = i + tableMeta->tableInfo.numOfColumns; + SSchema field; + tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)); + field.type = tableMeta->schema[j].type; + field.bytes = tableMeta->schema[j].bytes; + SSchema* pField = taosArrayPush(schema->tags, &field); + taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES); + } + + return code; + +} + +typedef enum { + SCHEMA_ACTION_CREATE_STABLE, + SCHEMA_ACTION_ADD_COLUMN, + SCHEMA_ACTION_ADD_TAG, + SCHEMA_ACTION_CHANGE_COLUMN_SIZE, + SCHEMA_ACTION_CHANGE_TAG_SIZE, + SCHEMA_ACTION_CREATE_CTABLE +} ESchemaAction; + +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SArray* tags; //SArray + SArray* fields; //SArray +} SCreateSTableActionInfo; + +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SSchema* field; +} SAlterSTableActionInfo; + +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + char cTableName[TSDB_TABLE_NAME_LEN]; + TAOS_SML_KV* tags; + int tagNum; +} SCreateCTableActionInfo; + +typedef struct { + ESchemaAction action; + union { + SCreateSTableActionInfo createSTable; + SAlterSTableActionInfo alterSTable; + SCreateCTableActionInfo createCTable; + }; +} SSchemaAction; + +int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { + if (!IS_VAR_DATA_TYPE(kv->type)) { + *bytes = tDataTypes[kv->type].bytes; + } else { + if (kv->type == TSDB_DATA_TYPE_NCHAR) { + char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1); + int32_t bytesNeeded = 0; + //todo check conversion succeed + taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); + free(ucs); + *bytes = bytesNeeded + VARSTR_HEADER_SIZE; + + } else if (kv->type == TSDB_DATA_TYPE_BINARY) { + *bytes = kv->length + VARSTR_HEADER_SIZE; + } + } + return 0; +} + +int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) { + SSchema* pField = NULL; + SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); + if (ppField) { + pField = *ppField; + + if (pField->type != smlKv->type) { + //TODO: + tscError("type mismatch"); + return -1; + } + + int32_t bytes = 0; + getFieldBytesFromSmlKv(smlKv, &bytes); + pField->bytes = MAX(pField->bytes, bytes); + + } else { + SSchema field; + size_t tagKeyLen = strlen(smlKv->key); + strncpy(field.name, smlKv->key, tagKeyLen); + field.name[tagKeyLen] = '\0'; + field.type = smlKv->type; + + int32_t bytes = 0; + getFieldBytesFromSmlKv(smlKv, &bytes); + field.bytes = bytes; + + pField = taosArrayPush(array, &field); + taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES); + } + return 0; +} + +int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool isTag, char sTableName[], + SSchemaAction* action, bool* actionNeeded) { + SSchema** ppDbAttr = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name)); + if (*ppDbAttr) { + SSchema* dbAttr = *ppDbAttr; + if (pointColField->type != dbAttr->type) { + //todo error + return -5; + } + + if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) { + if (isTag) { + action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE; + } else { + action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE; + } + memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); + memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); + action->alterSTable.field = pointColField; + *actionNeeded = true; + } + } else { + if (isTag) { + action->action = SCHEMA_ACTION_ADD_TAG; + } else { + action->action = SCHEMA_ACTION_ADD_COLUMN; + } + memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); + memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); + action->alterSTable.field = pointColField; + *actionNeeded = true; + } + return 0; +} + +int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { + int32_t code = TSDB_CODE_SUCCESS; + SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray + SHashObj* sname2shema = taosHashInit(32, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + for (int i = 0; i < numPoint; ++i) { + TAOS_SML_DATA_POINT* point = &points[i]; + SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN); + SSmlSTableSchema* pStableSchema = NULL; + if (ppStableSchema) { + pStableSchema= *ppStableSchema; + } else { + SSmlSTableSchema schema; + size_t stableNameLen = strlen(point->stableName); + strncpy(schema.sTableName, point->stableName, stableNameLen); + schema.sTableName[stableNameLen] = '\0'; + schema.fields = taosArrayInit(64, sizeof(SSchema)); + schema.tags = taosArrayInit(8, sizeof(SSchema)); + schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + pStableSchema = taosArrayPush(stableArray, &schema); + taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); + } + + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* tagKv = point->tags + j; + addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags); + } + + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* fieldKv = point->fields + j; + addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); + } + } + + SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction)); + size_t numStable = taosArrayGetSize(stableArray); + for (int i = 0; i < numStable; ++i) { + SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); + SSmlSTableSchema dbSchema = {0}; + dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); + dbSchema.tags = taosArrayInit(8, sizeof(SSchema)); + dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { + SSchemaAction schemaAction = {0}; + schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; + memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo)); + memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); + schemaAction.createSTable.tags = pointSchema->tags; + schemaAction.createSTable.fields = pointSchema->fields; + taosArrayPush(schemaActions, &schemaAction); + }else if (code == TSDB_CODE_SUCCESS) { + size_t pointTagSize = taosArrayGetSize(pointSchema->tags); + size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); + + SHashObj* dbTagHash = dbSchema.tagHash; + SHashObj* dbFieldHash = dbSchema.fieldHash; + + for (int j = 0; j < pointTagSize; ++j) { + SSchema* pointTag = taosArrayGet(pointSchema->tags, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + taosArrayPush(schemaActions, &schemaAction); + } + } + + for (int j = 0; j < pointFieldSize; ++j) { + SSchema* pointCol = taosArrayGet(pointSchema->tags, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + taosArrayPush(schemaActions, &schemaAction); + } + } + } else { + return code; + } + } + + return code; +} + + +int32_t buildColumnDescription(SSchema* field, + char* buf, int32_t bufSize, int32_t* outBytes) { + uint8_t type = field->type; + + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + int32_t bytes = field->bytes - VARSTR_HEADER_SIZE; + if (type == TSDB_DATA_TYPE_NCHAR) { + bytes = bytes/TSDB_NCHAR_SIZE; + } + int out = snprintf(buf, bufSize,"%s %s(%d)", + field->name,tDataTypes[field->type].name, bytes); + *outBytes = out; + } else { + int out = snprintf(buf, bufSize, "%s %s", + field->name, tDataTypes[type].name); + *outBytes = out; + } + + return 0; +} + +int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { + int32_t code = 0; + int32_t capacity = TSDB_MAX_BINARY_LEN; + int32_t outBytes = 0; + char *result = (char *)calloc(1, capacity); + + switch (action->action) { + case SCHEMA_ACTION_ADD_COLUMN: { + int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + break; + } + case SCHEMA_ACTION_ADD_TAG: { + int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, + result+n, capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + break; + } + case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { + int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, result+n, + capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + } + case SCHEMA_ACTION_CHANGE_TAG_SIZE: { + int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName); + buildColumnDescription(action->alterSTable.field, result+n, + capacity-n, &outBytes); + TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + code = taos_errno(res); + break; + } + case SCHEMA_ACTION_CREATE_STABLE: { + int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); + char* pos = result + n; int freeBytes = capacity - n; + int numCols = taosArrayGetSize(action->createSTable.fields); + for (int32_t i = 0; i < numCols; ++i) { + SSchema* field = taosArrayGet(action->createSTable.fields, i); + buildColumnDescription(field, pos, freeBytes, &outBytes); + pos += outBytes; freeBytes -= outBytes; + *pos = ','; ++pos; --freeBytes; + } + --pos; ++freeBytes; + outBytes = snprintf(pos, freeBytes, ") tags ("); + int numTags = taosArrayGetSize(action->createSTable.tags); + pos += outBytes; freeBytes -= outBytes; + for (int32_t i = 0; i < numTags; ++i) { + SSchema* field = taosArrayGet(action->createSTable.tags, i); + buildColumnDescription(field, pos, freeBytes, &outBytes); + pos += outBytes; freeBytes -= outBytes; + *pos = ','; ++pos; --freeBytes; + } + pos--; ++freeBytes; + outBytes = snprintf(pos, freeBytes, ")"); + TAOS_RES* res = taos_query(taos, result); + code = taos_errno(res); + break; + } + case SCHEMA_ACTION_CREATE_CTABLE: { + + break; + } + default: + break; + } + free(result); + return code; +} + +//todo: table/column length check +//todo: type check +//todo: taosmbs2ucs4 check From d5ab8d7ced3a76013cfc64419ec3bf970d3a822b Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 9 Jul 2021 19:37:15 +0800 Subject: [PATCH 02/10] before taos_bind/taos_multi_bind generation --- src/client/src/tscParseLineProtocol.c | 99 +++++++++++++++++++++++++-- 1 file changed, 95 insertions(+), 4 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 79e8955f9a..10d2429b5e 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -277,6 +277,9 @@ typedef struct { uint8_t type; int16_t length; char* value; + + //=================================== + SSchema* fieldSchema; } TAOS_SML_KV; typedef struct { @@ -415,7 +418,6 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { } return code; - } typedef enum { @@ -465,7 +467,6 @@ int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); free(ucs); *bytes = bytesNeeded + VARSTR_HEADER_SIZE; - } else if (kv->type == TSDB_DATA_TYPE_BINARY) { *bytes = kv->length + VARSTR_HEADER_SIZE; } @@ -503,6 +504,9 @@ int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* a pField = taosArrayPush(array, &field); taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES); } + + smlKv->fieldSchema = pField; + return 0; } @@ -700,9 +704,11 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { *pos = ','; ++pos; --freeBytes; } --pos; ++freeBytes; + outBytes = snprintf(pos, freeBytes, ") tags ("); - int numTags = taosArrayGetSize(action->createSTable.tags); pos += outBytes; freeBytes -= outBytes; + + int numTags = taosArrayGetSize(action->createSTable.tags); for (int32_t i = 0; i < numTags; ++i) { SSchema* field = taosArrayGet(action->createSTable.tags, i); buildColumnDescription(field, pos, freeBytes, &outBytes); @@ -716,7 +722,43 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { break; } case SCHEMA_ACTION_CREATE_CTABLE: { - +// SCreateCTableActionInfo* pInfo = &action->createCTable; +// SArray* bindParams = taosArrayInit(2 + 2 * pInfo->tagNum, sizeof(TAOS_BIND)); +// outBytes = sprintf(result, "create table ? using ?("); +// char* pos = result + outBytes; int32_t freeBytes = capacity-outBytes; +// uintptr_t lenSTableName = strlen(pInfo->sTableName); +// uintptr_t lenCTableName = strlen(pInfo->cTableName); +// TAOS_BIND tbCTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, +// .buffer = pInfo->cTableName, .length = &lenCTableName}; +// TAOS_BIND tbSTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, +// .buffer = pInfo->sTableName, .length = &lenSTableName}; +// taosArrayPush(bindParams, &tbCTableName); +// taosArrayPush(bindParams, &tbSTableName); +// for (int32_t i = 0; i < pInfo->tagNum; ++i) { +// outBytes = snprintf(pos, freeBytes, "?,"); +// +// TAOS_SML_KV* tagKv = pInfo->tags + i; +// TAOS_BIND tbTag = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, +// .buffer = tagKv->key, .length = }; +// pos += outBytes; freeBytes -= outBytes; +// } +// --pos; ++freeBytes; +// +// outBytes = snprintf(pos, freeBytes, ") tags ("); +// pos += outBytes; freeBytes -= outBytes; +// for (int32_t i = 0; i < pInfo->tagNum; ++i) { +// TAOS_SML_KV* tagKv = pInfo->tags + i; +// outBytes = snprintf(pos, freeBytes, "?,"); +// pos += outBytes; freeBytes -= outBytes; +// } +// pos--; ++freeBytes; +// outBytes = snprintf(pos, freeBytes, ")"); +// +// TAOS_STMT* stmt = taos_stmt_init(taos); +// taos_stmt_prepare(stmt, result, strlen(result)); +// +// +// taos_stmt_bind_param(stmt, (TAOS_BIND*)bindParams); break; } default: @@ -726,6 +768,55 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { return code; } +int32_t transformIntoPreparedStatement(SArray* points) { + size_t numPoints = taosArrayGetSize(points); + +// SHashObj* tag2bind = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); +// SHashObj* field2multiBind = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + for (int32_t i = 0; i < numPoints; ++i) { + TAOS_SML_DATA_POINT * point = taosArrayGet(points, i); + char tableKey[256]; + snprintf(tableKey, 256, "%s.%s", point->stableName, point->childTableName); + + } + return 0; +} + +int32_t insertBatch(TAOS* taos, const char* sTableName, char* cTableName, SSchema* tagsSchema, int numTags, TAOS_BIND* tagBind, + SSchema* colsSchema, int numCols, TAOS_MULTI_BIND* colBind) { + TAOS_STMT* stmt = taos_stmt_init(taos); + + char result[TSDB_MAX_BINARY_LEN] = {0}; + sprintf(result, "insert into ? using %s(", sTableName); + for (int i = 0; i < numTags; ++i) { + snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", tagsSchema[i].name); + } + snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") tags ("); + + for (int i = 0; i < numTags; ++i) { + snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,"); + } + snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") ("); + + for (int i = 0; i < numCols; ++i) { + snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", colsSchema[i].name); + } + snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") values ("); + + for (int i = 0; i < numCols; ++i) { + snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,"); + } + snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ")"); + + int32_t code = 0; + code = taos_stmt_prepare(stmt, result, strlen(result)); + + code = taos_stmt_set_tbname_tags(stmt, cTableName, tagBind); + code = taos_stmt_bind_param_batch(stmt, colBind); + code = taos_stmt_execute(stmt); + return code; +} //todo: table/column length check //todo: type check //todo: taosmbs2ucs4 check From 12e8b038d176b8c0ded55fced17185d531c61cac Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 10 Jul 2021 16:18:38 +0800 Subject: [PATCH 03/10] before debugging --- src/client/src/tscParseLineProtocol.c | 393 ++++++++++++++------------ 1 file changed, 211 insertions(+), 182 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 10d2429b5e..2738fe6f7a 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -272,6 +272,14 @@ TAOS_RES* taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { } //================================================================================================= +typedef struct { + char sTableName[TSDB_TABLE_NAME_LEN]; + SHashObj* tagHash; + SHashObj* fieldHash; + SArray* tags; //SArray + SArray* fields; //SArray +} SSmlSTableSchema; + typedef struct { char* key; uint8_t type; @@ -279,7 +287,7 @@ typedef struct { char* value; //=================================== - SSchema* fieldSchema; + SSchema* schema; } TAOS_SML_KV; typedef struct { @@ -293,17 +301,10 @@ typedef struct { TAOS_SML_KV* fields; int fieldNum; + //================================ + SSmlSTableSchema* schema; } TAOS_SML_DATA_POINT; -typedef struct { - char sTableName[TSDB_TABLE_NAME_LEN]; - SHashObj* tagHash; - SHashObj* fieldHash; - SArray* tags; //SArray - SArray* fields; //SArray -} SSmlSTableSchema; - - int compareSmlColKv(const void* p1, const void* p2) { TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; @@ -426,7 +427,6 @@ typedef enum { SCHEMA_ACTION_ADD_TAG, SCHEMA_ACTION_CHANGE_COLUMN_SIZE, SCHEMA_ACTION_CHANGE_TAG_SIZE, - SCHEMA_ACTION_CREATE_CTABLE } ESchemaAction; typedef struct { @@ -440,19 +440,11 @@ typedef struct { SSchema* field; } SAlterSTableActionInfo; -typedef struct { - char sTableName[TSDB_TABLE_NAME_LEN]; - char cTableName[TSDB_TABLE_NAME_LEN]; - TAOS_SML_KV* tags; - int tagNum; -} SCreateCTableActionInfo; - typedef struct { ESchemaAction action; union { SCreateSTableActionInfo createSTable; SAlterSTableActionInfo alterSTable; - SCreateCTableActionInfo createCTable; }; } SSchemaAction; @@ -505,7 +497,7 @@ int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* a taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES); } - smlKv->fieldSchema = pField; + smlKv->schema = pField; return 0; } @@ -545,96 +537,6 @@ int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool return 0; } -int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { - int32_t code = TSDB_CODE_SUCCESS; - SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray - SHashObj* sname2shema = taosHashInit(32, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - - for (int i = 0; i < numPoint; ++i) { - TAOS_SML_DATA_POINT* point = &points[i]; - SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN); - SSmlSTableSchema* pStableSchema = NULL; - if (ppStableSchema) { - pStableSchema= *ppStableSchema; - } else { - SSmlSTableSchema schema; - size_t stableNameLen = strlen(point->stableName); - strncpy(schema.sTableName, point->stableName, stableNameLen); - schema.sTableName[stableNameLen] = '\0'; - schema.fields = taosArrayInit(64, sizeof(SSchema)); - schema.tags = taosArrayInit(8, sizeof(SSchema)); - schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - - pStableSchema = taosArrayPush(stableArray, &schema); - taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); - } - - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* tagKv = point->tags + j; - addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags); - } - - for (int j = 0; j < point->fieldNum; ++j) { - TAOS_SML_KV* fieldKv = point->fields + j; - addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); - } - } - - SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction)); - size_t numStable = taosArrayGetSize(stableArray); - for (int i = 0; i < numStable; ++i) { - SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); - SSmlSTableSchema dbSchema = {0}; - dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); - dbSchema.tags = taosArrayInit(8, sizeof(SSchema)); - dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); - if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { - SSchemaAction schemaAction = {0}; - schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; - memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo)); - memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); - schemaAction.createSTable.tags = pointSchema->tags; - schemaAction.createSTable.fields = pointSchema->fields; - taosArrayPush(schemaActions, &schemaAction); - }else if (code == TSDB_CODE_SUCCESS) { - size_t pointTagSize = taosArrayGetSize(pointSchema->tags); - size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); - - SHashObj* dbTagHash = dbSchema.tagHash; - SHashObj* dbFieldHash = dbSchema.fieldHash; - - for (int j = 0; j < pointTagSize; ++j) { - SSchema* pointTag = taosArrayGet(pointSchema->tags, j); - SSchemaAction schemaAction = {0}; - bool actionNeeded = false; - generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); - if (actionNeeded) { - taosArrayPush(schemaActions, &schemaAction); - } - } - - for (int j = 0; j < pointFieldSize; ++j) { - SSchema* pointCol = taosArrayGet(pointSchema->tags, j); - SSchemaAction schemaAction = {0}; - bool actionNeeded = false; - generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); - if (actionNeeded) { - taosArrayPush(schemaActions, &schemaAction); - } - } - } else { - return code; - } - } - - return code; -} - - int32_t buildColumnDescription(SSchema* field, char* buf, int32_t bufSize, int32_t* outBytes) { uint8_t type = field->type; @@ -721,46 +623,7 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { code = taos_errno(res); break; } - case SCHEMA_ACTION_CREATE_CTABLE: { -// SCreateCTableActionInfo* pInfo = &action->createCTable; -// SArray* bindParams = taosArrayInit(2 + 2 * pInfo->tagNum, sizeof(TAOS_BIND)); -// outBytes = sprintf(result, "create table ? using ?("); -// char* pos = result + outBytes; int32_t freeBytes = capacity-outBytes; -// uintptr_t lenSTableName = strlen(pInfo->sTableName); -// uintptr_t lenCTableName = strlen(pInfo->cTableName); -// TAOS_BIND tbCTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, -// .buffer = pInfo->cTableName, .length = &lenCTableName}; -// TAOS_BIND tbSTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, -// .buffer = pInfo->sTableName, .length = &lenSTableName}; -// taosArrayPush(bindParams, &tbCTableName); -// taosArrayPush(bindParams, &tbSTableName); -// for (int32_t i = 0; i < pInfo->tagNum; ++i) { -// outBytes = snprintf(pos, freeBytes, "?,"); -// -// TAOS_SML_KV* tagKv = pInfo->tags + i; -// TAOS_BIND tbTag = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY, -// .buffer = tagKv->key, .length = }; -// pos += outBytes; freeBytes -= outBytes; -// } -// --pos; ++freeBytes; -// -// outBytes = snprintf(pos, freeBytes, ") tags ("); -// pos += outBytes; freeBytes -= outBytes; -// for (int32_t i = 0; i < pInfo->tagNum; ++i) { -// TAOS_SML_KV* tagKv = pInfo->tags + i; -// outBytes = snprintf(pos, freeBytes, "?,"); -// pos += outBytes; freeBytes -= outBytes; -// } -// pos--; ++freeBytes; -// outBytes = snprintf(pos, freeBytes, ")"); -// -// TAOS_STMT* stmt = taos_stmt_init(taos); -// taos_stmt_prepare(stmt, result, strlen(result)); -// -// -// taos_stmt_bind_param(stmt, (TAOS_BIND*)bindParams); - break; - } + default: break; } @@ -768,55 +631,221 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { return code; } -int32_t transformIntoPreparedStatement(SArray* points) { - size_t numPoints = taosArrayGetSize(points); +int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsSchema, char* result, int16_t freeBytes) { + size_t numTags = taosArrayGetSize(tagsSchema); + size_t numCols = taosArrayGetSize(colsSchema); + sprintf(result, "insert into ? using %s(", sTableName); + for (int i = 0; i < numTags; ++i) { + SSchema* tagSchema = taosArrayGet(tagsSchema, i); + snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name); + } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") tags ("); -// SHashObj* tag2bind = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); -// SHashObj* field2multiBind = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + for (int i = 0; i < numTags; ++i) { + snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); + } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") ("); + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", colSchema->name); + } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") values ("); + + for (int i = 0; i < numCols; ++i) { + snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); + } + snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")"); + return 0; +} + +int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* rowsBind) { + TAOS_STMT* stmt = taos_stmt_init(taos); + taos_stmt_prepare(stmt, sql, strlen(sql)); + + taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); + size_t rows = taosArrayGetSize(rowsBind); + for (int32_t i = 0; i < rows; ++i) { + TAOS_BIND* colBind = taosArrayGetP(rowsBind, i); + taos_stmt_bind_param(stmt, colBind); + taos_stmt_add_batch(stmt); + } + + taos_stmt_execute(stmt); + TAOS_RES* res = taos_stmt_use_result(stmt); + return taos_errno(res); +} + +int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) { + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); for (int32_t i = 0; i < numPoints; ++i) { - TAOS_SML_DATA_POINT * point = taosArrayGet(points, i); - char tableKey[256]; - snprintf(tableKey, 256, "%s.%s", point->stableName, point->childTableName); + TAOS_SML_DATA_POINT * point = points + i; + if (!point->childTableName) { + char childTableName[TSDB_TABLE_NAME_LEN]; + int32_t tableNameLen; + getChildTableName(point, childTableName, &tableNameLen); + point->childTableName = calloc(1, tableNameLen+1); + strncpy(point->childTableName, childTableName, tableNameLen); + point->childTableName[tableNameLen] = '\0'; + } + SArray* cTablePoints = NULL; + SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName)); + if (pCTablePoints) { + cTablePoints = *pCTablePoints; + } else { + cTablePoints = taosArrayInit(64, sizeof(point)); + taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES); + } + taosArrayPush(cTablePoints, point); + } + SArray** pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* cTablePoints = *pCTablePoints; + TAOS_SML_DATA_POINT * point = taosArrayGet(cTablePoints, 0); + int32_t numTags = taosArrayGetSize(point->schema->tags); + int32_t numCols = taosArrayGetSize(point->schema->fields); + char* stableName = point->stableName; + char* ctableName = point->childTableName; + char sql[TSDB_MAX_BINARY_LEN]; + getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN); + + size_t rows = taosArrayGetSize(cTablePoints); + SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); + SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); + + for (int i = 0; i < rows; ++i) { + point = taosArrayGet(cTablePoints, i); + + taosArraySetSize(tagBinds, numTags); + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); + TAOS_BIND* bind = taosArrayGet(tagBinds, idx); + bind->buffer_type = kv->type; + bind->length = (uintptr_t*)&kv->length; + bind->buffer = kv->value; + } + + SArray* colBinds = taosArrayInit(numCols, sizeof(TAOS_BIND)); + taosArraySetSize(colBinds, numCols); + for (int j = 0; jfieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + int32_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); + TAOS_BIND* bind = taosArrayGet(colBinds, idx); + bind->buffer_type = kv->type; + bind->length = (uintptr_t*)&kv->length; + bind->buffer = kv->value; + } + taosArrayPush(rowsBind, &colBinds); + } + + insertBatch(taos, sql, ctableName, tagBinds, rowsBind); + + pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } return 0; } -int32_t insertBatch(TAOS* taos, const char* sTableName, char* cTableName, SSchema* tagsSchema, int numTags, TAOS_BIND* tagBind, - SSchema* colsSchema, int numCols, TAOS_MULTI_BIND* colBind) { - TAOS_STMT* stmt = taos_stmt_init(taos); - char result[TSDB_MAX_BINARY_LEN] = {0}; - sprintf(result, "insert into ? using %s(", sTableName); - for (int i = 0; i < numTags; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", tagsSchema[i].name); +int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { + int32_t code = TSDB_CODE_SUCCESS; + SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray + SHashObj* sname2shema = taosHashInit(32, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + for (int i = 0; i < numPoint; ++i) { + TAOS_SML_DATA_POINT* point = &points[i]; + SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN); + SSmlSTableSchema* pStableSchema = NULL; + if (ppStableSchema) { + pStableSchema= *ppStableSchema; + } else { + SSmlSTableSchema schema; + size_t stableNameLen = strlen(point->stableName); + strncpy(schema.sTableName, point->stableName, stableNameLen); + schema.sTableName[stableNameLen] = '\0'; + schema.fields = taosArrayInit(64, sizeof(SSchema)); + schema.tags = taosArrayInit(8, sizeof(SSchema)); + schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + + pStableSchema = taosArrayPush(stableArray, &schema); + taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); + } + + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* tagKv = point->tags + j; + addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags); + } + + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* fieldKv = point->fields + j; + addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields); + } + + point->schema = pStableSchema; } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") tags ("); - for (int i = 0; i < numTags; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,"); + SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction)); + size_t numStable = taosArrayGetSize(stableArray); + for (int i = 0; i < numStable; ++i) { + SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); + SSmlSTableSchema dbSchema = {0}; + dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); + dbSchema.tags = taosArrayInit(8, sizeof(SSchema)); + dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { + SSchemaAction schemaAction = {0}; + schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; + memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo)); + memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); + schemaAction.createSTable.tags = pointSchema->tags; + schemaAction.createSTable.fields = pointSchema->fields; + taosArrayPush(schemaActions, &schemaAction); + }else if (code == TSDB_CODE_SUCCESS) { + size_t pointTagSize = taosArrayGetSize(pointSchema->tags); + size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); + + SHashObj* dbTagHash = dbSchema.tagHash; + SHashObj* dbFieldHash = dbSchema.fieldHash; + + for (int j = 0; j < pointTagSize; ++j) { + SSchema* pointTag = taosArrayGet(pointSchema->tags, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + taosArrayPush(schemaActions, &schemaAction); + } + } + + for (int j = 0; j < pointFieldSize; ++j) { + SSchema* pointCol = taosArrayGet(pointSchema->tags, j); + SSchemaAction schemaAction = {0}; + bool actionNeeded = false; + generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); + if (actionNeeded) { + taosArrayPush(schemaActions, &schemaAction); + } + } + } else { + return code; + } } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") ("); - for (int i = 0; i < numCols; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", colsSchema[i].name); + for (int i = 0; i < taosArrayGetSize(schemaActions); ++i) { + SSchemaAction* action = taosArrayGet(schemaActions, i); + applySchemaAction(taos, action); } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") values ("); - for (int i = 0; i < numCols; ++i) { - snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,"); - } - snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ")"); - - int32_t code = 0; - code = taos_stmt_prepare(stmt, result, strlen(result)); - - code = taos_stmt_set_tbname_tags(stmt, cTableName, tagBind); - code = taos_stmt_bind_param_batch(stmt, colBind); - code = taos_stmt_execute(stmt); + insertPoints(taos, points, numPoint); return code; } + + //todo: table/column length check //todo: type check //todo: taosmbs2ucs4 check From 2d0005d3f179b3a4f734fd75dcd104058537a3a6 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 11 Jul 2021 23:52:19 +0800 Subject: [PATCH 04/10] before getChildTableName and insertBatch --- src/client/src/tscParseLineProtocol.c | 758 +++++++++++++++----------- src/inc/taos.h | 2 + tests/examples/c/apitest.c | 15 + 3 files changed, 447 insertions(+), 328 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 2738fe6f7a..3a88d2e906 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -15,263 +15,6 @@ #include "tskiplist.h" #include "tscUtil.h" -typedef enum { - LP_ITEM_TAG, - LP_ITEM_FIELD -} LPItemKind; - -typedef struct { - SStrToken key; - SStrToken value; - - char name[TSDB_COL_NAME_LEN]; - int8_t type; - int16_t bytes; - - char* payload; -}SLPItem; - -typedef struct { - SStrToken measToken; - SStrToken tsToken; - - char sTableName[TSDB_TABLE_NAME_LEN]; - SArray* tags; - SArray* fields; - int64_t ts; - -} SLPPoint; - -typedef enum { - LP_MEASUREMENT, - LP_TAG_KEY, - LP_TAG_VALUE, - LP_FIELD_KEY, - LP_FIELD_VALUE -} LPPart; - -int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) { - for (int32_t i = start; i < s.n; ++i) { - if (s.z[i] == ',' || s.z[i] == ' ') { - *index = i; - return 0; - } - } - return -1; -} - -int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) { - for (int32_t i = start; i < s.n; ++i) { - if (s.z[i] == '=') { - *index = i; - return 0; - } - } - return -1; -} - -int32_t setPointMeasurement(SLPPoint* point, SStrToken token) { - point->measToken = token; - if (point->measToken.n < TSDB_TABLE_NAME_LEN) { - strncpy(point->sTableName, point->measToken.z, point->measToken.n); - point->sTableName[point->measToken.n] = '\0'; - } - return 0; -} - -int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) { - item->key = key; - if (item->key.n < TSDB_COL_NAME_LEN) { - strncpy(item->name, item->key.z, item->key.n); - item->name[item->key.n] = '\0'; - } - return 0; -} - -int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) { - item->value = value; - return 0; -} - -int32_t parseItemValue(SLPItem* item, LPItemKind kind) { - char* sv = item->value.z; - char* last = item->value.z + item->value.n - 1; - - if (isdigit(sv[0]) || sv[0] == '-') { - if (*last == 'i') { - item->type = TSDB_DATA_TYPE_BIGINT; - item->bytes = (int16_t)tDataTypes[item->type].bytes; - item->payload = malloc(item->bytes); - char* endptr = NULL; - *(item->payload) = strtoll(sv, &endptr, 10); - } else { - item->type = TSDB_DATA_TYPE_DOUBLE; - item->bytes = (int16_t)tDataTypes[item->type].bytes; - item->payload = malloc(item->bytes); - char* endptr = NULL; - *(item->payload) = strtold(sv, &endptr); - } - } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { - if (sv[0] == 'L') { - item->type = TSDB_DATA_TYPE_NCHAR; - uint32_t bytes = item->value.n - 3; -// uint32_t len = bytes; -// char* ucs = malloc(len); -// int32_t ncharBytes = 0; -// taosMbsToUcs4(sv+2, len, ucs, len, &ncharBytes); -// item->bytes = ncharBytes; -// item->payload = malloc(ncharBytes); -// memcpy(item->payload, ucs, ncharBytes); -// free(ucs); - item->bytes = bytes; - item->payload = malloc(bytes); - memcpy(item->payload, sv+1, bytes); - } else if (sv[0]=='"'){ - item->type = TSDB_DATA_TYPE_BINARY; - uint32_t bytes = item->value.n - 2; - item->bytes = bytes; - item->payload = malloc(bytes); - memcpy(item->payload, sv+1, bytes); - } - } else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') { - item->type = TSDB_DATA_TYPE_BOOL; - item->bytes = tDataTypes[item->type].bytes; - item->payload = malloc(tDataTypes[item->type].bytes); - *(item->payload) = tolower(sv[0])=='t' ? true : false; - } - return 0; -} - -int32_t compareLPItemKey(const void* p1, const void* p2) { - const SLPItem* t1 = p1; - const SLPItem* t2 = p2; - uint32_t min = (t1->key.n < t2->key.n) ? t1->key.n : t2->key.n; - int res = strncmp(t1->key.z, t2->key.z, min); - if (res != 0) { - return res; - } else { - return (int)(t1->key.n) - (int)(t2->key.n); - } -} - -int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) { - point->tsToken = tsToken; - return 0; -} - -int32_t parsePointTime(SLPPoint* point) { - if (point->tsToken.n <= 0) { - point->ts = taosGetTimestampNs(); - } else { - char* endptr = NULL; - point->ts = strtoll(point->tsToken.z, &endptr, 10); - } - return 0; -} - -int32_t tscParseLine(SStrToken line, SLPPoint* point) { - int32_t pos = 0; - - int32_t start = 0; - int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT); - if (err != 0) { - tscError("a"); - return err; - } - - SStrToken measurement = {.z = line.z+start, .n = pos-start}; - setPointMeasurement(point, measurement); - point->tags = taosArrayInit(64, sizeof(SLPItem)); - start = pos + 1; - while (line.z[start] == ',') { - SLPItem item; - - err = scanToEqual(line, start, &pos); - if (err != 0) { - tscError("b"); - goto error; - } - - SStrToken tagKey = {.z = line.z + start, .n = pos-start}; - setItemKey(&item, tagKey, LP_TAG_KEY); - - start = pos + 1; - err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE); - if (err != 0) { - tscError("c"); - goto error; - } - - SStrToken tagValue = {.z = line.z + start, .n = pos-start}; - setItemValue(&item, tagValue, LP_TAG_VALUE); - - parseItemValue(&item, LP_ITEM_TAG); - taosArrayPush(point->tags, &item); - - start = pos + 1; - } - - taosArraySort(point->tags, compareLPItemKey); - - point->fields = taosArrayInit(64, sizeof(SLPItem)); - do { - SLPItem item; - err = scanToEqual(line, start, &pos); - if (err != 0) { - goto error; - } - SStrToken fieldKey = {.z = line.z + start, .n = pos- start}; - setItemKey(&item, fieldKey, LP_FIELD_KEY); - - start = pos + 1; - err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE); - if (err != 0) { - goto error; - } - SStrToken fieldValue = {.z = line.z + start, .n = pos - start}; - setItemValue(&item, fieldValue, LP_TAG_VALUE); - - parseItemValue(&item, LP_ITEM_FIELD); - taosArrayPush(point->fields, &item); - - start = pos + 1; - } while (line.z[pos] == ','); - - taosArraySort(point->fields, compareLPItemKey); - - SStrToken tsToken = {.z = line.z+start, .n = line.n-start}; - setPointTimeStamp(point, tsToken); - parsePointTime(point); - - goto done; - -error: - // free array - return err; -done: - return 0; -} - - -int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { - for (int32_t i = 0; i < numLines; ++i) { - SStrToken tkLine = {.z = lines[i], .n = strlen(lines[i])+1}; - SLPPoint point; - tscParseLine(tkLine, &point); - taosArrayPush(points, &point); - } - return 0; -} - -TAOS_RES* taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { - SArray* points = taosArrayInit(numLines, sizeof(SLPPoint)); - tscParseLines(lines, numLines, points, NULL); - - - return NULL; -} -//================================================================================================= - typedef struct { char sTableName[TSDB_TABLE_NAME_LEN]; SHashObj* tagHash; @@ -305,6 +48,8 @@ typedef struct { SSmlSTableSchema* schema; } TAOS_SML_DATA_POINT; +//================================================================================================= + int compareSmlColKv(const void* p1, const void* p2) { TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; @@ -318,32 +63,6 @@ int compareSmlColKv(const void* p1, const void* p2) { } } -int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { - qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); - - SStringBuilder sb; memset(&sb, 0, sizeof(sb)); - taosStringBuilderAppendString(&sb, point->stableName); - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* tagKv = point->tags + j; - taosStringBuilderAppendChar(&sb, ','); - taosStringBuilderAppendString(&sb, tagKv->key); - taosStringBuilderAppendChar(&sb, '='); - taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); - } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); - MD5_CTX context; - MD5Init(&context); - MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); - MD5Final(&context); - *tableNameLen = snprintf(tableName, *tableNameLen, - "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], - context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); - return 0; -} - int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { int32_t code = 0; @@ -393,15 +112,14 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { tscFreeSqlObj(pSql); - uint32_t size = tscGetTableMetaMaxSize(); STableMeta* tableMeta = calloc(1, size); taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); - tstrncpy(schema->sTableName, tableName, strlen(tableName)); + tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); for (int i=0; itableInfo.numOfColumns; ++i) { SSchema field; - tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)); + tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1); field.type = tableMeta->schema[i].type; field.bytes = tableMeta->schema[i].bytes; SSchema* pField = taosArrayPush(schema->fields, &field); @@ -411,13 +129,13 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { for (int i=0; itableInfo.numOfTags; ++i) { int j = i + tableMeta->tableInfo.numOfColumns; SSchema field; - tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)); + tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1); field.type = tableMeta->schema[j].type; field.bytes = tableMeta->schema[j].bytes; SSchema* pField = taosArrayPush(schema->tags, &field); taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES); } - + free(tableMeta); tableMeta = NULL; return code; } @@ -586,6 +304,7 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { capacity-n, &outBytes); TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery code = taos_errno(res); + break; } case SCHEMA_ACTION_CHANGE_TAG_SIZE: { int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName); @@ -631,15 +350,46 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { return code; } +int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { + qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); + + SStringBuilder sb; memset(&sb, 0, sizeof(sb)); + taosStringBuilderAppendString(&sb, point->stableName); + for (int j = 0; j < point->tagNum; ++j) { + taosStringBuilderAppendChar(&sb, ','); + TAOS_SML_KV* tagKv = point->tags + j; + taosStringBuilderAppendString(&sb, tagKv->key); + taosStringBuilderAppendChar(&sb, '='); + taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); + } + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); + MD5_CTX context; + MD5Init(&context); + MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + MD5Final(&context); + *tableNameLen = snprintf(tableName, *tableNameLen, + "tbl%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], + context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], + context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + taosStringBuilderDestroy(&sb); + return 0; +} + int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsSchema, char* result, int16_t freeBytes) { size_t numTags = taosArrayGetSize(tagsSchema); size_t numCols = taosArrayGetSize(colsSchema); - sprintf(result, "insert into ? using %s(", sTableName); - for (int i = 0; i < numTags; ++i) { - SSchema* tagSchema = taosArrayGet(tagsSchema, i); - snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name); - } - snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") tags ("); + sprintf(result, "insert into ? using %s", sTableName); + +// snprintf(result+strlen(result), freeBytes-strlen(result), "("); +// for (int i = 0; i < numTags; ++i) { +// SSchema* tagSchema = taosArrayGet(tagsSchema, i); +// snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name); +// } +// snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")"); + + snprintf(result + strlen(result), freeBytes-strlen(result), " tags ("); for (int i = 0; i < numTags; ++i) { snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); @@ -661,17 +411,38 @@ int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsS int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* rowsBind) { TAOS_STMT* stmt = taos_stmt_init(taos); - taos_stmt_prepare(stmt, sql, strlen(sql)); - - taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); - size_t rows = taosArrayGetSize(rowsBind); - for (int32_t i = 0; i < rows; ++i) { - TAOS_BIND* colBind = taosArrayGetP(rowsBind, i); - taos_stmt_bind_param(stmt, colBind); - taos_stmt_add_batch(stmt); + int32_t code; + code = taos_stmt_prepare(stmt, sql, strlen(sql)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; } - taos_stmt_execute(stmt); + code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } + size_t rows = taosArrayGetSize(rowsBind); + for (int32_t i = 0; i < rows; ++i) { + SArray* colBind = taosArrayGetP(rowsBind, i); + code = taos_stmt_bind_param(stmt, TARRAY_GET_START(colBind)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } + code = taos_stmt_add_batch(stmt); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } TAOS_RES* res = taos_stmt_use_result(stmt); return taos_errno(res); } @@ -682,7 +453,7 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) TAOS_SML_DATA_POINT * point = points + i; if (!point->childTableName) { char childTableName[TSDB_TABLE_NAME_LEN]; - int32_t tableNameLen; + int32_t tableNameLen = TSDB_TABLE_NAME_LEN; getChildTableName(point, childTableName, &tableNameLen); point->childTableName = calloc(1, tableNameLen+1); strncpy(point->childTableName, childTableName, tableNameLen); @@ -696,50 +467,62 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) cTablePoints = taosArrayInit(64, sizeof(point)); taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES); } - taosArrayPush(cTablePoints, point); + taosArrayPush(cTablePoints, &point); } + int isNullColBind = TSDB_TRUE; SArray** pCTablePoints = taosHashIterate(cname2points, NULL); while (pCTablePoints) { SArray* cTablePoints = *pCTablePoints; - TAOS_SML_DATA_POINT * point = taosArrayGet(cTablePoints, 0); + + TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); int32_t numTags = taosArrayGetSize(point->schema->tags); int32_t numCols = taosArrayGetSize(point->schema->fields); char* stableName = point->stableName; char* ctableName = point->childTableName; - char sql[TSDB_MAX_BINARY_LEN]; - getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN); + + SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); + taosArraySetSize(tagBinds, numTags); + for (int j = 0; j < numTags; ++j) { + TAOS_BIND* bind = taosArrayGet(tagBinds, j); + bind->is_null = &isNullColBind; + } + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); + TAOS_BIND* bind = taosArrayGet(tagBinds, idx); + bind->buffer_type = kv->type; + bind->length = (uintptr_t*)&kv->length; + bind->buffer = kv->value; + bind->is_null = NULL; + } size_t rows = taosArrayGetSize(cTablePoints); SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); - SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); for (int i = 0; i < rows; ++i) { - point = taosArrayGet(cTablePoints, i); - - taosArraySetSize(tagBinds, numTags); - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* kv = point->tags + j; - int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); - TAOS_BIND* bind = taosArrayGet(tagBinds, idx); - bind->buffer_type = kv->type; - bind->length = (uintptr_t*)&kv->length; - bind->buffer = kv->value; - } + point = taosArrayGetP(cTablePoints, i); SArray* colBinds = taosArrayInit(numCols, sizeof(TAOS_BIND)); taosArraySetSize(colBinds, numCols); - for (int j = 0; jfieldNum; ++j) { + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = taosArrayGet(colBinds, j); + bind->is_null = &isNullColBind; + } + for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* kv = point->fields + j; int32_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); TAOS_BIND* bind = taosArrayGet(colBinds, idx); bind->buffer_type = kv->type; bind->length = (uintptr_t*)&kv->length; bind->buffer = kv->value; + bind->is_null = NULL; } taosArrayPush(rowsBind, &colBinds); } + char sql[TSDB_MAX_BINARY_LEN]; + getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN); insertBatch(taos, sql, ctableName, tagBinds, rowsBind); pCTablePoints = taosHashIterate(cname2points, pCTablePoints); @@ -747,7 +530,6 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) return 0; } - int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { int32_t code = TSDB_CODE_SUCCESS; SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray @@ -792,11 +574,13 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { for (int i = 0; i < numStable; ++i) { SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); SSmlSTableSchema dbSchema = {0}; - dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); dbSchema.tags = taosArrayInit(8, sizeof(SSchema)); + dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { SSchemaAction schemaAction = {0}; schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; @@ -813,19 +597,23 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { SHashObj* dbFieldHash = dbSchema.fieldHash; for (int j = 0; j < pointTagSize; ++j) { - SSchema* pointTag = taosArrayGet(pointSchema->tags, j); + SSchema* pointTag = taosArrayGet(pointSchema->tags, j); SSchemaAction schemaAction = {0}; - bool actionNeeded = false; + bool actionNeeded = false; generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { taosArrayPush(schemaActions, &schemaAction); } } - for (int j = 0; j < pointFieldSize; ++j) { - SSchema* pointCol = taosArrayGet(pointSchema->tags, j); + SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0); + SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0); + memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN); + + for (int j = 1; j < pointFieldSize; ++j) { + SSchema* pointCol = taosArrayGet(pointSchema->fields, j); SSchemaAction schemaAction = {0}; - bool actionNeeded = false; + bool actionNeeded = false; generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { taosArrayPush(schemaActions, &schemaAction); @@ -849,3 +637,317 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { //todo: table/column length check //todo: type check //todo: taosmbs2ucs4 check + +//========================================================================= + +typedef enum { + LP_ITEM_TAG, + LP_ITEM_FIELD +} LPItemKind; + +typedef struct { + SStrToken keyToken; + SStrToken valueToken; + + char key[TSDB_COL_NAME_LEN]; + int8_t type; + int16_t length; + + char* value; +}SLPItem; + +typedef struct { + SStrToken measToken; + SStrToken tsToken; + + char sTableName[TSDB_TABLE_NAME_LEN]; + SArray* tags; + SArray* fields; + int64_t ts; + +} SLPPoint; + +typedef enum { + LP_MEASUREMENT, + LP_TAG_KEY, + LP_TAG_VALUE, + LP_FIELD_KEY, + LP_FIELD_VALUE +} LPPart; + +int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == ',' || s.z[i] == ' ') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == '=') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t setPointMeasurement(SLPPoint* point, SStrToken token) { + point->measToken = token; + if (point->measToken.n < TSDB_TABLE_NAME_LEN) { + strncpy(point->sTableName, point->measToken.z, point->measToken.n); + point->sTableName[point->measToken.n] = '\0'; + } + return 0; +} + +int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) { + item->keyToken = key; + if (item->keyToken.n < TSDB_COL_NAME_LEN) { + strncpy(item->key, item->keyToken.z, item->keyToken.n); + item->key[item->keyToken.n] = '\0'; + } + return 0; +} + +int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) { + item->valueToken = value; + return 0; +} + +int32_t parseItemValue(SLPItem* item, LPItemKind kind) { + char* sv = item->valueToken.z; + char* last = item->valueToken.z + item->valueToken.n - 1; + + if (isdigit(sv[0]) || sv[0] == '-') { + if (*last == 'i') { + item->type = TSDB_DATA_TYPE_BIGINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(item->value) = strtoll(sv, &endptr, 10); + } else { + item->type = TSDB_DATA_TYPE_DOUBLE; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(item->value) = strtold(sv, &endptr); + } + } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { + if (sv[0] == 'L') { + item->type = TSDB_DATA_TYPE_NCHAR; + uint32_t bytes = item->valueToken.n - 3; + item->length = bytes; + item->value = malloc(bytes); + memcpy(item->value, sv+1, bytes); + } else if (sv[0]=='"'){ + item->type = TSDB_DATA_TYPE_BINARY; + uint32_t bytes = item->valueToken.n - 2; + item->length = bytes; + item->value = malloc(bytes); + memcpy(item->value, sv+1, bytes); + } + } else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') { + item->type = TSDB_DATA_TYPE_BOOL; + item->length = tDataTypes[item->type].bytes; + item->value = malloc(tDataTypes[item->type].bytes); + *(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE; + } + return 0; +} + +int32_t compareLPItemKey(const void* p1, const void* p2) { + const SLPItem* t1 = p1; + const SLPItem* t2 = p2; + uint32_t min = (t1->keyToken.n < t2->keyToken.n) ? t1->keyToken.n : t2->keyToken.n; + int res = strncmp(t1->keyToken.z, t2->keyToken.z, min); + if (res != 0) { + return res; + } else { + return (int)(t1->keyToken.n) - (int)(t2->keyToken.n); + } +} + +int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) { + point->tsToken = tsToken; + return 0; +} + +int32_t parsePointTime(SLPPoint* point) { + if (point->tsToken.n <= 0) { + point->ts = taosGetTimestampNs(); + } else { + char* endptr = NULL; + point->ts = strtoll(point->tsToken.z, &endptr, 10); + } + return 0; +} + +int32_t tscParseLine(SStrToken line, SLPPoint* point) { + int32_t pos = 0; + + int32_t start = 0; + int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT); + if (err != 0) { + tscError("a"); + return err; + } + + SStrToken measurement = {.z = line.z+start, .n = pos-start}; + setPointMeasurement(point, measurement); + point->tags = taosArrayInit(64, sizeof(SLPItem)); + start = pos; + while (line.z[start] == ',') { + SLPItem item; + + start++; + err = scanToEqual(line, start, &pos); + if (err != 0) { + tscError("b"); + goto error; + } + + SStrToken tagKey = {.z = line.z + start, .n = pos-start}; + setItemKey(&item, tagKey, LP_TAG_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE); + if (err != 0) { + tscError("c"); + goto error; + } + + SStrToken tagValue = {.z = line.z + start, .n = pos-start}; + setItemValue(&item, tagValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_TAG); + taosArrayPush(point->tags, &item); + + start = pos; + } + + taosArraySort(point->tags, compareLPItemKey); + + point->fields = taosArrayInit(64, sizeof(SLPItem)); + + start++; + do { + SLPItem item; + + err = scanToEqual(line, start, &pos); + if (err != 0) { + goto error; + } + SStrToken fieldKey = {.z = line.z + start, .n = pos- start}; + setItemKey(&item, fieldKey, LP_FIELD_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE); + if (err != 0) { + goto error; + } + SStrToken fieldValue = {.z = line.z + start, .n = pos - start}; + setItemValue(&item, fieldValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_FIELD); + taosArrayPush(point->fields, &item); + + start = pos + 1; + } while (line.z[pos] == ','); + + taosArraySort(point->fields, compareLPItemKey); + + SStrToken tsToken = {.z = line.z+start, .n = line.n-start}; + setPointTimeStamp(point, tsToken); + parsePointTime(point); + + goto done; + + error: + // free array + return err; + done: + return 0; +} + + +int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { + for (int32_t i = 0; i < numLines; ++i) { + SStrToken tkLine = {.z = lines[i], .n = strlen(lines[i])+1}; + SLPPoint point; + tscParseLine(tkLine, &point); + taosArrayPush(points, &point); + } + return 0; +} + +int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { + SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint)); + tscParseLines(lines, numLines, lpPoints, NULL); + + size_t numPoints = taosArrayGetSize(lpPoints); + TAOS_SML_DATA_POINT* points = calloc(numPoints, sizeof(TAOS_SML_DATA_POINT)); + for (int i = 0; i < numPoints; ++i) { + SLPPoint* lpPoint = taosArrayGet(lpPoints, i); + TAOS_SML_DATA_POINT* point = points+i; + point->stableName = calloc(1, strlen(lpPoint->sTableName)+1); + strncpy(point->stableName, lpPoint->sTableName, strlen(lpPoint->sTableName)); + point->stableName[strlen(lpPoint->sTableName)] = '\0'; + + size_t lpTagSize = taosArrayGetSize(lpPoint->tags); + point->tags = calloc(lpTagSize, sizeof(TAOS_SML_KV)); + point->tagNum = lpTagSize; + for (int j=0; jtags, j); + TAOS_SML_KV* tagKv = point->tags + j; + + size_t kenLen = strlen(lpTag->key); + tagKv->key = calloc(1, kenLen+1); + strncpy(tagKv->key, lpTag->key, kenLen); + tagKv->key[kenLen] = '\0'; + + tagKv->type = lpTag->type; + tagKv->length = lpTag->length; + tagKv->value = malloc(tagKv->length); + memcpy(tagKv->value, lpTag->value, tagKv->length); + } + + size_t lpFieldsSize = taosArrayGetSize(lpPoint->fields); + point->fields = calloc(lpFieldsSize + 1, sizeof(TAOS_SML_KV)); + point->fieldNum = lpFieldsSize + 1; + + TAOS_SML_KV* tsField = point->fields + 0; + char tsKey[256]; + snprintf(tsKey, 256, "_%s_ts", point->stableName); + size_t tsKeyLen = strlen(tsKey); + tsField->key = calloc(1, tsKeyLen+1); + strncpy(tsField->key, tsKey, tsKeyLen); + tsField->key[tsKeyLen] = '\0'; + tsField->type = TSDB_DATA_TYPE_TIMESTAMP; + tsField->length = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; + tsField->value = malloc(tsField->length); + memcpy(tsField->value, &(lpPoint->ts), tsField->length); + + for (int j=0; jfields, j); + TAOS_SML_KV* fieldKv = point->fields + j + 1; + + size_t kenLen = strlen(lpField->key); + fieldKv->key = calloc(1, kenLen+1); + strncpy(fieldKv->key, lpField->key, kenLen); + fieldKv->key[kenLen] = '\0'; + + fieldKv->type = lpField->type; + fieldKv->length = lpField->length; + fieldKv->value = malloc(fieldKv->length); + memcpy(fieldKv->value, lpField->value, fieldKv->length); + } + } + + taos_sml_insert(taos, points, numPoints); + return 0; +} + diff --git a/src/inc/taos.h b/src/inc/taos.h index 9f72945ef0..ca18c4fb93 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -169,6 +169,8 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); +DLL_EXPORT int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines); + #ifdef __cplusplus } #endif diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 0f24df0f47..dc77677774 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -949,6 +949,15 @@ void verify_stream(TAOS* taos) { taos_close_stream(strm); } +int32_t verify_schema_less(TAOS* taos) { + prepare_data(taos); + char* lines[] = { + "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639162922" + }; + int code = taos_insert_by_lines(taos, lines , 1); + return code; +} + int main(int argc, char *argv[]) { const char* host = "127.0.0.1"; const char* user = "root"; @@ -967,6 +976,12 @@ int main(int argc, char *argv[]) { info = taos_get_client_info(taos); printf("client info: %s\n", info); + printf("************ verify query *************\n"); + int code = verify_schema_less(taos); + if (code == 0) { + return code; + } + printf("************ verify query *************\n"); verify_query(taos); From cb5a0012f411675b7d5a40d0ec6cd88361c157ec Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 12 Jul 2021 00:39:16 +0800 Subject: [PATCH 05/10] before time precision and create table with tags instead of autocreate --- src/client/src/tscParseLineProtocol.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 3a88d2e906..4b9191a91f 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -492,7 +492,8 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); TAOS_BIND* bind = taosArrayGet(tagBinds, idx); bind->buffer_type = kv->type; - bind->length = (uintptr_t*)&kv->length; + bind->length = malloc(sizeof(uintptr_t*)); + *bind->length = kv->length; bind->buffer = kv->value; bind->is_null = NULL; } @@ -514,7 +515,8 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) int32_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); TAOS_BIND* bind = taosArrayGet(colBinds, idx); bind->buffer_type = kv->type; - bind->length = (uintptr_t*)&kv->length; + bind->length = malloc(sizeof(uintptr_t*)); + *bind->length = kv->length; bind->buffer = kv->value; bind->is_null = NULL; } From 3d25ecdf29c3d912bb91e81f47c72b97fb737cea Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 12 Jul 2021 08:38:51 +0800 Subject: [PATCH 06/10] can add two lines of one supertable/diffrent subtable --- src/client/src/tscParseLineProtocol.c | 12 ++++++------ tests/examples/c/apitest.c | 8 +++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 4b9191a91f..1fde741c2b 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -540,13 +540,13 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { for (int i = 0; i < numPoint; ++i) { TAOS_SML_DATA_POINT* point = &points[i]; - SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN); + size_t stableNameLen = strlen(point->stableName); + SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen); SSmlSTableSchema* pStableSchema = NULL; if (ppStableSchema) { pStableSchema= *ppStableSchema; } else { SSmlSTableSchema schema; - size_t stableNameLen = strlen(point->stableName); strncpy(schema.sTableName, point->stableName, stableNameLen); schema.sTableName[stableNameLen] = '\0'; schema.fields = taosArrayInit(64, sizeof(SSchema)); @@ -730,13 +730,13 @@ int32_t parseItemValue(SLPItem* item, LPItemKind kind) { item->length = (int16_t)tDataTypes[item->type].bytes; item->value = malloc(item->length); char* endptr = NULL; - *(item->value) = strtoll(sv, &endptr, 10); + *(int64_t*)(item->value) = strtoll(sv, &endptr, 10); } else { item->type = TSDB_DATA_TYPE_DOUBLE; item->length = (int16_t)tDataTypes[item->type].bytes; item->value = malloc(item->length); char* endptr = NULL; - *(item->value) = strtold(sv, &endptr); + *(double*)(item->value) = strtold(sv, &endptr); } } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { if (sv[0] == 'L') { @@ -744,7 +744,7 @@ int32_t parseItemValue(SLPItem* item, LPItemKind kind) { uint32_t bytes = item->valueToken.n - 3; item->length = bytes; item->value = malloc(bytes); - memcpy(item->value, sv+1, bytes); + memcpy(item->value, sv+2, bytes); } else if (sv[0]=='"'){ item->type = TSDB_DATA_TYPE_BINARY; uint32_t bytes = item->valueToken.n - 2; @@ -756,7 +756,7 @@ int32_t parseItemValue(SLPItem* item, LPItemKind kind) { item->type = TSDB_DATA_TYPE_BOOL; item->length = tDataTypes[item->type].bytes; item->value = malloc(tDataTypes[item->type].bytes); - *(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE; + *(uint8_t*)(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE; } return 0; } diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index dc77677774..04a03df6fa 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -952,9 +952,11 @@ void verify_stream(TAOS* taos) { int32_t verify_schema_less(TAOS* taos) { prepare_data(taos); char* lines[] = { - "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639162922" + "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639", + "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833640", + "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642" }; - int code = taos_insert_by_lines(taos, lines , 1); + int code = taos_insert_by_lines(taos, lines , 2); return code; } @@ -976,7 +978,7 @@ int main(int argc, char *argv[]) { info = taos_get_client_info(taos); printf("client info: %s\n", info); - printf("************ verify query *************\n"); + printf("************ verify shemaless *************\n"); int code = verify_schema_less(taos); if (code == 0) { return code; From 4a75ebe4afacbcce488ec35fbdefc5e900ca781f Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 12 Jul 2021 09:17:15 +0800 Subject: [PATCH 07/10] create child table with tags --- src/client/src/tscParseLineProtocol.c | 87 +++++++++++++++++---------- tests/examples/c/apitest.c | 2 +- 2 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 1fde741c2b..dd0e64ba84 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -377,39 +377,26 @@ int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tabl return 0; } -int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsSchema, char* result, int16_t freeBytes) { +int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) { size_t numTags = taosArrayGetSize(tagsSchema); - size_t numCols = taosArrayGetSize(colsSchema); - sprintf(result, "insert into ? using %s", sTableName); + char sql[TSDB_MAX_BINARY_LEN] = {0}; + int freeBytes = TSDB_MAX_BINARY_LEN; + sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName); -// snprintf(result+strlen(result), freeBytes-strlen(result), "("); -// for (int i = 0; i < numTags; ++i) { -// SSchema* tagSchema = taosArrayGet(tagsSchema, i); -// snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name); -// } -// snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")"); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "("); + for (int i = 0; i < numTags; ++i) { + SSchema* tagSchema = taosArrayGet(tagsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name); + } + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); - snprintf(result + strlen(result), freeBytes-strlen(result), " tags ("); + snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags ("); for (int i = 0; i < numTags; ++i) { - snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); } - snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") ("); + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); - for (int i = 0; i < numCols; ++i) { - SSchema* colSchema = taosArrayGet(colsSchema, i); - snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", colSchema->name); - } - snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") values ("); - - for (int i = 0; i < numCols; ++i) { - snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); - } - snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")"); - return 0; -} - -int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* rowsBind) { TAOS_STMT* stmt = taos_stmt_init(taos); int32_t code; code = taos_stmt_prepare(stmt, sql, strlen(sql)); @@ -418,7 +405,47 @@ int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, S return code; } - code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); + code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } + TAOS_RES* res = taos_stmt_use_result(stmt); + return taos_errno(res); +} + +int32_t insertBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) { + size_t numCols = taosArrayGetSize(colsSchema); + char sql[TSDB_MAX_BINARY_LEN]; + int32_t freeBytes = TSDB_MAX_BINARY_LEN; + sprintf(sql, "insert into ? ("); + + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ("); + + for (int i = 0; i < numCols; ++i) { + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); + + TAOS_STMT* stmt = taos_stmt_init(taos); + int32_t code; + code = taos_stmt_prepare(stmt, sql, strlen(sql)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } + + code = taos_stmt_set_tbname(stmt, cTableName); if (code != 0) { printf("%s", taos_stmt_errstr(stmt)); return code; @@ -478,7 +505,6 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); int32_t numTags = taosArrayGetSize(point->schema->tags); int32_t numCols = taosArrayGetSize(point->schema->fields); - char* stableName = point->stableName; char* ctableName = point->childTableName; SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); @@ -523,9 +549,8 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) taosArrayPush(rowsBind, &colBinds); } - char sql[TSDB_MAX_BINARY_LEN]; - getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN); - insertBatch(taos, sql, ctableName, tagBinds, rowsBind); + creatChildTableIfNotExists(taos, point->childTableName, point->stableName, point->schema->tags, tagBinds); + insertBatch(taos, ctableName, point->schema->fields, rowsBind); pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 04a03df6fa..5272dafaf1 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -956,7 +956,7 @@ int32_t verify_schema_less(TAOS* taos) { "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833640", "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642" }; - int code = taos_insert_by_lines(taos, lines , 2); + int code = taos_insert_by_lines(taos, lines , 3); return code; } From 62e4b0b599ba7a066cd6a0be435c60bac3a3cabd Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 12 Jul 2021 10:16:30 +0800 Subject: [PATCH 08/10] add timestamp precision support --- src/client/src/tscParseLineProtocol.c | 39 ++++++++++++++++++++------- tests/examples/c/apitest.c | 18 +++++++++---- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index dd0e64ba84..c0cc2ea3ae 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -21,6 +21,7 @@ typedef struct { SHashObj* fieldHash; SArray* tags; //SArray SArray* fields; //SArray + uint8_t precision; } SSmlSTableSchema; typedef struct { @@ -117,6 +118,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); + schema->precision = tableMeta->tableInfo.precision; for (int i=0; itableInfo.numOfColumns; ++i) { SSchema field; tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1); @@ -486,6 +488,25 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) strncpy(point->childTableName, childTableName, tableNameLen); point->childTableName[tableNameLen] = '\0'; } + + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { + int64_t ts = *(int64_t*)(kv->value); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + *(int64_t*)(kv->value) = ts; + } + } + + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { + int64_t ts = *(int64_t*)(kv->value); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + *(int64_t*)(kv->value) = ts; + } + } + SArray* cTablePoints = NULL; SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName)); if (pCTablePoints) { @@ -596,7 +617,6 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { point->schema = pStableSchema; } - SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction)); size_t numStable = taosArrayGetSize(stableArray); for (int i = 0; i < numStable; ++i) { SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); @@ -615,8 +635,10 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN); schemaAction.createSTable.tags = pointSchema->tags; schemaAction.createSTable.fields = pointSchema->fields; - taosArrayPush(schemaActions, &schemaAction); - }else if (code == TSDB_CODE_SUCCESS) { + applySchemaAction(taos, &schemaAction); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + pointSchema->precision = dbSchema.precision; + } else if (code == TSDB_CODE_SUCCESS) { size_t pointTagSize = taosArrayGetSize(pointSchema->tags); size_t pointFieldSize = taosArrayGetSize(pointSchema->fields); @@ -629,7 +651,7 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { bool actionNeeded = false; generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { - taosArrayPush(schemaActions, &schemaAction); + applySchemaAction(taos, &schemaAction); } } @@ -643,19 +665,16 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { bool actionNeeded = false; generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { - taosArrayPush(schemaActions, &schemaAction); + applySchemaAction(taos, &schemaAction); } } + + pointSchema->precision = dbSchema.precision; } else { return code; } } - for (int i = 0; i < taosArrayGetSize(schemaActions); ++i) { - SSchemaAction* action = taosArrayGet(schemaActions, i); - applySchemaAction(taos, action); - } - insertPoints(taos, points, numPoint); return code; } diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 5272dafaf1..656d5fd217 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -12,7 +12,7 @@ static void prepare_data(TAOS* taos) { result = taos_query(taos, "drop database if exists test;"); taos_free_result(result); usleep(100000); - result = taos_query(taos, "create database test;"); + result = taos_query(taos, "create database test precision 'us';"); taos_free_result(result); usleep(100000); taos_select_db(taos, "test"); @@ -950,11 +950,19 @@ void verify_stream(TAOS* taos) { } int32_t verify_schema_less(TAOS* taos) { - prepare_data(taos); + TAOS_RES *result; + result = taos_query(taos, "drop database if exists test;"); + taos_free_result(result); + usleep(100000); + result = taos_query(taos, "create database test precision 'us';"); + taos_free_result(result); + usleep(100000); + taos_select_db(taos, "test"); + char* lines[] = { - "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639", - "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833640", - "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642" + "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000", + "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833640000000", + "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000" }; int code = taos_insert_by_lines(taos, lines , 3); return code; From d8c57f0be0bb51f2a81157f12d8885f5afa70f23 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 12 Jul 2021 10:30:54 +0800 Subject: [PATCH 09/10] fixing exsiting stable --- src/client/src/tscParseLineProtocol.c | 2 +- tests/examples/c/apitest.c | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index c0cc2ea3ae..61f9cd9e76 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -225,7 +225,7 @@ int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* a int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool isTag, char sTableName[], SSchemaAction* action, bool* actionNeeded) { SSchema** ppDbAttr = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name)); - if (*ppDbAttr) { + if (ppDbAttr) { SSchema* dbAttr = *ppDbAttr; if (pointColField->type != dbAttr->type) { //todo error diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 656d5fd217..5cb3f5762a 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -958,13 +958,15 @@ int32_t verify_schema_less(TAOS* taos) { taos_free_result(result); usleep(100000); taos_select_db(taos, "test"); + result = taos_query(taos, "create stable ste(ts timestamp, f int) tags(t1 bigint)"); char* lines[] = { "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000", "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833640000000", - "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000" + "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000", + "ste,t2=5,t3=L\"ste\" c1=true,c2=4 1626056811823316532" }; - int code = taos_insert_by_lines(taos, lines , 3); + int code = taos_insert_by_lines(taos, lines , 4); return code; } From 958b89b4630c11c3c2c360d7d757c82213862461 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 12 Jul 2021 11:27:34 +0800 Subject: [PATCH 10/10] before tsim/log/memory leak handling --- tests/examples/c/apitest.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 5cb3f5762a..409140e0f2 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -957,16 +957,20 @@ int32_t verify_schema_less(TAOS* taos) { result = taos_query(taos, "create database test precision 'us';"); taos_free_result(result); usleep(100000); + taos_select_db(taos, "test"); result = taos_query(taos, "create stable ste(ts timestamp, f int) tags(t1 bigint)"); + taos_free_result(result); + usleep(100000); char* lines[] = { "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000", "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833640000000", "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000", - "ste,t2=5,t3=L\"ste\" c1=true,c2=4 1626056811823316532" + "ste,t2=5,t3=L\"ste\" c1=true,c2=4,c3=\"iam\" 1626056811823316532", + "ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532" }; - int code = taos_insert_by_lines(taos, lines , 4); + int code = taos_insert_by_lines(taos, lines , 5); return code; }