Merge pull request #12285 from taosdata/feature/TD-14761
feat: schemaless write
This commit is contained in:
commit
74b125924b
|
@ -17,6 +17,7 @@
|
|||
#define _TD_COMMON_NAME_H_
|
||||
|
||||
#include "tdef.h"
|
||||
#include "tarray.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -62,6 +63,18 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId);
|
|||
|
||||
bool tNameDBNameEqual(SName* left, SName* right);
|
||||
|
||||
typedef struct {
|
||||
// input
|
||||
SArray *tags; // element is SSmlKV
|
||||
const char *sTableName; // super table name
|
||||
uint8_t sTableNameLen; // the length of super table name
|
||||
|
||||
// output
|
||||
char *childTableName; // must have size of TSDB_TABLE_NAME_LEN;
|
||||
uint64_t uid; // child table uid, may be useful
|
||||
} RandTableName;
|
||||
|
||||
void buildChildTableName(RandTableName *rName);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include "tcommon.h"
|
||||
#include "catalog.h"
|
||||
#include "clientInt.h"
|
||||
#include "tname.h"
|
||||
//=================================================================================================
|
||||
|
||||
#define SPACE ' '
|
||||
|
@ -97,6 +98,21 @@ typedef struct {
|
|||
char *buf;
|
||||
} SSmlMsgBuf;
|
||||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
int32_t lineNum;
|
||||
|
||||
int32_t numOfSTables;
|
||||
int32_t numOfCTables;
|
||||
int32_t numOfCreateSTables;
|
||||
|
||||
int64_t parseTime;
|
||||
int64_t schemaTime;
|
||||
int64_t insertBindTime;
|
||||
int64_t insertRpcTime;
|
||||
int64_t endTime;
|
||||
} SSmlCostInfo;
|
||||
|
||||
typedef struct {
|
||||
uint64_t id;
|
||||
|
||||
|
@ -114,6 +130,7 @@ typedef struct {
|
|||
SRequestObj *pRequest;
|
||||
SQuery *pQuery;
|
||||
|
||||
SSmlCostInfo cost;
|
||||
int32_t affectedRows;
|
||||
SSmlMsgBuf msgBuf;
|
||||
SHashObj *dumplicateKey; // for dumplicate key
|
||||
|
@ -147,45 +164,6 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const
|
|||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
}
|
||||
|
||||
static int smlCompareKv(const void* p1, const void* p2) {
|
||||
SSmlKv* kv1 = *(SSmlKv**)p1;
|
||||
SSmlKv* kv2 = *(SSmlKv**)p2;
|
||||
int32_t kvLen1 = kv1->keyLen;
|
||||
int32_t kvLen2 = kv2->keyLen;
|
||||
int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
|
||||
if (res != 0) {
|
||||
return res;
|
||||
} else {
|
||||
return kvLen1-kvLen2;
|
||||
}
|
||||
}
|
||||
|
||||
static void smlBuildChildTableName(SSmlTableInfo *tags) {
|
||||
int32_t size = taosArrayGetSize(tags->tags);
|
||||
ASSERT(size > 0);
|
||||
taosArraySort(tags->tags, smlCompareKv);
|
||||
|
||||
SStringBuilder sb = {0};
|
||||
taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen);
|
||||
for (int j = 0; j < size; ++j) {
|
||||
SSmlKv *tagKv = taosArrayGetP(tags->tags, j);
|
||||
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
|
||||
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
|
||||
}
|
||||
size_t len = 0;
|
||||
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
|
||||
T_MD5_CTX context;
|
||||
tMD5Init(&context);
|
||||
tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
|
||||
tMD5Final(&context);
|
||||
uint64_t digest1 = *(uint64_t*)(context.digest);
|
||||
//uint64_t digest2 = *(uint64_t*)(context.digest + 8);
|
||||
//snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
|
||||
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1);
|
||||
taosStringBuilderDestroy(&sb);
|
||||
tags->uid = digest1;
|
||||
}
|
||||
|
||||
static int32_t smlGenerateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
|
||||
SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) {
|
||||
// char fieldName[TSDB_COL_NAME_LEN] = {0};
|
||||
|
@ -444,6 +422,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
|||
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, schemaAction.createSTable.sTableName);
|
||||
return code;
|
||||
}
|
||||
info->cost.numOfCreateSTables++;
|
||||
}else if (code == TSDB_CODE_SUCCESS) {
|
||||
} else {
|
||||
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
|
||||
|
@ -926,20 +905,6 @@ static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlHandle* info) {
|
||||
char *val = NULL;
|
||||
val = taosHashGet(pHash, key, strlen(key));
|
||||
if (val) {
|
||||
uError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
|
||||
return true;
|
||||
}
|
||||
|
||||
uint8_t dummy_val = 0;
|
||||
taosHashPut(pHash, key, strlen(key), &dummy_val, sizeof(uint8_t));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){
|
||||
if(!sql) return TSDB_CODE_SML_INVALID_DATA;
|
||||
while (*sql != '\0') { // jump the space at the begining
|
||||
|
@ -1546,8 +1511,10 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
|
|||
|
||||
tinfo->sTableName = elements.measure;
|
||||
tinfo->sTableNameLen = elements.measureLen;
|
||||
smlBuildChildTableName(tinfo);
|
||||
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName);
|
||||
RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=tinfo->sTableNameLen,
|
||||
.childTableName=tinfo->childTableName};
|
||||
buildChildTableName(&rName);
|
||||
tinfo->uid = rName.uid;
|
||||
|
||||
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
|
||||
if(tableMeta){ // update meta
|
||||
|
@ -1604,7 +1571,7 @@ static void smlDestroyInfo(SSmlHandle* info){
|
|||
|
||||
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSmlHandle* info = taosMemoryMalloc(sizeof(SSmlHandle));
|
||||
SSmlHandle* info = taosMemoryCalloc(1, sizeof(SSmlHandle));
|
||||
if (NULL == info) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1699,12 +1666,23 @@ static int32_t smlInsertData(SSmlHandle* info) {
|
|||
}
|
||||
|
||||
smlBuildOutput(info->exec, info->pVgHash);
|
||||
info->cost.insertRpcTime = taosGetTimestampUs();
|
||||
|
||||
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
|
||||
|
||||
info->affectedRows = taos_affected_rows(info->pRequest);
|
||||
return info->pRequest->code;
|
||||
}
|
||||
|
||||
static void smlPrintStatisticInfo(SSmlHandle *info){
|
||||
uError("SML:0x%"PRIx64" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
|
||||
parse cost:%"PRId64",schema cost:%"PRId64",bind cost:%"PRId64",rpc cost:%"PRId64",total cost:%"PRId64"", info->id, info->cost.code,
|
||||
info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables,
|
||||
info->cost.schemaTime-info->cost.parseTime, info->cost.insertBindTime-info->cost.schemaTime,
|
||||
info->cost.insertRpcTime-info->cost.insertBindTime, info->cost.endTime-info->cost.insertRpcTime,
|
||||
info->cost.endTime-info->cost.parseTime);
|
||||
}
|
||||
|
||||
static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -1714,6 +1692,7 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
|
|||
goto cleanup;
|
||||
}
|
||||
|
||||
info->cost.parseTime = taosGetTimestampUs();
|
||||
for (int32_t i = 0; i < numLines; ++i) {
|
||||
code = smlParseLine(info, lines[i]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1721,24 +1700,29 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
|
|||
goto cleanup;
|
||||
}
|
||||
}
|
||||
uDebug("SML:0x%"PRIx64" smlInsertLines parse success. tables %d", info->id, taosHashGetSize(info->childTables));
|
||||
uDebug("SML:0x%"PRIx64" smlInsertLines parse success. super tables %d", info->id, taosHashGetSize(info->superTables));
|
||||
|
||||
info->cost.lineNum = numLines;
|
||||
info->cost.numOfSTables = taosHashGetSize(info->superTables);
|
||||
info->cost.numOfCTables = taosHashGetSize(info->childTables);
|
||||
|
||||
info->cost.schemaTime = taosGetTimestampUs();
|
||||
code = smlModifyDBSchemas(info);
|
||||
if (code != 0) {
|
||||
uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
info->cost.insertBindTime = taosGetTimestampUs();
|
||||
code = smlInsertData(info);
|
||||
if (code != 0) {
|
||||
uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
uDebug("SML:0x%"PRIx64" smlInsertLines finish inserting %d lines.", info->id, numLines);
|
||||
info->cost.endTime = taosGetTimestampUs();
|
||||
|
||||
cleanup:
|
||||
info->cost.code = code;
|
||||
smlPrintStatisticInfo(info);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1790,7 +1774,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
|||
}
|
||||
smlDestroyInfo(info);
|
||||
|
||||
end:
|
||||
return (TAOS_RES*)request;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tname.h"
|
||||
#include "tcommon.h"
|
||||
#include "tstrbuild.h"
|
||||
|
||||
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
|
||||
|
||||
|
@ -294,4 +296,43 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int compareKv(const void* p1, const void* p2) {
|
||||
SSmlKv* kv1 = *(SSmlKv**)p1;
|
||||
SSmlKv* kv2 = *(SSmlKv**)p2;
|
||||
int32_t kvLen1 = kv1->keyLen;
|
||||
int32_t kvLen2 = kv2->keyLen;
|
||||
int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
|
||||
if (res != 0) {
|
||||
return res;
|
||||
} else {
|
||||
return kvLen1-kvLen2;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* use stable name and tags to grearate child table name
|
||||
*/
|
||||
void buildChildTableName(RandTableName *rName) {
|
||||
int32_t size = taosArrayGetSize(rName->tags);
|
||||
ASSERT(size > 0);
|
||||
taosArraySort(rName->tags, compareKv);
|
||||
|
||||
SStringBuilder sb = {0};
|
||||
taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen);
|
||||
for (int j = 0; j < size; ++j) {
|
||||
SSmlKv *tagKv = taosArrayGetP(rName->tags, j);
|
||||
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
|
||||
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
|
||||
}
|
||||
size_t len = 0;
|
||||
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
|
||||
T_MD5_CTX context;
|
||||
tMD5Init(&context);
|
||||
tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
|
||||
tMD5Final(&context);
|
||||
uint64_t digest1 = *(uint64_t*)(context.digest);
|
||||
uint64_t digest2 = *(uint64_t*)(context.digest + 8);
|
||||
snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
|
||||
taosStringBuilderDestroy(&sb);
|
||||
rName->uid = digest1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue