Merge branch '3.0' into fix/TD-29750-3.0

This commit is contained in:
kailixu 2024-04-24 08:44:43 +08:00
commit c173be667c
28 changed files with 741 additions and 477 deletions

View File

@ -832,7 +832,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer)
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);

View File

@ -54,6 +54,12 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void
*/
void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn);
/**
* Non-recursive quick sort.
*
*/
void taosqsort_r(void *src, int64_t nelem, int64_t size, const void *arg, __ext_compar_fn_t cmp);
/**
* merge sort, with the compare function requiring additional parameters support
*

View File

@ -236,35 +236,25 @@ typedef struct {
__data_compress_init initFn;
__data_compress_l1_fn_t comprFn;
__data_decompress_l1_fn_t decomprFn;
} TCompressL1FnSet;
} TCmprL1FnSet;
typedef struct {
char *name;
__data_compress_init initFn;
__data_compress_l2_fn_t comprFn;
__data_decompress_l2_fn_t decomprFn;
} TCompressL2FnSet;
} TCmprL2FnSet;
typedef struct {
int8_t type;
int8_t level;
__data_compress_init initFn;
__data_compress_l1_fn_t l1CmprFn;
__data_decompress_l1_fn_t l1DecmprFn;
__data_compress_l2_fn_t l2CmprFn;
__data_decompress_l2_fn_t l2DecmprFn;
} TCompressPara;
typedef enum L1Compress {
typedef enum {
L1_UNKNOWN = 0,
L1_SIMPLE_8B,
L1_XOR,
L1_RLE,
L1_DELTAD,
L1_DISABLED = 0xFF,
} EL1CompressFuncType;
} TCmprL1Type;
typedef enum L2Compress {
typedef enum {
L2_UNKNOWN = 0,
L2_LZ4,
L2_ZLIB,
@ -272,7 +262,20 @@ typedef enum L2Compress {
L2_TSZ,
L2_XZ,
L2_DISABLED = 0xFF,
} EL2ComressFuncType;
} TCmprL2Type;
typedef enum {
L2_LVL_NOCHANGE = 0,
L2_LVL_LOW,
L2_LVL_MEDIUM,
L2_LVL_HIGH,
L2_LVL_DISABLED = 0xFF,
} TCmprLvlType;
typedef struct {
char *name;
uint8_t lvl[3]; // l[0] = 'low', l[1] = 'mid', l[2] = 'high'
} TCmprLvlSet;
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level);

View File

@ -538,7 +538,6 @@ void stopAllQueries(SRequestObj *pRequest) {
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
@ -547,6 +546,7 @@ void stopAllQueries(SRequestObj *pRequest) {
for (int32_t i = reqIdx; i >= 0; i--) {
taosStopQueryImpl(pReqList[i]);
releaseRequest(pReqList[i]->self);
}
taosStopQueryImpl(pRequest);

View File

@ -16,6 +16,8 @@
#include "cJSON.h"
#include "clientInt.h"
#include "parser.h"
#include "tcol.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
@ -27,7 +29,7 @@
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) {
int8_t t, SColCmprWrapper* pColCmprRow) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
@ -67,6 +69,23 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
cJSON_AddItemToObject(column, "isPrimarykey", isPk);
cJSON_AddItemToArray(columns, column);
if (pColCmprRow == NULL || pColCmprRow->nCols <= i) {
continue;
}
SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(pColCmpr->alg));
const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(pColCmpr->alg));
const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pColCmpr->alg));
cJSON* encodeJson = cJSON_CreateString(encode);
cJSON_AddItemToObject(column, "encode", encodeJson);
cJSON* compressJson = cJSON_CreateString(compress);
cJSON_AddItemToObject(column, "compress", compressJson);
cJSON* levelJson = cJSON_CreateString(level);
cJSON_AddItemToObject(column, "level", levelJson);
}
cJSON_AddItemToObject(json, "columns", columns);
@ -205,7 +224,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto _err;
}
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr);
_err:
uDebug("create stable return, sql json:%s", string);
tDecoderClear(&coder);
@ -373,8 +392,8 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
if (pCreateReq->type == TSDB_CHILD_TABLE) {
string = buildCreateCTableJson(req.pReqs, req.nReqs);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
string =
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid,
TSDB_NORMAL_TABLE, &pCreateReq->colCmpr);
}
}
@ -714,8 +733,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
// build create stable
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
SSchema* pSchema = req.schemaRow.pSchema + i;
SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
SSchema* pSchema = req.schemaRow.pSchema + i;
SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
// todo get active compress param
setDefaultOptionsForField(&field);
@ -1346,10 +1365,10 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
}
pVgData->vg = pInfo;
int tlen = 0;
int tlen = 0;
req.source = TD_REQ_FROM_TAOX;
tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
if(code != 0){
if (code != 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
@ -1365,7 +1384,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
SEncoder coder = {0};
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
code = tEncodeSVAlterTbReq(&coder, &req);
if(code != 0){
if (code != 0) {
tEncoderClear(&coder);
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
@ -1631,7 +1650,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
rspObj.common.resType = RES_TYPE__TMQ;
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION){
if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
@ -1777,7 +1796,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
rspObj.common.resType = RES_TYPE__TMQ_METADATA;
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION){
if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
@ -1913,7 +1932,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
void* rawData = getRawDataFromRes(pRetrieve);
char err[ERR_MSG_LEN] = {0};
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
@ -1982,9 +2001,9 @@ char* tmq_get_json_meta(TAOS_RES* res) {
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
static int32_t getOffSetLen(const void *rsp){
const SMqDataRspCommon *pRsp = rsp;
SEncoder coder = {0};
static int32_t getOffSetLen(const void* rsp) {
const SMqDataRspCommon* pRsp = rsp;
SEncoder coder = {0};
tEncoderInit(&coder, NULL, 0);
if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
@ -1993,13 +2012,13 @@ static int32_t getOffSetLen(const void *rsp){
return pos;
}
typedef int32_t __encode_func__(SEncoder *pEncoder, const void *pRsp);
typedef int32_t __encode_func__(SEncoder* pEncoder, const void* pRsp);
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw){
int32_t len = 0;
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder = {0};
void* buf = NULL;
void* buf = NULL;
tEncodeSize(encodeFunc, rspObj, len, code);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
@ -2007,17 +2026,17 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra
}
len += sizeof(int8_t) + sizeof(int32_t);
buf = taosMemoryCalloc(1, len);
if(buf == NULL){
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAILED;
}
tEncoderInit(&encoder, buf, len);
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
int32_t offsetLen = getOffSetLen(rspObj);
if(offsetLen <= 0){
if (offsetLen <= 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
@ -2025,7 +2044,7 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
if(encodeFunc(&encoder, rspObj) < 0){
if (encodeFunc(&encoder, rspObj) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
@ -2053,7 +2072,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
uDebug("tmq get raw type meta:%p", raw);
} else if (TD_RES_TMQ(res)) {
SMqRspObj* rspObj = ((SMqRspObj*)res);
if(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0){
if (encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0) {
uError("tmq get raw type error:%d", terrno);
return terrno;
}
@ -2062,7 +2081,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
if(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0){
if (encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0) {
uError("tmq get raw type error:%d", terrno);
return terrno;
}

View File

@ -190,30 +190,29 @@ static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t cur
}
}
size_t start = 1;
int32_t t = 0;
int32_t count = log(numOfRows) / log(2);
uint32_t startOffset = (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
size_t start = 1;
int32_t t = 0;
int32_t count = log(numOfRows) / log(2);
uint32_t startOffset =
(IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
// the first item
memcpy(pColumnInfoData->pData + startOffset, pData, itemLen);
while (t < count) {
int32_t xlen = 1 << t;
memcpy(pColumnInfoData->pData + start * itemLen + startOffset,
pColumnInfoData->pData + startOffset,
memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
xlen * itemLen);
t += 1;
start += xlen;
}
// the tail part
if (numOfRows > start) {
memcpy(pColumnInfoData->pData + start * itemLen + startOffset,
pColumnInfoData->pData + startOffset,
memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
(numOfRows - start) * itemLen);
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
for (int32_t i = 0; i < numOfRows; ++i) {
pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
@ -233,7 +232,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
len = getJsonValueLen(pData);
} else {
len = varDataTLen(pData);
}
}
if (pColumnInfoData->varmeta.allocLen < (numOfRows * len + pColumnInfoData->varmeta.length)) {
int32_t code = colDataReserve(pColumnInfoData, (numOfRows * len + pColumnInfoData->varmeta.length));
if (code != TSDB_CODE_SUCCESS) {
@ -247,7 +246,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
pColumnInfoData->hasNull = true;
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
} else {
@ -268,7 +267,7 @@ void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
int32_t bytes = (numOfRows - i) / 8;
memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes);
i += bytes * 8;
for (; i < numOfRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
}
@ -491,7 +490,8 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
return 0;
}
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) {
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx,
int32_t numOfRows) {
if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
return TSDB_CODE_FAILED;
}
@ -502,7 +502,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
int32_t allLen = 0;
void* srcAddr = NULL;
void* srcAddr = NULL;
if (pSrc->hasNull) {
for (int32_t i = 0; i < numOfRows; ++i) {
if (colDataIsNull_var(pSrc, srcIdx + i)) {
@ -526,7 +526,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
char* pData = colDataGetVarData(pSrc, srcIdx + i);
char* pData = colDataGetVarData(pSrc, srcIdx + i);
int32_t dataLen = 0;
if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
@ -545,7 +545,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pDst->pData = tmp;
pDst->varmeta.allocLen = pDst->varmeta.length + allLen;
}
@ -585,17 +585,17 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
}
}
}
}
}
if (pSrc->pData != NULL) {
memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx, pDst->info.bytes * numOfRows);
memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx,
pDst->info.bytes * numOfRows);
}
}
return 0;
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
@ -636,7 +636,7 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
return -1;
}
SDataBlockInfo* pInfo = &pDataBlock->info;
SDataBlockInfo* pInfo = &pDataBlock->info;
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
return 0;
@ -732,7 +732,7 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
int32_t bytes = (pBlock->info.rows - i) / 8;
memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes);
i += bytes * 8;
for (; i < pBlock->info.rows; ++i) {
colDataClearNull_f(pCol->nullbitmap, i);
}
@ -742,7 +742,6 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
pBlock->info.rows -= numOfRows;
}
size_t blockDataGetSize(const SSDataBlock* pBlock) {
size_t total = 0;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -827,19 +826,16 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
return NULL;
}
blockDataEnsureCapacity(pDst, rowCount);
/* may have disorder varchar data, TODO
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
/* may have disorder varchar data, TODO
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
}
*/
colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
}
*/
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
@ -1322,7 +1318,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
}
terrno = 0;
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
if (terrno) return terrno;
int64_t p1 = taosGetTimestampUs();
@ -1400,14 +1396,13 @@ void blockDataReset(SSDataBlock* pDataBlock) {
pInfo->id.groupId = 0;
}
/*
* NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
* the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
* any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
*/
int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
bool clearPayload) {
bool clearPayload) {
if (numOfRows <= 0 || pBlockInfo && numOfRows <= pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS;
}
@ -2402,12 +2397,12 @@ _end:
return TSDB_CODE_SUCCESS;
}
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId) {
char tmp[TSDB_TABLE_NAME_LEN] = {0};
if (stbName == NULL){
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
}else{
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId);
if (stbName == NULL) {
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%" PRIu64, groupId);
} else {
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName, groupId);
}
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end
strcat(ctbName, tmp);

View File

@ -1526,6 +1526,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
int32_t code = -1;
STransAction createStreamRedoAction = {0};
STransAction createStreamUndoAction = {0};
STransAction dropStbUndoAction = {0};
SMDropStbReq dropStbReq = {0};
STrans *pTrans =
mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
if (!pTrans) {
@ -1556,7 +1558,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
createStreamUndoAction.epSet = createStreamRedoAction.epSet;
createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
createStreamUndoAction.actionType = TDMT_STREAM_DROP;
createStreamUndoAction.msgType = TDMT_STREAM_DROP;
createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
if (!createStreamUndoAction.pCont) {
@ -1569,6 +1571,24 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
goto _OVER;
}
dropStbReq.igNotExists = true;
strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
dropStbUndoAction.epSet = createStreamRedoAction.epSet;
dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
if (!dropStbUndoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
SDbObj newDb = {0};
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
newDb.tsmaVersion++;
@ -1579,6 +1599,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndTransAppendRedoAction(pTrans, &createStreamRedoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &createStreamUndoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &dropStbUndoAction) != 0) goto _OVER;
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
code = TSDB_CODE_SUCCESS;

View File

@ -1109,7 +1109,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
goto _OVER;
}
int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
int32_t actionNum = taosArrayGetSize(pArray);
if (action < 0 || action >= actionNum) {
mError("trans:%d, invalid action:%d", transId, action);
goto _OVER;

View File

@ -989,7 +989,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
streamTaskPause(pMeta, pTask);
streamTaskPause(pTask);
SStreamTask* pHistoryTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -1006,7 +1006,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pMeta, pHistoryTask);
streamTaskPause(pHistoryTask);
streamMetaReleaseTask(pMeta, pHistoryTask);
}

View File

@ -1637,8 +1637,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
reallocVarDataVal(&lastCol.rowKey.pks[j]);
}
reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol);
@ -1667,8 +1667,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
reallocVarDataVal(&lastCol.rowKey.pks[j]);
}
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);

View File

@ -1610,11 +1610,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
doUnpinSttBlock(pSttBlockReader);
if (hasVal) {
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
if (IS_VAR_DATA_TYPE(pSttKey->pks[0].type)) {
tsdbInfo("current pk:%s, next pk:%s", pSttKey->pks[0].pData, pNext->pks[0].pData);
}
if (pkCompEx(pSttKey, pNext) != 0) {
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
*copied = (code == TSDB_CODE_SUCCESS);

View File

@ -944,6 +944,7 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
void destroyFlusedPos(void* pRes);
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -217,6 +217,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList;
SStoreCacheReader* pReaderFn = &pInfo->readHandle.api.cacheFn;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
@ -237,8 +238,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pBufferedRes);
taosArrayClear(pInfo->pUidList);
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes,
pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
int32_t code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
@ -307,10 +308,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
if (NULL == pInfo->pLastrowReader) {
code = pInfo->readHandle.api.cacheFn.openReader(
pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList),
pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList,
&pInfo->pkCol, pInfo->numOfPks);
code = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid,
&pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol,
pInfo->numOfPks);
if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
@ -318,13 +319,13 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
continue;
}
} else {
pInfo->readHandle.api.cacheFn.reuseReader(pInfo->pLastrowReader, pList, num);
pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
}
taosArrayClear(pInfo->pUidList);
code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
pInfo->pUidList);
code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
@ -357,7 +358,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
}
pInfo->pLastrowReader = pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
pInfo->pLastrowReader = pReaderFn->closeReader(pInfo->pLastrowReader);
setOperatorCompleted(pOperator);
return NULL;
}

View File

@ -287,7 +287,19 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
SMetaReader* mr = (SMetaReader*)pContext;
bool isTagCol = false, isTbname = false;
if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)*pNode;
if (pCol->colType == COLUMN_TYPE_TBNAME)
isTbname = true;
else
isTagCol = true;
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (pFunc->funcType == FUNCTION_TYPE_TBNAME)
isTbname = true;
}
if (isTagCol) {
SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
@ -316,24 +328,21 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
}
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFuncNode = *(SFunctionNode**)pNode;
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = pFuncNode->node.resType;
int32_t len = strlen(mr->me.name);
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), mr->me.name, len);
varDataSetLen(res->datum.p, len);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
} else if (isTbname) {
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = ((SExprNode*)(*pNode))->resType;
int32_t len = strlen(mr->me.name);
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), mr->me.name, len);
varDataSetLen(res->datum.p, len);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
}
return DEAL_RES_CONTINUE;

View File

@ -50,12 +50,13 @@ void destroyStreamCountAggOperatorInfo(void* param) {
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);

View File

@ -46,6 +46,9 @@ void destroyStreamEventOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -60,7 +63,6 @@ void destroyStreamEventOperatorInfo(void* param) {
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pAllUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);

View File

@ -262,7 +262,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
if (chIds) {
int32_t childId = getChildIndex(pBlock);
if (pInvalidWins) {
qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
}
@ -388,15 +388,20 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
}
}
void destroyFlusedPos(void* pRes) {
SRowBuffPos* pPos = (SRowBuffPos*) pRes;
if (!pPos->needFree && !pPos->pRowBuff) {
taosMemoryFreeClear(pPos->pKey);
taosMemoryFree(pPos);
}
}
void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
if (pGroupResInfo->freeItem) {
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
for (int32_t i = pGroupResInfo->index; i < size; i++) {
SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
if (!pPos->needFree && !pPos->pRowBuff) {
taosMemoryFreeClear(pPos->pKey);
taosMemoryFree(pPos);
}
void* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
destroyFlusedPos(pPos);
}
pGroupResInfo->freeItem = false;
}
@ -409,6 +414,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
// it should be empty.
void* pIte = NULL;
@ -437,7 +444,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
cleanupExprSupp(&pInfo->scalarSupp);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pDeletedMap);
blockDataDestroy(pInfo->pCheckpointRes);
@ -654,11 +660,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
.calWin.skey = nextWin.skey,
.calWin.ekey = nextWin.skey};
// add pull data request
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
if (IS_MID_INTERVAL_OP(pOperator)) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
taosArrayPush(pInfo->pMidPullDatas, &winRes);
} else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
taosArrayPush(pInfo->pDelWins, &winRes);
addPullWindow(pMap, &winRes, numOfCh);
if (pInfo->destHasPrimaryKey) {
tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0);
@ -1328,7 +1335,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL;
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap);
if (IS_FINAL_INTERVAL_OP(pOperator)) {
int32_t chId = getChildIndex(pBlock);
addRetriveWindow(delWins, pInfo, chId);
@ -1662,6 +1670,8 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -1677,7 +1687,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pWinBlock);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);
@ -3248,6 +3257,9 @@ void destroyStreamStateOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -3261,7 +3273,6 @@ void destroyStreamStateOperatorInfo(void* param) {
blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);

View File

@ -2517,7 +2517,7 @@ static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR};
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}

View File

@ -6332,6 +6332,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
if (code == TSDB_CODE_SUCCESS) {
code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true);
}
pTsmaOptCtx->pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
if (pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo && pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo->size > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pTsmaOptCtx->pScan->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, i);

View File

@ -195,7 +195,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
" window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64,
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
@ -218,8 +218,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // for sink task, set it ready directly.
@ -395,7 +396,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
}
if (pRsp->status == TASK_DOWNSTREAM_READY) {
streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
if (left == 0) {
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
@ -405,7 +409,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
} else { // not ready, wait for 100ms and retry
streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS; // return success in any cases.
}
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64

View File

@ -912,7 +912,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
return TSDB_CODE_SUCCESS;
}
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
void streamTaskPause(SStreamTask* pTask) {
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
}
@ -983,19 +983,27 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf
return TSDB_CODE_SUCCESS;
}
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p->taskId == taskId) {
return p;
}
}
return NULL;
}
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
taosThreadMutexLock(&pInfo->checkInfoLock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->taskId == taskId) {
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pInfo->pList, &info);
@ -1008,28 +1016,34 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
int32_t* pNotReady, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->taskId == taskId) {
ASSERT(reqId == p->reqId);
// count down one, since it is ready now
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
} else {
*pNotReady = pInfo->notReadyTasks;
}
p->status = status;
p->rspTs = rspTs;
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
if (reqId != p->reqId) {
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
" expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_FAILED;
}
// subtract one not-ready-task, since it is ready now
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
} else {
*pNotReady = pInfo->notReadyTasks;
}
p->status = status;
p->rspTs = rspTs;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stError("s-task:%s unexpected check rsp msg, downstream task:0x%x, reqId:%"PRIx64, id, taskId, reqId);
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId,
reqId);
return TSDB_CODE_FAILED;
}
@ -1077,7 +1091,7 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
req.reqId = p->reqId;
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64,
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
@ -1093,8 +1107,10 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
p->reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break;
}
@ -1104,6 +1120,31 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
}
}
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->status == TASK_DOWNSTREAM_READY) {
(*numOfReady) += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
p->taskId);
(*numOfFault) += 1;
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
ASSERT(p->status == -1);
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
}
} else {
taosArrayPush(pNotReadyList, &p->taskId);
}
}
}
}
static void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
@ -1119,7 +1160,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0;
stDebug("s-task:%s start to do check downstream rsp check", id);
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
@ -1130,17 +1171,20 @@ static void rspMonitorFn(void* param, void* tmrId) {
taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
return;
}
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return;
}
@ -1159,27 +1203,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->status == TASK_DOWNSTREAM_READY) {
numOfReady += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
p->taskId);
numOfFault += 1;
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
ASSERT(p->status == -1);
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION
numOfNotRsp += 1; // do nothing and continue waiting for their rsp
}
} else {
taosArrayPush(pNotReadyList, &p->taskId);
}
}
}
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
}
@ -1189,18 +1213,18 @@ static void rspMonitorFn(void* param, void* tmrId) {
// fault tasks detected, not try anymore
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
if ((numOfNotRsp == 0) && (numOfFault > 0)) {
if (numOfFault > 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
streamTaskCompleteCheckRsp(pInfo, id);
return;
}
@ -1221,6 +1245,9 @@ static void rspMonitorFn(void* param, void* tmrId) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
return;
}
@ -1231,13 +1258,11 @@ static void rspMonitorFn(void* param, void* tmrId) {
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p->taskId == taskId) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
}
@ -1251,13 +1276,10 @@ static void rspMonitorFn(void* param, void* tmrId) {
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p->taskId == taskId) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
break;
}
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
}
}

View File

@ -153,6 +153,130 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __
taosMemoryFreeClear(buf);
}
#define DOSWAP(a, b, size) \
do { \
size_t __size = (size); \
char *__a = (a), *__b = (b); \
do { \
char __tmp = *__a; \
*__a++ = *__b; \
*__b++ = __tmp; \
} while (--__size > 0); \
} while (0)
typedef struct {
char *lo;
char *hi;
} stack_node;
#define STACK_SIZE (CHAR_BIT * sizeof(size_t))
#define PUSH(low, high) ((void)((top->lo = (low)), (top->hi = (high)), ++top))
#define POP(low, high) ((void)(--top, (low = top->lo), (high = top->hi)))
#define STACK_NOT_EMPTY (stack < top)
void taosqsort_r(void *src, int64_t nelem, int64_t size, const void *arg, __ext_compar_fn_t cmp) {
const int32_t MAX_THRESH = 6;
char *base_ptr = (char *)src;
const size_t max_thresh = MAX_THRESH * size;
if (nelem == 0) return;
if (nelem > MAX_THRESH) {
char *lo = base_ptr;
char *hi = &lo[size * (nelem - 1)];
stack_node stack[STACK_SIZE];
stack_node *top = stack;
PUSH(NULL, NULL);
while (STACK_NOT_EMPTY) {
char *left_ptr;
char *right_ptr;
char *mid = lo + size * ((hi - lo) / size >> 1);
if ((*cmp)((void *)mid, (void *)lo, arg) < 0) DOSWAP(mid, lo, size);
if ((*cmp)((void *)hi, (void *)mid, arg) < 0)
DOSWAP(mid, hi, size);
else
goto jump_over;
if ((*cmp)((void *)mid, (void *)lo, arg) < 0) DOSWAP(mid, lo, size);
jump_over:;
left_ptr = lo + size;
right_ptr = hi - size;
do {
while ((*cmp)((void *)left_ptr, (void *)mid, arg) < 0) left_ptr += size;
while ((*cmp)((void *)mid, (void *)right_ptr, arg) < 0) right_ptr -= size;
if (left_ptr < right_ptr) {
DOSWAP(left_ptr, right_ptr, size);
if (mid == left_ptr)
mid = right_ptr;
else if (mid == right_ptr)
mid = left_ptr;
left_ptr += size;
right_ptr -= size;
} else if (left_ptr == right_ptr) {
left_ptr += size;
right_ptr -= size;
break;
}
} while (left_ptr <= right_ptr);
if ((size_t)(right_ptr - lo) <= max_thresh) {
if ((size_t)(hi - left_ptr) <= max_thresh)
POP(lo, hi);
else
lo = left_ptr;
} else if ((size_t)(hi - left_ptr) <= max_thresh)
hi = right_ptr;
else if ((right_ptr - lo) > (hi - left_ptr)) {
PUSH(lo, right_ptr);
lo = left_ptr;
} else {
PUSH(left_ptr, hi);
hi = right_ptr;
}
}
}
#define min(x, y) ((x) < (y) ? (x) : (y))
{
char *const end_ptr = &base_ptr[size * (nelem - 1)];
char *tmp_ptr = base_ptr;
char *thresh = min(end_ptr, base_ptr + max_thresh);
char *run_ptr;
for (run_ptr = tmp_ptr + size; run_ptr <= thresh; run_ptr += size)
if ((*cmp)((void *)run_ptr, (void *)tmp_ptr, arg) < 0) tmp_ptr = run_ptr;
if (tmp_ptr != base_ptr) DOSWAP(tmp_ptr, base_ptr, size);
run_ptr = base_ptr + size;
while ((run_ptr += size) <= end_ptr) {
tmp_ptr = run_ptr - size;
while ((*cmp)((void *)run_ptr, (void *)tmp_ptr, arg) < 0) tmp_ptr -= size;
tmp_ptr += size;
if (tmp_ptr != run_ptr) {
char *trav;
trav = run_ptr + size;
while (--trav >= run_ptr) {
char c = *trav;
char *hi, *lo;
for (hi = lo = trav; (lo -= size) >= tmp_ptr; hi = lo) *hi = *lo;
*hi = c;
}
}
}
}
}
void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size, __compar_fn_t compar, int32_t flags) {
uint8_t *p;
int32_t lidx;
@ -275,7 +399,7 @@ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar,
}
static void taosMerge(void *src, int32_t start, int32_t leftend, int32_t end, int64_t size, const void *param,
__ext_compar_fn_t comparFn, void *tmp) {
__ext_compar_fn_t comparFn, void *tmp) {
int32_t leftSize = leftend - start + 1;
int32_t rightSize = end - leftend;
@ -326,7 +450,7 @@ static int32_t taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, c
if (numOfElem > THRESHOLD_SIZE) {
int32_t currSize;
void *tmp = taosMemoryMalloc(numOfElem * size);
void *tmp = taosMemoryMalloc(numOfElem * size);
if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (currSize = THRESHOLD_SIZE; currSize <= numOfElem - 1; currSize = 2 * currSize) {
@ -351,7 +475,6 @@ int32_t msortHelper(const void *p1, const void *p2, const void *param) {
return comparFn(p1, p2);
}
int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) {
void *param = comparFn;
return taosMergeSortHelper(src, numOfElem, size, param, msortHelper);

View File

@ -179,6 +179,7 @@ int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, c
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
// do nothing
#else
int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
return 0;
@ -226,7 +227,7 @@ int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPre
int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) {
size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, ZSTD_CLEVEL_DEFAULT);
size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, lvl);
if (len > inputSize) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
@ -253,7 +254,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci
}
int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) {
size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, 0);
size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
if (len > inputSize) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
@ -274,14 +275,19 @@ int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSiz
}
#endif
TCompressL1FnSet compressL1Dict[] = {{"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2},
{"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2},
{"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2},
{"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2},
{"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}};
TCmprL1FnSet compressL1Dict[] = {{"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2},
{"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2},
{"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2},
{"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2},
{"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}};
TCmprLvlSet compressL2LevelDict[] = {
{"unknown", .lvl = {1, 2, 3}}, {"lz4", .lvl = {1, 2, 3}}, {"zlib", .lvl = {1, 6, 9}},
{"zstd", .lvl = {1, 11, 22}}, {"tsz", .lvl = {1, 2, 3}}, {"xz", .lvl = {1, 6, 9}},
};
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
TCompressL2FnSet compressL2Dict[] = {
TCmprL2FnSet compressL2Dict[] = {
{"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
{"zlib", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
@ -289,7 +295,7 @@ TCompressL2FnSet compressL2Dict[] = {
{"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
{"xz", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}};
#else
TCompressL2FnSet compressL2Dict[] = {
TCmprL2FnSet compressL2Dict[] = {
{"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
{"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
{"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib},
@ -299,6 +305,17 @@ TCompressL2FnSet compressL2Dict[] = {
#endif
int8_t tsGetCompressL2Level(uint8_t alg, uint8_t lvl) {
if (lvl == L2_LVL_LOW) {
return compressL2LevelDict[alg].lvl[0];
} else if (lvl == L2_LVL_MEDIUM) {
return compressL2LevelDict[alg].lvl[1];
} else if (lvl == L2_LVL_HIGH) {
return compressL2LevelDict[alg].lvl[2];
}
return 1;
}
static const int32_t TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
@ -2704,7 +2721,8 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
uTrace("encode:%s, compress:%s, level:%d, type:%s, l1:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, \
lvl, tDataTypes[type].name, l1); \
int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, lvl); \
int8_t alvl = tsGetCompressL2Level(l2, lvl); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, alvl); \
} else { \
uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \
tDataTypes[type].name); \
@ -2715,7 +2733,8 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
if (compress) { \
uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \
tDataTypes[type].name); \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, lvl); \
int8_t alvl = tsGetCompressL2Level(l2, lvl); \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, alvl); \
} else { \
uTrace("dencode:%s, dcompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \
tDataTypes[type].name); \
@ -2913,127 +2932,6 @@ int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
}
// int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);
// int32_t tsCompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
// void *pBuf, int32_t nBuf) {
// TCompressL1FnSet fn1;
// TCompressL2FnSet fn2;
// if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2)) return -1;
// int32_t len = 0;
// uint8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg);
// uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg);
// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg);
// if (l2 == L2_DISABLED) {
// len = fn1.comprFn(pIn, nEle, pOut, type);
// } else {
// len = fn1.comprFn(pIn, nEle, pBuf, type);
// len = fn2.comprFn(pBuf, len, pOut, nOut, type, lvl);
// }
// return len;
// }
// int32_t tsDecompressImpl(int8_t type, void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t
// cmprAlg,
// void *pBuf, int32_t nBuf) {
// TCompressL1FnSet fn1;
// TCompressL2FnSet fn2;
// if (tsFindCompressAlg(type, cmprAlg, &fn1, &fn2) != 0) return -1;
// uint8_t l1 = COMPRESS_L1_TYPE_U8(cmprAlg);
// uint8_t l2 = COMPRESS_L2_TYPE_U8(cmprAlg);
// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(cmprAlg);
// uint32_t len = 0;
// if (l2 == L2_DISABLED) {
// len = fn1.decomprFn(pIn, nEle, pOut, type);
// } else {
// len = fn2.decomprFn(pIn, nIn, pBuf, nBuf, type);
// if (len < 0) return -1;
// len = fn1.decomprFn(pBuf, nEle, pOut, type);
// }
// return len;
// }
// int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn) {
// uint8_t l1 = COMPRESS_L1_TYPE_U8(compress);
// uint8_t l2 = COMPRESS_L2_TYPE_U8(compress);
// uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress);
// static int32_t l1Sz = sizeof(compressL1Dict) / sizeof(compressL1Dict[0]);
// if (l1 >= l1Sz) return -1;
// static int32_t l2Sz = sizeof(compressL2Dict) / sizeof(compressL2Dict[0]);
// if (l2 >= l2Sz) return -1;
// *l1Fn = compressL1Dict[l1];
// *l2Fn = compressL2Dict[l2];
// return 0;
// }
// typedef struct {
// int8_t dtype;
// SArray *l1Set;
// SArray *l2Set;
// } TCompressCompatible;
// SHashObj *algSet = NULL;
// int32_t tsCompressSetInit() {
// algSet = taosHashInit(24, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
// for (int i = TSDB_DATA_TYPE_NULL; i < TSDB_DATA_TYPE_MAX; i++) {
// TCompressCompatible p;
// p.dtype = i;
// p.l1Set = taosArrayInit(4, sizeof(int8_t));
// p.l2Set = taosArrayInit(4, sizeof(int8_t));
// for (int8_t j = L1_DISABLED; j < L1_MAX; j++) {
// taosArrayPush(p.l1Set, &j);
// }
// for (int8_t j = L2_DISABLED; j < L2_MAX; j++) {
// taosArrayPush(p.l2Set, &j);
// }
// taosHashPut(algSet, &i, sizeof(i), &p, sizeof(TCompressCompatible));
// }
// return 0;
// }
// int32_t tsCompressSetDestroy() {
// void *p = taosHashIterate(algSet, NULL);
// while (p) {
// TCompressCompatible *v = p;
// taosArrayDestroy(v->l1Set);
// taosArrayDestroy(v->l2Set);
// taosHashIterate(algSet, p);
// }
// return 0;
// }
// int32_t tsValidCompressAlgByDataTypes(int8_t type, int8_t compress) {
// // compress alg
// int8_t l1 = COMPRESS_L1_TYPE_U8(compress);
// int8_t l2 = COMPRESS_L2_TYPE_U8(compress);
// int8_t lvl = COMPRESS_L2_TYPE_LEVEL_U8(compress);
// TCompressCompatible *p = taosHashGet(algSet, &type, sizeof(type));
// if (p == NULL) return -1;
// if (p->dtype != type) return -1;
// if (taosArraySearch(p->l1Set, &l1, compareInt8Val, 0) == NULL) {
// return -1;
// }
// if (taosArraySearch(p->l2Set, &l2, compareInt8Val, 0) == NULL) {
// return -1;
// }
// return 0;
// }
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) {
DEFINE_VAR(cmprAlg)
*l1Alg = l1;

View File

@ -1030,7 +1030,7 @@ endi
sql_error select a.ts from sta a left window join sta b on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 having(count(a.ts) > 1) order by b.col1;
sql_error select a.ts from sta a left window join sta b on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 having(count(a.ts) > 1) order by b.tbname;
sql select count(b.col1) from sta a left window join sta b on a.t1 = b.t1 window_offset(-1s, 1s) where (a.col1 < 3 or a.col1 > 4) and b.col1 is not null;
sql select count(b.col1) from sta a left window join sta b on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 order by a.t1;
sql select count(b.col1),a.t1,a.ts from sta a left window join sta b on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 order by 2,3;
if $rows != 5 then
return -1
endi

View File

@ -725,7 +725,7 @@ endi
sql_error select a.ts from sta b right window join sta a on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 having(count(a.ts) > 1) order by b.col1;
sql_error select a.ts from sta b right window join sta a on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 having(count(a.ts) > 1) order by b.tbname;
sql select count(b.col1) from sta b right window join sta a on a.t1 = b.t1 window_offset(-1s, 1s) where (a.col1 < 3 or a.col1 > 4) and b.col1 is not null;
sql select count(b.col1) from sta b right window join sta a on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 order by a.t1;
sql select count(b.col1),a.t1,a.ts from sta b right window join sta a on a.t1 = b.t1 window_offset(-1s, 1s) where a.col1 < 3 or a.col1 > 4 order by 2,3;
if $rows != 5 then
return -1
endi

View File

@ -88,7 +88,7 @@ class TDTestCase:
tdSql.checkData(1, 1, 8)
tdSql.checkData(1, 2, 8)
tdSql.query('select ts,diff(f) from d1.st partition by t order by t;')
tdSql.query('select ts,diff(f),t from d1.st partition by t order by 3,1;')
tdSql.checkRows(4)
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
tdSql.checkData(0, 1, 0)
@ -104,7 +104,7 @@ class TDTestCase:
tdSql.checkData(0, 0, 4.0)
tdSql.checkData(1, 0, 5.0)
tdSql.query('select ts,derivative(f, 1s, 0) from d1.st partition by t order by t;')
tdSql.query('select ts,derivative(f, 1s, 0),t from d1.st partition by t order by 3,1;')
tdSql.checkRows(4)
tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1))
tdSql.checkData(0, 1, 0.0)

View File

@ -1209,7 +1209,7 @@ class TDTestCase:
self.test_ddl()
self.test_query_with_tsma()
# bug to fix
# self.test_flush_query()
self.test_flush_query()
#cluster test
cluster_dnode_list = tdSql.get_cluseter_dnodes()
@ -1231,14 +1231,22 @@ class TDTestCase:
# self.test_drop_ctable()
self.test_drop_db()
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float):
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float, is_expect_row = None):
timeout = timeout_in_seconds
tdSql.query(sql)
while timeout > 0 and tdSql.getRows() != expected_row_num:
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}')
rows: int = 0
for row in tdSql.queryResult:
if is_expect_row is None or is_expect_row(row):
rows = rows + 1
while timeout > 0 and rows != expected_row_num:
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {str(tdSql.queryResult)} useful rows: {rows}, remain: {timeout_in_seconds - timeout}')
time.sleep(1)
timeout = timeout - 1
tdSql.query(sql)
rows = 0
for row in tdSql.queryResult:
if is_expect_row is None or is_expect_row(row):
rows = rows + 1
if timeout <= 0:
tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s')
else:
@ -1255,7 +1263,7 @@ class TDTestCase:
tdSql.error('drop tsma test.tsma1', -2147482491)
tdSql.execute('drop tsma test.tsma2', queryTimes=1)
tdSql.execute('drop tsma test.tsma1', queryTimes=1)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1296,7 +1304,7 @@ class TDTestCase:
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database nsdb')
# drop norm table
@ -1323,7 +1331,7 @@ class TDTestCase:
# test drop stream
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1424,7 +1432,7 @@ class TDTestCase:
tdSql.error(
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database nsdb')
def test_create_tsma_on_norm_table(self):
@ -1569,7 +1577,7 @@ class TDTestCase:
tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406)
def test_flush_query(self):
tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
tdSql.execute('insert into test.norm_tb (ts,c1,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1),avg(c2) from test.norm_tb interval(10m);select avg(c1),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
tdSql.execute('flush database test', queryTimes=1)
tdSql.query('select count(*) from test.meters', queryTimes=1)
tdSql.checkData(0,0,100000)

View File

@ -19,8 +19,8 @@
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
#include "tmsg.h"
#include "types.h"
static int running = 1;
TdFilePtr g_fp = NULL;
@ -412,7 +412,8 @@ int buildStable(TAOS* pConn, TAOS_RES* pRes) {
#ifdef WINDOWS
pRes = taos_query(pConn,
"CREATE STABLE `meters_summary` (`_wstart` TIMESTAMP, `current` FLOAT, `groupid` INT, `location` VARCHAR(16)) TAGS (`group_id` BIGINT UNSIGNED)");
"CREATE STABLE `meters_summary` (`_wstart` TIMESTAMP, `current` FLOAT, `groupid` INT, `location` "
"VARCHAR(16)) TAGS (`group_id` BIGINT UNSIGNED)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters_summary, reason:%s\n", taos_errstr(pRes));
return -1;
@ -420,7 +421,8 @@ int buildStable(TAOS* pConn, TAOS_RES* pRes) {
taos_free_result(pRes);
pRes = taos_query(pConn,
" CREATE TABLE `t_d2a450ee819dcf7576f0282d9ac22dbc` USING `meters_summary` (`group_id`) TAGS (13135550082773579308)");
" CREATE TABLE `t_d2a450ee819dcf7576f0282d9ac22dbc` USING `meters_summary` (`group_id`) TAGS "
"(13135550082773579308)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters_summary, reason:%s\n", taos_errstr(pRes));
return -1;
@ -435,7 +437,8 @@ int buildStable(TAOS* pConn, TAOS_RES* pRes) {
taos_free_result(pRes);
#else
pRes = taos_query(pConn,
"create stream meters_summary_s trigger at_once IGNORE EXPIRED 0 into meters_summary as select _wstart, max(current) as current, "
"create stream meters_summary_s trigger at_once IGNORE EXPIRED 0 into meters_summary as select "
"_wstart, max(current) as current, "
"groupid, location from meters partition by groupid, location interval(10m)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters_summary, reason:%s\n", taos_errstr(pRes));
@ -539,7 +542,8 @@ int32_t create_topic() {
if (g_conf.subTable) {
char topic[128] = {0};
sprintf(topic, "create topic meters_summary_t1 %s as stable meters_summary", g_conf.meta == 0 ? "with meta" : "only meta");
sprintf(topic, "create topic meters_summary_t1 %s as stable meters_summary",
g_conf.meta == 0 ? "with meta" : "only meta");
pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) {
printf("failed to create topic meters_summary_t1, reason:%s\n", taos_errstr(pRes));
@ -653,8 +657,15 @@ void initLogFile() {
if (g_conf.subTable) {
char* result[] = {
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"meters_summary\",\"columns\":[{\"name\":\"_"
"wstart\",\"type\":9,\"isPrimarykey\":false},{\"name\":\"current\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"groupid\",\"type\":4,\"isPrimarykey\":false},{\"name\":"
"\"location\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],\"tags\":[{\"name\":\"group_id\",\"type\":14}]}",
"wstart\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"current\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{"
"\"name\":\"groupid\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{\"name\":"
"\"location\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\","
"\"level\":\"medium\"}],\"tags\":[{\"name\":\"group_id\","
"\"type\":14}"
"]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"t_d2a450ee819dcf7576f0282d9ac22dbc\",\"using\":"
"\"meters_summary\",\"tagNum\":1,\"tags\":[{\"name\":\"group_id\",\"type\":14,\"value\":1.313555008277358e+"
"19}],\"createList\":[]}"};
@ -664,56 +675,107 @@ void initLogFile() {
}
} else {
char* result[] = {
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":4,\"isPrimarykey\":false}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":64,\"isPrimarykey\":false},{"
"\"name\":\"c4\",\"type\":5,\"isPrimarykey\":false}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":"
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\","
"\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c1\","
"\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c2\",\"type\":"
"4,"
"\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\","
"\"type\":9,"
"\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\","
"\"level\":\"medium\"}"
",{"
"\"name\":\"c3\",\"type\":8,\"length\":64,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":"
"\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c4\",\"type\":5,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":"
"\"t3\","
"\"type\":10,\"length\":"
"8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":4,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":4,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":4,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":4,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":2000}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":4,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":4,"
"\"tags\":["
"],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":4,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":4,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":5000}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c2\",\"type\":10,\"length\":8,\"isPrimarykey\":false},{\"name\":\"cc3\",\"type\":5,\"isPrimarykey\":false}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false},"
"{\"name\":\"i\",\"type\":4,\"isPrimarykey\":false}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{"
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\","
"\"type\":9,"
"\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c2\","
"\"type\":10,\"length\":8,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\",\"level\":"
"\"medium\"},{\"name\":\"cc3\",\"type\":5,"
"\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\","
"\"type\":9,"
"\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},"
"{\"name\":\"i\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,"
"\"tags\":[{"
"\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,"
"\"tags\":[]"
",\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\","
"\"type\":9,"
"\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":"
"\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":"
"false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":"
"\"lz4\",\"level\":\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\","
"\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":"
"\"c1\","
"\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c2\",\"type\":6,"
"\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":"
"\"c3\","
"\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb1\",\"using\":\"sttb\",\"tagNum\":3,"
"\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb1\\\"\"}"
"\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":"
"\"\\\"sttb1\\\"\"}"
",{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt2\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt2\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":43},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt2\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":0}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb2\",\"using\":\"sttb\",\"tagNum\":3,"
"\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":54},{\"name\":\"t3\",\"type\":10,\"value\":"
"\"\\\"sttb2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb3\",\"using\":\"sttb\",\"tagNum\":3,"
"\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb3\\\"\"}"
"\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":"
"\"\\\"sttb3\\\"\"}"
",{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt4\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt4\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":433},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt4\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":0}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb4\",\"using\":\"sttb\",\"tagNum\":3,"
@ -726,24 +788,37 @@ void initLogFile() {
}
} else {
if (g_conf.meta) {
if (g_conf.subTable){
}else{
if (g_conf.subTable) {
} else {
char* result[] = {
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":4,\"isPrimarykey\":false}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\","
"\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":"
"\"c1\",\"type\":4,\"isPrimarykey\":false,"
"\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c2\",\"type\":4,"
"\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\","
"\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":"
"\"lz4\",\"level\":\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":2000}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\","
"\"colType\":5}",
@ -755,8 +830,11 @@ void initLogFile() {
"\"colValue\":\"5000\",\"colValueNull\":false}",
"{\"type\":\"drop\",\"tableNameList\":[\"ct3\",\"ct1\"]}",
"{\"type\":\"drop\",\"tableType\":\"super\",\"tableName\":\"st1\"}",
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":10,\"length\":4,\"isPrimarykey\":false}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":"
"\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"},{\"name\":\"c2\",\"type\":10,\"length\":4,"
"\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\",\"level\":\"medium\"}],\"tags\":[]}",
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":5,\"colName\":\"c3\","
"\"colType\":5}",
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":7,\"colName\":\"c2\","
@ -766,25 +844,48 @@ void initLogFile() {
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":9}",
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":6,\"colName\":\"c1\"}",
"{\"type\":\"drop\",\"tableNameList\":[\"n1\"]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false},"
"{\"name\":\"i\",\"type\":4,\"isPrimarykey\":false}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},"
"{\"name\":\"i\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":"
"[{"
"\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":"
"[]"
",\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false},"
"{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},"
"{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":"
"false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c3\",\"type\":8,"
"\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\",\"level\":\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":1}]}",
"{\"type\":\"drop\",\"tableType\":\"super\",\"tableName\":\"st1\"}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":"
"false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":"
"\"lz4\",\"level\":\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\","
"\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":"
"\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"},{\"name\":\"c2\",\"type\":6,"
"\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c3\","
"\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt1\",\"using\":\"stt\","
"\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":"
@ -795,7 +896,8 @@ void initLogFile() {
"\"\\\"stt2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}]},{\"tableName\":\"sttb2\",\"using\":\"sttb\","
"\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":54},{\"name\":\"t3\",\"type\":10,\"value\":"
"\"\\\"sttb2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt3\",\"using\":\"stt\","
"\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":"
@ -816,9 +918,17 @@ void initLogFile() {
if (g_conf.subTable) {
char* result[] = {
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"meters_summary\",\"columns\":[{\"name\":\"_"
"wstart\",\"type\":9,\"isPrimarykey\":false},{\"name\":\"current\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"groupid\",\"type\":4,\"isPrimarykey\":false},{\"name\":"
"\"location\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],\"tags\":[{\"name\":\"group_id\",\"type\":14}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"t_d2a450ee819dcf7576f0282d9ac22dbc\",\"using\":"
"wstart\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":"
"\"medium\"},{\"name\":\"current\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":"
"\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"groupid\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{\"name\":"
"\"location\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\","
"\"level\":\"medium\"}],\"tags\":[{\"name\":\"group_id\","
"\"type\":"
"14}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"t_d2a450ee819dcf7576f0282d9ac22dbc\","
"\"using\":"
"\"meters_summary\",\"tagNum\":1,\"tags\":[{\"name\":\"group_id\",\"type\":14,\"value\":1.313555008277358e+"
"19}],\"createList\":[]}"};
@ -826,23 +936,36 @@ void initLogFile() {
taosFprintfFile(pFile2, result[i]);
taosFprintfFile(pFile2, "\n");
}
}
else {
} else {
char* result[] = {
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":4,\"isPrimarykey\":false}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\","
"\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c1\",\"type\":4,"
"\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":"
"\"c2\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":"
"false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":"
"\"lz4\",\"level\":\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":2000}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":["
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,"
"\"tags\":["
"{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\","
"\"colType\":5}",
@ -853,8 +976,11 @@ void initLogFile() {
"{\"type\":\"alter\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"alterType\":4,\"colName\":\"t1\","
"\"colValue\":\"5000\",\"colValueNull\":false}",
"{\"type\":\"delete\",\"sql\":\"delete from `ct3` where `ts` >= 1626006833600 and `ts` <= 1626006833605\"}",
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":10,\"length\":4,\"isPrimarykey\":false}],\"tags\":[]}",
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{\"name\":\"c2\",\"type\":10,\"length\":4,"
"\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\",\"level\":\"medium\"}],\"tags\":[]}",
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":5,\"colName\":\"c3\","
"\"colType\":5}",
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":7,\"colName\":\"c2\","
@ -863,21 +989,39 @@ void initLogFile() {
"\"colNewName\":\"cc3\"}",
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":9}",
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":6,\"colName\":\"c1\"}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false},"
"{\"name\":\"i\",\"type\":4,\"isPrimarykey\":false}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},"
"{\"name\":\"i\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":"
"[{"
"\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}],\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":"
"[]"
",\"createList\":[]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"}"
",{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\","
"\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":"
"false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{"
"\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":"
"\"lz4\",\"level\":\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\",\"type\":"
"9,\"isPrimarykey\":false},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":"
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\","
"\"type\":"
"9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":"
"\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":"
"\"medium\"},{\"name\":\"c2\",\"type\":6,"
"\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c3\","
"\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"lz4\",\"level\":"
"\"medium\"}],"
"\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\","
"\"type\":"
"1}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt1\",\"using\":\"stt\","
"\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":"
@ -888,7 +1032,8 @@ void initLogFile() {
"\"\\\"stt2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}]},{\"tableName\":\"sttb2\",\"using\":\"sttb\","
"\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":54},{\"name\":\"t3\",\"type\":10,\"value\":"
"\"\\\"sttb2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}",
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":"
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,"
"\"tags\":"
"[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{"
"\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt3\",\"using\":\"stt\","
"\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":"
@ -911,12 +1056,12 @@ void initLogFile() {
taosCloseFile(&pFile2);
}
void testConsumeExcluded(int topic_type){
TAOS* pConn = use_db();
TAOS_RES *pRes = NULL;
void testConsumeExcluded(int topic_type) {
TAOS* pConn = use_db();
TAOS_RES* pRes = NULL;
if(topic_type == 1){
char *topic = "create topic topic_excluded with meta as database db_taosx";
if (topic_type == 1) {
char* topic = "create topic topic_excluded with meta as database db_taosx";
pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
@ -924,8 +1069,8 @@ void testConsumeExcluded(int topic_type){
return;
}
taos_free_result(pRes);
}else if(topic_type == 2){
char *topic = "create topic topic_excluded as select * from stt";
} else if (topic_type == 2) {
char* topic = "create topic topic_excluded as select * from stt";
pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
@ -946,7 +1091,6 @@ void testConsumeExcluded(int topic_type){
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
@ -967,19 +1111,15 @@ void testConsumeExcluded(int topic_type){
if (msg) {
tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw);
if(topic_type == 1){
assert(raw.raw_type != 2 && raw.raw_type != 4 &&
raw.raw_type != TDMT_VND_CREATE_STB &&
raw.raw_type != TDMT_VND_ALTER_STB &&
raw.raw_type != TDMT_VND_CREATE_TABLE &&
raw.raw_type != TDMT_VND_ALTER_TABLE &&
raw.raw_type != TDMT_VND_DELETE);
assert(raw.raw_type == TDMT_VND_DROP_STB ||
raw.raw_type == TDMT_VND_DROP_TABLE);
}else if(topic_type == 2){
if (topic_type == 1) {
assert(raw.raw_type != 2 && raw.raw_type != 4 && raw.raw_type != TDMT_VND_CREATE_STB &&
raw.raw_type != TDMT_VND_ALTER_STB && raw.raw_type != TDMT_VND_CREATE_TABLE &&
raw.raw_type != TDMT_VND_ALTER_TABLE && raw.raw_type != TDMT_VND_DELETE);
assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE);
} else if (topic_type == 2) {
assert(0);
}
// printf("write raw data type: %d\n", raw.raw_type);
// printf("write raw data type: %d\n", raw.raw_type);
tmq_free_raw(raw);
taos_free_result(msg);
@ -1001,13 +1141,13 @@ void testConsumeExcluded(int topic_type){
taos_free_result(pRes);
}
void testDetailError(){
void testDetailError() {
tmq_raw_data raw = {0};
raw.raw_type = 2;
int32_t code = tmq_write_raw((TAOS *)1, raw);
int32_t code = tmq_write_raw((TAOS*)1, raw);
ASSERT(code);
const char *err = tmq_err2str(code);
char* tmp = strstr(err, "Invalid parameters,detail:taos:0x1 or data");
const char* err = tmq_err2str(code);
char* tmp = strstr(err, "Invalid parameters,detail:taos:0x1 or data");
ASSERT(tmp != NULL);
}