Merge remote-tracking branch 'origin/3.0' into feature/check

This commit is contained in:
Shengliang Guan 2022-05-09 09:29:15 +08:00
commit 495bc3a36b
14 changed files with 305 additions and 157 deletions

View File

@ -164,7 +164,6 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_TOPICS_STMT, QUERY_NODE_SHOW_TOPICS_STMT,
QUERY_NODE_SHOW_CONSUMERS_STMT, QUERY_NODE_SHOW_CONSUMERS_STMT,
QUERY_NODE_SHOW_SUBSCRIBES_STMT, QUERY_NODE_SHOW_SUBSCRIBES_STMT,
QUERY_NODE_SHOW_TRANS_STMT,
QUERY_NODE_SHOW_SMAS_STMT, QUERY_NODE_SHOW_SMAS_STMT,
QUERY_NODE_SHOW_CONFIGS_STMT, QUERY_NODE_SHOW_CONFIGS_STMT,
QUERY_NODE_SHOW_CONNECTIONS_STMT, QUERY_NODE_SHOW_CONNECTIONS_STMT,

View File

@ -105,7 +105,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void* smlInitHandle(SQuery *pQuery); void* smlInitHandle(SQuery *pQuery);
void smlDestroyHandle(void *pHandle); void smlDestroyHandle(void *pHandle);
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen); int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -37,8 +37,8 @@ typedef enum {
typedef struct { typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN]; char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj *tags; SArray *tags;
SHashObj *fields; SArray *fields;
} SCreateSTableActionInfo; } SCreateSTableActionInfo;
typedef struct { typedef struct {
@ -78,14 +78,17 @@ typedef struct {
// colsFormat store cols formated, for quick parse, if info->formatData is true // colsFormat store cols formated, for quick parse, if info->formatData is true
SArray *colsFormat; // elements are SArray<SSmlKv*> SArray *colsFormat; // elements are SArray<SSmlKv*>
// cols & colsColumn store cols un formated // cols store cols un formated
SArray *cols; // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly SArray *cols; // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
SHashObj *columnsHash; // elements are <cols key string, 1>, just for judge if key exists quickly.
} SSmlTableInfo; } SSmlTableInfo;
typedef struct { typedef struct {
SHashObj *tagHash; SArray *tags; // save the origin order to create table
SHashObj *tagHash; // elements are <key, index in tags>
SArray *cols;
SHashObj *fieldHash; SHashObj *fieldHash;
STableMeta *tableMeta; STableMeta *tableMeta;
} SSmlSTableMeta; } SSmlSTableMeta;
@ -113,6 +116,8 @@ typedef struct {
int32_t affectedRows; int32_t affectedRows;
SSmlMsgBuf msgBuf; SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key
SArray *colsContainer; // for cols parse, if is dataFormat == false
} SSmlHandle; } SSmlHandle;
//================================================================================================= //=================================================================================================
@ -143,8 +148,8 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const
} }
static int smlCompareKv(const void* p1, const void* p2) { static int smlCompareKv(const void* p1, const void* p2) {
SSmlKv* kv1 = (SSmlKv*)p1; SSmlKv* kv1 = *(SSmlKv**)p1;
SSmlKv* kv2 = (SSmlKv*)p2; SSmlKv* kv2 = *(SSmlKv**)p2;
int32_t kvLen1 = kv1->keyLen; int32_t kvLen1 = kv1->keyLen;
int32_t kvLen2 = kv2->keyLen; int32_t kvLen2 = kv2->keyLen;
int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2)); int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
@ -174,8 +179,9 @@ static void smlBuildChildTableName(SSmlTableInfo *tags) {
tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
tMD5Final(&context); tMD5Final(&context);
uint64_t digest1 = *(uint64_t*)(context.digest); uint64_t digest1 = *(uint64_t*)(context.digest);
uint64_t digest2 = *(uint64_t*)(context.digest + 8); //uint64_t digest2 = *(uint64_t*)(context.digest + 8);
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); //snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1);
taosStringBuilderDestroy(&sb); taosStringBuilderDestroy(&sb);
tags->uid = digest1; tags->uid = digest1;
} }
@ -350,37 +356,26 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
char* pos = result + n; int freeBytes = capacity - n; char* pos = result + n; int freeBytes = capacity - n;
size_t size = taosHashGetSize(action->createSTable.fields); SArray *cols = action->createSTable.fields;
SArray *cols = taosArrayInit(size, POINTER_BYTES);
SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL);
while(kv){
if(strncmp((*kv)->key, TS, strlen(TS)) == 0 && (*kv)->type == TSDB_DATA_TYPE_TIMESTAMP){
taosArrayInsert(cols, 0, kv);
}else{
taosArrayPush(cols, kv);
}
kv = taosHashIterate(action->createSTable.fields, kv);
}
for(int i = 0; i < taosArrayGetSize(cols); i++){ for(int i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kvNew = taosArrayGetP(cols, i); SSmlKv *kv = taosArrayGetP(cols, i);
smlBuildColumnDescription(kvNew, pos, freeBytes, &outBytes); smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes; pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes; *pos = ','; ++pos; --freeBytes;
} }
taosArrayDestroy(cols);
--pos; ++freeBytes; --pos; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ") tags ("); outBytes = snprintf(pos, freeBytes, ") tags (");
pos += outBytes; freeBytes -= outBytes; pos += outBytes; freeBytes -= outBytes;
kv = taosHashIterate(action->createSTable.tags, NULL); cols = action->createSTable.tags;
while(kv){ for(int i = 0; i < taosArrayGetSize(cols); i++){
smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes); SSmlKv *kv = taosArrayGetP(cols, i);
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes; pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes; *pos = ','; ++pos; --freeBytes;
kv = taosHashIterate(action->createSTable.tags, kv);
} }
pos--; ++freeBytes; pos--; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ")"); outBytes = snprintf(pos, freeBytes, ")");
@ -419,7 +414,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL); SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
while (tableMetaSml) { while (tableMetaSml) {
SSmlSTableMeta* cTablePoints = *tableMetaSml; SSmlSTableMeta* sTableData = *tableMetaSml;
STableMeta *pTableMeta = NULL; STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
@ -436,8 +431,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
SSchemaAction schemaAction = {0}; SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen); memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
schemaAction.createSTable.tags = cTablePoints->tagHash; schemaAction.createSTable.tags = sTableData->tags;
schemaAction.createSTable.fields = cTablePoints->fieldHash; schemaAction.createSTable.fields = sTableData->cols;
code = smlApplySchemaAction(info, &schemaAction); code = smlApplySchemaAction(info, &schemaAction);
if (code != 0) { if (code != 0) {
uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName); uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
@ -454,7 +449,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
return code; return code;
} }
cTablePoints->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml); tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
} }
@ -1034,7 +1029,7 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SSmlMsgBuf *msg){ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){ if(isTag && len == 0){
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = TAG; kv->key = TAG;
@ -1062,6 +1057,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if(taosHashGet(dumplicateKey, key, keyLen)){
smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_SML_INVALID_DATA;
}else{
taosHashPut(dumplicateKey, key, keyLen, key, CHAR_BYTES);
}
// parse value // parse value
i++; i++;
const char *value = data + i; const char *value = data + i;
@ -1295,14 +1297,19 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
SSmlKv *kv = taosArrayGetP(tags, i); SSmlKv *kv = taosArrayGetP(tags, i);
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR); ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
SSmlKv **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen); uint8_t *index = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
if(value){ if(index){
SSmlKv **value = taosArrayGet(tableMeta->tags, *index);
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR); ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
*value = kv; *value = kv;
} }
}else{ }else{
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); size_t tmp = taosArrayGetSize(tableMeta->tags);
ASSERT(tmp <= UINT8_MAX);
uint8_t size = tmp;
taosArrayPush(tableMeta->tags, &kv);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES);
} }
} }
} }
@ -1310,8 +1317,10 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
if(cols){ if(cols){
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
SSmlKv *kv = taosArrayGetP(cols, i); SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(value){ int16_t *index = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(index){
SSmlKv **value = taosArrayGet(tableMeta->cols, *index);
if(kv->type != (*value)->type){ if(kv->type != (*value)->type){
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key); smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return false; return false;
@ -1323,7 +1332,11 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
} }
} }
}else{ }else{
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); size_t tmp = taosArrayGetSize(tableMeta->cols);
ASSERT(tmp <= INT16_MAX);
int16_t size = tmp;
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
} }
} }
} }
@ -1332,16 +1345,18 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
if(tags){ if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) { for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = taosArrayGetP(tags, i); SSmlKv *kv = taosArrayGetP(tags, i);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); taosArrayPush(tableMeta->tags, &kv);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES);
} }
} }
if(cols){ if(cols){
for (int i = 0; i < taosArrayGetSize(cols); ++i) { for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = taosArrayGetP(cols, i); SSmlKv *kv = taosArrayGetP(cols, i);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
} }
} }
} }
@ -1364,12 +1379,6 @@ static SSmlTableInfo* smlBuildTableInfo(bool format){
uError("SML:smlParseLine failed to allocate memory"); uError("SML:smlParseLine failed to allocate memory");
goto cleanup; goto cleanup;
} }
tag->columnsHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (tag->columnsHash == NULL) {
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
}
} }
tag->tags = taosArrayInit(16, POINTER_BYTES); tag->tags = taosArrayInit(16, POINTER_BYTES);
@ -1399,7 +1408,6 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
} }
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
} }
taosHashCleanup(tag->columnsHash);
} }
taosArrayDestroy(tag->tags); taosArrayDestroy(tag->tags);
taosMemoryFreeClear(tag); taosMemoryFreeClear(tag);
@ -1408,7 +1416,9 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
if(dataFormat){ if(dataFormat){
taosArrayPush(oneTable->colsFormat, &cols); taosArrayPush(oneTable->colsFormat, &cols);
}else{ return TSDB_CODE_SUCCESS;
}
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!kvHash){ if(!kvHash){
uError("SML:smlDealCols failed to allocate memory"); uError("SML:smlDealCols failed to allocate memory");
@ -1417,14 +1427,9 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
for(size_t i = 0; i < taosArrayGetSize(cols); i++){ for(size_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i); SSmlKv *kv = taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
if(taosHashGet(oneTable->columnsHash, kv->key, kv->keyLen) != NULL){
continue;
}
taosHashPut(oneTable->columnsHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
} }
taosArrayPush(oneTable->cols, &kvHash); taosArrayPush(oneTable->cols, &kvHash);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1444,6 +1449,18 @@ static SSmlSTableMeta* smlBuildSTableMeta(){
uError("SML:smlBuildSTableMeta failed to allocate memory"); uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup; goto cleanup;
} }
meta->tags = taosArrayInit(32, POINTER_BYTES);
if (meta->tags == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->cols = taosArrayInit(32, POINTER_BYTES);
if (meta->cols == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
return meta; return meta;
cleanup: cleanup:
@ -1454,6 +1471,8 @@ cleanup:
static void smlDestroySTableMeta(SSmlSTableMeta *meta){ static void smlDestroySTableMeta(SSmlSTableMeta *meta){
taosHashCleanup(meta->tagHash); taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->fieldHash); taosHashCleanup(meta->fieldHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta); taosMemoryFree(meta->tableMeta);
} }
@ -1465,18 +1484,23 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return ret; return ret;
} }
SArray *cols = taosArrayInit(16, POINTER_BYTES); SArray *cols = NULL;
if(info->dataFormat){ // if dataFormat, cols need new memory to save data
cols = taosArrayInit(16, POINTER_BYTES);
if (cols == NULL) { if (cols == NULL) {
uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id); uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
}else{ // if dataFormat is false, cols do not need to save data, there is another new memory to save data
cols = info->colsContainer;
}
ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols); ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseTS failed", info->id); uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
return ret; return ret;
} }
ret = smlParseCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf); ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
return ret; return ret;
@ -1500,46 +1524,51 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return ret; return ret;
} }
}else{ }else{
SSmlTableInfo *tag = smlBuildTableInfo(info->dataFormat); SSmlTableInfo *tinfo = smlBuildTableInfo(info->dataFormat);
if(!tag){ if(!tinfo){
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
ret = smlDealCols(tag, info->dataFormat, cols); ret = smlDealCols(tinfo, info->dataFormat, cols);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
return ret; return ret;
} }
ret = smlParseCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf); ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
return ret; return ret;
} }
if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){ if(taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
tag->sTableName = elements.measure; tinfo->sTableName = elements.measure;
tag->sTableNameLen = elements.measureLen; tinfo->sTableNameLen = elements.measureLen;
smlBuildChildTableName(tag); smlBuildChildTableName(tinfo);
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tag->childTableName); uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName);
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta if(tableMeta){ // update meta
ret = smlUpdateMeta(*tableMeta, tag->tags, cols, &info->msgBuf); ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf);
if(!ret){ if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id); uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
}else{ }else{
SSmlSTableMeta *meta = smlBuildSTableMeta(); SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, tag->tags, cols); smlInsertMeta(meta, tinfo->tags, cols);
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES); taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
} }
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tag, POINTER_BYTES); taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
} }
if(!info->dataFormat){
taosArrayClear(info->colsContainer);
}
taosHashClear(info->dumplicateKey);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1568,6 +1597,7 @@ static void smlDestroyInfo(SSmlHandle* info){
// destroy info->pVgHash // destroy info->pVgHash
taosHashCleanup(info->pVgHash); taosHashCleanup(info->pVgHash);
taosHashCleanup(info->dumplicateKey);
taosMemoryFreeClear(info); taosMemoryFreeClear(info);
} }
@ -1614,8 +1644,17 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!dataFormat){
info->colsContainer = taosArrayInit(32, POINTER_BYTES);
if(NULL == info->colsContainer){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
}
if(NULL == info->exec || NULL == info->childTables if(NULL == info->exec || NULL == info->childTables
|| NULL == info->superTables || NULL == info->pVgHash){ || NULL == info->superTables || NULL == info->pVgHash
|| NULL == info->dumplicateKey){
uError("SML:0x%"PRIx64" create info failed", info->id); uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup; goto cleanup;
} }
@ -1651,7 +1690,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
(*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, tableData->columnsHash, code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, (*pMeta)->cols,
tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len); tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
return code; return code;
@ -1730,7 +1769,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL; return NULL;
} }
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, false); SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, true);
if(!info){ if(!info){
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }

View File

@ -190,17 +190,21 @@ TEST(testCase, smlParseCols_Error_Test) {
"c=-3.402823466e+39u64", "c=-3.402823466e+39u64",
"c=-339u64", "c=-339u64",
"c=18446744073709551616u64", "c=18446744073709551616u64",
"c=1,c=2"
}; };
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for(int i = 0; i < sizeof(data)/sizeof(data[0]); i++){ for(int i = 0; i < sizeof(data)/sizeof(data[0]); i++){
char msg[256] = {0}; char msg[256] = {0};
SSmlMsgBuf msgBuf; SSmlMsgBuf msgBuf;
msgBuf.buf = msg; msgBuf.buf = msg;
msgBuf.len = 256; msgBuf.len = 256;
int32_t len = strlen(data[i]); int32_t len = strlen(data[i]);
int32_t ret = smlParseCols(data[i], len, NULL, false, &msgBuf); int32_t ret = smlParseCols(data[i], len, NULL, false, dumplicateKey, &msgBuf);
ASSERT_NE(ret, TSDB_CODE_SUCCESS); ASSERT_NE(ret, TSDB_CODE_SUCCESS);
taosHashClear(dumplicateKey);
} }
taosHashCleanup(dumplicateKey);
} }
TEST(testCase, smlParseCols_tag_Test) { TEST(testCase, smlParseCols_tag_Test) {
@ -211,11 +215,12 @@ TEST(testCase, smlParseCols_tag_Test) {
SArray *cols = taosArrayInit(16, POINTER_BYTES); SArray *cols = taosArrayInit(16, POINTER_BYTES);
ASSERT_NE(cols, NULL); ASSERT_NE(cols, NULL);
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
const char *data = const char *data =
"cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
int32_t len = strlen(data); int32_t len = strlen(data);
int32_t ret = smlParseCols(data, len, cols, true, &msgBuf); int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS); ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
int32_t size = taosArrayGetSize(cols); int32_t size = taosArrayGetSize(cols);
ASSERT_EQ(size, 19); ASSERT_EQ(size, 19);
@ -239,10 +244,14 @@ TEST(testCase, smlParseCols_tag_Test) {
taosMemoryFree(kv); taosMemoryFree(kv);
taosArrayClear(cols); taosArrayClear(cols);
// test tag is null
data = "t=3e"; data = "t=3e";
len = 0; len = 0;
memset(msgBuf.buf, 0, msgBuf.len); memset(msgBuf.buf, 0, msgBuf.len);
ret = smlParseCols(data, len, cols, true, &msgBuf); taosHashClear(dumplicateKey);
ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS); ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
size = taosArrayGetSize(cols); size = taosArrayGetSize(cols);
ASSERT_EQ(size, 1); ASSERT_EQ(size, 1);
@ -255,6 +264,9 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(kv->valueLen, strlen(TAG)); ASSERT_EQ(kv->valueLen, strlen(TAG));
ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0); ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0);
taosMemoryFree(kv); taosMemoryFree(kv);
taosArrayDestroy(cols);
taosHashCleanup(dumplicateKey);
} }
TEST(testCase, smlParseCols_Test) { TEST(testCase, smlParseCols_Test) {
@ -266,9 +278,11 @@ TEST(testCase, smlParseCols_Test) {
SArray *cols = taosArrayInit(16, POINTER_BYTES); SArray *cols = taosArrayInit(16, POINTER_BYTES);
ASSERT_NE(cols, NULL); ASSERT_NE(cols, NULL);
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
int32_t len = strlen(data); int32_t len = strlen(data);
int32_t ret = smlParseCols(data, len, cols, false, &msgBuf); int32_t ret = smlParseCols(data, len, cols, false, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS); ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
int32_t size = taosArrayGetSize(cols); int32_t size = taosArrayGetSize(cols);
ASSERT_EQ(size, 19); ASSERT_EQ(size, 19);
@ -450,6 +464,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv); taosMemoryFree(kv);
taosArrayDestroy(cols); taosArrayDestroy(cols);
taosHashCleanup(dumplicateKey);
} }
TEST(testCase, smlParseLine_Test) { TEST(testCase, smlParseLine_Test) {
@ -468,17 +483,47 @@ TEST(testCase, smlParseLine_Test) {
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, NULL); ASSERT_NE(info, NULL);
const char *sql[3] = { const char *sql[9] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619400000000000",
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000" "readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000"
}; };
smlInsertLines(info, sql, 3); smlInsertLines(info, sql, 9);
// for (int i = 0; i < 3; i++) { // for (int i = 0; i < 3; i++) {
// smlParseLine(info, sql[i]); // smlParseLine(info, sql[i]);
// } // }
} }
TEST(testCase, smlParseLine_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, NULL);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, NULL);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, NULL);
const char *sql[2] = {
"measure,t1=3 c1=8",
"measure,t2=3 c1=8u8"
};
int ret = smlInsertLines(info, sql, 2);
ASSERT_NE(ret, 0);
}
// TEST(testCase, smlParseTS_Test) { // TEST(testCase, smlParseTS_Test) {
// char msg[256] = {0}; // char msg[256] = {0};
// SSmlMsgBuf msgBuf; // SSmlMsgBuf msgBuf;

View File

@ -30,6 +30,7 @@ static void indexMemUnRef(MemTable* tbl);
static void indexCacheTermDestroy(CacheTerm* ct); static void indexCacheTermDestroy(CacheTerm* ct);
static int32_t indexCacheTermCompare(const void* l, const void* r); static int32_t indexCacheTermCompare(const void* l, const void* r);
static int32_t indexCacheJsonTermCompare(const void* l, const void* r);
static char* indexCacheTermGet(const void* pData); static char* indexCacheTermGet(const void* pData);
static MemTable* indexInternalCacheCreate(int8_t type); static MemTable* indexInternalCacheCreate(int8_t type);
@ -63,6 +64,7 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
// optime later
int32_t ret = func(a, b); int32_t ret = func(a, b);
switch (comType) { switch (comType) {
case QUERY_LESS_THAN: { case QUERY_LESS_THAN: {
@ -242,6 +244,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul
break; break;
} }
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (0 == strcmp(c->colVal, pCt->colVal)) { if (0 == strcmp(c->colVal, pCt->colVal)) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
@ -311,6 +314,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
} }
char* key = indexCacheTermGet(pCt); char* key = indexCacheTermGet(pCt);
// SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
@ -318,6 +322,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
break; break;
} }
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
printf("json val: %s\n", c->colVal);
if (0 != strncmp(c->colVal, term->colName, term->nColName)) {
continue;
}
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType); TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
if (cond == MATCH) { if (cond == MATCH) {
@ -598,24 +606,11 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result
indexMemRef(imm); indexMemRef(imm);
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
// SIndexTerm* term = query->term;
// EIndexQueryType qtype = query->qType;
// bool isJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
// char* p = term->colVal;
// if (isJson) {
// p = indexPackJsonData(term);
//}
// CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
int ret = indexQueryMem(mem, query, result, s); int ret = indexQueryMem(mem, query, result, s);
if (ret == 0 && *s != kTypeDeletion) { if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm // continue search in imm
ret = indexQueryMem(imm, query, result, s); ret = indexQueryMem(imm, query, result, s);
} }
// if (isJson) {
// taosMemoryFreeClear(p);
//}
indexMemUnRef(mem); indexMemUnRef(mem);
indexMemUnRef(imm); indexMemUnRef(imm);
@ -682,14 +677,52 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
return cmp; return cmp;
} }
static int indexFindCh(char* a, char c) {
char* p = a;
while (*p != 0 && *p++ != c) {
}
return p - a;
}
static int indexCacheJsonTermCompareImpl(char* a, char* b) {
int alen = indexFindCh(a, '&');
int blen = indexFindCh(b, '&');
int cmp = strncmp(a, b, MIN(alen, blen));
if (cmp == 0) {
cmp = alen - blen;
if (cmp != 0) {
return cmp;
}
cmp = *(a + alen) - *(b + blen);
if (cmp != 0) {
return cmp;
}
alen += 2;
blen += 2;
cmp = strcmp(a + alen, b + blen);
}
return cmp;
}
static int32_t indexCacheJsonTermCompare(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r;
// compare colVal
int cmp = indexCacheJsonTermCompareImpl(lt->colVal, rt->colVal);
if (cmp == 0) {
return rt->version - lt->version;
}
return cmp;
}
static MemTable* indexInternalCacheCreate(int8_t type) { static MemTable* indexInternalCacheCreate(int8_t type) {
type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type; int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
int32_t (*cmpFn)(const void* l, const void* r) =
INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare;
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable)); MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
indexMemRef(tbl); indexMemRef(tbl);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY, tbl->mem =
indexCacheTermGet); tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, indexCacheTermGet);
} }
return tbl; return tbl;
} }

View File

@ -129,7 +129,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000000; i++) { for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
} }
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
@ -148,4 +148,36 @@ TEST_F(JsonEnv, testWriteMillonData) {
assert(100 == taosArrayGetSize(result)); assert(100 == taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
} }
{
{
std::string colName("test");
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
assert(0 == taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
{
std::string colName("test");
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
assert(100 == taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
}
}
} }

View File

@ -168,8 +168,6 @@ const char* nodesNodeName(ENodeType type) {
return "ShowConsumersStmt"; return "ShowConsumersStmt";
case QUERY_NODE_SHOW_SUBSCRIBES_STMT: case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
return "ShowSubscribesStmt"; return "ShowSubscribesStmt";
case QUERY_NODE_SHOW_TRANS_STMT:
return "ShowTransStmt";
case QUERY_NODE_SHOW_SMAS_STMT: case QUERY_NODE_SHOW_SMAS_STMT:
return "ShowSmasStmt"; return "ShowSmasStmt";
case QUERY_NODE_SHOW_CONFIGS_STMT: case QUERY_NODE_SHOW_CONFIGS_STMT:
@ -1972,10 +1970,10 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d); code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d);
break; break;
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
//cJSON only support utf-8 encoding. Convert memory content to hex string. // cJSON only support utf-8 encoding. Convert memory content to hex string.
char *buf = taosMemoryCalloc(varDataLen(pNode->datum.p) * 2 + 1, sizeof(char)); char* buf = taosMemoryCalloc(varDataLen(pNode->datum.p) * 2 + 1, sizeof(char));
code = taosHexEncode(varDataVal(pNode->datum.p), buf, varDataLen(pNode->datum.p)); code = taosHexEncode(varDataVal(pNode->datum.p), buf, varDataLen(pNode->datum.p));
if(code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf); taosMemoryFree(buf);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
@ -2086,7 +2084,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
} }
varDataSetLen(pNode->datum.p, pNode->node.resType.bytes); varDataSetLen(pNode->datum.p, pNode->node.resType.bytes);
if (TSDB_DATA_TYPE_NCHAR == pNode->node.resType.type) { if (TSDB_DATA_TYPE_NCHAR == pNode->node.resType.type) {
char *buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1); char* buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1);
if (NULL == buf) { if (NULL == buf) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;

View File

@ -190,7 +190,6 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_CONSUMERS_STMT: case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT: case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_TRANS_STMT:
case QUERY_NODE_SHOW_SMAS_STMT: case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT: case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:

View File

@ -1300,9 +1300,10 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const
} }
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) { SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) {
SNode* pStmt = nodesMakeNode(type); SKillStmt* pStmt = nodesMakeNode(type);
CHECK_OUT_OF_MEM(pStmt); CHECK_OUT_OF_MEM(pStmt);
return pStmt; pStmt->targetId = strtol(pId->z, NULL, 10);
return (SNode*)pStmt;
} }
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) { SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) {

View File

@ -23,12 +23,17 @@ typedef struct SAuthCxt {
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt); static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt);
static int32_t checkAuth(SParseContext* pCxt, const char* dbName, AUTH_TYPE type) { static int32_t checkAuth(SParseContext* pCxt, const char* pDbName, AUTH_TYPE type) {
if (pCxt->isSuperUser) { if (pCxt->isSuperUser) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SName name;
tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
bool pass = false; bool pass = false;
int32_t code = catalogChkAuth(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pCxt->pUser, dbName, type, &pass); int32_t code =
catalogChkAuth(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pCxt->pUser, dbFname, type, &pass);
return TSDB_CODE_SUCCESS == code ? (pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code; return TSDB_CODE_SUCCESS == code ? (pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code;
} }
@ -130,7 +135,6 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_CONSUMERS_STMT: case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT: case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_TRANS_STMT:
case QUERY_NODE_SHOW_SMAS_STMT: case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT: case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:

View File

@ -1549,7 +1549,7 @@ typedef struct SmlExecHandle {
SQuery* pQuery; SQuery* pQuery;
} SSmlExecHandle; } SSmlExecHandle;
static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) { static int32_t smlBoundColumnData(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
col_id_t nCols = pColList->numOfCols; col_id_t nCols = pColList->numOfCols;
pColList->numOfBound = 0; pColList->numOfBound = 0;
@ -1620,7 +1620,7 @@ static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSche
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SKVRow *row, SMsgBuf *msg) { static int32_t smlBuildTagRow(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SKVRow *row, SMsgBuf *msg) {
if (tdInitKVRowBuilder(tagsBuilder) < 0) { if (tdInitKVRowBuilder(tagsBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
@ -1642,20 +1642,20 @@ static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDat
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format, int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format,
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) { STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle; SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle;
SSchema* pTagsSchema = getTableTagSchema(pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta)); setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema); int ret = smlBoundColumnData(tags, &smlHandle->tags, pTagsSchema);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "bound tags error"); buildInvalidOperationMsg(&pBuf, "bound tags error");
return ret; return ret;
} }
SKVRow row = NULL; SKVRow row = NULL;
ret = smlBoundTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf); ret = smlBuildTagRow(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
return ret; return ret;
} }
@ -1673,21 +1673,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
SSchema* pSchema = getTableColumnSchema(pTableMeta); SSchema* pSchema = getTableColumnSchema(pTableMeta);
ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
if(format){
ret = smlBoundColumns(taosArrayGetP(colsFormat, 0), &pDataBlock->boundColumnInfo, pSchema);
}else{
SArray *columns = taosArrayInit(16, POINTER_BYTES);
void **p1 = taosHashIterate(colsHash, NULL);
while (p1) {
SSmlKv* kv = *p1;
taosArrayPush(columns, &kv);
p1 = taosHashIterate(colsHash, p1);
}
ret = smlBoundColumns(columns, &pDataBlock->boundColumnInfo, pSchema);
taosArrayDestroy(columns);
}
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "bound cols error"); buildInvalidOperationMsg(&pBuf, "bound cols error");
return ret; return ret;
@ -1712,14 +1698,16 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row); tdSRowResetBuf(pBuilder, row);
void *rowData = NULL; void *rowData = NULL;
size_t rowDataSize = 0;
if(format){ if(format){
rowData = taosArrayGetP(colsFormat, r); rowData = taosArrayGetP(colsFormat, r);
rowDataSize = taosArrayGetSize(rowData);
}else{ }else{
rowData = taosArrayGetP(cols, r); rowData = taosArrayGetP(cols, r);
} }
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) { for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
param.schema = pColSchema; param.schema = pColSchema;
@ -1727,23 +1715,27 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
SSmlKv *kv = NULL; SSmlKv *kv = NULL;
if(format){ if(format){
kv = taosArrayGetP(rowData, c); if(j < rowDataSize){
if (!kv){ kv = taosArrayGetP(rowData, j);
char msg[64] = {0}; if (rowDataSize != spd->numOfBound && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){
sprintf(msg, "cols num not the same like before:%d", r); kv = NULL;
return buildInvalidOperationMsg(&pBuf, msg); }else{
j++;
}
} }
}else{ }else{
void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
kv = *p; if(p) kv = *p;
} }
if (kv->length == 0) { if (!kv || kv->length == 0) {
MemRowAppend(&pBuf, NULL, 0, &param); MemRowAppend(&pBuf, NULL, 0, &param);
} else { } else {
int32_t colLen = pColSchema->bytes; int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) { if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = kv->length; colLen = kv->length;
} else if(pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP){
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
} }
MemRowAppend(&pBuf, &(kv->value), colLen, &param); MemRowAppend(&pBuf, &(kv->value), colLen, &param);

View File

@ -4131,6 +4131,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT: case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
code = rewriteShow(pCxt, pQuery); code = rewriteShow(pCxt, pQuery);
break; break;
case QUERY_NODE_CREATE_TABLE_STMT: case QUERY_NODE_CREATE_TABLE_STMT:

View File

@ -100,6 +100,14 @@ void generateInformationSchema(MockCatalogService* mcs) {
} }
} }
void generatePerformanceSchema(MockCatalogService* mcs) {
{
ITableBuilder& builder = mcs->createTableBuilder("performance_schema", "trans", TSDB_SYSTEM_TABLE, 1)
.addColumn("id", TSDB_DATA_TYPE_INT);
builder.done();
}
}
/* /*
* Table:t1 * Table:t1
* Field | Type | DataType | Bytes | * Field | Type | DataType | Bytes |
@ -244,6 +252,7 @@ void initMetaDataEnv() {
void generateMetaData() { void generateMetaData() {
generateInformationSchema(mockCatalogService.get()); generateInformationSchema(mockCatalogService.get());
generatePerformanceSchema(mockCatalogService.get());
generateTestT1(mockCatalogService.get()); generateTestT1(mockCatalogService.get());
generateTestST1(mockCatalogService.get()); generateTestST1(mockCatalogService.get());
mockCatalogService->showTables(); mockCatalogService->showTables();

View File

@ -709,10 +709,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
if (IS_VAR_DATA_TYPE(outputType)) {
outputLen += VARSTR_HEADER_SIZE;
}
char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1);
char *output = outputBuf; char *output = outputBuf;