fix:modify influxdb parse logic for sml
This commit is contained in:
parent
6d788b6b95
commit
517c8b1d5a
|
|
@ -108,7 +108,11 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
|
||||||
void qDestroyBoundColInfo(void* pInfo);
|
void qDestroyBoundColInfo(void* pInfo);
|
||||||
|
|
||||||
SQuery* smlInitHandle();
|
SQuery* smlInitHandle();
|
||||||
int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
int32_t smlBuildRow(STableDataCxt* pTableCxt);
|
||||||
|
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void *kv, int32_t index);
|
||||||
|
STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta);
|
||||||
|
|
||||||
|
int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta,
|
||||||
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
||||||
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
||||||
|
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -56,7 +56,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
||||||
SSmlKv* kv = taosArrayGetP(cols, i);
|
SSmlKv* kv = taosArrayGet(cols, i);
|
||||||
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
||||||
col_id_t t = lastColIdx + 1;
|
col_id_t t = lastColIdx + 1;
|
||||||
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
|
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
|
||||||
|
|
@ -111,7 +111,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < tags->numOfBound; ++i) {
|
for (int i = 0; i < tags->numOfBound; ++i) {
|
||||||
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
|
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
|
||||||
SSmlKv* kv = taosArrayGetP(cols, i);
|
SSmlKv* kv = taosArrayGet(cols, i);
|
||||||
|
|
||||||
taosArrayPush(*tagName, pTagSchema->name);
|
taosArrayPush(*tagName, pTagSchema->name);
|
||||||
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
||||||
|
|
@ -158,7 +158,66 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta){
|
||||||
|
STableDataCxt* pTableCxt = NULL;
|
||||||
|
int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
||||||
|
pTableMeta, NULL, &pTableCxt, false);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = initTableColSubmitData(pTableCxt);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pTableCxt;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t smlBuildRow(STableDataCxt* pTableCxt){
|
||||||
|
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
||||||
|
int ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
||||||
|
if (TSDB_CODE_SUCCESS != ret) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void *data, int32_t index){
|
||||||
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
|
SSchema* pColSchema = schema + index;
|
||||||
|
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
|
||||||
|
SSmlKv* kv = (SSmlKv*)data;
|
||||||
|
if (kv->type == TSDB_DATA_TYPE_NCHAR){
|
||||||
|
int32_t len = 0;
|
||||||
|
char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE);
|
||||||
|
if (NULL == pUcs4) {
|
||||||
|
ret = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
||||||
|
if (errno == E2BIG) {
|
||||||
|
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
ret = TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
pVal->value.pData = pUcs4;
|
||||||
|
pVal->value.nData = len;
|
||||||
|
} else if(kv->type == TSDB_DATA_TYPE_BINARY) {
|
||||||
|
pVal->value.nData = kv->length;
|
||||||
|
pVal->value.pData = (uint8_t *)kv->value;
|
||||||
|
} else {
|
||||||
|
memcpy(&pVal->value.val, &(kv->value), kv->length);
|
||||||
|
}
|
||||||
|
pVal->flag = CV_FLAG_VALUE;
|
||||||
|
|
||||||
|
end:
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta,
|
||||||
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) {
|
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) {
|
||||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||||
|
|
||||||
|
|
@ -192,6 +251,18 @@ int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* col
|
||||||
pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
|
pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
|
||||||
memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
|
memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
|
||||||
|
|
||||||
|
if(dataFormat){
|
||||||
|
STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
|
||||||
|
if (NULL == pTableCxt) {
|
||||||
|
ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
(*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
|
(*pTableCxt)->pData->pCreateTbReq = pCreateTblReq;
|
||||||
|
pCreateTblReq = NULL;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
STableDataCxt* pTableCxt = NULL;
|
STableDataCxt* pTableCxt = NULL;
|
||||||
ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
||||||
pTableMeta, &pCreateTblReq, &pTableCxt, false);
|
pTableMeta, &pCreateTblReq, &pTableCxt, false);
|
||||||
|
|
@ -226,15 +297,10 @@ int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* col
|
||||||
for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
|
for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
|
||||||
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
|
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
|
||||||
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
|
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
|
||||||
SSmlKv* kv = NULL;
|
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
||||||
if (!format){
|
if (p == NULL) continue;
|
||||||
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
SSmlKv *kv = *(SSmlKv **)p;
|
||||||
if (p) kv = *p;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (kv == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue