refactor:modify schemaless function to speed
This commit is contained in:
parent
1025c3c708
commit
d418abd07f
|
@ -105,7 +105,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
|
||||||
|
|
||||||
void* smlInitHandle(SQuery *pQuery);
|
void* smlInitHandle(SQuery *pQuery);
|
||||||
void smlDestroyHandle(void *pHandle);
|
void smlDestroyHandle(void *pHandle);
|
||||||
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen);
|
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen);
|
||||||
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
|
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -37,8 +37,8 @@ typedef enum {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||||
SHashObj *tags;
|
SArray *tags;
|
||||||
SHashObj *fields;
|
SArray *fields;
|
||||||
} SCreateSTableActionInfo;
|
} SCreateSTableActionInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -78,14 +78,17 @@ typedef struct {
|
||||||
// colsFormat store cols formated, for quick parse, if info->formatData is true
|
// colsFormat store cols formated, for quick parse, if info->formatData is true
|
||||||
SArray *colsFormat; // elements are SArray<SSmlKv*>
|
SArray *colsFormat; // elements are SArray<SSmlKv*>
|
||||||
|
|
||||||
// cols & colsColumn store cols un formated
|
// cols store cols un formated
|
||||||
SArray *cols; // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
|
SArray *cols; // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
|
||||||
SHashObj *columnsHash; // elements are <cols key string, 1>, just for judge if key exists quickly.
|
|
||||||
} SSmlTableInfo;
|
} SSmlTableInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SHashObj *tagHash;
|
SArray *tags; // save the origin order to create table
|
||||||
|
SHashObj *tagHash; // elements are <key, index in tags>
|
||||||
|
|
||||||
|
SArray *cols;
|
||||||
SHashObj *fieldHash;
|
SHashObj *fieldHash;
|
||||||
|
|
||||||
STableMeta *tableMeta;
|
STableMeta *tableMeta;
|
||||||
} SSmlSTableMeta;
|
} SSmlSTableMeta;
|
||||||
|
|
||||||
|
@ -350,37 +353,26 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
|
||||||
int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
|
int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
|
||||||
char* pos = result + n; int freeBytes = capacity - n;
|
char* pos = result + n; int freeBytes = capacity - n;
|
||||||
|
|
||||||
size_t size = taosHashGetSize(action->createSTable.fields);
|
SArray *cols = action->createSTable.tags;
|
||||||
SArray *cols = taosArrayInit(size, POINTER_BYTES);
|
|
||||||
SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL);
|
|
||||||
while(kv){
|
|
||||||
if(strncmp((*kv)->key, TS, strlen(TS)) == 0 && (*kv)->type == TSDB_DATA_TYPE_TIMESTAMP){
|
|
||||||
taosArrayInsert(cols, 0, kv);
|
|
||||||
}else{
|
|
||||||
taosArrayPush(cols, kv);
|
|
||||||
}
|
|
||||||
kv = taosHashIterate(action->createSTable.fields, kv);
|
|
||||||
}
|
|
||||||
|
|
||||||
for(int i = 0; i < taosArrayGetSize(cols); i++){
|
for(int i = 0; i < taosArrayGetSize(cols); i++){
|
||||||
SSmlKv *kvNew = taosArrayGetP(cols, i);
|
SSmlKv *kv = taosArrayGetP(cols, i);
|
||||||
smlBuildColumnDescription(kvNew, pos, freeBytes, &outBytes);
|
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
|
||||||
pos += outBytes; freeBytes -= outBytes;
|
pos += outBytes; freeBytes -= outBytes;
|
||||||
*pos = ','; ++pos; --freeBytes;
|
*pos = ','; ++pos; --freeBytes;
|
||||||
}
|
}
|
||||||
taosArrayDestroy(cols);
|
|
||||||
|
|
||||||
--pos; ++freeBytes;
|
--pos; ++freeBytes;
|
||||||
|
|
||||||
outBytes = snprintf(pos, freeBytes, ") tags (");
|
outBytes = snprintf(pos, freeBytes, ") tags (");
|
||||||
pos += outBytes; freeBytes -= outBytes;
|
pos += outBytes; freeBytes -= outBytes;
|
||||||
|
|
||||||
kv = taosHashIterate(action->createSTable.tags, NULL);
|
cols = action->createSTable.tags;
|
||||||
while(kv){
|
for(int i = 0; i < taosArrayGetSize(cols); i++){
|
||||||
smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes);
|
SSmlKv *kv = taosArrayGetP(cols, i);
|
||||||
|
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
|
||||||
pos += outBytes; freeBytes -= outBytes;
|
pos += outBytes; freeBytes -= outBytes;
|
||||||
*pos = ','; ++pos; --freeBytes;
|
*pos = ','; ++pos; --freeBytes;
|
||||||
kv = taosHashIterate(action->createSTable.tags, kv);
|
|
||||||
}
|
}
|
||||||
pos--; ++freeBytes;
|
pos--; ++freeBytes;
|
||||||
outBytes = snprintf(pos, freeBytes, ")");
|
outBytes = snprintf(pos, freeBytes, ")");
|
||||||
|
@ -419,7 +411,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
||||||
|
|
||||||
SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
|
SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
|
||||||
while (tableMetaSml) {
|
while (tableMetaSml) {
|
||||||
SSmlSTableMeta* cTablePoints = *tableMetaSml;
|
SSmlSTableMeta* sTableData = *tableMetaSml;
|
||||||
|
|
||||||
STableMeta *pTableMeta = NULL;
|
STableMeta *pTableMeta = NULL;
|
||||||
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
|
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
|
||||||
|
@ -436,8 +428,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
||||||
SSchemaAction schemaAction = {0};
|
SSchemaAction schemaAction = {0};
|
||||||
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
|
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
|
||||||
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
||||||
schemaAction.createSTable.tags = cTablePoints->tagHash;
|
schemaAction.createSTable.tags = sTableData->tags;
|
||||||
schemaAction.createSTable.fields = cTablePoints->fieldHash;
|
schemaAction.createSTable.fields = sTableData->cols;
|
||||||
code = smlApplySchemaAction(info, &schemaAction);
|
code = smlApplySchemaAction(info, &schemaAction);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
|
uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
|
||||||
|
@ -454,7 +446,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
||||||
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
|
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
cTablePoints->tableMeta = pTableMeta;
|
sTableData->tableMeta = pTableMeta;
|
||||||
|
|
||||||
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
|
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
|
||||||
}
|
}
|
||||||
|
@ -1295,14 +1287,19 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
|
||||||
SSmlKv *kv = taosArrayGetP(tags, i);
|
SSmlKv *kv = taosArrayGetP(tags, i);
|
||||||
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
|
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
|
||||||
|
|
||||||
SSmlKv **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
|
uint8_t *index = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
|
||||||
if(value){
|
if(index){
|
||||||
|
SSmlKv **value = taosArrayGet(tableMeta->tags, *index);
|
||||||
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
|
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
|
||||||
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
|
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
|
||||||
*value = kv;
|
*value = kv;
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
|
size_t tmp = taosArrayGetSize(tableMeta->tags);
|
||||||
|
ASSERT(tmp <= UINT8_MAX);
|
||||||
|
uint8_t size = tmp;
|
||||||
|
taosArrayPush(tableMeta->tags, &kv);
|
||||||
|
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1310,8 +1307,10 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
|
||||||
if(cols){
|
if(cols){
|
||||||
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
|
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
|
||||||
SSmlKv *kv = taosArrayGetP(cols, i);
|
SSmlKv *kv = taosArrayGetP(cols, i);
|
||||||
SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
|
|
||||||
if(value){
|
int16_t *index = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
|
||||||
|
if(index){
|
||||||
|
SSmlKv **value = taosArrayGet(tableMeta->cols, *index);
|
||||||
if(kv->type != (*value)->type){
|
if(kv->type != (*value)->type){
|
||||||
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
|
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
|
||||||
return false;
|
return false;
|
||||||
|
@ -1323,7 +1322,11 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
|
size_t tmp = taosArrayGetSize(tableMeta->cols);
|
||||||
|
ASSERT(tmp <= INT16_MAX);
|
||||||
|
int16_t size = tmp;
|
||||||
|
taosArrayPush(tableMeta->cols, &kv);
|
||||||
|
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1332,16 +1335,18 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
|
||||||
|
|
||||||
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
|
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
|
||||||
if(tags){
|
if(tags){
|
||||||
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
|
for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) {
|
||||||
SSmlKv *kv = taosArrayGetP(tags, i);
|
SSmlKv *kv = taosArrayGetP(tags, i);
|
||||||
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
|
taosArrayPush(tableMeta->tags, &kv);
|
||||||
|
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(cols){
|
if(cols){
|
||||||
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
|
||||||
SSmlKv *kv = taosArrayGetP(cols, i);
|
SSmlKv *kv = taosArrayGetP(cols, i);
|
||||||
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
|
taosArrayPush(tableMeta->cols, &kv);
|
||||||
|
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1364,12 +1369,6 @@ static SSmlTableInfo* smlBuildTableInfo(bool format){
|
||||||
uError("SML:smlParseLine failed to allocate memory");
|
uError("SML:smlParseLine failed to allocate memory");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
tag->columnsHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
if (tag->columnsHash == NULL) {
|
|
||||||
uError("SML:smlParseLine failed to allocate memory");
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tag->tags = taosArrayInit(16, POINTER_BYTES);
|
tag->tags = taosArrayInit(16, POINTER_BYTES);
|
||||||
|
@ -1399,7 +1398,6 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
|
||||||
}
|
}
|
||||||
taosHashCleanup(kvHash);
|
taosHashCleanup(kvHash);
|
||||||
}
|
}
|
||||||
taosHashCleanup(tag->columnsHash);
|
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tag->tags);
|
taosArrayDestroy(tag->tags);
|
||||||
taosMemoryFreeClear(tag);
|
taosMemoryFreeClear(tag);
|
||||||
|
@ -1408,23 +1406,20 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
|
||||||
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
|
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
|
||||||
if(dataFormat){
|
if(dataFormat){
|
||||||
taosArrayPush(oneTable->colsFormat, &cols);
|
taosArrayPush(oneTable->colsFormat, &cols);
|
||||||
}else{
|
return TSDB_CODE_SUCCESS;
|
||||||
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
if(!kvHash){
|
|
||||||
uError("SML:smlDealCols failed to allocate memory");
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
for(size_t i = 0; i < taosArrayGetSize(cols); i++){
|
|
||||||
SSmlKv *kv = taosArrayGetP(cols, i);
|
|
||||||
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
|
|
||||||
|
|
||||||
if(taosHashGet(oneTable->columnsHash, kv->key, kv->keyLen) != NULL){
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
taosHashPut(oneTable->columnsHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
|
|
||||||
}
|
|
||||||
taosArrayPush(oneTable->cols, &kvHash);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
if(!kvHash){
|
||||||
|
uError("SML:smlDealCols failed to allocate memory");
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
for(size_t i = 0; i < taosArrayGetSize(cols); i++){
|
||||||
|
SSmlKv *kv = taosArrayGetP(cols, i);
|
||||||
|
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
|
||||||
|
}
|
||||||
|
taosArrayPush(oneTable->cols, &kvHash);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1444,6 +1439,18 @@ static SSmlSTableMeta* smlBuildSTableMeta(){
|
||||||
uError("SML:smlBuildSTableMeta failed to allocate memory");
|
uError("SML:smlBuildSTableMeta failed to allocate memory");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
meta->tags = taosArrayInit(32, POINTER_BYTES);
|
||||||
|
if (meta->tags == NULL) {
|
||||||
|
uError("SML:smlBuildSTableMeta failed to allocate memory");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
meta->cols = taosArrayInit(32, POINTER_BYTES);
|
||||||
|
if (meta->cols == NULL) {
|
||||||
|
uError("SML:smlBuildSTableMeta failed to allocate memory");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
return meta;
|
return meta;
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
|
@ -1454,6 +1461,8 @@ cleanup:
|
||||||
static void smlDestroySTableMeta(SSmlSTableMeta *meta){
|
static void smlDestroySTableMeta(SSmlSTableMeta *meta){
|
||||||
taosHashCleanup(meta->tagHash);
|
taosHashCleanup(meta->tagHash);
|
||||||
taosHashCleanup(meta->fieldHash);
|
taosHashCleanup(meta->fieldHash);
|
||||||
|
taosArrayDestroy(meta->tags);
|
||||||
|
taosArrayDestroy(meta->cols);
|
||||||
taosMemoryFree(meta->tableMeta);
|
taosMemoryFree(meta->tableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1500,45 +1509,45 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
SSmlTableInfo *tag = smlBuildTableInfo(info->dataFormat);
|
SSmlTableInfo *tinfo = smlBuildTableInfo(info->dataFormat);
|
||||||
if(!tag){
|
if(!tinfo){
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
ret = smlDealCols(tag, info->dataFormat, cols);
|
ret = smlDealCols(tinfo, info->dataFormat, cols);
|
||||||
if(ret != TSDB_CODE_SUCCESS){
|
if(ret != TSDB_CODE_SUCCESS){
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = smlParseCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf);
|
ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, &info->msgBuf);
|
||||||
if(ret != TSDB_CODE_SUCCESS){
|
if(ret != TSDB_CODE_SUCCESS){
|
||||||
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
|
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){
|
if(taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
|
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
tag->sTableName = elements.measure;
|
tinfo->sTableName = elements.measure;
|
||||||
tag->sTableNameLen = elements.measureLen;
|
tinfo->sTableNameLen = elements.measureLen;
|
||||||
smlBuildChildTableName(tag);
|
smlBuildChildTableName(tinfo);
|
||||||
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tag->childTableName);
|
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName);
|
||||||
|
|
||||||
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
|
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
|
||||||
if(tableMeta){ // update meta
|
if(tableMeta){ // update meta
|
||||||
ret = smlUpdateMeta(*tableMeta, tag->tags, cols, &info->msgBuf);
|
ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf);
|
||||||
if(!ret){
|
if(!ret){
|
||||||
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
|
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
SSmlSTableMeta *meta = smlBuildSTableMeta();
|
SSmlSTableMeta *meta = smlBuildSTableMeta();
|
||||||
smlInsertMeta(meta, tag->tags, cols);
|
smlInsertMeta(meta, tinfo->tags, cols);
|
||||||
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
|
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tag, POINTER_BYTES);
|
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1651,7 +1660,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
|
||||||
(*pMeta)->tableMeta->vgId = vg.vgId;
|
(*pMeta)->tableMeta->vgId = vg.vgId;
|
||||||
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
||||||
|
|
||||||
code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, tableData->columnsHash,
|
code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, (*pMeta)->cols,
|
||||||
tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
|
tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
|
||||||
if(code != TSDB_CODE_SUCCESS){
|
if(code != TSDB_CODE_SUCCESS){
|
||||||
return code;
|
return code;
|
||||||
|
@ -1730,7 +1739,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, false);
|
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, true);
|
||||||
if(!info){
|
if(!info){
|
||||||
return (TAOS_RES*)request;
|
return (TAOS_RES*)request;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1642,7 +1642,7 @@ static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDat
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format,
|
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format,
|
||||||
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
|
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
|
||||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||||
|
|
||||||
|
@ -1673,21 +1673,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
|
||||||
|
|
||||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||||
|
|
||||||
|
ret = smlBoundColumns(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
|
||||||
if(format){
|
|
||||||
ret = smlBoundColumns(taosArrayGetP(colsFormat, 0), &pDataBlock->boundColumnInfo, pSchema);
|
|
||||||
}else{
|
|
||||||
SArray *columns = taosArrayInit(16, POINTER_BYTES);
|
|
||||||
void **p1 = taosHashIterate(colsHash, NULL);
|
|
||||||
while (p1) {
|
|
||||||
SSmlKv* kv = *p1;
|
|
||||||
taosArrayPush(columns, &kv);
|
|
||||||
p1 = taosHashIterate(colsHash, p1);
|
|
||||||
}
|
|
||||||
ret = smlBoundColumns(columns, &pDataBlock->boundColumnInfo, pSchema);
|
|
||||||
taosArrayDestroy(columns);
|
|
||||||
}
|
|
||||||
|
|
||||||
if(ret != TSDB_CODE_SUCCESS){
|
if(ret != TSDB_CODE_SUCCESS){
|
||||||
buildInvalidOperationMsg(&pBuf, "bound cols error");
|
buildInvalidOperationMsg(&pBuf, "bound cols error");
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -1712,8 +1698,10 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
|
||||||
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
|
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
|
||||||
tdSRowResetBuf(pBuilder, row);
|
tdSRowResetBuf(pBuilder, row);
|
||||||
void *rowData = NULL;
|
void *rowData = NULL;
|
||||||
|
bool eleEqual = false;
|
||||||
if(format){
|
if(format){
|
||||||
rowData = taosArrayGetP(colsFormat, r);
|
rowData = taosArrayGetP(colsFormat, r);
|
||||||
|
eleEqual = (taosArrayGetSize(rowData) == spd->numOfBound);
|
||||||
}else{
|
}else{
|
||||||
rowData = taosArrayGetP(cols, r);
|
rowData = taosArrayGetP(cols, r);
|
||||||
}
|
}
|
||||||
|
@ -1728,17 +1716,22 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
|
||||||
SSmlKv *kv = NULL;
|
SSmlKv *kv = NULL;
|
||||||
if(format){
|
if(format){
|
||||||
kv = taosArrayGetP(rowData, c);
|
kv = taosArrayGetP(rowData, c);
|
||||||
if (!kv){
|
do{
|
||||||
char msg[64] = {0};
|
if (!eleEqual && kv && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){
|
||||||
sprintf(msg, "cols num not the same like before:%d", r);
|
MemRowAppend(&pBuf, NULL, 0, ¶m);
|
||||||
return buildInvalidOperationMsg(&pBuf, msg);
|
c++;
|
||||||
}
|
if(c >= spd->numOfBound) break;
|
||||||
|
pColSchema = &pSchema[spd->boundColumns[c] - 1];
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}while(1);
|
||||||
}else{
|
}else{
|
||||||
void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
||||||
kv = *p;
|
if(p) kv = *p;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (kv->length == 0) {
|
if (!kv || kv->length == 0) {
|
||||||
MemRowAppend(&pBuf, NULL, 0, ¶m);
|
MemRowAppend(&pBuf, NULL, 0, ¶m);
|
||||||
} else {
|
} else {
|
||||||
int32_t colLen = pColSchema->bytes;
|
int32_t colLen = pColSchema->bytes;
|
||||||
|
|
Loading…
Reference in New Issue