diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 67daaf2f2a..34603fdb64 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -58,7 +58,6 @@ typedef struct SParseContext { bool isSuperUser; bool enableSysInfo; bool async; - int8_t schemalessType; const char* svrVer; bool nodeOffline; SArray* pTableMetaPos; // sql table pos => catalog data pos diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index ea76f726ea..095858e945 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -149,7 +149,7 @@ typedef struct STscObj { int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; SHashObj* pRequests; - int8_t schemalessType; // todo remove it, this attribute should be move to request + void* smlHandle; } STscObj; typedef struct SResultColumn { @@ -323,6 +323,7 @@ void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid); void destroyAppInst(SAppInstInfo* pAppInfo); +void smlDestroyInfo(void *data); uint64_t generateRequestId(); diff --git a/source/client/inc/clientLog.h b/source/client/inc/clientLog.h index 0cb36ff61d..c29f495201 100644 --- a/source/client/inc/clientLog.h +++ b/source/client/inc/clientLog.h @@ -30,7 +30,7 @@ extern "C" { #define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) -//#define tscPerf(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", DEBUG_INFO, cDebugFlag, __VA_ARGS__); }} while(0) +#define tscPerf(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); }} while(0) // clang-format on #ifdef __cplusplus diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 93398d337d..4a20d4fa8b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -77,11 +77,11 @@ static void deregisterRequest(SRequestObj *pRequest) { pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { - // tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 - // "us, exec:%" PRId64 "us", - // duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, - // pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - - // pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd); + tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 + "us, exec:%" PRId64 "us", + duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, + pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - + pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { // tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 @@ -237,6 +237,7 @@ void destroyTscObj(void *pObj) { } taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFree(pTscObj); + smlDestroyInfo(pTscObj->smlHandle); tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); } @@ -266,7 +267,6 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c taosThreadMutexInit(&pObj->mutex, NULL); pObj->id = taosAddRef(clientConnRefPool, pObj); - pObj->schemalessType = 1; atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 897931d5b2..299c08745e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -232,7 +232,6 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC .pTransporter = pTscObj->pAppInfo->pTransporter, .pStmtCb = pStmtCb, .pUser = pTscObj->user, - .schemalessType = pTscObj->schemalessType, .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .enableSysInfo = pTscObj->sysInfo, .svrVer = pTscObj->sVer, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9f3c78aba2..f3c8caecd7 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -862,7 +862,6 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { .pTransporter = pTscObj->pAppInfo->pTransporter, .pStmtCb = NULL, .pUser = pTscObj->user, - .schemalessType = pTscObj->schemalessType, .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .enableSysInfo = pTscObj->sysInfo, .async = true, diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 2459aed729..14d0534e18 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -49,22 +49,23 @@ break; \ } // comma , -#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH) +//#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH) #define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH) // space -#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH) +//#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH) #define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH) // equal = -#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH) +//#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH) #define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH) // quote " -#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH) +//#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH) #define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) != SLASH) // SLASH -#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH) +//#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH) #define IS_SLASH_LETTER(sql) \ - (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql)) + (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == QUOTE || *(sql) == SLASH)) \ +// (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql)) #define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len)) @@ -104,6 +105,70 @@ typedef enum { SCHEMA_ACTION_CHANGE_TAG_SIZE, } ESchemaAction; +/*********************** list start *********************************/ +typedef struct { + const void *key; + int32_t keyLen; + void *value; + bool used; +}Node; + +typedef struct NodeList{ + Node data; + struct NodeList* next; +}NodeList; + +static void* nodeListGet(NodeList* list, const void *key, int32_t len){ + NodeList *tmp = list; + while(tmp){ + if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) { + return tmp->data.value; + } + tmp = tmp->next; + } + return NULL; +} + +static int nodeListSet(NodeList** list, const void *key, int32_t len, void* value){ + NodeList *tmp = *list; + while (tmp){ + if(!tmp->data.used) break; + if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) { + return -1; + } + tmp = tmp->next; + } + if(tmp){ + tmp->data.key = key; + tmp->data.keyLen = len; + tmp->data.value = value; + tmp->data.used = true; + }else{ + NodeList *newNode = taosMemoryCalloc(1, sizeof(NodeList)); + if(newNode == NULL){ + return -1; + } + newNode->data.key = key; + newNode->data.keyLen = len; + newNode->data.value = value; + newNode->data.used = true; + newNode->next = *list; + *list = newNode; + } + return 0; +} + +static int nodeListSize(NodeList* list){ + int cnt = 0; + while(list){ + if(list->data.used) cnt++; + else break; + list = list->next; + } + return cnt; +} +/*********************** list end *********************************/ + typedef struct { const char *measure; const char *tags; @@ -164,17 +229,8 @@ typedef struct { int64_t endTime; } SSmlCostInfo; -typedef struct { - SRequestObj *request; - tsem_t sem; - int32_t cnt; - int32_t total; - TdThreadSpinlock lock; -} Params; - typedef struct { int64_t id; - Params *params; SMLProtocolType protocol; int8_t precision; @@ -183,8 +239,8 @@ typedef struct { bool isRawLine; int32_t ttl; - SHashObj *childTables; - SHashObj *superTables; + NodeList *childTables; + NodeList *superTables; SHashObj *pVgHash; STscObj *taos; @@ -193,16 +249,15 @@ typedef struct { SQuery *pQuery; SSmlCostInfo cost; - int32_t affectedRows; + int32_t lineNum; SSmlMsgBuf msgBuf; - SHashObj *dumplicateKey; // for dumplicate key cJSON *root; // for parse json - SArray *lines; // element is SSmlLineInfo + SSmlLineInfo *lines; // element is SSmlLineInfo // - SHashObj *superTableTagKeyStr; - SHashObj *superTableColKeyStr; + NodeList *superTableTagKeyStr; + NodeList *superTableColKeyStr; void *currentLineTagKeys; void *preLineTagKeys; void *currentLineColKeys; @@ -214,6 +269,7 @@ typedef struct { SSmlLineInfo preLine; STableMeta *currSTableMeta; STableDataCxt *currTableDataCtx; + bool needModifySchema; } SSmlHandle; //================================================================================================= @@ -461,6 +517,9 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, } static int32_t smlModifyDBSchemas(SSmlHandle *info) { + if(info->dataFormat && !info->needModifySchema){ + return TSDB_CODE_SUCCESS; + } int32_t code = 0; SHashObj *hashTmp = NULL; STableMeta *pTableMeta = NULL; @@ -474,13 +533,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { conn.requestObjRefId = info->pRequest->self; conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp); - SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL); - while (tableMetaSml) { - SSmlSTableMeta *sTableData = *tableMetaSml; + NodeList *tmp = info->superTables; + while (tmp) { + SSmlSTableMeta *sTableData = tmp->data.value; bool needCheckMeta = false; // for multi thread - size_t superTableLen = 0; - void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); + size_t superTableLen = (size_t)tmp->data.keyLen; + const void *superTable = tmp->data.key; memset(pName.tname, 0, TSDB_TABLE_NAME_LEN); memcpy(pName.tname, superTable, superTableLen); @@ -629,7 +688,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { sTableData->tableMeta = pTableMeta; - tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml); + tmp = tmp->next; } return 0; @@ -779,7 +838,7 @@ static bool smlIsNchar(const char *pVal, uint16_t len) { if (len < 3) { return false; } - if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') { + if (pVal[1] == '"' && pVal[len - 1] == '"' && (pVal[0] == 'l' || pVal[0] == 'L')) { return true; } return false; @@ -787,6 +846,10 @@ static bool smlIsNchar(const char *pVal, uint16_t len) { /******************************* parse basic type function end **********************/ /******************************* time function **********************/ +static int8_t precisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES, + TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO, + TSDB_TIME_PRECISION_NANO}; + static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) { char *endPtr = NULL; int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10); @@ -837,32 +900,12 @@ static int8_t smlGetTsTypeByLen(int32_t len) { } } -static int8_t smlGetTsTypeByPrecision(int8_t precision) { - switch (precision) { - case TSDB_SML_TIMESTAMP_HOURS: - return TSDB_TIME_PRECISION_HOURS; - case TSDB_SML_TIMESTAMP_MILLI_SECONDS: - return TSDB_TIME_PRECISION_MILLI; - case TSDB_SML_TIMESTAMP_NANO_SECONDS: - case TSDB_SML_TIMESTAMP_NOT_CONFIGURED: - return TSDB_TIME_PRECISION_NANO; - case TSDB_SML_TIMESTAMP_MICRO_SECONDS: - return TSDB_TIME_PRECISION_MICRO; - case TSDB_SML_TIMESTAMP_SECONDS: - return TSDB_TIME_PRECISION_SECONDS; - case TSDB_SML_TIMESTAMP_MINUTES: - return TSDB_TIME_PRECISION_MINUTES; - default: - return -1; - } -} - static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) { if (len == 0 || (len == 1 && data[0] == '0')) { return taosGetTimestampNs(); } - int8_t tsType = smlGetTsTypeByPrecision(info->precision); + int8_t tsType = precisionConvert[info->precision]; if (tsType == -1) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL); return -1; @@ -1063,21 +1106,20 @@ static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){ } } -bool smlFormatJudge(SHashObj* superTableKeyStr, void* preLineKeys, void* currentLineKeys, - SSmlLineInfo *currElements, SSmlLineInfo *preElements, int32_t len){ +bool smlFormatJudge(NodeList **superTableKeyStr, void* preLineKeys, void* currentLineKeys, + SSmlLineInfo *currElements, bool isSameMeasure, int32_t len){ // same measure - if(preElements->measureLen == currElements->measureLen - && memcmp(preElements->measure, currElements->measure, currElements->measureLen) == 0){ + if(isSameMeasure){ if(varDataTLen(preLineKeys) != varDataTLen(currentLineKeys) || memcmp(preLineKeys, currentLineKeys, varDataTLen(preLineKeys)) != 0){ return false; } }else{ // diff measure - void *keyStr = taosHashGet(superTableKeyStr, currElements->measure, currElements->measureLen); + void *keyStr = nodeListGet(*superTableKeyStr, currElements->measure, currElements->measureLen); if(unlikely(keyStr == NULL)){ keyStr = taosMemoryMalloc(len); varDataCopy(keyStr, currentLineKeys); - taosHashPut(superTableKeyStr, currElements->measure, currElements->measureLen, &keyStr, POINTER_BYTES); + nodeListSet(superTableKeyStr, currElements->measure, currElements->measureLen, keyStr); }else{ if(varDataTLen(keyStr) != varDataTLen(currentLineKeys) && memcmp(keyStr, currentLineKeys, varDataTLen(currentLineKeys)) != 0){ @@ -1166,27 +1208,40 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd bool isSuperKVInit = false; SArray *superKV = NULL; if(info->dataFormat){ - if(currElement->measureLen == info->preLine.measureLen - && memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){ + if(currElement->measureTagsLen == info->preLine.measureTagsLen + && memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){ + isSameCTable = true; + if(isTag) return TSDB_CODE_SUCCESS; + }else if(!isTag){ + SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen); + if (unlikely(oneTable == NULL)) { + smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure); + return TSDB_CODE_SML_INVALID_DATA; + } + info->currTableDataCtx = oneTable->tableDataCtx; + } + + if(isSameCTable){ + isSameMeasure = true; + }else if(currElement->measureLen == info->preLine.measureLen + && memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){ isSameMeasure = true; } + if(!isSameMeasure){ - SSmlSTableMeta *sMeta = NULL; - SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); - if(tableMeta == NULL){ - SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); + SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen); + + if(sMeta == NULL){ + sMeta = smlBuildSTableMeta(info->dataFormat); STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); - meta->tableMeta = pTableMeta; + sMeta->tableMeta = pTableMeta; if(pTableMeta == NULL){ info->dataFormat = false; info->reRun = true; return TSDB_CODE_SUCCESS; } - taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &meta, POINTER_BYTES); - sMeta = meta; - }else{ - sMeta = *tableMeta; + nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta); } info->currSTableMeta = sMeta->tableMeta; @@ -1200,19 +1255,6 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd } } - if(currElement->measureTagsLen == info->preLine.measureTagsLen - && memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){ - isSameCTable = true; - if(isTag) return TSDB_CODE_SUCCESS; - }else if(!isTag){ - SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); - if (unlikely(oneTable == NULL)) { - smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure); - return TSDB_CODE_SML_INVALID_DATA; - } - info->currTableDataCtx = (*oneTable)->tableDataCtx; - } - if(isTag){ // prepare for judging if tag or col is the same for each line if(unlikely(info->currentLineTagKeys == NULL)){ // sml todo size need remalloc @@ -1296,16 +1338,17 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd (*sql)++; continue; } - if (!isInQuote && IS_SPACE(*sql)) { - break; - } - if (!isInQuote && IS_COMMA(*sql)) { - break; - } - if (!isInQuote && IS_EQUAL(*sql)) { - smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); - return TSDB_CODE_SML_INVALID_DATA; + if (!isInQuote){ + if (IS_SPACE(*sql)) { + break; + }else if (IS_COMMA(*sql)) { + break; + }else if (IS_EQUAL(*sql)) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); + return TSDB_CODE_SML_INVALID_DATA; + } } + (*sql)++; } valueLen = *sql - value; @@ -1368,19 +1411,20 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd if(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length){ preKV->length = kv.length; - SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); + SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen); if(tableMeta == NULL){ smlBuildInvalidDataMsg(&info->msgBuf, "measure should has inside", value); return TSDB_CODE_SML_INVALID_DATA; } if(isTag){ - superKV = (*tableMeta)->tags; + superKV = tableMeta->tags; }else{ - superKV = (*tableMeta)->cols; + superKV = tableMeta->cols; } SSmlKv *oldKV = taosArrayGet(superKV, cnt); oldKV->length = kv.length; + info->needModifySchema = true; } }else{ if(isSuperKVInit){ @@ -1404,6 +1448,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd }else{ kv.length = preKV->length; } + info->needModifySchema = true; } } taosArrayPush(preLineKV, &kv); @@ -1435,8 +1480,8 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd if(info->dataFormat){ if(isTag){ - info->dataFormat = smlFormatJudge(info->superTableTagKeyStr, info->preLineTagKeys, - info->currentLineTagKeys, currElement, &info->preLine, sqlEnd - currElement->tags); + info->dataFormat = smlFormatJudge(&info->superTableTagKeyStr, info->preLineTagKeys, + info->currentLineTagKeys, currElement, isSameMeasure, sqlEnd - currElement->tags); if(!info->dataFormat) { info->reRun = true; return TSDB_CODE_SUCCESS; @@ -1447,7 +1492,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd return TSDB_CODE_PAR_INVALID_TAGS_NUM; } - void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); + void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen); if (unlikely(oneTable == NULL)) { SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen); if (!tinfo) { @@ -1463,21 +1508,21 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL); return TSDB_CODE_SML_INVALID_DATA; } - taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES); + nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo); } } }else{ - info->dataFormat = smlFormatJudge(info->superTableColKeyStr, info->preLineColKeys, - info->currentLineColKeys, currElement, &info->preLine, sqlEnd - currElement->cols); + info->dataFormat = smlFormatJudge(&info->superTableColKeyStr, info->preLineColKeys, + info->currentLineColKeys, currElement, isSameMeasure, sqlEnd - currElement->cols); if(!info->dataFormat) { info->reRun = true; return TSDB_CODE_SUCCESS; } } }else{ - void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); + void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen); if (unlikely(oneTable == NULL)) { - SSmlTableInfo *tinfo = smlBuildTableInfo(info->affectedRows / 2, currElement->measure, currElement->measureLen); + SSmlTableInfo *tinfo = smlBuildTableInfo(info->lineNum / 2, currElement->measure, currElement->measureLen); if (!tinfo) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -1485,7 +1530,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i)); } smlSetCTableName(tinfo); - taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES); + nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo); } taosArrayDestroy(preLineKV); // smltodo } @@ -1737,8 +1782,9 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const cha return ret; } - // parse tags - ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); + // parse tags sml todo + ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, NULL, &info->msgBuf); +// ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return ret; @@ -1780,7 +1826,7 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols return TSDB_CODE_SUCCESS; } -static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) { +static void smlDestroyTableInfo(SSmlTableInfo *tag) { for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); taosHashCleanup(kvHash); @@ -1806,43 +1852,56 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { return TSDB_CODE_SUCCESS; } -static void smlDestroyInfo(SSmlHandle *info) { - if (!info) return; +void smlDestroyInfo(void *data) { + if (!data) return; + SSmlHandle *info = (SSmlHandle *)data; qDestroyQuery(info->pQuery); // destroy info->childTables - void **p1 = (void **)taosHashIterate(info->childTables, NULL); - while (p1) { - smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1)); - p1 = (void **)taosHashIterate(info->childTables, p1); + NodeList* tmp = info->childTables; + while (tmp) { + if(tmp->data.used) { + smlDestroyTableInfo(tmp->data.value); + } + NodeList* t = tmp->next; + taosMemoryFree(tmp); + tmp = t; } - taosHashCleanup(info->childTables); // destroy info->superTables - p1 = (void **)taosHashIterate(info->superTables, NULL); - while (p1) { - smlDestroySTableMeta((SSmlSTableMeta *)(*p1)); - p1 = (void **)taosHashIterate(info->superTables, p1); + tmp = info->superTables; + while (tmp) { + if(tmp->data.used) { + smlDestroySTableMeta(tmp->data.value); + } + NodeList* t = tmp->next; + taosMemoryFree(tmp); + tmp = t; } - taosHashCleanup(info->superTables); // destroy info->pVgHash taosHashCleanup(info->pVgHash); destroyRequest(info->pRequest); - p1 = (void **)taosHashIterate(info->superTableTagKeyStr, NULL); - while (p1) { - taosMemoryFree(*p1); - p1 = (void **)taosHashIterate(info->superTableTagKeyStr, p1); + tmp = info->superTableTagKeyStr; + while (tmp) { + if(tmp->data.used) { + taosMemoryFree(tmp->data.value); + } + NodeList* t = tmp->next; + taosMemoryFree(tmp); + tmp = tmp->next; } - taosHashCleanup(info->superTableTagKeyStr); - p1 = (void **)taosHashIterate(info->superTableColKeyStr, NULL); - while (p1) { - taosMemoryFree(*p1); - p1 = (void **)taosHashIterate(info->superTableColKeyStr, p1); + tmp = info->superTableColKeyStr; + while (tmp) { + if(tmp->data.used) { + taosMemoryFree(tmp->data.value); + } + NodeList* t = tmp->next; + taosMemoryFree(tmp); + tmp = tmp->next; } - taosHashCleanup(info->superTableColKeyStr); taosMemoryFree(info->currentLineTagKeys); taosMemoryFree(info->preLineTagKeys); @@ -1851,61 +1910,103 @@ static void smlDestroyInfo(SSmlHandle *info) { taosArrayDestroy(info->preLineTagKV); taosArrayDestroy(info->preLineColKV); - for(int i = 0; i < taosArrayGetSize(info->lines); i++){ - taosArrayDestroy(((SSmlLineInfo*)taosArrayGet(info->lines, i))->colArray); + for(int i = 0; i < info->lineNum; i++){ + taosArrayDestroy(info->lines[i].colArray); } - taosArrayDestroy(info->lines); + taosMemoryFree(info->lines); cJSON_Delete(info->root); taosMemoryFreeClear(info); } -static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision, int32_t perBatch) { +int32_t smlInitInfo(void *data, SRequestObj *request, bool isRawLine, int32_t ttl, + SMLProtocolType protocol, int8_t precision, int32_t lineNum) { + if (!data) return TSDB_CODE_SML_INVALID_DATA; + SSmlHandle *info = (SSmlHandle *)data; + info->id = smlGenId(); + info->pRequest = request; + info->isRawLine = isRawLine; + info->ttl = ttl; + info->precision = precision; + info->protocol = protocol; + info->msgBuf.buf = info->pRequest->msgBuf; + info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; + info->pQuery = smlInitHandle(); + + if(lineNum > info->lineNum && !info->dataFormat){ + void *tmp = taosMemoryRealloc(info->lines, lineNum * sizeof(SSmlLineInfo)); + if(tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + info->lines = tmp; + } + info->lineNum = lineNum; + + return TSDB_CODE_SUCCESS; +} + +void smlClearInfo(void *data) { + if (!data) return; + SSmlHandle *info = (SSmlHandle *)data; + + // clear info->childTables + NodeList *tmp = info->childTables; + while (tmp) { + if(tmp->data.used){ + smlDestroySTableMeta((SSmlSTableMeta *)(tmp->data.value)); + tmp->data.used = false; + } + tmp = tmp->next; + } + +// tmp = info->superTableTagKeyStr; +// while (tmp) { +// taosMemoryFree(tmp->data.value); +// tmp->data.used = false; +// tmp = tmp->next; +// } +// +// tmp = info->superTableColKeyStr; +// while (tmp) { +// taosMemoryFree(tmp->data.value); +// tmp->data.used = false; +// tmp = tmp->next; +// } + + + if(!info->dataFormat){ + for(int i = 0; i < info->lineNum; i++){ + taosArrayDestroy(info->lines[i].colArray); + } + memset(info->lines, 0, info->lineNum * sizeof(SSmlLineInfo)); + } + cJSON_Delete(info->root); + qDestroyQuery(info->pQuery); + info->pQuery = NULL; +} + +static SSmlHandle *smlBuildSmlInfo(TAOS *taos) { int32_t code = TSDB_CODE_SUCCESS; SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle)); if (NULL == info) { return NULL; } - info->id = smlGenId(); - - info->pQuery = smlInitHandle(); - - if (pTscObj) { - info->taos = pTscObj; - code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code); - goto cleanup; - } + info->taos = acquireTscObj(*(int64_t *)taos); + code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code); + goto cleanup; } - - info->precision = precision; - info->protocol = protocol; info->dataFormat = true; - info->affectedRows = perBatch; - - if (request) { - info->pRequest = request; - info->msgBuf.buf = info->pRequest->msgBuf; - info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; - } - - info->lines = taosArrayInit(perBatch, sizeof(SSmlLineInfo)); - taosArraySetSize(info->lines, perBatch); - info->superTableTagKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - info->superTableColKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - - info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->superTableTagKeyStr - || NULL == info->superTableColKeyStr || NULL == info->pVgHash) { - uError("SML:0x%" PRIx64 " create info failed", info->id); + if (NULL == info->pVgHash) { + uError("create SSmlHandle failed"); goto cleanup; } return info; - cleanup: + +cleanup: smlDestroyInfo(info); return NULL; } @@ -2369,8 +2470,9 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * } uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id); - // Parse tags - ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); + // Parse tags sml todo + ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, NULL, &info->msgBuf); +// ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); if (ret) { uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); return ret; @@ -2383,18 +2485,15 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseLineBottom(SSmlHandle *info) { if(info->dataFormat) return TSDB_CODE_SUCCESS; - for(int32_t i = 0; i < taosArrayGetSize(info->lines); i ++){ - SSmlLineInfo* elements = taosArrayGet(info->lines, i); - bool hasTable = true; - SSmlTableInfo *tinfo = NULL; - SSmlTableInfo **oneTable = - (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen); - if(oneTable == NULL){ + for(int32_t i = 0; i < info->lineNum; i ++){ + SSmlLineInfo* elements = info->lines + i; + SSmlTableInfo *tinfo = + (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen); + if(tinfo == NULL){ uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i); smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure); return TSDB_CODE_SML_INVALID_DATA; } - tinfo = *oneTable; if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) { smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL); @@ -2406,11 +2505,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { return ret; } - SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); + SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen); if (tableMeta) { // update meta - ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); + ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf); if (ret == TSDB_CODE_SUCCESS) { - ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); + ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf); } if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); @@ -2426,7 +2525,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); smlInsertMeta(meta->colHash, meta->cols, elements->colArray); - taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); + nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta); } } @@ -2455,18 +2554,17 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { } if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id); - smlDestroyTableInfo(info, tinfo); + smlDestroyTableInfo(tinfo); taosArrayDestroy(cols); return ret; } if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL); - smlDestroyTableInfo(info, tinfo); + smlDestroyTableInfo(tinfo); taosArrayDestroy(cols); return TSDB_CODE_PAR_INVALID_TAGS_NUM; } - taosHashClear(info->dumplicateKey); if (strlen(tinfo->childTableName) == 0) { RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0}; @@ -2477,23 +2575,23 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { } bool hasTable = true; - SSmlTableInfo **oneTable = - (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); + SSmlTableInfo *oneTable = + (SSmlTableInfo *)nodeListGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); if (!oneTable) { - taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES); - oneTable = &tinfo; + nodeListSet(&info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), tinfo); + oneTable = tinfo; hasTable = false; } else { - smlDestroyTableInfo(info, tinfo); + smlDestroyTableInfo(tinfo); } - taosArrayPush((*oneTable)->cols, &cols); - SSmlSTableMeta **tableMeta = - (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); + taosArrayPush(oneTable->cols, &cols); + SSmlSTableMeta *tableMeta = + (SSmlSTableMeta *)nodeListGet(info->superTables, oneTable->sTableName, oneTable->sTableNameLen); if (tableMeta) { // update meta - ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, false, &info->msgBuf); + ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, cols, false, &info->msgBuf); if (!hasTable && ret == TSDB_CODE_SUCCESS) { - ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, true, &info->msgBuf); + ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, oneTable->tags, true, &info->msgBuf); } if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); @@ -2501,9 +2599,9 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { } } else { SSmlSTableMeta *meta = smlBuildSTableMeta(false); - smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags); + smlInsertMeta(meta->tagHash, meta->tags, oneTable->tags); smlInsertMeta(meta->colHash, meta->cols, cols); - taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES); + nodeListSet(&info->superTables, oneTable->sTableName, oneTable->sTableNameLen, meta); } return TSDB_CODE_SUCCESS; @@ -2550,9 +2648,9 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) { static int32_t smlInsertData(SSmlHandle *info) { int32_t code = TSDB_CODE_SUCCESS; - SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL); - while (oneTable) { - SSmlTableInfo *tableData = *oneTable; + NodeList* tmp = info->childTables; + while (tmp) { + SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value; SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); @@ -2572,22 +2670,22 @@ static int32_t smlInsertData(SSmlHandle *info) { } taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); - SSmlSTableMeta **pMeta = - (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen); - ASSERT(NULL != pMeta && NULL != *pMeta); + SSmlSTableMeta *pMeta = + (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen); + ASSERT(NULL != pMeta); // use tablemeta of stable to save vgid and uid of child table - (*pMeta)->tableMeta->vgId = vg.vgId; - (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid + pMeta->tableMeta->vgId = vg.vgId; + pMeta->tableMeta->uid = tableData->uid; // one table merge data block together according uid - code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, - (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, + code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols, + pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->ttl, info->msgBuf.buf, info->msgBuf.len); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlBindData failed", info->id); return code; } - oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable); + tmp = tmp->next; } code = smlBuildOutput(info->pQuery, info->pVgHash); @@ -2597,20 +2695,11 @@ static int32_t smlInsertData(SSmlHandle *info) { } info->cost.insertRpcTime = taosGetTimestampUs(); - // launchQueryImpl(info->pRequest, info->pQuery, false, NULL); - // info->affectedRows = taos_affected_rows(info->pRequest); - // return info->pRequest->code; - SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary; atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); - SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); - if (pWrapper == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pWrapper->pRequest = info->pRequest; - launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper); - return TSDB_CODE_SUCCESS; + launchQueryImpl(info->pRequest, info->pQuery, true, NULL); + return info->pRequest->code; } static void smlPrintStatisticInfo(SSmlHandle *info) { @@ -2664,7 +2753,12 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp)); if (info->protocol == TSDB_SML_LINE_PROTOCOL) { - code = smlParseInfluxString(info, tmp, tmp + len, taosArrayGet(info->lines, i)); + if(info->dataFormat){ + SSmlLineInfo element = {0}; + code = smlParseInfluxString(info, tmp, tmp + len, &element); + }else{ + code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i); + } } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { code = smlParseTelnetLine(info, tmp, len); } else { @@ -2678,20 +2772,31 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char i = 0; info->reRun = false; // clear info->childTables - void **p1 = (void **)taosHashIterate(info->childTables, NULL); - while (p1) { - smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1)); - p1 = (void **)taosHashIterate(info->childTables, p1); + NodeList* pList = info->childTables; + while (pList) { + if(pList->data.used) { + smlDestroyTableInfo(pList->data.value); + pList->data.used = false; + } + pList = pList->next; } - taosHashClear(info->childTables); // clear info->superTables - p1 = (void **)taosHashIterate(info->superTables, NULL); - while (p1) { - smlDestroySTableMeta((SSmlSTableMeta *)(*p1)); - p1 = (void **)taosHashIterate(info->superTables, p1); + pList = info->superTables; + while (pList) { + if(pList->data.used) { + smlDestroySTableMeta(pList->data.value); + pList->data.used = false; + } + pList = pList->next; } - taosHashClear(info->superTables); + + if(info->lines != NULL){ + uError("SML:0x%" PRIx64 " info->lines != NULL", info->id); + return TSDB_CODE_SML_INVALID_DATA; + } + info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo)); + continue; } i++; @@ -2719,15 +2824,15 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL } info->cost.lineNum = numLines; - info->cost.numOfSTables = taosHashGetSize(info->superTables); - info->cost.numOfCTables = taosHashGetSize(info->childTables); + info->cost.numOfSTables = nodeListSize(info->superTables); + info->cost.numOfCTables = nodeListSize(info->childTables); info->cost.schemaTime = taosGetTimestampUs(); do { code = smlModifyDBSchemas(info); if (code == 0) break; - } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); + } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES); if (code != 0) { uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code)); @@ -2744,79 +2849,46 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL return code; } -static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) { - // SCatalog *catalog = NULL; - // int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog); - // if (code != TSDB_CODE_SUCCESS) { - // uError("SML get catalog error %d", code); - // return code; - // } - // - // SName name; - // tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db)); - // char dbFname[TSDB_DB_FNAME_LEN] = {0}; - // tNameGetFullDbName(&name, dbFname); - // SDbCfgInfo pInfo = {0}; - // - // SRequestConnInfo conn = {0}; - // conn.pTrans = taos->pAppInfo->pTransporter; - // conn.requestId = request->requestId; - // conn.requestObjRefId = request->self; - // conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp); - // - // code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo); - // if (code != TSDB_CODE_SUCCESS) { - // return code; - // } - // taosArrayDestroy(pInfo.pRetensions); - // - // if (!pInfo.schemaless) { - // return TSDB_CODE_SML_INVALID_DB_CONF; - // } - return TSDB_CODE_SUCCESS; -} - -static void smlInsertCallback(void *param, void *res, int32_t code) { - SRequestObj *pRequest = (SRequestObj *)res; - SSmlHandle *info = (SSmlHandle *)param; - int32_t rows = taos_affected_rows(pRequest); - - uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); - Params *pParam = info->params; - // lock - taosThreadSpinLock(&pParam->lock); - pParam->cnt++; - if (code != TSDB_CODE_SUCCESS) { - pParam->request->code = code; - pParam->request->body.resInfo.numOfRows += rows; - } else { - pParam->request->body.resInfo.numOfRows += info->affectedRows; +TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, + int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) { + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; } - // unlock - taosThreadSpinUnlock(&pParam->lock); - if (pParam->cnt == pParam->total) { - tsem_post(&pParam->sem); + STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); + if (pTscObj == NULL) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); + if (request == NULL) { + uError("SML:taos_schemaless_insert error request is null"); + return NULL; } - uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows); - info->cost.endTime = taosGetTimestampUs(); - info->cost.code = code; - smlPrintStatisticInfo(info); - smlDestroyInfo(info); -} -TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, - int numLines, int protocol, int precision, int32_t ttl) { - int batchs = 0; - STscObj *pTscObj = request->pTscObj; - - pTscObj->schemalessType = 1; + SSmlHandle *info = NULL; + if(pTscObj->smlHandle == NULL){ + info = smlBuildSmlInfo(taos); + if (info == NULL) { + request->code = TSDB_CODE_OUT_OF_MEMORY; + uError("SML:taos_schemaless_insert error SSmlHandle is null"); + goto end; + } + pTscObj->smlHandle = info; + }else{ + info = (SSmlHandle *)(pTscObj->smlHandle); + smlClearInfo(info); + } SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - Params params = {0}; - params.request = request; - tsem_init(¶ms.sem, 0, 0); - taosThreadSpinInit(&(params.lock), 0); + int ret = smlInitInfo(info, request, rawLine != NULL, + ttl, protocol, precision, numLines); + if(ret != TSDB_CODE_SUCCESS){ + request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + smlBuildInvalidDataMsg(&msg, "smlInitInfo error", NULL); + goto end; + } if (request->pDb == NULL) { request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -2824,12 +2896,6 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char goto end; } - if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) { - request->code = TSDB_CODE_SML_INVALID_DB_CONF; - smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); - goto end; - } - if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) { request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL); @@ -2851,64 +2917,13 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char goto end; } - batchs = ceil(((double)numLines) / tsSmlBatchSize); - params.total = batchs; - for (int i = 0; i < batchs; ++i) { - SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0); - if (!req) { - request->code = TSDB_CODE_OUT_OF_MEMORY; - uError("SML:taos_schemaless_insert error request is null"); - goto end; - } + int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, numLines); + request->code = code; + info->cost.endTime = taosGetTimestampUs(); + info->cost.code = code; + smlPrintStatisticInfo(info); - int32_t perBatch = tsSmlBatchSize; - - if (numLines > perBatch) { - numLines -= perBatch; - } else { - perBatch = numLines; - numLines = 0; - } - - SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision, perBatch); - if (!info) { - request->code = TSDB_CODE_OUT_OF_MEMORY; - uError("SML:taos_schemaless_insert error SSmlHandle is null"); - goto end; - } - - info->isRawLine = (rawLine == NULL); - info->ttl = ttl; - - info->params = ¶ms; - info->pRequest->body.queryFp = smlInsertCallback; - info->pRequest->body.param = info; - int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch); - if (lines) { - lines += perBatch; - } - if (rawLine) { - int num = 0; - while (rawLine < rawLineEnd) { - if (*(rawLine++) == '\n') { - num++; - } - if (num == perBatch) { - break; - } - } - } - if (code != TSDB_CODE_SUCCESS) { - info->pRequest->body.queryFp(info, req, code); - } - } - tsem_wait(¶ms.sem); - - end: - taosThreadSpinDestroy(¶ms.lock); - tsem_destroy(¶ms.sem); - // ((STscObj *)taos)->schemalessType = 0; - pTscObj->schemalessType = 1; +end: uDebug("resultend:%s", request->msgBuf); return (TAOS_RES *)request; } @@ -2934,25 +2949,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) { - if (NULL == taos) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; - } - - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); - if (!request) { - uError("SML:taos_schemaless_insert error request is null"); - return NULL; - } - - if (!lines) { - SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&msg, "lines is null", NULL); - return (TAOS_RES *)request; - } - - return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision, ttl); + return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid); } TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { @@ -2969,24 +2966,6 @@ TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLi TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl, int64_t reqid) { - if (NULL == taos) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; - } - - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); - if (!request) { - uError("SML:taos_schemaless_insert error request is null"); - return NULL; - } - - if (!lines || len <= 0) { - SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; - request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&msg, "lines is null", NULL); - return (TAOS_RES *)request; - } - int numLines = 0; *totalRows = 0; char *tmp = lines; @@ -2999,7 +2978,7 @@ TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int tmp = lines + i + 1; } } - return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision, ttl); + return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid); } TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) { diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 685d4586be..47b7adbf18 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1142,8 +1142,8 @@ int sml_ttl_Test() { int main(int argc, char *argv[]) { int ret = 0; -// ret = sml_ttl_Test(); -// ASSERT(!ret); + ret = sml_ttl_Test(); + ASSERT(!ret); ret = sml_ts2164_Test(); ASSERT(!ret); ret = smlProcess_influx_Test();