Merge remote-tracking branch 'origin/3.0' into feature/qnode

This commit is contained in:
dapan 2022-05-09 08:16:02 +08:00
commit 71dbed3ede
53 changed files with 3252 additions and 641 deletions

View File

@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
}
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
uint32_t numOfRow2);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
@ -232,7 +232,7 @@ void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);

View File

@ -190,6 +190,8 @@ typedef struct SRetention {
int8_t keepUnit;
} SRetention;
#define RETENTION_VALID(r) (((r)->freq > 0) && ((r)->keep > 0))
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta

View File

@ -164,7 +164,6 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_TOPICS_STMT,
QUERY_NODE_SHOW_CONSUMERS_STMT,
QUERY_NODE_SHOW_SUBSCRIBES_STMT,
QUERY_NODE_SHOW_TRANS_STMT,
QUERY_NODE_SHOW_SMAS_STMT,
QUERY_NODE_SHOW_CONFIGS_STMT,
QUERY_NODE_SHOW_CONNECTIONS_STMT,

View File

@ -105,7 +105,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void* smlInitHandle(SQuery *pQuery);
void smlDestroyHandle(void *pHandle);
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen);
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
#ifdef __cplusplus

View File

@ -25,6 +25,8 @@ extern "C" {
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
typedef struct SStreamTask SStreamTask;
enum {
STREAM_TASK_STATUS__RUNNING = 1,
STREAM_TASK_STATUS__STOP,
@ -69,20 +71,24 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct {
int8_t reserved;
int64_t stbUid;
SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder
void* vnode;
FTbSink* tbSinkFunc;
STSchema* pTSchema;
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef struct {
int64_t smaId;
// following are not applicable to encoder and decoder
FSmaHandle* smaHandle;
FSmaSink* smaSink;
} STaskSinkSma;
typedef struct {
@ -115,7 +121,7 @@ enum {
TASK_SINK__FETCH,
};
typedef struct {
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t status;
@ -150,8 +156,7 @@ typedef struct {
// application storage
void* ahandle;
} SStreamTask;
};
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);

View File

@ -37,8 +37,8 @@ typedef enum {
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj *tags;
SHashObj *fields;
SArray *tags;
SArray *fields;
} SCreateSTableActionInfo;
typedef struct {
@ -78,14 +78,17 @@ typedef struct {
// colsFormat store cols formated, for quick parse, if info->formatData is true
SArray *colsFormat; // elements are SArray<SSmlKv*>
// cols & colsColumn store cols un formated
// cols store cols un formated
SArray *cols; // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
SHashObj *columnsHash; // elements are <cols key string, 1>, just for judge if key exists quickly.
} SSmlTableInfo;
typedef struct {
SHashObj *tagHash;
SArray *tags; // save the origin order to create table
SHashObj *tagHash; // elements are <key, index in tags>
SArray *cols;
SHashObj *fieldHash;
STableMeta *tableMeta;
} SSmlSTableMeta;
@ -113,6 +116,8 @@ typedef struct {
int32_t affectedRows;
SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key
SArray *colsContainer; // for cols parse, if is dataFormat == false
} SSmlHandle;
//=================================================================================================
@ -143,8 +148,8 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const
}
static int smlCompareKv(const void* p1, const void* p2) {
SSmlKv* kv1 = (SSmlKv*)p1;
SSmlKv* kv2 = (SSmlKv*)p2;
SSmlKv* kv1 = *(SSmlKv**)p1;
SSmlKv* kv2 = *(SSmlKv**)p2;
int32_t kvLen1 = kv1->keyLen;
int32_t kvLen2 = kv2->keyLen;
int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
@ -174,8 +179,9 @@ static void smlBuildChildTableName(SSmlTableInfo *tags) {
tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
tMD5Final(&context);
uint64_t digest1 = *(uint64_t*)(context.digest);
uint64_t digest2 = *(uint64_t*)(context.digest + 8);
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
//uint64_t digest2 = *(uint64_t*)(context.digest + 8);
//snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1);
taosStringBuilderDestroy(&sb);
tags->uid = digest1;
}
@ -350,37 +356,26 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
char* pos = result + n; int freeBytes = capacity - n;
size_t size = taosHashGetSize(action->createSTable.fields);
SArray *cols = taosArrayInit(size, POINTER_BYTES);
SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL);
while(kv){
if(strncmp((*kv)->key, TS, strlen(TS)) == 0 && (*kv)->type == TSDB_DATA_TYPE_TIMESTAMP){
taosArrayInsert(cols, 0, kv);
}else{
taosArrayPush(cols, kv);
}
kv = taosHashIterate(action->createSTable.fields, kv);
}
SArray *cols = action->createSTable.fields;
for(int i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kvNew = taosArrayGetP(cols, i);
smlBuildColumnDescription(kvNew, pos, freeBytes, &outBytes);
SSmlKv *kv = taosArrayGetP(cols, i);
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
}
taosArrayDestroy(cols);
--pos; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ") tags (");
pos += outBytes; freeBytes -= outBytes;
kv = taosHashIterate(action->createSTable.tags, NULL);
while(kv){
smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes);
cols = action->createSTable.tags;
for(int i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i);
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
kv = taosHashIterate(action->createSTable.tags, kv);
}
pos--; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ")");
@ -419,7 +414,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
while (tableMetaSml) {
SSmlSTableMeta* cTablePoints = *tableMetaSml;
SSmlSTableMeta* sTableData = *tableMetaSml;
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
@ -436,8 +431,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
schemaAction.createSTable.tags = cTablePoints->tagHash;
schemaAction.createSTable.fields = cTablePoints->fieldHash;
schemaAction.createSTable.tags = sTableData->tags;
schemaAction.createSTable.fields = sTableData->cols;
code = smlApplySchemaAction(info, &schemaAction);
if (code != 0) {
uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
@ -454,7 +449,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
return code;
}
cTablePoints->tableMeta = pTableMeta;
sTableData->tableMeta = pTableMeta;
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
}
@ -1034,7 +1029,7 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SSmlMsgBuf *msg){
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = TAG;
@ -1062,6 +1057,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
return TSDB_CODE_SML_INVALID_DATA;
}
if(taosHashGet(dumplicateKey, key, keyLen)){
smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_SML_INVALID_DATA;
}else{
taosHashPut(dumplicateKey, key, keyLen, key, CHAR_BYTES);
}
// parse value
i++;
const char *value = data + i;
@ -1295,14 +1297,19 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
SSmlKv *kv = taosArrayGetP(tags, i);
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
SSmlKv **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
if(value){
uint8_t *index = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
if(index){
SSmlKv **value = taosArrayGet(tableMeta->tags, *index);
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
*value = kv;
}
}else{
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
size_t tmp = taosArrayGetSize(tableMeta->tags);
ASSERT(tmp <= UINT8_MAX);
uint8_t size = tmp;
taosArrayPush(tableMeta->tags, &kv);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES);
}
}
}
@ -1310,8 +1317,10 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
if(cols){
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(value){
int16_t *index = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(index){
SSmlKv **value = taosArrayGet(tableMeta->cols, *index);
if(kv->type != (*value)->type){
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return false;
@ -1323,7 +1332,11 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
}
}
}else{
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
size_t tmp = taosArrayGetSize(tableMeta->cols);
ASSERT(tmp <= INT16_MAX);
int16_t size = tmp;
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
}
}
}
@ -1332,16 +1345,18 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = taosArrayGetP(tags, i);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
taosArrayPush(tableMeta->tags, &kv);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES);
}
}
if(cols){
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = taosArrayGetP(cols, i);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
}
}
}
@ -1364,12 +1379,6 @@ static SSmlTableInfo* smlBuildTableInfo(bool format){
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
}
tag->columnsHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (tag->columnsHash == NULL) {
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
}
}
tag->tags = taosArrayInit(16, POINTER_BYTES);
@ -1399,7 +1408,6 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
}
taosHashCleanup(kvHash);
}
taosHashCleanup(tag->columnsHash);
}
taosArrayDestroy(tag->tags);
taosMemoryFreeClear(tag);
@ -1408,7 +1416,9 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
if(dataFormat){
taosArrayPush(oneTable->colsFormat, &cols);
}else{
return TSDB_CODE_SUCCESS;
}
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!kvHash){
uError("SML:smlDealCols failed to allocate memory");
@ -1417,14 +1427,9 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
for(size_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
if(taosHashGet(oneTable->columnsHash, kv->key, kv->keyLen) != NULL){
continue;
}
taosHashPut(oneTable->columnsHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
taosArrayPush(oneTable->cols, &kvHash);
}
return TSDB_CODE_SUCCESS;
}
@ -1444,6 +1449,18 @@ static SSmlSTableMeta* smlBuildSTableMeta(){
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->tags = taosArrayInit(32, POINTER_BYTES);
if (meta->tags == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->cols = taosArrayInit(32, POINTER_BYTES);
if (meta->cols == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
return meta;
cleanup:
@ -1454,6 +1471,8 @@ cleanup:
static void smlDestroySTableMeta(SSmlSTableMeta *meta){
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->fieldHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta);
}
@ -1465,18 +1484,23 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return ret;
}
SArray *cols = taosArrayInit(16, POINTER_BYTES);
SArray *cols = NULL;
if(info->dataFormat){ // if dataFormat, cols need new memory to save data
cols = taosArrayInit(16, POINTER_BYTES);
if (cols == NULL) {
uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}else{ // if dataFormat is false, cols do not need to save data, there is another new memory to save data
cols = info->colsContainer;
}
ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
return ret;
}
ret = smlParseCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf);
ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
return ret;
@ -1500,46 +1524,51 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return ret;
}
}else{
SSmlTableInfo *tag = smlBuildTableInfo(info->dataFormat);
if(!tag){
SSmlTableInfo *tinfo = smlBuildTableInfo(info->dataFormat);
if(!tinfo){
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ret = smlDealCols(tag, info->dataFormat, cols);
ret = smlDealCols(tinfo, info->dataFormat, cols);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
ret = smlParseCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf);
ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
return ret;
}
if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){
if(taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
tag->sTableName = elements.measure;
tag->sTableNameLen = elements.measureLen;
smlBuildChildTableName(tag);
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tag->childTableName);
tinfo->sTableName = elements.measure;
tinfo->sTableNameLen = elements.measureLen;
smlBuildChildTableName(tinfo);
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName);
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta
ret = smlUpdateMeta(*tableMeta, tag->tags, cols, &info->msgBuf);
ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf);
if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
}else{
SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, tag->tags, cols);
smlInsertMeta(meta, tinfo->tags, cols);
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
}
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tag, POINTER_BYTES);
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
}
if(!info->dataFormat){
taosArrayClear(info->colsContainer);
}
taosHashClear(info->dumplicateKey);
return TSDB_CODE_SUCCESS;
}
@ -1568,6 +1597,7 @@ static void smlDestroyInfo(SSmlHandle* info){
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->dumplicateKey);
taosMemoryFreeClear(info);
}
@ -1614,8 +1644,17 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!dataFormat){
info->colsContainer = taosArrayInit(32, POINTER_BYTES);
if(NULL == info->colsContainer){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
}
if(NULL == info->exec || NULL == info->childTables
|| NULL == info->superTables || NULL == info->pVgHash){
|| NULL == info->superTables || NULL == info->pVgHash
|| NULL == info->dumplicateKey){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
@ -1651,7 +1690,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
(*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, tableData->columnsHash,
code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, (*pMeta)->cols,
tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){
return code;
@ -1730,7 +1769,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL;
}
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, false);
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, true);
if(!info){
return (TAOS_RES*)request;
}

View File

@ -190,17 +190,21 @@ TEST(testCase, smlParseCols_Error_Test) {
"c=-3.402823466e+39u64",
"c=-339u64",
"c=18446744073709551616u64",
"c=1,c=2"
};
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for(int i = 0; i < sizeof(data)/sizeof(data[0]); i++){
char msg[256] = {0};
SSmlMsgBuf msgBuf;
msgBuf.buf = msg;
msgBuf.len = 256;
int32_t len = strlen(data[i]);
int32_t ret = smlParseCols(data[i], len, NULL, false, &msgBuf);
int32_t ret = smlParseCols(data[i], len, NULL, false, dumplicateKey, &msgBuf);
ASSERT_NE(ret, TSDB_CODE_SUCCESS);
taosHashClear(dumplicateKey);
}
taosHashCleanup(dumplicateKey);
}
TEST(testCase, smlParseCols_tag_Test) {
@ -211,11 +215,12 @@ TEST(testCase, smlParseCols_tag_Test) {
SArray *cols = taosArrayInit(16, POINTER_BYTES);
ASSERT_NE(cols, NULL);
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
const char *data =
"cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
int32_t len = strlen(data);
int32_t ret = smlParseCols(data, len, cols, true, &msgBuf);
int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
int32_t size = taosArrayGetSize(cols);
ASSERT_EQ(size, 19);
@ -239,10 +244,14 @@ TEST(testCase, smlParseCols_tag_Test) {
taosMemoryFree(kv);
taosArrayClear(cols);
// test tag is null
data = "t=3e";
len = 0;
memset(msgBuf.buf, 0, msgBuf.len);
ret = smlParseCols(data, len, cols, true, &msgBuf);
taosHashClear(dumplicateKey);
ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
size = taosArrayGetSize(cols);
ASSERT_EQ(size, 1);
@ -255,6 +264,9 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(kv->valueLen, strlen(TAG));
ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0);
taosMemoryFree(kv);
taosArrayDestroy(cols);
taosHashCleanup(dumplicateKey);
}
TEST(testCase, smlParseCols_Test) {
@ -266,9 +278,11 @@ TEST(testCase, smlParseCols_Test) {
SArray *cols = taosArrayInit(16, POINTER_BYTES);
ASSERT_NE(cols, NULL);
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
int32_t len = strlen(data);
int32_t ret = smlParseCols(data, len, cols, false, &msgBuf);
int32_t ret = smlParseCols(data, len, cols, false, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
int32_t size = taosArrayGetSize(cols);
ASSERT_EQ(size, 19);
@ -450,6 +464,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
taosArrayDestroy(cols);
taosHashCleanup(dumplicateKey);
}
TEST(testCase, smlParseLine_Test) {
@ -468,17 +483,47 @@ TEST(testCase, smlParseLine_Test) {
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, NULL);
const char *sql[3] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451606400000000000",
const char *sql[9] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619400000000000",
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000"
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000"
};
smlInsertLines(info, sql, 3);
smlInsertLines(info, sql, 9);
// for (int i = 0; i < 3; i++) {
// smlParseLine(info, sql[i]);
// }
}
TEST(testCase, smlParseLine_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, NULL);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, NULL);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, NULL);
const char *sql[2] = {
"measure,t1=3 c1=8",
"measure,t2=3 c1=8u8"
};
int ret = smlInsertLines(info, sql, 2);
ASSERT_NE(ret, 0);
}
// TEST(testCase, smlParseTS_Test) {
// char msg[256] = {0};
// SSmlMsgBuf msgBuf;

View File

@ -1596,7 +1596,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
return TSDB_CODE_SUCCESS;
}
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
int32_t vgId) {
SSubmitReq* ret = NULL;
// cal size
@ -1608,13 +1609,37 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen;
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = htobe64(suid);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
}
// assign data
ret = taosMemoryCalloc(1, cap);
ret = taosMemoryCalloc(1, cap + 46);
ret = POINTER_SHIFT(ret, 46);
ret->header.vgId = vgId;
ret->version = htonl(1);
ret->length = htonl(cap - sizeof(SSubmitReq));
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
@ -1623,11 +1648,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitBlk* blkHead = submitBlk;
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->schemaLen = 0;
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = 0;
blkHead->uid = htobe64(pDataBlock->info.uid);
blkHead->suid = htobe64(suid);
// uid is assigned by vnode
blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows;
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
@ -1635,7 +1660,35 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
blkHead->dataLen = 0;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
STSRow* rowData = blockData;
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
tEncoderClear(&encoder);
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
@ -1653,10 +1706,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
rowData = POINTER_SHIFT(rowData, rowLen);
blkHead->dataLen += rowLen;
}
int32_t len = blkHead->dataLen;
blkHead->dataLen = htonl(len);
blkHead = POINTER_SHIFT(blkHead, len);
int32_t dataLen = blkHead->dataLen;
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen);
/*submitBlk = blkHead;*/
}
ret->length = htonl(ret->length);
return ret;
}

View File

@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen);
ASSERT(pIter->len > 0);
}

View File

@ -106,6 +106,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);

View File

@ -51,6 +51,7 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs;
pMgmt->state = pInfo->vstat;
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
taosArrayDestroy(vloads.pVloads);
}
@ -177,6 +178,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;

View File

@ -135,6 +135,7 @@ static void *vmOpenVnodeFunc(void *param) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;

View File

@ -357,6 +357,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
pMsg->rpcMsg = *pRpc;
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, will be put into vnode-write queue", pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
break;
case QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg);
@ -387,6 +391,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
return code;
}
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE);
}
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
}

View File

@ -574,6 +574,7 @@ typedef struct {
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int64_t createTime;
int64_t updateTime;
int64_t uid;

View File

@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;

View File

@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
//
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
#endif

View File

@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
goto _OVER;
}
stbObj.uid = pStream->targetStbUid;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
return 0;
@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.dbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;

View File

@ -967,7 +967,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = 0;
mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action);
}
}
@ -1043,7 +1043,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return errCode;
}
} else {
mDebug("trans:%d, %d of %d actions executing", pTrans->id, numOfReceived, numOfActions);
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}

View File

@ -137,25 +137,15 @@ struct STsdbCfg {
int8_t update;
int8_t compression;
int8_t slLevel;
int32_t days;
int32_t minRows;
int32_t maxRows;
int32_t keep0;
int32_t keep1;
int32_t keep2;
// TODO: save to tsdb cfg file
int8_t type; // ETsdbType
int32_t days; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keep0; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
SRetention retentions[TSDB_RETENTION_MAX];
};
typedef enum {
TSDB_TYPE_TSDB = 0, // TSDB
TSDB_TYPE_TSMA = 1, // TSMA
TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0
TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
} ETsdbType;
struct SVnodeCfg {
int32_t vgId;
char dbname[TSDB_DB_NAME_LEN];

View File

@ -70,9 +70,10 @@ struct SSmaEnvs {
struct STsdb {
char *path;
SVnode *pVnode;
TdThreadMutex mutex;
bool repoLocked;
int8_t level; // retention level
TdThreadMutex mutex;
STsdbKeepCfg keepCfg;
STsdbMemTable *mem;
STsdbMemTable *imem;
SRtn rtn;
@ -185,6 +186,7 @@ struct STsdbFS {
#define REPO_ID(r) TD_VID((r)->pVnode)
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
#define REPO_KEEP_CFG(r) (&(r)->keepCfg)
#define REPO_LEVEL(r) ((r)->level)
#define REPO_FS(r) ((r)->fs)
#define REPO_META(r) ((r)->pVnode->pMeta)
@ -830,7 +832,7 @@ typedef struct {
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg);
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
void *tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo);

View File

@ -59,6 +59,7 @@ typedef struct SQWorker SQHandle;
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
#define VNODE_TSMA_DIR "tsma"
#define VNODE_RSMA0_DIR "tsdb"
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
@ -154,6 +155,22 @@ struct SVnodeInfo {
SVState state;
};
typedef enum {
TSDB_TYPE_TSDB = 0, // TSDB
TSDB_TYPE_TSMA = 1, // TSMA
TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0
TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
} ETsdbType;
typedef struct {
int8_t precision; // precision always be used with below keep cfgs
int32_t days;
int32_t keep0;
int32_t keep1;
int32_t keep2;
} STsdbKeepCfg;
struct SVnode {
char* path;
SVnodeCfg config;
@ -180,6 +197,7 @@ struct SVnode {
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
#define VND_RSMA1(vnd) ((vnd)->pRSma1)
#define VND_RSMA2(vnd) ((vnd)->pRSma2)
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
struct STbUidStore {
tb_uid_t suid;

View File

@ -141,16 +141,19 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
goto _err;
}
// preprocess req
pReq->uid = tGenIdPI64();
pReq->ctime = taosGetTimestampMs();
// validate req
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
pReq->uid = mr.me.uid;
if (pReq->type == TSDB_CHILD_TABLE) {
pReq->ctb.suid = mr.me.ctbEntry.suid;
}
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
} else {
pReq->uid = tGenIdPI64();
pReq->ctime = taosGetTimestampMs();
}
metaReaderClear(&mr);

View File

@ -884,9 +884,26 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType == TASK_EXEC__NONE) return 0;
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = pReq,
.contLen = ntohl(pReq->length),
};
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
@ -902,6 +919,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor);
}
}
if (pTask->sinkType == TASK_SINK__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink;
}
return 0;
}
@ -925,7 +949,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
// sink
pTask->ahandle = pTq->pVnode;
if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle = smaHandleRes;
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);

View File

@ -210,7 +210,7 @@ int tsdbCommit(STsdb *pRepo) {
}
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
@ -305,7 +305,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
static int tsdbNextCommitFid(SCommitH *pCommith) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
int fid = TSDB_IVLD_FID;
for (int i = 0; i < pCommith->niters; i++) {
@ -338,7 +338,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid);

View File

@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
}
// ================== STsdbFS
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) {
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
int keep = pCfg->keep2;
int days = pCfg->days;
int maxFSet = TSDB_MAX_FSETS(keep, days);

View File

@ -76,6 +76,8 @@ struct SMemSkipListCurosr {
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
SMemTable *pMemTb = NULL;
@ -176,20 +178,19 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
// do insert data to SMemData
SMemSkipListCurosr slc = {0};
const uint8_t *p = pSubmitBlk->pData;
const uint8_t *pt;
const STSRow *pRow;
uint64_t szRow;
uint32_t szRow;
SDecoder decoder = {0};
// tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER);
tDecoderInit(&decoder, pSubmitBlk->pData, pSubmitBlk->nData);
for (;;) {
// if (tDecodeIsEnd(&coder)) break;
if (tDecodeIsEnd(&decoder)) break;
if (tDecodeBinary(&decoder, (const uint8_t **)&pRow, &szRow) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
// check the row (todo)
// // move the cursor to position to write (todo)
@ -197,11 +198,16 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
// tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
// ASSERT(c);
// // encode row
// int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
// int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
// pSlNode = vnodeBufPoolMalloc(pPool, tsize);
// pSlNode->level = level;
// encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (0 /*todo*/);
SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pNode->level = level;
// uint8_t *pData = SL_NODE_DATA(pSlNode);
// *(int64_t *)pData = version;
@ -215,7 +221,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
}
// tCoderClear(&coder);
tDecoderClear(&decoder);
// tsdbMemSkipListCursorClose(&slc);
// update status
@ -229,3 +235,18 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
return 0;
}
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1;
int8_t tlevel;
const uint32_t factor = 4;
if (pSl->size) {
tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
level++;
}
}
return level;
}

View File

@ -15,7 +15,30 @@
#include "tsdb.h"
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
#define TSDB_OPEN_RSMA_IMPL(v, l) \
do { \
SRetention *r = VND_RETENTIONS(v)[0]; \
if (RETENTION_VALID(r)) { \
return tsdbOpenImpl((v), type, &VND_RSMA##l(v), VNODE_RSMA##l##_DIR, TSDB_RETENTION_L##l); \
} \
} while (0)
#define TSDB_SET_KEEP_CFG(l) \
do { \
SRetention *r = &pCfg->retentions[l]; \
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
pKeepCfg->keep0 = pKeepCfg->keep2; \
pKeepCfg->keep1 = pKeepCfg->keep2; \
pKeepCfg->days = tsdbEvalDays(r, pCfg->precision); \
} while (0)
#define RETENTION_DAYS_SPLIT_RATIO 10
#define RETENTION_DAYS_SPLIT_MIN 1
#define RETENTION_DAYS_SPLIT_MAX 30
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type);
static int32_t tsdbEvalDays(SRetention *r, int8_t precision);
static int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
int tsdbOpen(SVnode *pVnode, int8_t type) {
switch (type) {
@ -25,11 +48,63 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
TSDB_OPEN_RSMA_IMPL(pVnode, 0);
break;
case TSDB_TYPE_RSMA_L1:
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR, TSDB_RETENTION_L1);
TSDB_OPEN_RSMA_IMPL(pVnode, 1);
break;
case TSDB_TYPE_RSMA_L2:
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR, TSDB_RETENTION_L2);
TSDB_OPEN_RSMA_IMPL(pVnode, 2);
break;
default:
ASSERT(0);
break;
}
return 0;
}
static int32_t tsdbEvalDays(SRetention *r, int8_t precision) {
int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY);
int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY);
int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO;
if (days <= RETENTION_DAYS_SPLIT_MIN) {
days = RETENTION_DAYS_SPLIT_MIN;
if (days < freqDays) {
days = freqDays + 1;
}
} else {
if (days > RETENTION_DAYS_SPLIT_MAX) {
days = RETENTION_DAYS_SPLIT_MAX;
}
if (days < freqDays) {
days = freqDays + 1;
}
}
return days * 1440;
}
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type) {
pKeepCfg->precision = pCfg->precision;
switch (type) {
case TSDB_TYPE_TSDB:
pKeepCfg->days = pCfg->days;
pKeepCfg->keep0 = pCfg->keep0;
pKeepCfg->keep1 = pCfg->keep1;
pKeepCfg->keep2 = pCfg->keep2;
break;
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
TSDB_SET_KEEP_CFG(0);
break;
case TSDB_TYPE_RSMA_L1:
TSDB_SET_KEEP_CFG(1);
break;
case TSDB_TYPE_RSMA_L2:
TSDB_SET_KEEP_CFG(2);
break;
default:
ASSERT(0);
break;
@ -47,7 +122,7 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
* @param level retention level
* @return int
*/
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
STsdb *pTsdb = NULL;
int slen = 0;
@ -62,13 +137,13 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, i
}
pTsdb->path = (char *)&pTsdb[1];
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
dir);
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir);
pTsdb->pVnode = pVnode;
pTsdb->level = level;
pTsdb->repoLocked = false;
taosThreadMutexInit(&pTsdb->mutex, NULL);
pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb));
tsdbSetKeepCfg(REPO_KEEP_CFG(pTsdb), REPO_CFG(pTsdb), type);
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// create dir (TODO: use tfsMkdir)
taosMkDir(pTsdb->path);

View File

@ -320,7 +320,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
STsdbCfg* pCfg = REPO_CFG(pTsdb);
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb);
int64_t now = taosGetTimestamp(pCfg->precision);
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
@ -2425,7 +2425,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
@ -2531,7 +2531,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
// find the start data block in file
pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle);
@ -2632,7 +2632,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
// find the start data block in file
if (!pTsdbReadHandle->locateStart) {
pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle);

View File

@ -1013,7 +1013,7 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t
* @return int32_t
*/
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
STsdbCfg *pCfg = REPO_CFG(pTsdb);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
int32_t daysPerFile = pCfg->days;
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {

View File

@ -60,7 +60,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
STsdbCfg *pCfg = REPO_CFG(pTsdb);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;

View File

@ -15,8 +15,9 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
#include "tcommon.h"
#include <libs/function/function.h>
#include "tbuffer.h"
#include "tcommon.h"
#include "tpagedbuf.h"
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
@ -56,9 +57,9 @@ typedef struct SResultRow {
bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window
struct SResultRowEntryInfo* pEntryInfo; // For each result column, there is a resultInfo
STimeWindow win;
char *key; // start key of current result row
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
// char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowPosition {

View File

@ -711,7 +711,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
#if 0

View File

@ -157,8 +157,6 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
pResultRow->pageId = -1;
pResultRow->offset = -1;
pResultRow->closed = false;
taosMemoryFreeClear(pResultRow->key);
pResultRow->win = TSWINDOW_INITIALIZER;
}

View File

@ -388,6 +388,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// allocate a new buffer page
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
if (pResult == NULL) {
ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
initResultRow(pResult);
@ -1152,7 +1153,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
// for simple column, the intermediate buffer needs to hold one element.
// for simple column, the result buffer needs to hold at least one element.
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
}
@ -1872,7 +1873,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo)
}
void initResultRow(SResultRow* pResultRow) {
pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
// pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
}
/*
@ -1884,7 +1885,7 @@ void initResultRow(SResultRow* pResultRow) {
* offset[0] offset[1] offset[2]
*/
// TODO refactor: some function move away
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) {
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, SExecTaskInfo* pTaskInfo) {
SqlFunctionCtx* pCtx = pInfo->pCtx;
SSDataBlock* pDataBlock = pInfo->pRes;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
@ -1897,6 +1898,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
pTaskInfo, false, pSup);
ASSERT(pDataBlock->info.numOfCols == numOfExprs);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset);
cleanupResultRowEntry(pEntry);
@ -3624,7 +3626,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error;
}
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num, pTaskInfo);
code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -4217,12 +4219,22 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ ||
pAggSup->pResultRowHashTable == NULL) {
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, 4096, 4096 * 256, pKey, "/tmp/");
uint32_t defaultPgsz = 4096;
while(defaultPgsz < pAggSup->resultRowSize*4) {
defaultPgsz <<= 1u;
}
// at least four pages need to be in buffer
int32_t defaultBufsz = 4096 * 256;
if (defaultBufsz <= defaultPgsz) {
defaultBufsz = defaultPgsz * 4;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, "/tmp/");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -4362,6 +4374,10 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
doDestroyBasicInfo(pInfo, numOfOutput);
}
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*) param;
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
@ -4425,7 +4441,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
pOperator->name = "ProjectOperator";
@ -4938,7 +4954,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
@ -5511,7 +5527,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
return (pRes->info.rows > 0) ? pRes : NULL;
}
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
@ -5537,7 +5553,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -38,7 +38,8 @@
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false;
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableScanInfo* pInfo = pOperator->info;
@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
"-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
"-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
@ -652,8 +655,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t* id = taosArrayGet(pColList, i);
taosArrayPush(pColIds, id);
SColMatchInfo* id = taosArrayGet(pColList, i);
int16_t colId = id->colId;
taosArrayPush(pColIds, &colId);
}
pInfo->pColMatchInfo = pColList;
@ -701,7 +705,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator;
@ -1107,7 +1112,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
return p->info.rows;
}
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) {
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName) {
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t numOfRows = p->info.rows;
@ -1207,8 +1213,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
NULL, NULL, NULL);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
@ -1355,7 +1361,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, SArray* pColMatchInfo,
STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {

View File

@ -1,7 +1,7 @@
#include "ttime.h"
#include "tdatablock.h"
#include "executorimpl.h"
#include "functionMgt.h"
#include "tdatablock.h"
#include "ttime.h"
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
@ -545,7 +545,6 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
}
}
static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx,
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep,
int32_t order, bool timeWindowInterpo) {
@ -1038,7 +1037,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
ASSERT(pInfo->binfo.pRes->info.rows > 0);
// TODO: remove for stream
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
pOperator->status = OP_RES_TO_RETURN;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
@ -1432,8 +1432,8 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {

View File

@ -266,6 +266,20 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS;
}
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t paraLen = LIST_LENGTH(pFunc->pParameterList);
if (paraLen == 0 || paraLen > 2) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!IS_NUMERIC_TYPE(p1->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = p1->resType;
return TSDB_CODE_SUCCESS;
}
static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
@ -617,7 +631,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "diff",
.type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateInOutNum,
.translateFunc = translateDiff,
.getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup,
.processFunc = diffFunction,

View File

@ -75,7 +75,7 @@ typedef struct SPercentileInfo {
typedef struct SDiffInfo {
bool hasPrev;
bool includeNull;
bool ignoreNegative;
bool ignoreNegative; // replace the ignore with case when
bool firstOutput;
union {
int64_t i64;
@ -1419,32 +1419,126 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->hasPrev = false;
pDiffInfo->prev.i64 = 0;
pDiffInfo->ignoreNegative = false; // TODO set correct param
pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param
pDiffInfo->includeNull = false;
pDiffInfo->firstOutput = false;
return true;
}
static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
switch(type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
pDiffInfo->prev.i64 = *(int8_t*) pv; break;
case TSDB_DATA_TYPE_INT:
pDiffInfo->prev.i64 = *(int32_t*) pv; break;
case TSDB_DATA_TYPE_SMALLINT:
pDiffInfo->prev.i64 = *(int16_t*) pv; break;
case TSDB_DATA_TYPE_BIGINT:
pDiffInfo->prev.i64 = *(int64_t*) pv; break;
case TSDB_DATA_TYPE_FLOAT:
pDiffInfo->prev.d64 = *(float *) pv; break;
case TSDB_DATA_TYPE_DOUBLE:
pDiffInfo->prev.d64 = *(double*) pv; break;
default:
ASSERT(0);
}
}
static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, int32_t order) {
int32_t factor = (order == TSDB_ORDER_ASC)? 1:-1;
switch (type) {
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)pv;
int32_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
pDiffInfo->prev.i64 = v;
break;
}
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = *(int8_t*)pv;
int8_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt8(pOutput, pos, &delta);
}
pDiffInfo->prev.i64 = v;
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)pv;
int16_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt16(pOutput, pos, &delta);
}
pDiffInfo->prev.i64 = v;
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)pv;
int64_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt64(pOutput, pos, &delta);
}
pDiffInfo->prev.i64 = v;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float v = *(float*)pv;
float delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendFloat(pOutput, pos, &delta);
}
pDiffInfo->prev.d64 = v;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double v = *(double*)pv;
double delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendDouble(pOutput, pos, &delta);
}
pDiffInfo->prev.d64 = v;
break;
}
default:
ASSERT(0);
}
}
int32_t diffFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
bool isFirstBlock = (pDiffInfo->hasPrev == false);
int32_t numOfElems = 0;
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
int32_t numOfElems = 0;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
int32_t startOffset = pCtx->offset;
switch (pInputCol->info.type) {
case TSDB_DATA_TYPE_INT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
if (pCtx->order == TSDB_ORDER_ASC) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
int32_t pos = startOffset + numOfElems;
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
@ -1457,210 +1551,60 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
continue;
}
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
char* pv = colDataGetData(pInputCol, i);
if (pDiffInfo->hasPrev) {
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems++;
} else {
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
}
pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true;
numOfElems++;
}
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
int32_t pos = startOffset + numOfElems;
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems += 1;
}
continue;
}
char* pv = colDataGetData(pInputCol, i);
// there is a row of previous data block to be handled in the first place.
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
}
pDiffInfo->hasPrev = false;
}
// it is not the last row of current block
if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);
int32_t delta = v - next; // direct previous may be null
colDataAppendInt32(pOutput, pos, &delta);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems++;
} else {
pDiffInfo->prev.i64 = v;
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
}
pDiffInfo->hasPrev = true;
if (pTsOutput != NULL) {
pDiffInfo->prevTs = tsList[i];
}
pDiffInfo->hasPrev = true;
}
numOfElems++;
}
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
int32_t v = 0;
if (pDiffInfo->hasPrev) {
v = *(int64_t*)colDataGetData(pInputCol, i);
int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (pDiffInfo->ignoreNegative) {
continue;
}
// *(pOutput++) = delta;
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
//
// pOutput += 1;
// pTimestamp += 1;
}
pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
#if 0
case TSDB_DATA_TYPE_DOUBLE: {
double *pData = (double *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->d64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *pData = (float *)data;
float *pOutput = (float *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->d64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *pData = (int16_t *)data;
int16_t *pOutput = (int16_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->i64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t *pData = (int8_t *)data;
int8_t *pOutput = (int8_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->i64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
#endif
default:
break;
// qError("error input type");
}
// initial value is not set yet
if (numOfElems <= 0) {
return 0;
} else {
return (isFirstBlock) ? numOfElems - 1 : numOfElems;
}
return numOfElems;
}
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {

View File

@ -19,7 +19,6 @@
#include "thash.h"
#include "ttypes.h"
//#include "tfill.h"
#include "function.h"
#include "taggfunction.h"
#include "tbuffer.h"
@ -27,7 +26,6 @@
#include "thistogram.h"
#include "tpercentile.h"
#include "ttszip.h"
//#include "queryLog.h"
#include "tdatablock.h"
#include "tudf.h"

View File

@ -168,8 +168,6 @@ const char* nodesNodeName(ENodeType type) {
return "ShowConsumersStmt";
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
return "ShowSubscribesStmt";
case QUERY_NODE_SHOW_TRANS_STMT:
return "ShowTransStmt";
case QUERY_NODE_SHOW_SMAS_STMT:
return "ShowSmasStmt";
case QUERY_NODE_SHOW_CONFIGS_STMT:

View File

@ -190,7 +190,6 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_TRANS_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:

View File

@ -1300,9 +1300,10 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const
}
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) {
SNode* pStmt = nodesMakeNode(type);
SKillStmt* pStmt = nodesMakeNode(type);
CHECK_OUT_OF_MEM(pStmt);
return pStmt;
pStmt->targetId = strtol(pId->z, NULL, 10);
return (SNode*)pStmt;
}
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) {

View File

@ -23,12 +23,17 @@ typedef struct SAuthCxt {
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt);
static int32_t checkAuth(SParseContext* pCxt, const char* dbName, AUTH_TYPE type) {
static int32_t checkAuth(SParseContext* pCxt, const char* pDbName, AUTH_TYPE type) {
if (pCxt->isSuperUser) {
return TSDB_CODE_SUCCESS;
}
SName name;
tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
bool pass = false;
int32_t code = catalogChkAuth(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pCxt->pUser, dbName, type, &pass);
int32_t code =
catalogChkAuth(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pCxt->pUser, dbFname, type, &pass);
return TSDB_CODE_SUCCESS == code ? (pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code;
}
@ -130,7 +135,6 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_TRANS_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:

View File

@ -1550,7 +1550,7 @@ typedef struct SmlExecHandle {
SQuery* pQuery;
} SSmlExecHandle;
static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
static int32_t smlBoundColumnData(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
col_id_t nCols = pColList->numOfCols;
pColList->numOfBound = 0;
@ -1621,7 +1621,7 @@ static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSche
return TSDB_CODE_SUCCESS;
}
static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SKVRow *row, SMsgBuf *msg) {
static int32_t smlBuildTagRow(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SKVRow *row, SMsgBuf *msg) {
if (tdInitKVRowBuilder(tagsBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
@ -1643,20 +1643,20 @@ static int32_t smlBoundTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDat
return TSDB_CODE_SUCCESS;
}
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format,
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format,
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle;
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema);
int ret = smlBoundColumnData(tags, &smlHandle->tags, pTagsSchema);
if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "bound tags error");
return ret;
}
SKVRow row = NULL;
ret = smlBoundTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf);
ret = smlBuildTagRow(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
@ -1674,21 +1674,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
SSchema* pSchema = getTableColumnSchema(pTableMeta);
if(format){
ret = smlBoundColumns(taosArrayGetP(colsFormat, 0), &pDataBlock->boundColumnInfo, pSchema);
}else{
SArray *columns = taosArrayInit(16, POINTER_BYTES);
void **p1 = taosHashIterate(colsHash, NULL);
while (p1) {
SSmlKv* kv = *p1;
taosArrayPush(columns, &kv);
p1 = taosHashIterate(colsHash, p1);
}
ret = smlBoundColumns(columns, &pDataBlock->boundColumnInfo, pSchema);
taosArrayDestroy(columns);
}
ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "bound cols error");
return ret;
@ -1713,14 +1699,16 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
void *rowData = NULL;
size_t rowDataSize = 0;
if(format){
rowData = taosArrayGetP(colsFormat, r);
rowDataSize = taosArrayGetSize(rowData);
}else{
rowData = taosArrayGetP(cols, r);
}
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) {
for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
param.schema = pColSchema;
@ -1728,23 +1716,27 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *co
SSmlKv *kv = NULL;
if(format){
kv = taosArrayGetP(rowData, c);
if (!kv){
char msg[64] = {0};
sprintf(msg, "cols num not the same like before:%d", r);
return buildInvalidOperationMsg(&pBuf, msg);
if(j < rowDataSize){
kv = taosArrayGetP(rowData, j);
if (rowDataSize != spd->numOfBound && (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)){
kv = NULL;
}else{
j++;
}
}
}else{
void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
kv = *p;
if(p) kv = *p;
}
if (kv->length == 0) {
if (!kv || kv->length == 0) {
MemRowAppend(&pBuf, NULL, 0, &param);
} else {
int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = kv->length;
} else if(pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP){
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
}
MemRowAppend(&pBuf, &(kv->value), colLen, &param);

View File

@ -3127,6 +3127,13 @@ static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionS
return code;
}
static int32_t translateDropFunction(STranslateContext* pCxt, SDropFunctionStmt* pStmt) {
SDropFuncReq req = {0};
strcpy(req.name, pStmt->funcName);
req.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_DROP_FUNC, (FSerializeFunc)tSerializeSDropFuncReq, &req);
}
static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
SAlterUserReq req = {0};
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
@ -3266,6 +3273,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_CREATE_FUNCTION_STMT:
code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode);
break;
case QUERY_NODE_DROP_FUNCTION_STMT:
code = translateDropFunction(pCxt, (SDropFunctionStmt*)pNode);
break;
case QUERY_NODE_GRANT_STMT:
code = translateGrant(pCxt, (SGrantStmt*)pNode);
break;
@ -4121,6 +4131,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
code = rewriteShow(pCxt, pQuery);
break;
case QUERY_NODE_CREATE_TABLE_STMT:

View File

@ -100,6 +100,14 @@ void generateInformationSchema(MockCatalogService* mcs) {
}
}
void generatePerformanceSchema(MockCatalogService* mcs) {
{
ITableBuilder& builder = mcs->createTableBuilder("performance_schema", "trans", TSDB_SYSTEM_TABLE, 1)
.addColumn("id", TSDB_DATA_TYPE_INT);
builder.done();
}
}
/*
* Table:t1
* Field | Type | DataType | Bytes |
@ -244,6 +252,7 @@ void initMetaDataEnv() {
void generateMetaData() {
generateInformationSchema(mockCatalogService.get());
generatePerformanceSchema(mockCatalogService.get());
generateTestT1(mockCatalogService.get());
generateTestST1(mockCatalogService.get());
mockCatalogService->showTables();

View File

@ -432,7 +432,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * (numOfRows - numOfNulls);
} else {
inputLen += pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE;
inputLen += (pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE) * factor;
}
}
@ -510,7 +510,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
} else if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * (numOfRows - numOfNulls) * factor;
} else {
inputLen += pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE;
inputLen += (pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE) * factor;
}
}
@ -709,10 +709,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
if (IS_VAR_DATA_TYPE(outputType)) {
outputLen += VARSTR_HEADER_SIZE;
}
char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1);
char *output = outputBuf;

View File

@ -150,14 +150,14 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
pRes = (SArray*)input;
}
if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0;
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
/*blockDebugShowData(pRes);*/
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema);
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
}
if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;

View File

@ -59,6 +59,10 @@
# ---- table
./test.sh -f tsim/table/basic1.sim
# ---- tstream
./test.sh -f tsim/tstream/basic0.sim
# ---- tmq
./test.sh -f tsim/tmq/basic1.sim
./test.sh -f tsim/tmq/basic2.sim

View File

@ -64,6 +64,47 @@ if $data00 != 1.414213562 then
return -1
endi
#sql drop function udf1;
#sql drop function udf2;
sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
sql select udf1(f1, f2) from t2;
print $rows , $data00 , $data10 , $data20 , $data30
if $rows != 4 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
sql select udf2(f1, f2) from t2;
print $rows, $data00
if $rows != 1 then
return -1
endi
if $data00 != 2.645751311 then
return -1
endi
sql drop function udf1;
sql show functions;
if $rows != 1 then
return -1
endi
if $data00 != @udf2@ then
return -1
endi
sql drop function udf2;
sql show functions;
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGTERM

View File

@ -0,0 +1,139 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d0 vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use d0
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, k int) tags (a int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags(1000)
sql create table ct2 using stb tags(2000)
sql create table ct3 using stb tags(3000)
sql show tables
if $rows != 3 then
return -1
endi
sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
sql show stables
if $rows != 2 then
return -1
endi
print =============== insert data
sql insert into ct1 values('2022-05-08 03:42:00.000', 234)
sleep 100
#===================================================================
print =============== query data from child table
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data01 != 234 then
return -1
endi
if $data02 != 234 then
return -1
endi
if $data03 != 234 then
return -1
endi
#===================================================================
print =============== insert data
sql insert into ct1 values('2022-05-08 03:43:00.000', -111)
sleep 100
#===================================================================
print =============== query data from child table
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data01 != -111 then
return -1
endi
if $data02 != 234 then
return -1
endi
if $data03 != 123 then
return -1
endi
#===================================================================
print =============== insert data
sql insert into ct1 values('2022-05-08 03:53:00.000', 789)
sleep 100
#===================================================================
print =============== query data from child table
sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
if $rows != 2 then
return -1
endi
if $data01 != -111 then
return -1
endi
if $data02 != 234 then
return -1
endi
if $data03 != 123 then
return -1
endi
if $data11 != 789 then
return -1
endi
if $data12 != 789 then
return -1
endi
if $data13 != 789 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

File diff suppressed because it is too large Load Diff