commit
4913ce5897
|
@ -56,6 +56,7 @@ typedef struct STableMeta {
|
||||||
STableComInfo tableInfo;
|
STableComInfo tableInfo;
|
||||||
uint8_t tableType;
|
uint8_t tableType;
|
||||||
int16_t sversion;
|
int16_t sversion;
|
||||||
|
int16_t tversion;
|
||||||
SCMVgroupInfo vgroupInfo;
|
SCMVgroupInfo vgroupInfo;
|
||||||
int32_t sid; // the index of one table in a virtual node
|
int32_t sid; // the index of one table in a virtual node
|
||||||
uint64_t uid; // unique id of a table
|
uint64_t uid; // unique id of a table
|
||||||
|
|
|
@ -4402,7 +4402,9 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
|
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
|
||||||
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
|
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
|
||||||
// Note: update can only be applied to table not super table.
|
// Note: update can only be applied to table not super table.
|
||||||
// the following is handle display tags value for meters created according to super table
|
// the following is used to handle tags value for table created according to super table
|
||||||
|
pCmd->command = TSDB_SQL_UPDATE_TAGS_VAL;
|
||||||
|
|
||||||
tVariantList* pVarList = pAlterSQL->varList;
|
tVariantList* pVarList = pAlterSQL->varList;
|
||||||
tVariant* pTagName = &pVarList->a[0].pVar;
|
tVariant* pTagName = &pVarList->a[0].pVar;
|
||||||
|
|
||||||
|
@ -4425,15 +4427,38 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
|
||||||
// validate the length of binary
|
// validate the length of binary
|
||||||
if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) &&
|
if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) &&
|
||||||
pVarList->a[1].pVar.nLen > pTagsSchema->bytes) {
|
(pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) {
|
||||||
return invalidSqlErrMsg(pQueryInfo->msg, msg14);
|
return invalidSqlErrMsg(pQueryInfo->msg, msg14);
|
||||||
}
|
}
|
||||||
|
|
||||||
char name1[128] = {0};
|
int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + TSDB_EXTRA_PAYLOAD_SIZE;
|
||||||
strncpy(name1, pTagName->pz, pTagName->nLen);
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||||
|
tscError("%p failed to malloc for alter table msg", pSql);
|
||||||
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name1, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize);
|
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
|
||||||
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
|
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
|
||||||
|
pUpdateMsg->tid = htonl(pTableMeta->sid);
|
||||||
|
pUpdateMsg->uid = htobe64(pTableMeta->uid);
|
||||||
|
pUpdateMsg->colId = htons(pTagsSchema->colId);
|
||||||
|
pUpdateMsg->type = htons(pTagsSchema->type);
|
||||||
|
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
|
||||||
|
pUpdateMsg->tversion = htons(pTableMeta->tversion);
|
||||||
|
|
||||||
|
tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data, pTagsSchema->type, true);
|
||||||
|
|
||||||
|
int32_t len = 0;
|
||||||
|
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
len = tDataTypeDesc[pTagsSchema->type].nSize;
|
||||||
|
} else {
|
||||||
|
len = varDataLen(pUpdateMsg->data);
|
||||||
|
}
|
||||||
|
|
||||||
|
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data
|
||||||
|
|
||||||
|
int32_t total = sizeof(SUpdateTableTagValMsg) + len;
|
||||||
|
pUpdateMsg->head.contLen = htonl(total);
|
||||||
|
|
||||||
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
|
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
|
||||||
tFieldList* pFieldList = pAlterSQL->pAddColumns;
|
tFieldList* pFieldList = pAlterSQL->pAddColumns;
|
||||||
|
|
|
@ -168,6 +168,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
||||||
pTableMeta->sid = pTableMetaMsg->sid;
|
pTableMeta->sid = pTableMetaMsg->sid;
|
||||||
pTableMeta->uid = pTableMetaMsg->uid;
|
pTableMeta->uid = pTableMetaMsg->uid;
|
||||||
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
|
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
|
||||||
|
pTableMeta->sversion = pTableMetaMsg->sversion;
|
||||||
|
pTableMeta->tversion = pTableMetaMsg->tversion;
|
||||||
|
|
||||||
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
|
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
|
||||||
|
|
||||||
|
|
|
@ -1260,28 +1260,24 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SCMAlterTableMsg *pAlterTableMsg;
|
|
||||||
char *pMsg;
|
char *pMsg;
|
||||||
int msgLen = 0;
|
int msgLen = 0;
|
||||||
int size = 0;
|
|
||||||
|
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
|
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
|
||||||
size = tscEstimateAlterTableMsgLength(pCmd);
|
SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
|
||||||
|
int size = tscEstimateAlterTableMsgLength(pCmd);
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||||
tscError("%p failed to malloc for alter table msg", pSql);
|
tscError("%p failed to malloc for alter table msg", pSql);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
|
SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
|
||||||
|
|
||||||
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
|
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
|
||||||
|
|
||||||
SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
|
|
||||||
|
|
||||||
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
|
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
|
||||||
pAlterTableMsg->type = htons(pAlterInfo->type);
|
pAlterTableMsg->type = htons(pAlterInfo->type);
|
||||||
|
|
||||||
|
@ -1302,6 +1298,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pMsg += pAlterInfo->tagData.dataLen;
|
pMsg += pAlterInfo->tagData.dataLen;
|
||||||
|
|
||||||
msgLen = pMsg - (char*)pAlterTableMsg;
|
msgLen = pMsg - (char*)pAlterTableMsg;
|
||||||
|
|
||||||
pCmd->payloadLen = msgLen;
|
pCmd->payloadLen = msgLen;
|
||||||
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
|
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
|
||||||
|
|
||||||
|
@ -1310,6 +1307,16 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
|
||||||
|
|
||||||
|
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
|
||||||
|
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
pCmd->payloadLen = sizeof(SCMAlterDbMsg);
|
pCmd->payloadLen = sizeof(SCMAlterDbMsg);
|
||||||
|
@ -1804,7 +1811,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
pMetaMsg->sid = htonl(pMetaMsg->sid);
|
pMetaMsg->sid = htonl(pMetaMsg->sid);
|
||||||
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
||||||
|
pMetaMsg->tversion = htons(pMetaMsg->tversion);
|
||||||
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
||||||
|
|
||||||
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
||||||
|
@ -2552,6 +2559,7 @@ void tscInitMsgsFp() {
|
||||||
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
|
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
|
||||||
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
|
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
|
||||||
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
|
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
|
||||||
|
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
|
||||||
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
|
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
|
||||||
|
|
||||||
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
|
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
|
||||||
|
|
|
@ -651,6 +651,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
SDataRow trow = (SDataRow)pDataBlock;
|
SDataRow trow = (SDataRow)pDataBlock;
|
||||||
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
|
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
|
||||||
|
dataRowSetVersion(trow, pTableMeta->sversion);
|
||||||
|
|
||||||
int toffset = 0;
|
int toffset = 0;
|
||||||
for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
|
for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
|
||||||
|
|
|
@ -35,6 +35,7 @@ enum {
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
|
||||||
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
|
||||||
|
|
||||||
// the SQL below is for mgmt node
|
// the SQL below is for mgmt node
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
|
||||||
|
|
|
@ -119,22 +119,24 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
|
||||||
// ----------------- Data row structure
|
// ----------------- Data row structure
|
||||||
|
|
||||||
/* A data row, the format is like below:
|
/* A data row, the format is like below:
|
||||||
* |<------------------------------------- len ---------------------------------->|
|
* |<--------------------+--------------------------- len ---------------------------------->|
|
||||||
* |<--Head ->|<--------- flen -------------->| |
|
* |<-- Head -->|<--------- flen -------------->| |
|
||||||
* +----------+---------------------------------+---------------------------------+
|
* +---------------------+---------------------------------+---------------------------------+
|
||||||
* | int32_t | | |
|
* | int16_t | int16_t | | |
|
||||||
* +----------+---------------------------------+---------------------------------+
|
* +----------+----------+---------------------------------+---------------------------------+
|
||||||
* | len | First part | Second part |
|
* | len | sversion | First part | Second part |
|
||||||
* +----------+---------------------------------+---------------------------------+
|
* +----------+----------+---------------------------------+---------------------------------+
|
||||||
*/
|
*/
|
||||||
typedef void *SDataRow;
|
typedef void *SDataRow;
|
||||||
|
|
||||||
#define TD_DATA_ROW_HEAD_SIZE sizeof(int32_t)
|
#define TD_DATA_ROW_HEAD_SIZE sizeof(int16_t)*2
|
||||||
|
|
||||||
#define dataRowLen(r) (*(int32_t *)(r))
|
#define dataRowLen(r) (*(int16_t *)(r))
|
||||||
|
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
|
||||||
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
|
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
|
||||||
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
|
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
|
||||||
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
|
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
|
||||||
|
#define dataRowSetVersion(r, v) (dataRowVersion(r) = (v))
|
||||||
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
|
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
|
||||||
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
|
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
|
||||||
|
|
||||||
|
@ -246,7 +248,7 @@ void tdResetDataCols(SDataCols *pCols);
|
||||||
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
||||||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||||
void tdFreeDataCols(SDataCols *pCols);
|
void tdFreeDataCols(SDataCols *pCols);
|
||||||
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
|
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
||||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
|
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
||||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
|
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
|
||||||
|
@ -278,9 +280,10 @@ typedef struct {
|
||||||
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
|
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
|
||||||
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
|
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
|
||||||
#define kvRowFree(r) tfree(r)
|
#define kvRowFree(r) tfree(r)
|
||||||
|
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
|
||||||
|
|
||||||
SKVRow tdKVRowDup(SKVRow row);
|
SKVRow tdKVRowDup(SKVRow row);
|
||||||
SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value);
|
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
|
||||||
void * tdEncodeKVRow(void *buf, SKVRow row);
|
void * tdEncodeKVRow(void *buf, SKVRow row);
|
||||||
void * tdDecodeKVRow(void *buf, SKVRow *row);
|
void * tdDecodeKVRow(void *buf, SKVRow *row);
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,10 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
|
||||||
/**
|
/**
|
||||||
* Initialize a data row
|
* Initialize a data row
|
||||||
*/
|
*/
|
||||||
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
|
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
|
||||||
|
dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
|
||||||
|
dataRowSetVersion(row, schemaVersion(pSchema));
|
||||||
|
}
|
||||||
|
|
||||||
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
|
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
|
||||||
int32_t size = dataRowMaxBytesFromSchema(pSchema);
|
int32_t size = dataRowMaxBytesFromSchema(pSchema);
|
||||||
|
@ -262,25 +265,29 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
|
void dataColSetNullAt(SDataCol *pCol, int index) {
|
||||||
char *ptr = NULL;
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
switch (pCol->type) {
|
pCol->dataOff[index] = pCol->len;
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
pCol->len = 0;
|
|
||||||
for (int i = 0; i < nEle; i++) {
|
|
||||||
pCol->dataOff[i] = pCol->len;
|
|
||||||
ptr = (char *)pCol->pData + pCol->len;
|
|
||||||
varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
|
varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
|
||||||
setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes);
|
setNull(varDataVal(ptr), pCol->type, pCol->bytes);
|
||||||
pCol->len += varDataTLen(ptr);
|
pCol->len += varDataTLen(ptr);
|
||||||
|
} else {
|
||||||
|
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
|
||||||
|
pCol->len += TYPE_BYTES[pCol->type];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
|
||||||
default:
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
pCol->len = 0;
|
||||||
|
for (int i = 0; i < nEle; i++) {
|
||||||
|
dataColSetNullAt(pCol, i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
||||||
pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,14 +384,32 @@ void tdResetDataCols(SDataCols *pCols) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
|
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
|
||||||
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));
|
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));
|
||||||
|
|
||||||
for (int i = 0; i < pCols->numOfCols; i++) {
|
int rcol = 0;
|
||||||
SDataCol *pCol = pCols->cols + i;
|
int dcol = 0;
|
||||||
void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
|
|
||||||
|
|
||||||
dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints);
|
while (dcol < pCols->numOfCols) {
|
||||||
|
SDataCol *pDataCol = &(pCols->cols[dcol]);
|
||||||
|
if (rcol >= schemaNCols(pSchema)) {
|
||||||
|
dataColSetNullAt(pDataCol, pCols->numOfRows);
|
||||||
|
dcol++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
STColumn *pRowCol = schemaColAt(pSchema, rcol);
|
||||||
|
if (pRowCol->colId == pDataCol->colId) {
|
||||||
|
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE);
|
||||||
|
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
|
||||||
|
dcol++;
|
||||||
|
rcol++;
|
||||||
|
} else if (pRowCol->colId < pDataCol->colId) {
|
||||||
|
rcol++;
|
||||||
|
} else {
|
||||||
|
dataColSetNullAt(pDataCol, pCols->numOfRows);
|
||||||
|
dcol++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pCols->numOfRows++;
|
pCols->numOfRows++;
|
||||||
}
|
}
|
||||||
|
@ -477,69 +502,103 @@ SKVRow tdKVRowDup(SKVRow row) {
|
||||||
return trow;
|
return trow;
|
||||||
}
|
}
|
||||||
|
|
||||||
SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value) {
|
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
|
||||||
// TODO
|
SColIdx *pColIdx = NULL;
|
||||||
return NULL;
|
SKVRow row = *orow;
|
||||||
// SColIdx *pColIdx = NULL;
|
SKVRow nrow = NULL;
|
||||||
// SKVRow rrow = row;
|
void * ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
|
||||||
// SKVRow nrow = NULL;
|
|
||||||
// void *ptr = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
|
|
||||||
|
|
||||||
// if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
|
if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
|
||||||
// int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) :
|
int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
|
||||||
// TYPE_BYTES[type]); nrow = malloc(tlen); if (nrow == NULL) return NULL;
|
nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
|
||||||
|
if (nrow == NULL) return -1;
|
||||||
|
|
||||||
// kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1);
|
kvRowSetLen(nrow, kvRowLen(row) + sizeof(SColIdx) + diff);
|
||||||
// kvDataRowSetLen(nrow, tlen);
|
kvRowSetNCols(nrow, kvRowNCols(row) + 1);
|
||||||
|
|
||||||
// if (ptr == NULL) ptr = kvDataRowValues(row);
|
if (ptr == NULL) {
|
||||||
|
memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * kvRowNCols(row));
|
||||||
|
memcpy(kvRowValues(nrow), kvRowValues(row), POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row)));
|
||||||
|
int colIdx = kvRowNCols(nrow) - 1;
|
||||||
|
kvRowColIdxAt(nrow, colIdx)->colId = colId;
|
||||||
|
kvRowColIdxAt(nrow, colIdx)->offset = POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row));
|
||||||
|
memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);
|
||||||
|
} else {
|
||||||
|
int16_t tlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
|
||||||
|
if (tlen > 0) {
|
||||||
|
memcpy(kvRowColIdx(nrow), kvRowColIdx(row), tlen);
|
||||||
|
memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
|
||||||
|
}
|
||||||
|
|
||||||
// // Copy the columns before the col
|
int colIdx = tlen / sizeof(SColIdx);
|
||||||
// if (POINTER_DISTANCE(ptr, kvDataRowColIdx(row)) > 0) {
|
kvRowColIdxAt(nrow, colIdx)->colId = colId;
|
||||||
// memcpy(kvDataRowColIdx(nrow), kvDataRowColIdx(row), POINTER_DISTANCE(ptr, kvDataRowColIdx(row)));
|
kvRowColIdxAt(nrow, colIdx)->offset = ((SColIdx *)ptr)->offset;
|
||||||
// memcpy(kvDataRowValues(nrow), kvDataRowValues(row), ((SColIdx *)ptr)->offset); // TODO: here is not correct
|
memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);
|
||||||
// }
|
|
||||||
|
|
||||||
// // Set the new col value
|
for (int i = colIdx; i < kvRowNCols(row); i++) {
|
||||||
// pColIdx = (SColIdx *)POINTER_SHIFT(nrow, POINTER_DISTANCE(ptr, row));
|
kvRowColIdxAt(nrow, i + 1)->colId = kvRowColIdxAt(row, i)->colId;
|
||||||
// pColIdx->colId = colId;
|
kvRowColIdxAt(nrow, i + 1)->offset = kvRowColIdxAt(row, i)->offset + diff;
|
||||||
// pColIdx->offset = ((SColIdx *)ptr)->offset; // TODO: here is not correct
|
}
|
||||||
|
memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx)),
|
||||||
|
POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx)))
|
||||||
|
|
||||||
// if (IS_VAR_DATA_TYPE(type)) {
|
);
|
||||||
// memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, varDataLen(value));
|
}
|
||||||
// } else {
|
|
||||||
// memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, TYPE_BYTES[type]);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Copy the columns after the col
|
*orow = nrow;
|
||||||
// if (POINTER_DISTANCE(kvDataRowValues(row), ptr) > 0) {
|
free(row);
|
||||||
// // TODO: memcpy();
|
} else {
|
||||||
// }
|
ASSERT(((SColIdx *)ptr)->colId == colId);
|
||||||
// } else {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
// // TODO
|
void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);
|
||||||
// ASSERT(((SColIdx *)ptr)->colId == colId);
|
|
||||||
// if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
// void *pOldVal = kvDataRowColVal(row, (SColIdx *)ptr);
|
|
||||||
|
|
||||||
// if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
|
if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
|
||||||
// memcpy(pOldVal, value, varDataTLen(value));
|
memcpy(pOldVal, value, varDataTLen(value));
|
||||||
// } else { // enlarge the memory
|
} else { // need to reallocate the memory
|
||||||
// // rrow = realloc(rrow, kvDataRowLen(rrow) + varDataTLen(value) - varDataTLen(pOldVal));
|
int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
|
||||||
// // if (rrow == NULL) return NULL;
|
int16_t nlen = kvRowLen(row) + diff;
|
||||||
// // memmove();
|
ASSERT(nlen > 0);
|
||||||
// // for () {
|
nrow = malloc(nlen);
|
||||||
// // ((SColIdx *)ptr)->offset += balabala;
|
if (nrow == NULL) return -1;
|
||||||
// // }
|
|
||||||
|
|
||||||
// // kvDataRowSetLen();
|
kvRowSetLen(nrow, nlen);
|
||||||
|
kvRowSetNCols(nrow, kvRowNCols(row));
|
||||||
|
|
||||||
// }
|
// Copy part ahead
|
||||||
// } else {
|
nlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
|
||||||
// memcpy(kvDataRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
|
ASSERT(nlen % sizeof(SColIdx) == 0);
|
||||||
// }
|
if (nlen > 0) {
|
||||||
// }
|
ASSERT(((SColIdx *)ptr)->offset > 0);
|
||||||
|
memcpy(kvRowColIdx(nrow), kvRowColIdx(row), nlen);
|
||||||
|
memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
|
||||||
|
}
|
||||||
|
|
||||||
// return rrow;
|
// Construct current column value
|
||||||
|
int colIdx = nlen / sizeof(SColIdx);
|
||||||
|
pColIdx = kvRowColIdxAt(nrow, colIdx);
|
||||||
|
pColIdx->colId = ((SColIdx *)ptr)->colId;
|
||||||
|
pColIdx->offset = ((SColIdx *)ptr)->offset;
|
||||||
|
memcpy(kvRowColVal(nrow, pColIdx), value, varDataTLen(value));
|
||||||
|
|
||||||
|
// Construct columns after
|
||||||
|
if (kvRowNCols(nrow) - colIdx - 1 > 0) {
|
||||||
|
for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
|
||||||
|
kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
|
||||||
|
kvRowColIdxAt(nrow, i)->offset += diff;
|
||||||
|
}
|
||||||
|
memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
|
||||||
|
POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
|
||||||
|
}
|
||||||
|
|
||||||
|
*orow = nrow;
|
||||||
|
free(row);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tdEncodeKVRow(void *buf, SKVRow row) {
|
void *tdEncodeKVRow(void *buf, SKVRow row) {
|
||||||
|
|
|
@ -364,7 +364,7 @@ char tTokenTypeSwitcher[13] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
bool isValidDataType(int32_t type, int32_t length) {
|
bool isValidDataType(int32_t type, int32_t length) {
|
||||||
if (type < TSDB_DATA_TYPE_BOOL || type > TSDB_DATA_TYPE_NCHAR) {
|
if (type < TSDB_DATA_TYPE_NULL || type > TSDB_DATA_TYPE_NCHAR) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ int32_t dnodeInitShell() {
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
|
||||||
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
|
||||||
|
|
||||||
// the following message shall be treated as mnode write
|
// the following message shall be treated as mnode write
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
|
||||||
|
|
|
@ -172,6 +172,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 0x0500, "invalid file
|
||||||
|
|
||||||
// TSDB
|
// TSDB
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONFIG, 0, 0x0580, "invalid TSDB configuration")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONFIG, 0, 0x0580, "invalid TSDB configuration")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TAG_VER_OUT_OF_DATE, 0, 0x0581, "tag version is out of date")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_SCHEMA_VERSION, 0, 0x0582, "invalid table schema version from client")
|
||||||
|
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
|
|
|
@ -43,7 +43,7 @@ enum {
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
|
||||||
|
@ -276,6 +276,18 @@ typedef struct {
|
||||||
// char tagVal[];
|
// char tagVal[];
|
||||||
} SCMAlterTableMsg;
|
} SCMAlterTableMsg;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SMsgHead head;
|
||||||
|
int64_t uid;
|
||||||
|
int32_t tid;
|
||||||
|
int16_t tversion;
|
||||||
|
int16_t colId;
|
||||||
|
int16_t type;
|
||||||
|
int16_t bytes;
|
||||||
|
int32_t tagValLen;
|
||||||
|
char data[];
|
||||||
|
} SUpdateTableTagValMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char clientVersion[TSDB_VERSION_LEN];
|
char clientVersion[TSDB_VERSION_LEN];
|
||||||
char msgVersion[TSDB_VERSION_LEN];
|
char msgVersion[TSDB_VERSION_LEN];
|
||||||
|
|
|
@ -45,6 +45,7 @@ typedef struct {
|
||||||
int (*eventCallBack)(void *);
|
int (*eventCallBack)(void *);
|
||||||
void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema);
|
void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema);
|
||||||
void (*cqDropFunc)(void *handle);
|
void (*cqDropFunc)(void *handle);
|
||||||
|
void *(*configFunc)(int32_t vgId, int32_t sid);
|
||||||
} STsdbAppH;
|
} STsdbAppH;
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
||||||
|
@ -108,13 +109,14 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
|
||||||
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
|
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
|
||||||
void tsdbClearTableCfg(STableCfg *config);
|
void tsdbClearTableCfg(STableCfg *config);
|
||||||
|
|
||||||
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
|
void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes);
|
||||||
char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes);
|
char* tsdbGetTableName(TsdbRepoT *repo, const STableId *id);
|
||||||
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
||||||
|
|
||||||
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
|
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
|
||||||
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
|
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
|
||||||
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
|
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
|
||||||
|
int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg);
|
||||||
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
|
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
|
||||||
|
|
||||||
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
|
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
|
||||||
|
|
|
@ -1352,11 +1352,14 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
|
||||||
int32_t tagDataLen = 0;
|
int32_t tagDataLen = 0;
|
||||||
int32_t totalCols = 0;
|
int32_t totalCols = 0;
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
if (pTable->info.type == TSDB_CHILD_TABLE && pMsg != NULL) {
|
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
||||||
pTagData = (STagData*)pMsg->schema;
|
|
||||||
tagDataLen = ntohl(pTagData->dataLen);
|
|
||||||
totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
|
totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
|
||||||
contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen + pTable->sqlLen;
|
contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen + pTable->sqlLen;
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
pTagData = (STagData *)pMsg->schema;
|
||||||
|
tagDataLen = ntohl(pTagData->dataLen);
|
||||||
|
contLen += tagDataLen;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
totalCols = pTable->numOfColumns;
|
totalCols = pTable->numOfColumns;
|
||||||
contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
|
contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
|
||||||
|
@ -1410,7 +1413,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
|
||||||
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData->data, tagDataLen);
|
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData->data, tagDataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTable->info.type == TSDB_STREAM_TABLE && pMsg != NULL) {
|
if (pTable->info.type == TSDB_STREAM_TABLE) {
|
||||||
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
|
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2224,24 +2224,26 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
* set tag value in SQLFunctionCtx
|
* set tag value in SQLFunctionCtx
|
||||||
* e.g.,tag information into input buffer
|
* e.g.,tag information into input buffer
|
||||||
*/
|
*/
|
||||||
static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *param) {
|
static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type,
|
||||||
tVariantDestroy(param);
|
int16_t bytes) {
|
||||||
|
tVariantDestroy(tag);
|
||||||
char * val = NULL;
|
|
||||||
int16_t bytes = 0;
|
|
||||||
int16_t type = 0;
|
|
||||||
|
|
||||||
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
val = tsdbGetTableName(tsdb, pTableId, &bytes);
|
char* val = tsdbGetTableName(tsdb, pTableId);
|
||||||
type = TSDB_DATA_TYPE_BINARY;
|
assert(val != NULL);
|
||||||
tVariantCreateFromBinary(param, varDataVal(val), varDataLen(val), type);
|
|
||||||
|
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), TSDB_DATA_TYPE_BINARY);
|
||||||
} else {
|
} else {
|
||||||
tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val);
|
char* val = tsdbGetTableTagVal(tsdb, pTableId, tagColId, type, bytes);
|
||||||
|
if (val == NULL) {
|
||||||
|
tag->nType = TSDB_DATA_TYPE_NULL;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
tVariantCreateFromBinary(param, varDataVal(val), varDataLen(val), type);
|
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
|
||||||
} else {
|
} else {
|
||||||
tVariantCreateFromBinary(param, val, bytes, type);
|
tVariantCreateFromBinary(tag, val, bytes, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2249,25 +2251,29 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI
|
||||||
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) {
|
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].base;
|
SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
|
||||||
if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
|
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
|
||||||
assert(pFuncMsg->numOfParams == 1);
|
|
||||||
doSetTagValueInParam(tsdb, pTableId, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
|
assert(pExprInfo->base.numOfParams == 1);
|
||||||
|
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag,
|
||||||
|
pExprInfo->type, pExprInfo->bytes);
|
||||||
} else {
|
} else {
|
||||||
// set tag value, by which the results are aggregated.
|
// set tag value, by which the results are aggregated.
|
||||||
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
|
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
|
||||||
SColIndex *pCol = &pQuery->pSelectExpr[idx].base.colInfo;
|
SExprInfo* pExprInfo = &pQuery->pSelectExpr[idx];
|
||||||
|
|
||||||
// ts_comp column required the tag value for join filter
|
// ts_comp column required the tag value for join filter
|
||||||
if (!TSDB_COL_IS_TAG(pCol->flag)) {
|
if (!TSDB_COL_IS_TAG(pExprInfo->base.colInfo.flag)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo use tag column index to optimize performance
|
// todo use tag column index to optimize performance
|
||||||
doSetTagValueInParam(tsdb, pTableId, pCol->colId, &pRuntimeEnv->pCtx[idx].tag);
|
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag,
|
||||||
|
pExprInfo->type, pExprInfo->bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the join tag for first column
|
// set the join tag for first column
|
||||||
|
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
|
||||||
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
|
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
|
||||||
pRuntimeEnv->pTSBuf != NULL) {
|
pRuntimeEnv->pTSBuf != NULL) {
|
||||||
assert(pFuncMsg->numOfParams == 1);
|
assert(pFuncMsg->numOfParams == 1);
|
||||||
|
@ -6004,7 +6010,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
num = taosArrayGetSize(pa);
|
num = taosArrayGetSize(pa);
|
||||||
|
|
||||||
assert(num == pQInfo->groupInfo.numOfTables);
|
assert(num == pQInfo->groupInfo.numOfTables);
|
||||||
int16_t type, bytes;
|
// int16_t type, bytes;
|
||||||
|
|
||||||
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
|
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
|
||||||
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
|
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
|
||||||
|
@ -6012,7 +6018,6 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
|
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
|
||||||
|
|
||||||
int32_t rsize = pExprInfo->bytes;
|
int32_t rsize = pExprInfo->bytes;
|
||||||
char* data = NULL;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
SGroupItem* item = taosArrayGet(pa, i);
|
SGroupItem* item = taosArrayGet(pa, i);
|
||||||
|
@ -6030,8 +6035,25 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
*(int32_t*) output = pQInfo->vgId;
|
*(int32_t*) output = pQInfo->vgId;
|
||||||
output += sizeof(pQInfo->vgId);
|
output += sizeof(pQInfo->vgId);
|
||||||
|
|
||||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data);
|
int16_t bytes = pExprInfo->bytes;
|
||||||
memcpy(output, data, bytes);
|
int16_t type = pExprInfo->type;
|
||||||
|
|
||||||
|
char* val = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, type, bytes);
|
||||||
|
|
||||||
|
// todo refactor
|
||||||
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
if (val == NULL) {
|
||||||
|
setVardataNull(output, type);
|
||||||
|
} else {
|
||||||
|
memcpy(output, val, varDataTLen(val));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (val == NULL) {
|
||||||
|
setNull(output, type, bytes);
|
||||||
|
} else {
|
||||||
|
memcpy(output, val, bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
|
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
|
||||||
|
@ -6040,23 +6062,32 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
||||||
SGroupItem* item = taosArrayGet(pa, i);
|
SGroupItem* item = taosArrayGet(pa, i);
|
||||||
|
|
||||||
char* data = NULL;
|
|
||||||
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
// todo check the return value, refactor codes
|
// todo check the return value, refactor codes
|
||||||
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
data = tsdbGetTableName(pQInfo->tsdb, &item->id, &bytes);
|
char* data = tsdbGetTableName(pQInfo->tsdb, &item->id);
|
||||||
|
|
||||||
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||||
memcpy(dst, data, varDataTLen(data));
|
memcpy(dst, data, varDataTLen(data));
|
||||||
} else {// todo refactor, return the true length of binary|nchar data
|
} else {// todo refactor
|
||||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
|
int16_t type = pExprInfo[j].type;
|
||||||
assert(bytes <= pExprInfo[j].bytes && type == pExprInfo[j].type);
|
int16_t bytes = pExprInfo[j].bytes;
|
||||||
|
|
||||||
|
char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes);
|
||||||
|
|
||||||
char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes;
|
char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes;
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
memcpy(dst, data, varDataTLen(data));
|
if (data == NULL) {
|
||||||
|
setVardataNull(dst, type);
|
||||||
} else {
|
} else {
|
||||||
memcpy(dst, data, bytes);
|
memcpy(dst, data, varDataTLen(data));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (data == NULL) {
|
||||||
|
setNull(dst, type, bytes);
|
||||||
|
} else {
|
||||||
|
memcpy(dst, data, pExprInfo[j].bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,7 @@ int main(int argc, char *argv[]) {
|
||||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||||
ipSet.port[0] = atoi(argv[++i]);
|
ipSet.port[0] = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
|
||||||
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn));
|
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn[0]));
|
||||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||||
|
|
|
@ -69,12 +69,13 @@ typedef struct {
|
||||||
} SMemTable;
|
} SMemTable;
|
||||||
|
|
||||||
// ---------- TSDB TABLE DEFINITION
|
// ---------- TSDB TABLE DEFINITION
|
||||||
|
#define TSDB_MAX_TABLE_SCHEMAS 16
|
||||||
typedef struct STable {
|
typedef struct STable {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
STableId tableId;
|
STableId tableId;
|
||||||
uint64_t superUid; // Super table UID
|
uint64_t superUid; // Super table UID
|
||||||
int32_t sversion;
|
int16_t numOfSchemas;
|
||||||
STSchema * schema;
|
STSchema ** schema;
|
||||||
STSchema * tagSchema;
|
STSchema * tagSchema;
|
||||||
SKVRow tagVal;
|
SKVRow tagVal;
|
||||||
SMemTable * mem;
|
SMemTable * mem;
|
||||||
|
@ -122,7 +123,6 @@ typedef struct STableIndexElem {
|
||||||
|
|
||||||
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo);
|
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo);
|
||||||
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
|
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
|
||||||
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
|
|
||||||
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
|
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
|
||||||
|
|
||||||
// ---- Operation on STable
|
// ---- Operation on STable
|
||||||
|
@ -507,6 +507,13 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
|
||||||
void tsdbAdjustCacheBlocks(STsdbCache *pCache);
|
void tsdbAdjustCacheBlocks(STsdbCache *pCache);
|
||||||
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
|
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
|
||||||
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
|
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
|
||||||
|
int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg);
|
||||||
|
int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
|
int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
|
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version);
|
||||||
|
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
|
||||||
|
|
||||||
|
#define DEFAULT_TAG_INDEX_COLUMN 0 // skip list built based on the first column of tags
|
||||||
|
|
||||||
int compFGroupKey(const void *key, const void *fgroup);
|
int compFGroupKey(const void *key, const void *fgroup);
|
||||||
|
|
||||||
|
|
|
@ -410,6 +410,61 @@ int tsdbAlterTable(TsdbRepoT *pRepo, STableCfg *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
int16_t tversion = htons(pMsg->tversion);
|
||||||
|
|
||||||
|
STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid));
|
||||||
|
if (pTable == NULL) return TSDB_CODE_INVALID_TABLE_ID;
|
||||||
|
if (pTable->tableId.tid != htonl(pMsg->tid)) return TSDB_CODE_INVALID_TABLE_ID;
|
||||||
|
|
||||||
|
if (pTable->type != TSDB_CHILD_TABLE) {
|
||||||
|
tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", pRepo->config.tsdbId,
|
||||||
|
varDataVal(pTable->name), pTable->type);
|
||||||
|
return TSDB_CODE_INVALID_TABLE_TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)) < tversion) {
|
||||||
|
tsdbTrace("vgId:%d server tag version %d is older than client tag version %d, try to config", pRepo->config.tsdbId,
|
||||||
|
schemaVersion(tsdbGetTableTagSchema(pMeta, pTable)), tversion);
|
||||||
|
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid));
|
||||||
|
if (msg == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
// Deal with error her
|
||||||
|
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
|
||||||
|
STable *super = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
|
||||||
|
ASSERT(super != NULL);
|
||||||
|
|
||||||
|
int32_t code = tsdbUpdateTable(pMeta, super, pTableCfg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
tsdbClearTableCfg(pTableCfg);
|
||||||
|
rpcFreeCont(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
STSchema *pTagSchema = tsdbGetTableTagSchema(pMeta, pTable);
|
||||||
|
|
||||||
|
if (schemaVersion(pTagSchema) > tversion) {
|
||||||
|
tsdbError(
|
||||||
|
"vgId:%d failed to update tag value of table %s since version out of date, client tag version:%d server tag "
|
||||||
|
"version:%d",
|
||||||
|
pRepo->config.tsdbId, varDataVal(pTable->name), tversion, schemaVersion(pTable->tagSchema));
|
||||||
|
return TSDB_CODE_TAG_VER_OUT_OF_DATE;
|
||||||
|
}
|
||||||
|
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
|
||||||
|
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||||
|
}
|
||||||
|
// TODO: remove table from index if it is the first column of tag
|
||||||
|
tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
|
||||||
|
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) {
|
||||||
|
tsdbAddTableIntoIndex(pMeta, pTable);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) {
|
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
|
||||||
|
@ -559,12 +614,15 @@ int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbClearTableCfg(STableCfg *config) {
|
void tsdbClearTableCfg(STableCfg *config) {
|
||||||
|
if (config) {
|
||||||
if (config->schema) tdFreeSchema(config->schema);
|
if (config->schema) tdFreeSchema(config->schema);
|
||||||
if (config->tagSchema) tdFreeSchema(config->tagSchema);
|
if (config->tagSchema) tdFreeSchema(config->tagSchema);
|
||||||
if (config->tagValues) kvRowFree(config->tagValues);
|
if (config->tagValues) kvRowFree(config->tagValues);
|
||||||
tfree(config->name);
|
tfree(config->name);
|
||||||
tfree(config->sname);
|
tfree(config->sname);
|
||||||
tfree(config->sql);
|
tfree(config->sql);
|
||||||
|
free(config);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||||
|
@ -883,6 +941,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
||||||
|
|
||||||
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
|
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
|
||||||
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
||||||
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
||||||
|
@ -892,6 +951,39 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
|
||||||
return TSDB_CODE_INVALID_TABLE_ID;
|
return TSDB_CODE_INVALID_TABLE_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check schema version
|
||||||
|
int32_t tversion = pBlock->sversion;
|
||||||
|
int16_t nversion = schemaVersion(tsdbGetTableSchema(pMeta, pTable));
|
||||||
|
if (tversion > nversion) {
|
||||||
|
tsdbTrace("vgId:%d table:%s tid:%d server schema version %d is older than clien version %d, try to config.",
|
||||||
|
pRepo->config.tsdbId, varDataVal(pTable->name), pTable->tableId.tid, nversion, tversion);
|
||||||
|
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pTable->tableId.tid);
|
||||||
|
if (msg == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
// Deal with error her
|
||||||
|
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
|
||||||
|
STable *pTableUpdate = NULL;
|
||||||
|
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
|
pTableUpdate = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
|
||||||
|
} else {
|
||||||
|
pTableUpdate = pTable;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tsdbUpdateTable(pMeta, pTableUpdate, pTableCfg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
tsdbClearTableCfg(pTableCfg);
|
||||||
|
rpcFreeCont(msg);
|
||||||
|
} else {
|
||||||
|
if (tsdbGetTableSchemaByVersion(pMeta, pTable, tversion) == NULL) {
|
||||||
|
tsdbError("vgId:%d table:%s tid:%d invalid schema version %d from client", pRepo->config.tsdbId,
|
||||||
|
varDataVal(pTable->name), pTable->tableId.tid, tversion);
|
||||||
|
return TSDB_CODE_TABLE_SCHEMA_VERSION;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
SDataRow row = NULL;
|
SDataRow row = NULL;
|
||||||
|
|
||||||
|
@ -916,9 +1008,10 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||||
ASSERT(maxRowsToRead > 0);
|
ASSERT(maxRowsToRead > 0);
|
||||||
if (pIter == NULL) return 0;
|
if (pIter == NULL) return 0;
|
||||||
|
STSchema *pSchema = NULL;
|
||||||
|
|
||||||
int numOfRows = 0;
|
int numOfRows = 0;
|
||||||
|
|
||||||
|
@ -931,7 +1024,15 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
if (dataRowKey(row) > maxKey) break;
|
if (dataRowKey(row) > maxKey) break;
|
||||||
|
|
||||||
tdAppendDataRowToDataCol(row, pCols);
|
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||||
|
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
|
||||||
|
if (pSchema == NULL) {
|
||||||
|
// TODO: deal with the error here
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
} while (tSkipListIterNext(pIter));
|
} while (tSkipListIterNext(pIter));
|
||||||
|
|
||||||
|
@ -1081,7 +1182,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
||||||
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||||
int nLoop = 0;
|
int nLoop = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
|
int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols);
|
||||||
assert(rowsRead >= 0);
|
assert(rowsRead >= 0);
|
||||||
if (pDataCols->numOfRows == 0) break;
|
if (pDataCols->numOfRows == 0) break;
|
||||||
nLoop++;
|
nLoop++;
|
||||||
|
|
|
@ -8,13 +8,10 @@
|
||||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
|
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
|
||||||
// #define TSDB_META_FILE_NAME "META"
|
// #define TSDB_META_FILE_NAME "META"
|
||||||
|
|
||||||
const int32_t DEFAULT_TAG_INDEX_COLUMN = 0; // skip list built based on the first column of tags
|
|
||||||
|
|
||||||
static int tsdbFreeTable(STable *pTable);
|
static int tsdbFreeTable(STable *pTable);
|
||||||
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
||||||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
|
||||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
|
||||||
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
|
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,15 +38,20 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
||||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
|
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
|
||||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
|
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
|
||||||
T_APPEND_MEMBER(ptr, pTable, STable, superUid);
|
T_APPEND_MEMBER(ptr, pTable, STable, superUid);
|
||||||
T_APPEND_MEMBER(ptr, pTable, STable, sversion);
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
ptr = tdEncodeSchema(ptr, pTable->schema);
|
T_APPEND_MEMBER(ptr, pTable, STable, numOfSchemas);
|
||||||
|
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||||
|
ptr = tdEncodeSchema(ptr, pTable->schema[i]);
|
||||||
|
}
|
||||||
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
|
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
ptr = tdEncodeKVRow(ptr, pTable->tagVal);
|
ptr = tdEncodeKVRow(ptr, pTable->tagVal);
|
||||||
} else {
|
} else {
|
||||||
ptr = tdEncodeSchema(ptr, pTable->schema);
|
T_APPEND_MEMBER(ptr, pTable, STable, numOfSchemas);
|
||||||
|
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||||
|
ptr = tdEncodeSchema(ptr, pTable->schema[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||||
|
@ -72,6 +74,11 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
||||||
STable *tsdbDecodeTable(void *cont, int contLen) {
|
STable *tsdbDecodeTable(void *cont, int contLen) {
|
||||||
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
||||||
if (pTable == NULL) return NULL;
|
if (pTable == NULL) return NULL;
|
||||||
|
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
||||||
|
if (pTable->schema == NULL) {
|
||||||
|
free(pTable);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void *ptr = cont;
|
void *ptr = cont;
|
||||||
T_READ_MEMBER(ptr, int8_t, pTable->type);
|
T_READ_MEMBER(ptr, int8_t, pTable->type);
|
||||||
|
@ -87,15 +94,20 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
|
||||||
T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid);
|
T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid);
|
||||||
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
|
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
|
||||||
T_READ_MEMBER(ptr, uint64_t, pTable->superUid);
|
T_READ_MEMBER(ptr, uint64_t, pTable->superUid);
|
||||||
T_READ_MEMBER(ptr, int32_t, pTable->sversion);
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
pTable->schema = tdDecodeSchema(&ptr);
|
T_READ_MEMBER(ptr, int16_t, pTable->numOfSchemas);
|
||||||
|
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||||
|
pTable->schema[i] = tdDecodeSchema(&ptr);
|
||||||
|
}
|
||||||
pTable->tagSchema = tdDecodeSchema(&ptr);
|
pTable->tagSchema = tdDecodeSchema(&ptr);
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
ptr = tdDecodeKVRow(ptr, &pTable->tagVal);
|
ptr = tdDecodeKVRow(ptr, &pTable->tagVal);
|
||||||
} else {
|
} else {
|
||||||
pTable->schema = tdDecodeSchema(&ptr);
|
T_READ_MEMBER(ptr, int16_t, pTable->numOfSchemas);
|
||||||
|
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||||
|
pTable->schema[i] = tdDecodeSchema(&ptr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||||
|
@ -223,18 +235,45 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the newest table schema
|
||||||
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
|
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
|
||||||
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
|
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
|
||||||
return pTable->schema;
|
return pTable->schema[pTable->numOfSchemas - 1];
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||||
if (pSuper == NULL) return NULL;
|
if (pSuper == NULL) return NULL;
|
||||||
return pSuper->schema;
|
return pSuper->schema[pSuper->numOfSchemas-1];
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
|
if (*(int16_t *)key1 < (*(STSchema **)key2)->version) {
|
||||||
|
return -1;
|
||||||
|
} else if (*(int16_t *)key1 > (*(STSchema **)key2)->version) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version) {
|
||||||
|
STable *pSearchTable = NULL;
|
||||||
|
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
|
pSearchTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||||
|
} else {
|
||||||
|
pSearchTable = pTable;
|
||||||
|
}
|
||||||
|
ASSERT(pSearchTable != NULL);
|
||||||
|
|
||||||
|
void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *),
|
||||||
|
tsdbCompareSchemaVersion, TD_EQ);
|
||||||
|
if (ptr == NULL) return NULL;
|
||||||
|
|
||||||
|
return *(STSchema **)ptr;
|
||||||
|
}
|
||||||
|
|
||||||
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
|
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
return pTable->tagSchema;
|
return pTable->tagSchema;
|
||||||
|
@ -247,45 +286,33 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) {
|
void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes) {
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
||||||
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||||
|
|
||||||
STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable);
|
STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable);
|
||||||
STColumn *pCol = tdGetColOfID(pSchema, colId);
|
STColumn *pCol = tdGetColOfID(pSchema, colId);
|
||||||
if (pCol == NULL) {
|
if (pCol == NULL) {
|
||||||
return -1; // No matched tag volumn
|
return NULL; // No matched tag volumn
|
||||||
}
|
}
|
||||||
|
|
||||||
*val = tdGetKVRowValOfCol(pTable->tagVal, colId);
|
char* val = tdGetKVRowValOfCol(pTable->tagVal, colId);
|
||||||
*type = pCol->type;
|
assert(type == pCol->type && bytes == pCol->bytes);
|
||||||
|
|
||||||
if (*val != NULL) {
|
if (val != NULL && IS_VAR_DATA_TYPE(type)) {
|
||||||
if (IS_VAR_DATA_TYPE(*type)) {
|
assert(varDataLen(val) < pCol->bytes);
|
||||||
*bytes = varDataLen(*val);
|
|
||||||
} else {
|
|
||||||
*bytes = TYPE_BYTES[*type];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return val;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) {
|
char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id) {
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
||||||
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||||
|
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
if (bytes != NULL) {
|
|
||||||
*bytes = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
if (bytes != NULL) {
|
|
||||||
*bytes = varDataLen(pTable->name);
|
|
||||||
}
|
|
||||||
|
|
||||||
return (char*) pTable->name;
|
return (char*) pTable->name;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,13 +328,16 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTable->type = pCfg->type;
|
pTable->type = pCfg->type;
|
||||||
|
pTable->numOfSchemas = 0;
|
||||||
|
|
||||||
if (isSuper) {
|
if (isSuper) {
|
||||||
pTable->type = TSDB_SUPER_TABLE;
|
pTable->type = TSDB_SUPER_TABLE;
|
||||||
pTable->tableId.uid = pCfg->superUid;
|
pTable->tableId.uid = pCfg->superUid;
|
||||||
pTable->tableId.tid = -1;
|
pTable->tableId.tid = -1;
|
||||||
pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||||
pTable->schema = tdDupSchema(pCfg->schema);
|
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
||||||
|
pTable->numOfSchemas = 1;
|
||||||
|
pTable->schema[0] = tdDupSchema(pCfg->schema);
|
||||||
pTable->tagSchema = tdDupSchema(pCfg->tagSchema);
|
pTable->tagSchema = tdDupSchema(pCfg->tagSchema);
|
||||||
|
|
||||||
tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN);
|
tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN);
|
||||||
|
@ -342,16 +372,20 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
|
||||||
if (pCfg->type == TSDB_CHILD_TABLE) {
|
if (pCfg->type == TSDB_CHILD_TABLE) {
|
||||||
pTable->superUid = pCfg->superUid;
|
pTable->superUid = pCfg->superUid;
|
||||||
pTable->tagVal = tdKVRowDup(pCfg->tagValues);
|
pTable->tagVal = tdKVRowDup(pCfg->tagValues);
|
||||||
} else if (pCfg->type == TSDB_NORMAL_TABLE) {
|
} else {
|
||||||
|
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
||||||
|
pTable->numOfSchemas = 1;
|
||||||
|
pTable->schema[0] = tdDupSchema(pCfg->schema);
|
||||||
|
|
||||||
|
if (pCfg->type == TSDB_NORMAL_TABLE) {
|
||||||
pTable->superUid = -1;
|
pTable->superUid = -1;
|
||||||
pTable->schema = tdDupSchema(pCfg->schema);
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pCfg->type == TSDB_STREAM_TABLE);
|
ASSERT(pCfg->type == TSDB_STREAM_TABLE);
|
||||||
pTable->superUid = -1;
|
pTable->superUid = -1;
|
||||||
pTable->schema = tdDupSchema(pCfg->schema);
|
|
||||||
pTable->sql = strdup(pCfg->sql);
|
pTable->sql = strdup(pCfg->sql);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return pTable;
|
return pTable;
|
||||||
|
|
||||||
|
@ -360,6 +394,56 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) {
|
||||||
|
ASSERT(pTable->type == TSDB_SUPER_TABLE);
|
||||||
|
ASSERT(schemaVersion(pTable->tagSchema) < schemaVersion(newSchema));
|
||||||
|
STSchema *pOldSchema = pTable->tagSchema;
|
||||||
|
STSchema *pNewSchema = tdDupSchema(newSchema);
|
||||||
|
if (pNewSchema == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
pTable->tagSchema = pNewSchema;
|
||||||
|
tdFreeSchema(pOldSchema);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
|
||||||
|
ASSERT(pTable->type != TSDB_CHILD_TABLE);
|
||||||
|
bool isChanged = false;
|
||||||
|
|
||||||
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
|
if (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema)) {
|
||||||
|
int32_t code = tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
|
}
|
||||||
|
isChanged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
STSchema *pTSchema = tsdbGetTableSchema(pMeta, pTable);
|
||||||
|
if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) {
|
||||||
|
if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
|
||||||
|
pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema);
|
||||||
|
} else {
|
||||||
|
ASSERT(pTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
|
||||||
|
STSchema *tSchema = tdDupSchema(pCfg->schema);
|
||||||
|
tdFreeSchema(pTable->schema[0]);
|
||||||
|
memmove(pTable->schema, pTable->schema+1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
|
||||||
|
pTable->schema[pTable->numOfSchemas-1] = tSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
isChanged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isChanged) {
|
||||||
|
char *buf = malloc(1024 * 1024);
|
||||||
|
int bufLen = 0;
|
||||||
|
tsdbEncodeTable(pTable, buf, &bufLen);
|
||||||
|
tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen);
|
||||||
|
free(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
@ -384,6 +468,8 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
||||||
if (super == NULL) return -1;
|
if (super == NULL) return -1;
|
||||||
} else {
|
} else {
|
||||||
if (super->type != TSDB_SUPER_TABLE) return -1;
|
if (super->type != TSDB_SUPER_TABLE) return -1;
|
||||||
|
if (super->tableId.uid != pCfg->superUid) return -1;
|
||||||
|
tsdbUpdateTable(pMeta, super, pCfg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,24 +544,31 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
|
||||||
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
|
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
|
||||||
|
|
||||||
if (numOfTags > 0) {
|
if (numOfTags > 0) {
|
||||||
int accBytes = 0;
|
// Decode tag schema
|
||||||
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
|
||||||
|
|
||||||
SKVRowBuilder kvRowBuilder = {0};
|
|
||||||
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
|
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
|
||||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
|
|
||||||
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
||||||
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
|
|
||||||
accBytes += htons(pSchema[i].bytes);
|
|
||||||
}
|
}
|
||||||
if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
|
if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
|
||||||
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
|
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
|
||||||
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
|
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
|
||||||
|
|
||||||
|
// Decode tag values
|
||||||
|
if (pMsg->tagDataLen) {
|
||||||
|
int accBytes = 0;
|
||||||
|
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
||||||
|
|
||||||
|
SKVRowBuilder kvRowBuilder = {0};
|
||||||
|
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
|
||||||
|
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
||||||
|
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
|
||||||
|
accBytes += htons(pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
|
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
|
||||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pMsg->tableType == TSDB_STREAM_TABLE) {
|
if (pMsg->tableType == TSDB_STREAM_TABLE) {
|
||||||
char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
||||||
|
@ -535,7 +628,7 @@ static int tsdbFreeTable(STable *pTable) {
|
||||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
kvRowFree(pTable->tagVal);
|
kvRowFree(pTable->tagVal);
|
||||||
} else {
|
} else {
|
||||||
tdFreeSchema(pTable->schema);
|
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||||
|
@ -597,9 +690,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the pMeta->maxCols and pMeta->maxRowBytes
|
// Update the pMeta->maxCols and pMeta->maxRowBytes
|
||||||
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE) {
|
||||||
if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema);
|
if (schemaNCols(pTable->schema[pTable->numOfSchemas - 1]) > pMeta->maxCols)
|
||||||
int bytes = dataRowMaxBytesFromSchema(pTable->schema);
|
pMeta->maxCols = schemaNCols(pTable->schema[pTable->numOfSchemas - 1]);
|
||||||
|
int bytes = dataRowMaxBytesFromSchema(pTable->schema[pTable->numOfSchemas - 1]);
|
||||||
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
|
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -648,7 +742,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFrom
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
||||||
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||||
assert(pSTable != NULL);
|
assert(pSTable != NULL);
|
||||||
|
@ -673,7 +767,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
||||||
|
|
||||||
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||||
|
|
|
@ -289,8 +289,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
|
||||||
|
|
||||||
pHelper->tableInfo.tid = pTable->tableId.tid;
|
pHelper->tableInfo.tid = pTable->tableId.tid;
|
||||||
pHelper->tableInfo.uid = pTable->tableId.uid;
|
pHelper->tableInfo.uid = pTable->tableId.uid;
|
||||||
pHelper->tableInfo.sversion = pTable->sversion;
|
|
||||||
STSchema *pSchema = tsdbGetTableSchema(pRepo->tsdbMeta, pTable);
|
STSchema *pSchema = tsdbGetTableSchema(pRepo->tsdbMeta, pTable);
|
||||||
|
pHelper->tableInfo.sversion = schemaVersion(pSchema);
|
||||||
|
|
||||||
tdInitDataCols(pHelper->pDataCols[0], pSchema);
|
tdInitDataCols(pHelper->pDataCols[0], pSchema);
|
||||||
tdInitDataCols(pHelper->pDataCols[1], pSchema);
|
tdInitDataCols(pHelper->pDataCols[1], pSchema);
|
||||||
|
|
|
@ -224,6 +224,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
appH.cqH = pVnode->cq;
|
appH.cqH = pVnode->cq;
|
||||||
appH.cqCreateFunc = cqCreate;
|
appH.cqCreateFunc = cqCreate;
|
||||||
appH.cqDropFunc = cqDrop;
|
appH.cqDropFunc = cqDrop;
|
||||||
|
appH.configFunc = dnodeSendCfgTableToRecv;
|
||||||
sprintf(temp, "%s/tsdb", rootDir);
|
sprintf(temp, "%s/tsdb", rootDir);
|
||||||
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
||||||
if (pVnode->tsdb == NULL) {
|
if (pVnode->tsdb == NULL) {
|
||||||
|
@ -473,6 +474,7 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
|
||||||
appH.cqH = pVnode->cq;
|
appH.cqH = pVnode->cq;
|
||||||
appH.cqCreateFunc = cqCreate;
|
appH.cqCreateFunc = cqCreate;
|
||||||
appH.cqDropFunc = cqDrop;
|
appH.cqDropFunc = cqDrop;
|
||||||
|
appH.configFunc = dnodeSendCfgTableToRecv;
|
||||||
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
|
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRe
|
||||||
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet);
|
||||||
|
|
||||||
void vnodeInitWriteFp(void) {
|
void vnodeInitWriteFp(void) {
|
||||||
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
|
||||||
|
@ -41,6 +42,7 @@ void vnodeInitWriteFp(void) {
|
||||||
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg;
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg;
|
||||||
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg;
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg;
|
||||||
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg;
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg;
|
||||||
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
|
@ -110,7 +112,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg);
|
int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg);
|
||||||
|
|
||||||
tsdbClearTableCfg(pCfg);
|
tsdbClearTableCfg(pCfg);
|
||||||
free(pCfg);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +135,6 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
if (pCfg == NULL) return terrno;
|
if (pCfg == NULL) return terrno;
|
||||||
int32_t code = tsdbAlterTable(pVnode->tsdb, pCfg);
|
int32_t code = tsdbAlterTable(pVnode->tsdb, pCfg);
|
||||||
tsdbClearTableCfg(pCfg);
|
tsdbClearTableCfg(pCfg);
|
||||||
free(pCfg);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,6 +156,10 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
return tsdbUpdateTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont);
|
||||||
|
}
|
||||||
|
|
||||||
int vnodeWriteToQueue(void *param, void *data, int type) {
|
int vnodeWriteToQueue(void *param, void *data, int type) {
|
||||||
SVnodeObj *pVnode = param;
|
SVnodeObj *pVnode = param;
|
||||||
SWalHead *pHead = data;
|
SWalHead *pHead = data;
|
||||||
|
|
|
@ -95,7 +95,7 @@ int main(int argc, char *argv[]) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/TDinternal/community/sim/tsim/cfg");
|
taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg");
|
||||||
|
|
||||||
// init TAOS
|
// init TAOS
|
||||||
taos_init();
|
taos_init();
|
||||||
|
@ -108,7 +108,8 @@ int main(int argc, char *argv[]) {
|
||||||
printf("success to connect to server\n");
|
printf("success to connect to server\n");
|
||||||
|
|
||||||
// multiThreadTest(1, taos);
|
// multiThreadTest(1, taos);
|
||||||
doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1, -2) group by t1 limit 2 offset 10;");
|
doQuery(taos, "use test");
|
||||||
|
doQuery(taos, "alter table tm99 set tag a=99");
|
||||||
// for(int32_t i = 0; i < 100000; ++i) {
|
// for(int32_t i = 0; i < 100000; ++i) {
|
||||||
// doQuery(taos, "insert into t1 values(now, 2)");
|
// doQuery(taos, "insert into t1 values(now, 2)");
|
||||||
// }
|
// }
|
||||||
|
|
Loading…
Reference in New Issue