[td-225] append the tag schema info to the msg of updating the tag value of table
This commit is contained in:
parent
c9c3834449
commit
db3b9089a0
|
@ -255,17 +255,46 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t colIdCompar(const void* left, const void* right) {
|
||||||
|
int16_t colId = *(int16_t*) left;
|
||||||
|
STColumn* p2 = (STColumn*) right;
|
||||||
|
|
||||||
|
if (colId == p2->colId) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (colId < p2->colId)? -1:1;
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
int16_t tversion = htons(pMsg->tversion);
|
|
||||||
|
|
||||||
STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid));
|
pMsg->uid = htobe64(pMsg->uid);
|
||||||
|
pMsg->tid = htonl(pMsg->tid);
|
||||||
|
pMsg->tversion = htons(pMsg->tversion);
|
||||||
|
pMsg->colId = htons(pMsg->colId);
|
||||||
|
pMsg->tagValLen = htonl(pMsg->tagValLen);
|
||||||
|
pMsg->numOfTags = htons(pMsg->numOfTags);
|
||||||
|
pMsg->schemaLen = htonl(pMsg->schemaLen);
|
||||||
|
assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags);
|
||||||
|
|
||||||
|
char* d = pMsg->data;
|
||||||
|
for(int32_t i = 0; i < pMsg->numOfTags; ++i) {
|
||||||
|
STColumn* pCol = (STColumn*) d;
|
||||||
|
pCol->colId = htons(pCol->colId);
|
||||||
|
pCol->bytes = htonl(pCol->bytes);
|
||||||
|
assert(pCol->offset == 0);
|
||||||
|
|
||||||
|
d += sizeof(STColumn);
|
||||||
|
}
|
||||||
|
|
||||||
|
STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (TABLE_TID(pTable) != htonl(pMsg->tid)) {
|
if (TABLE_TID(pTable) != pMsg->tid) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schemaVersion(tsdbGetTableTagSchema(pTable)) < tversion) {
|
if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) {
|
||||||
tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo),
|
tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo),
|
||||||
schemaVersion(tsdbGetTableTagSchema(pTable)), tversion);
|
schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion);
|
||||||
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid));
|
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid);
|
||||||
if (msg == NULL) return -1;
|
if (msg == NULL) return -1;
|
||||||
|
|
||||||
// Deal with error her
|
// Deal with error her
|
||||||
|
@ -299,21 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
|
|
||||||
STSchema *pTagSchema = tsdbGetTableTagSchema(pTable);
|
STSchema *pTagSchema = tsdbGetTableTagSchema(pTable);
|
||||||
|
|
||||||
if (schemaVersion(pTagSchema) > tversion) {
|
if (schemaVersion(pTagSchema) > pMsg->tversion) {
|
||||||
tsdbError(
|
tsdbError(
|
||||||
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
|
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
|
||||||
"version %d",
|
"version %d",
|
||||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tversion, schemaVersion(pTable->tagSchema));
|
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema));
|
||||||
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
|
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
|
||||||
}
|
}
|
||||||
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
|
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
|
||||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||||
}
|
}
|
||||||
// TODO: remove table from index if it is the first column of tag
|
// TODO: remove table from index if it is the first column of tag
|
||||||
// TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId
|
|
||||||
|
|
||||||
// tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
|
// TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId
|
||||||
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
|
STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
|
||||||
|
assert(res != NULL);
|
||||||
|
|
||||||
|
tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen);
|
||||||
|
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
|
||||||
tsdbAddTableIntoIndex(pMeta, pTable);
|
tsdbAddTableIntoIndex(pMeta, pTable);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue