feat:modify get meta logic for schemaless
This commit is contained in:
parent
2ba6ac5fba
commit
788c063acc
|
@ -271,8 +271,6 @@ DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
|||
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
|
||||
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
|
||||
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char* tbname);
|
||||
|
||||
|
||||
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
|
||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
||||
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
|
||||
|
|
|
@ -84,32 +84,11 @@
|
|||
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
||||
|
||||
typedef enum {
|
||||
SCHEMA_ACTION_CREATE_STABLE,
|
||||
SCHEMA_ACTION_ADD_COLUMN,
|
||||
SCHEMA_ACTION_ADD_TAG,
|
||||
SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
|
||||
SCHEMA_ACTION_CHANGE_TAG_SIZE,
|
||||
SCHEMA_ACTION_NULL,
|
||||
SCHEMA_ACTION_COLUMN,
|
||||
SCHEMA_ACTION_TAG
|
||||
} ESchemaAction;
|
||||
|
||||
typedef struct {
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
SArray *tags;
|
||||
SArray *fields;
|
||||
} SCreateSTableActionInfo;
|
||||
|
||||
typedef struct {
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
SSmlKv *field;
|
||||
} SAlterSTableActionInfo;
|
||||
|
||||
typedef struct {
|
||||
ESchemaAction action;
|
||||
union {
|
||||
SCreateSTableActionInfo createSTable;
|
||||
SAlterSTableActionInfo alterSTable;
|
||||
};
|
||||
} SSchemaAction;
|
||||
|
||||
typedef struct {
|
||||
const char *measure;
|
||||
const char *tags;
|
||||
|
@ -226,18 +205,20 @@ static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashOb
|
|||
}
|
||||
|
||||
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
|
||||
memset(pBuf->buf, 0, pBuf->len);
|
||||
if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
|
||||
int32_t left = pBuf->len - strlen(pBuf->buf);
|
||||
if (left > 2 && msg2) {
|
||||
strncat(pBuf->buf, ":", left - 1);
|
||||
strncat(pBuf->buf, msg2, left - 2);
|
||||
if(pBuf->buf){
|
||||
memset(pBuf->buf, 0, pBuf->len);
|
||||
if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
|
||||
int32_t left = pBuf->len - strlen(pBuf->buf);
|
||||
if (left > 2 && msg2) {
|
||||
strncat(pBuf->buf, ":", left - 1);
|
||||
strncat(pBuf->buf, msg2, left - 2);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
}
|
||||
|
||||
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
|
||||
SSchemaAction *action, bool *actionNeeded, SSmlHandle *info) {
|
||||
ESchemaAction *action, SSmlHandle *info) {
|
||||
uint16_t *index = (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen);
|
||||
if (index) {
|
||||
if (colField[*index].type != kv->type) {
|
||||
|
@ -251,25 +232,17 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
|
|||
(colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
|
||||
((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
|
||||
if (isTag) {
|
||||
action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
|
||||
*action = SCHEMA_ACTION_TAG;
|
||||
} else {
|
||||
action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
|
||||
*action = SCHEMA_ACTION_COLUMN;
|
||||
}
|
||||
action->alterSTable.field = kv;
|
||||
*actionNeeded = true;
|
||||
}
|
||||
} else {
|
||||
if (isTag) {
|
||||
action->action = SCHEMA_ACTION_ADD_TAG;
|
||||
*action = SCHEMA_ACTION_TAG;
|
||||
} else {
|
||||
action->action = SCHEMA_ACTION_ADD_COLUMN;
|
||||
*action = SCHEMA_ACTION_COLUMN;
|
||||
}
|
||||
action->alterSTable.field = kv;
|
||||
*actionNeeded = true;
|
||||
}
|
||||
if (*actionNeeded) {
|
||||
uDebug("SML:0x%" PRIx64 " generate schema action. kv->name: %s, action: %d", info->id, kv->key,
|
||||
action->action);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -284,171 +257,25 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
|
|||
} else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
|
||||
result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||
}
|
||||
|
||||
if (type == TSDB_DATA_TYPE_NCHAR){
|
||||
result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
|
||||
}else if (type == TSDB_DATA_TYPE_BINARY){
|
||||
result = result + VARSTR_HEADER_SIZE;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static int32_t smlBuildColumnDescription(SSmlKv *field, char *buf, int32_t bufSize, int32_t *outBytes) {
|
||||
uint8_t type = field->type;
|
||||
char tname[TSDB_TABLE_NAME_LEN] = {0};
|
||||
memcpy(tname, field->key, field->keyLen);
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
int32_t bytes = smlFindNearestPowerOf2(field->length, type);
|
||||
int out = snprintf(buf, bufSize, "`%s` %s(%d)", tname, tDataTypes[field->type].name, bytes);
|
||||
*outBytes = out;
|
||||
} else {
|
||||
int out = snprintf(buf, bufSize, "`%s` %s", tname, tDataTypes[type].name);
|
||||
*outBytes = out;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
||||
int32_t code = 0;
|
||||
int32_t outBytes = 0;
|
||||
char *result = (char *)taosMemoryCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN);
|
||||
int32_t capacity = TSDB_MAX_ALLOWED_SQL_LEN;
|
||||
|
||||
uDebug("SML:0x%" PRIx64 " apply schema action. action: %d", info->id, action->action);
|
||||
switch (action->action) {
|
||||
case SCHEMA_ACTION_ADD_COLUMN: {
|
||||
int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
const char *errStr = taos_errstr(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error: %s", info->id, errStr);
|
||||
taosMsleep(100);
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
break;
|
||||
}
|
||||
case SCHEMA_ACTION_ADD_TAG: {
|
||||
int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
const char *errStr = taos_errstr(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||
taosMsleep(100);
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
break;
|
||||
}
|
||||
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
|
||||
int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||
taosMsleep(100);
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
break;
|
||||
}
|
||||
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
|
||||
int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||
taosMsleep(100);
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
break;
|
||||
}
|
||||
case SCHEMA_ACTION_CREATE_STABLE: {
|
||||
int n = sprintf(result, "create stable `%s` (", action->createSTable.sTableName);
|
||||
char *pos = result + n;
|
||||
int freeBytes = capacity - n;
|
||||
|
||||
SArray *cols = action->createSTable.fields;
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(cols); i++) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
|
||||
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
|
||||
pos += outBytes;
|
||||
freeBytes -= outBytes;
|
||||
*pos = ',';
|
||||
++pos;
|
||||
--freeBytes;
|
||||
}
|
||||
|
||||
--pos;
|
||||
++freeBytes;
|
||||
|
||||
outBytes = snprintf(pos, freeBytes, ") tags (");
|
||||
pos += outBytes;
|
||||
freeBytes -= outBytes;
|
||||
|
||||
cols = action->createSTable.tags;
|
||||
for (int i = 0; i < taosArrayGetSize(cols); i++) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
|
||||
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
|
||||
pos += outBytes;
|
||||
freeBytes -= outBytes;
|
||||
*pos = ',';
|
||||
++pos;
|
||||
--freeBytes;
|
||||
}
|
||||
if (taosArrayGetSize(cols) == 0) {
|
||||
outBytes = snprintf(pos, freeBytes, "`%s` %s(%d)", tsSmlTagName, tDataTypes[TSDB_DATA_TYPE_NCHAR].name, 1);
|
||||
pos += outBytes;
|
||||
freeBytes -= outBytes;
|
||||
*pos = ',';
|
||||
++pos;
|
||||
--freeBytes;
|
||||
}
|
||||
pos--;
|
||||
++freeBytes;
|
||||
outBytes = snprintf(pos, freeBytes, ")");
|
||||
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result);
|
||||
code = taos_errno(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||
taosMsleep(100);
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(result);
|
||||
if (code != 0) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
|
||||
SSchemaAction *action, bool isTag) {
|
||||
ESchemaAction *action, bool isTag) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
|
||||
if(j == 0 && !isTag) continue;
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
|
||||
bool actionNeeded = false;
|
||||
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, &actionNeeded, info);
|
||||
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
if (actionNeeded) {
|
||||
code = smlApplySchemaAction(info, action);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -475,6 +302,103 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t getBytes(uint8_t type, int32_t length){
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
return smlFindNearestPowerOf2(length, type);
|
||||
} else {
|
||||
return tDataTypes[type].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
|
||||
int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
|
||||
SRequestObj* pRequest = NULL;
|
||||
SMCreateStbReq pReq = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCmdMsgInfo pCmdMsg = {0};
|
||||
|
||||
code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (!pRequest->pDb) {
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
}
|
||||
|
||||
pReq.colVer = colVer;
|
||||
pReq.tagVer = tagVer;
|
||||
pReq.source = source;
|
||||
pReq.commentLen = -1;
|
||||
pReq.igExists = true;
|
||||
pReq.suid = suid;
|
||||
tNameExtractFullName(pName, pReq.name);
|
||||
|
||||
pReq.numOfColumns = taosArrayGetSize(sTableData->cols);
|
||||
pReq.numOfTags = taosArrayGetSize(sTableData->tags);
|
||||
|
||||
pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SField));
|
||||
for (int i = 0; i < pReq.numOfColumns; i++) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(sTableData->cols, i);
|
||||
SField field = {0};
|
||||
field.type = kv->type;
|
||||
field.bytes = getBytes(kv->type, kv->length);
|
||||
memcpy(field.name, kv->key, kv->keyLen);
|
||||
taosArrayPush(pReq.pColumns, &field);
|
||||
}
|
||||
|
||||
if (pReq.numOfTags == 0){
|
||||
pReq.numOfTags = 1;
|
||||
pReq.pTags = taosArrayInit(pReq.numOfTags, sizeof(SField));
|
||||
SField field = {0};
|
||||
field.type = TSDB_DATA_TYPE_NCHAR;
|
||||
field.bytes = 1;
|
||||
strcpy(field.name, tsSmlTagName);
|
||||
taosArrayPush(pReq.pTags, &field);
|
||||
}else{
|
||||
pReq.pTags = taosArrayInit(pReq.numOfTags, sizeof(SField));
|
||||
for (int i = 0; i < pReq.numOfTags; i++) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(sTableData->tags, i);
|
||||
SField field = {0};
|
||||
field.type = kv->type;
|
||||
field.bytes = getBytes(kv->type, kv->length);
|
||||
memcpy(field.name, kv->key, kv->keyLen);
|
||||
taosArrayPush(pReq.pTags, &field);
|
||||
}
|
||||
}
|
||||
|
||||
pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
|
||||
pCmdMsg.msgType = TDMT_MND_CREATE_STB;
|
||||
pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
|
||||
pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
|
||||
if (NULL == pCmdMsg.pMsg) {
|
||||
tFreeSMCreateStbReq(&pReq);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
|
||||
|
||||
SQuery pQuery;
|
||||
pQuery.execMode = QUERY_EXEC_MODE_RPC;
|
||||
pQuery.pCmdMsg = &pCmdMsg;
|
||||
pQuery.msgType = pQuery.pCmdMsg->msgType;
|
||||
pQuery.stableQuery = true;
|
||||
|
||||
launchQueryImpl(pRequest, &pQuery, true, NULL);
|
||||
|
||||
if(pRequest->code == TSDB_CODE_SUCCESS){
|
||||
catalogRemoveTableMeta(info->pCatalog, pName);
|
||||
}
|
||||
code = pRequest->code;
|
||||
taosMemoryFree(pCmdMsg.pMsg);
|
||||
|
||||
end:
|
||||
destroyRequest(pRequest);
|
||||
tFreeSMCreateStbReq(&pReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||
int32_t code = 0;
|
||||
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
|
||||
|
@ -500,16 +424,9 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||
SSchemaAction schemaAction;
|
||||
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
|
||||
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
|
||||
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
||||
schemaAction.createSTable.tags = sTableData->tags;
|
||||
schemaAction.createSTable.fields = sTableData->cols;
|
||||
code = smlApplySchemaAction(info, &schemaAction);
|
||||
code = smlSendMetaMsg(info, &pName, sTableData, 1, 1, TD_REQ_FROM_APP, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlApplySchemaAction failed. can not create %s", info->id,
|
||||
schemaAction.createSTable.sTableName);
|
||||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, superTable);
|
||||
goto end;
|
||||
}
|
||||
info->cost.numOfCreateSTables++;
|
||||
|
@ -521,24 +438,42 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
|
||||
}
|
||||
|
||||
SSchemaAction schemaAction;
|
||||
memset(&schemaAction, 0, sizeof(SSchemaAction));
|
||||
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &schemaAction, true);
|
||||
ESchemaAction action = SCHEMA_ACTION_NULL;
|
||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosHashCleanup(hashTmp);
|
||||
goto end;
|
||||
}
|
||||
if (action == SCHEMA_ACTION_TAG){
|
||||
code = smlSendMetaMsg(info, &pName, sTableData, pTableMeta->sversion, pTableMeta->tversion + 1, TD_REQ_FROM_TAOX, pTableMeta->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, superTable);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
taosHashClear(hashTmp);
|
||||
for (uint16_t i = 1; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
|
||||
}
|
||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false);
|
||||
action = SCHEMA_ACTION_NULL;
|
||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
|
||||
taosHashCleanup(hashTmp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
if (action == SCHEMA_ACTION_COLUMN){
|
||||
code = smlSendMetaMsg(info, &pName, sTableData, pTableMeta->sversion + 1, pTableMeta->tversion, TD_REQ_FROM_TAOX, pTableMeta->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, superTable);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1504,11 +1439,13 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr
|
|||
}
|
||||
((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
info->taos = pTscObj;
|
||||
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
|
||||
goto cleanup;
|
||||
if (pTscObj){
|
||||
info->taos = pTscObj;
|
||||
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
info->precision = precision;
|
||||
|
@ -1518,9 +1455,12 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr
|
|||
} else {
|
||||
info->dataFormat = true;
|
||||
}
|
||||
info->pRequest = request;
|
||||
info->msgBuf.buf = info->pRequest->msgBuf;
|
||||
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||
|
||||
if(request){
|
||||
info->pRequest = request;
|
||||
info->msgBuf.buf = info->pRequest->msgBuf;
|
||||
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||
}
|
||||
|
||||
info->exec = smlInitHandle(info->pQuery);
|
||||
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,98 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def checkFileContent(self):
|
||||
buildPath = tdCom.getBuildPath()
|
||||
cmdStr = '%s/build/bin/sml_test'%(buildPath)
|
||||
tdLog.info(cmdStr)
|
||||
ret = os.system(cmdStr)
|
||||
if ret != 0:
|
||||
tdLog.exit("sml_test failed")
|
||||
|
||||
tdSql.execute('use sml_db')
|
||||
tdSql.query("select * from t_b7d815c9222ca64cdf2614c61de8f211")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.checkData(0, 0, '2016-01-01 08:00:07.000')
|
||||
tdSql.checkData(0, 1, 2000)
|
||||
tdSql.checkData(0, 2, 200)
|
||||
tdSql.checkData(0, 3, 15)
|
||||
tdSql.checkData(0, 4, 24.5208)
|
||||
tdSql.checkData(0, 5, 28.09377)
|
||||
tdSql.checkData(0, 6, 428)
|
||||
tdSql.checkData(0, 7, 0)
|
||||
tdSql.checkData(0, 8, 304)
|
||||
tdSql.checkData(0, 9, 0)
|
||||
tdSql.checkData(0, 10, 25)
|
||||
|
||||
tdSql.query("select * from readings")
|
||||
tdSql.checkRows(9)
|
||||
|
||||
tdSql.query("select distinct tbname from readings")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from t_0799064f5487946e5d22164a822acfc8 order by _ts")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 3, "kk")
|
||||
tdSql.checkData(1, 3, None)
|
||||
|
||||
|
||||
tdSql.query("select distinct tbname from `sys.if.bytes.out`")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdSql.query("select * from t_fc70dec6677d4277c5d9799c4da806da order by _ts")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 1.300000000)
|
||||
tdSql.checkData(1, 1,13.000000000)
|
||||
|
||||
tdSql.query("select * from `sys.procs.running`")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 42.000000000)
|
||||
tdSql.checkData(0, 2, "web01")
|
||||
|
||||
tdSql.query("select distinct tbname from `sys.cpu.nice`")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdSql.query("select * from `sys.cpu.nice` order by _ts")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 9.000000000)
|
||||
tdSql.checkData(0, 2, "lga")
|
||||
tdSql.checkData(0, 3, "web02")
|
||||
tdSql.checkData(0, 4, None)
|
||||
tdSql.checkData(1, 1, 18.000000000)
|
||||
tdSql.checkData(1, 2, "lga")
|
||||
tdSql.checkData(1, 3, "web01")
|
||||
tdSql.checkData(1, 4, "t1")
|
||||
|
||||
return
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.checkFileContent()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -45,7 +45,7 @@ class TDTestCase:
|
|||
break
|
||||
|
||||
tdSql.execute('use db_taosx')
|
||||
tdSql.query("select * from ct3")
|
||||
tdSql.query("select * from ct3 order by c1 desc")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 51)
|
||||
tdSql.checkData(0, 4, 940)
|
||||
|
@ -58,17 +58,17 @@ class TDTestCase:
|
|||
tdSql.query("select * from ct2")
|
||||
tdSql.checkRows(0)
|
||||
|
||||
tdSql.query("select * from ct0")
|
||||
tdSql.query("select * from ct0 order by c1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 3, "a")
|
||||
tdSql.checkData(1, 4, None)
|
||||
|
||||
tdSql.query("select * from n1")
|
||||
tdSql.query("select * from n1 order by cc3 desc")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, "eeee")
|
||||
tdSql.checkData(1, 2, 940)
|
||||
|
||||
tdSql.query("select * from jt")
|
||||
tdSql.query("select * from jt order by i desc")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 11)
|
||||
tdSql.checkData(0, 2, None)
|
||||
|
@ -85,7 +85,5 @@ class TDTestCase:
|
|||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
|
|
|
@ -2,6 +2,7 @@ add_executable(tmq_demo tmqDemo.c)
|
|||
add_executable(tmq_sim tmqSim.c)
|
||||
add_executable(create_table createTable.c)
|
||||
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
|
||||
add_executable(sml_test sml_test.c)
|
||||
target_link_libraries(
|
||||
create_table
|
||||
PUBLIC taos_static
|
||||
|
@ -31,6 +32,14 @@ target_link_libraries(
|
|||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
sml_test
|
||||
PUBLIC taos_static
|
||||
PUBLIC util
|
||||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
|
||||
add_executable(sdbDump sdbDump.c)
|
||||
target_link_libraries(
|
||||
sdbDump
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue