[td-225] update the updateTagVal msg
This commit is contained in:
parent
a9ef9ff97e
commit
5ac1c1bb67
|
@ -4452,6 +4452,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
|
||||||
tVariantList* pVarList = pAlterSQL->varList;
|
tVariantList* pVarList = pAlterSQL->varList;
|
||||||
tVariant* pTagName = &pVarList->a[0].pVar;
|
tVariant* pTagName = &pVarList->a[0].pVar;
|
||||||
|
int16_t numOfTags = tscGetNumOfTags(pTableMeta);
|
||||||
|
|
||||||
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
|
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
|
||||||
SSQLToken name = {.type = TK_STRING, .z = pTagName->pz, .n = pTagName->nLen};
|
SSQLToken name = {.type = TK_STRING, .z = pTagName->pz, .n = pTagName->nLen};
|
||||||
|
@ -4475,8 +4476,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
(pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) {
|
(pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) {
|
||||||
return invalidSqlErrMsg(pQueryInfo->msg, msg14);
|
return invalidSqlErrMsg(pQueryInfo->msg, msg14);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + TSDB_EXTRA_PAYLOAD_SIZE;
|
int32_t schemaLen = sizeof(STColumn) * numOfTags;
|
||||||
|
int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + schemaLen + TSDB_EXTRA_PAYLOAD_SIZE;
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||||
tscError("%p failed to malloc for alter table msg", pSql);
|
tscError("%p failed to malloc for alter table msg", pSql);
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
@ -4487,11 +4490,25 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
pUpdateMsg->tid = htonl(pTableMeta->sid);
|
pUpdateMsg->tid = htonl(pTableMeta->sid);
|
||||||
pUpdateMsg->uid = htobe64(pTableMeta->uid);
|
pUpdateMsg->uid = htobe64(pTableMeta->uid);
|
||||||
pUpdateMsg->colId = htons(pTagsSchema->colId);
|
pUpdateMsg->colId = htons(pTagsSchema->colId);
|
||||||
pUpdateMsg->type = htons(pTagsSchema->type);
|
|
||||||
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
|
|
||||||
pUpdateMsg->tversion = htons(pTableMeta->tversion);
|
pUpdateMsg->tversion = htons(pTableMeta->tversion);
|
||||||
|
pUpdateMsg->numOfTags = htons(numOfTags);
|
||||||
tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data, pTagsSchema->type, true);
|
pUpdateMsg->schemaLen = htonl(schemaLen);
|
||||||
|
|
||||||
|
// the schema is located after the msg body, then followed by true tag value
|
||||||
|
char* d = pUpdateMsg->data;
|
||||||
|
SSchema* pTagCols = tscGetTableTagSchema(pTableMeta);
|
||||||
|
for (int i = 0; i < numOfTags; ++i) {
|
||||||
|
STColumn* pCol = (STColumn*) d;
|
||||||
|
pCol->colId = htons(pTagCols[i].colId);
|
||||||
|
pCol->bytes = htons(pTagCols[i].bytes);
|
||||||
|
pCol->type = pTagCols[i].type;
|
||||||
|
pCol->offset = 0;
|
||||||
|
|
||||||
|
d += sizeof(STColumn);
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy the tag value to msg body
|
||||||
|
tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data + schemaLen, pTagsSchema->type, true);
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
|
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
@ -4502,7 +4519,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
|
||||||
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data
|
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data
|
||||||
|
|
||||||
int32_t total = sizeof(SUpdateTableTagValMsg) + len;
|
int32_t total = sizeof(SUpdateTableTagValMsg) + len + schemaLen;
|
||||||
pUpdateMsg->head.contLen = htonl(total);
|
pUpdateMsg->head.contLen = htonl(total);
|
||||||
|
|
||||||
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
|
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
|
||||||
|
|
|
@ -285,9 +285,9 @@ typedef struct {
|
||||||
int32_t tid;
|
int32_t tid;
|
||||||
int16_t tversion;
|
int16_t tversion;
|
||||||
int16_t colId;
|
int16_t colId;
|
||||||
int16_t type;
|
|
||||||
int16_t bytes;
|
|
||||||
int32_t tagValLen;
|
int32_t tagValLen;
|
||||||
|
int16_t numOfTags;
|
||||||
|
int32_t schemaLen;
|
||||||
char data[];
|
char data[];
|
||||||
} SUpdateTableTagValMsg;
|
} SUpdateTableTagValMsg;
|
||||||
|
|
||||||
|
|
|
@ -310,7 +310,9 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||||
}
|
}
|
||||||
// TODO: remove table from index if it is the first column of tag
|
// TODO: remove table from index if it is the first column of tag
|
||||||
tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
|
// TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId
|
||||||
|
|
||||||
|
// tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
|
||||||
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
|
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
|
||||||
tsdbAddTableIntoIndex(pMeta, pTable);
|
tsdbAddTableIntoIndex(pMeta, pTable);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue