Merge branch 'feature/2.0tsdb' of github.com:taosdata/TDengine into feature/2.0tsdb
This commit is contained in:
commit
5b0534b4f4
|
@ -364,7 +364,7 @@ char tTokenTypeSwitcher[13] = {
|
|||
};
|
||||
|
||||
bool isValidDataType(int32_t type, int32_t length) {
|
||||
if (type < TSDB_DATA_TYPE_BOOL || type > TSDB_DATA_TYPE_NCHAR) {
|
||||
if (type < TSDB_DATA_TYPE_NULL || type > TSDB_DATA_TYPE_NCHAR) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -109,8 +109,8 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
|
|||
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
|
||||
void tsdbClearTableCfg(STableCfg *config);
|
||||
|
||||
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
|
||||
char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes);
|
||||
void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes);
|
||||
char* tsdbGetTableName(TsdbRepoT *repo, const STableId *id);
|
||||
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
||||
|
||||
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
|
||||
|
|
|
@ -2224,24 +2224,26 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
* set tag value in SQLFunctionCtx
|
||||
* e.g.,tag information into input buffer
|
||||
*/
|
||||
static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *param) {
|
||||
tVariantDestroy(param);
|
||||
|
||||
char * val = NULL;
|
||||
int16_t bytes = 0;
|
||||
int16_t type = 0;
|
||||
static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type,
|
||||
int16_t bytes) {
|
||||
tVariantDestroy(tag);
|
||||
|
||||
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
val = tsdbGetTableName(tsdb, pTableId, &bytes);
|
||||
type = TSDB_DATA_TYPE_BINARY;
|
||||
tVariantCreateFromBinary(param, varDataVal(val), varDataLen(val), type);
|
||||
char* val = tsdbGetTableName(tsdb, pTableId);
|
||||
assert(val != NULL);
|
||||
|
||||
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), TSDB_DATA_TYPE_BINARY);
|
||||
} else {
|
||||
tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val);
|
||||
char* val = tsdbGetTableTagVal(tsdb, pTableId, tagColId, type, bytes);
|
||||
if (val == NULL) {
|
||||
tag->nType = TSDB_DATA_TYPE_NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
tVariantCreateFromBinary(param, varDataVal(val), varDataLen(val), type);
|
||||
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
|
||||
} else {
|
||||
tVariantCreateFromBinary(param, val, bytes, type);
|
||||
tVariantCreateFromBinary(tag, val, bytes, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2249,25 +2251,29 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI
|
|||
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].base;
|
||||
if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
|
||||
assert(pFuncMsg->numOfParams == 1);
|
||||
doSetTagValueInParam(tsdb, pTableId, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
|
||||
SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
|
||||
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
|
||||
|
||||
assert(pExprInfo->base.numOfParams == 1);
|
||||
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag,
|
||||
pExprInfo->type, pExprInfo->bytes);
|
||||
} else {
|
||||
// set tag value, by which the results are aggregated.
|
||||
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
|
||||
SColIndex *pCol = &pQuery->pSelectExpr[idx].base.colInfo;
|
||||
|
||||
SExprInfo* pExprInfo = &pQuery->pSelectExpr[idx];
|
||||
|
||||
// ts_comp column required the tag value for join filter
|
||||
if (!TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
if (!TSDB_COL_IS_TAG(pExprInfo->base.colInfo.flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// todo use tag column index to optimize performance
|
||||
doSetTagValueInParam(tsdb, pTableId, pCol->colId, &pRuntimeEnv->pCtx[idx].tag);
|
||||
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag,
|
||||
pExprInfo->type, pExprInfo->bytes);
|
||||
}
|
||||
|
||||
// set the join tag for first column
|
||||
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
|
||||
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
|
||||
pRuntimeEnv->pTSBuf != NULL) {
|
||||
assert(pFuncMsg->numOfParams == 1);
|
||||
|
@ -6005,7 +6011,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
num = taosArrayGetSize(pa);
|
||||
|
||||
assert(num == pQInfo->groupInfo.numOfTables);
|
||||
int16_t type, bytes;
|
||||
// int16_t type, bytes;
|
||||
|
||||
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
|
||||
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
|
||||
|
@ -6013,7 +6019,6 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
|
||||
|
||||
int32_t rsize = pExprInfo->bytes;
|
||||
char* data = NULL;
|
||||
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SGroupItem* item = taosArrayGet(pa, i);
|
||||
|
@ -6031,8 +6036,25 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
*(int32_t*) output = pQInfo->vgId;
|
||||
output += sizeof(pQInfo->vgId);
|
||||
|
||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data);
|
||||
memcpy(output, data, bytes);
|
||||
int16_t bytes = pExprInfo->bytes;
|
||||
int16_t type = pExprInfo->type;
|
||||
|
||||
char* val = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, type, bytes);
|
||||
|
||||
// todo refactor
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (val == NULL) {
|
||||
setVardataNull(output, type);
|
||||
} else {
|
||||
memcpy(output, val, varDataTLen(val));
|
||||
}
|
||||
} else {
|
||||
if (val == NULL) {
|
||||
setNull(output, type, bytes);
|
||||
} else {
|
||||
memcpy(output, val, bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
|
||||
|
@ -6041,17 +6063,18 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
||||
SGroupItem* item = taosArrayGet(pa, i);
|
||||
|
||||
char* data = NULL;
|
||||
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||
// todo check the return value, refactor codes
|
||||
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
data = tsdbGetTableName(pQInfo->tsdb, &item->id, &bytes);
|
||||
char* data = tsdbGetTableName(pQInfo->tsdb, &item->id);
|
||||
|
||||
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||
memcpy(dst, data, varDataTLen(data));
|
||||
} else {// todo refactor, return the true length of binary|nchar data
|
||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
|
||||
assert(bytes <= pExprInfo[j].bytes && type == pExprInfo[j].type);
|
||||
} else {// todo refactor
|
||||
int16_t type = pExprInfo[j].type;
|
||||
int16_t bytes = pExprInfo[j].bytes;
|
||||
|
||||
char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes);
|
||||
|
||||
char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes;
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
|
|
|
@ -260,45 +260,33 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) {
|
||||
void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes) {
|
||||
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
||||
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||
|
||||
STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable);
|
||||
STColumn *pCol = tdGetColOfID(pSchema, colId);
|
||||
if (pCol == NULL) {
|
||||
return -1; // No matched tag volumn
|
||||
return NULL; // No matched tag volumn
|
||||
}
|
||||
|
||||
*val = tdGetKVRowValOfCol(pTable->tagVal, colId);
|
||||
*type = pCol->type;
|
||||
char* val = tdGetKVRowValOfCol(pTable->tagVal, colId);
|
||||
assert(type == pCol->type && bytes == pCol->bytes);
|
||||
|
||||
if (*val != NULL) {
|
||||
if (IS_VAR_DATA_TYPE(*type)) {
|
||||
*bytes = varDataLen(*val);
|
||||
} else {
|
||||
*bytes = TYPE_BYTES[*type];
|
||||
}
|
||||
if (val != NULL && IS_VAR_DATA_TYPE(type)) {
|
||||
assert(varDataLen(val) < pCol->bytes);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return val;
|
||||
}
|
||||
|
||||
char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) {
|
||||
char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id) {
|
||||
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
||||
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||
|
||||
if (pTable == NULL) {
|
||||
if (bytes != NULL) {
|
||||
*bytes = 0;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
} else {
|
||||
if (bytes != NULL) {
|
||||
*bytes = varDataLen(pTable->name);
|
||||
}
|
||||
|
||||
return (char*) pTable->name;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue