Merge branch '3.0' of github.com:taosdata/TDengine into szhou/tms-wc/save-row-simplebuf
This commit is contained in:
commit
ca6d10cdda
|
@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo {
|
|||
SColumnInfoData* pColData;
|
||||
} SBlockOrderInfo;
|
||||
|
||||
#define BLOCK_VERSION_1 1
|
||||
#define BLOCK_VERSION_2 2
|
||||
|
||||
#define NBIT (3u)
|
||||
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
|
||||
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
|
||||
|
|
|
@ -207,9 +207,6 @@ typedef enum _mgmt_table {
|
|||
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
|
||||
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
|
||||
|
||||
#define TD_REQ_FROM_APP 0
|
||||
#define TD_REQ_FROM_TAOX 1
|
||||
|
||||
typedef enum ENodeType {
|
||||
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
|
||||
// VALUE, OPERATOR, FUNCTION and so on.
|
||||
|
@ -759,7 +756,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
|
|||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
int8_t source; // 1-taosX or 0-taosClient
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
int8_t reserved[6];
|
||||
tb_uid_t suid;
|
||||
int64_t delay1;
|
||||
|
@ -802,7 +799,7 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp);
|
|||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igNotExists;
|
||||
int8_t source; // 1-taosX or 0-taosClient
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
int8_t reserved[6];
|
||||
tb_uid_t suid;
|
||||
int32_t sqlLen;
|
||||
|
@ -2661,6 +2658,7 @@ typedef struct SVCreateStbReq {
|
|||
SRSmaParam rsmaParam;
|
||||
int32_t alterOriDataLen;
|
||||
void* alterOriData;
|
||||
int8_t source;
|
||||
} SVCreateStbReq;
|
||||
|
||||
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
|
||||
|
@ -2730,6 +2728,7 @@ typedef struct {
|
|||
SVCreateTbReq* pReqs;
|
||||
SArray* pArray;
|
||||
};
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
} SVCreateTbBatchReq;
|
||||
|
||||
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
|
||||
|
@ -2822,6 +2821,7 @@ typedef struct {
|
|||
int32_t newCommentLen;
|
||||
char* newComment;
|
||||
int64_t ctimeMs; // fill by vnode
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
} SVAlterTbReq;
|
||||
|
||||
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
|
||||
|
@ -3919,12 +3919,13 @@ int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
|
|||
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
|
||||
#define TD_REQ_FROM_APP 0x0
|
||||
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
||||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||
#define SUBMIT_REQ_FROM_FILE 0x4
|
||||
#define TD_REQ_FROM_TAOX 0x8
|
||||
|
||||
#define SOURCE_NULL 0
|
||||
#define SOURCE_TAOSX 1
|
||||
#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
|
||||
|
||||
typedef struct {
|
||||
int32_t flags;
|
||||
|
@ -3937,7 +3938,6 @@ typedef struct {
|
|||
SArray* aCol;
|
||||
};
|
||||
int64_t ctimeMs;
|
||||
int8_t source;
|
||||
} SSubmitTbData;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1850,7 +1850,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
|
|||
|
||||
char* pStart = p + len;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i];
|
||||
int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
|
||||
|
||||
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t* offset = (int32_t*)pStart;
|
||||
|
@ -1949,8 +1949,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
char* pStart = p;
|
||||
char* pStart1 = p1;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i];
|
||||
int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i];
|
||||
int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
|
||||
int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
|
||||
if (ASSERT(colLen < dataLen)) {
|
||||
tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
|
||||
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
|
@ -2009,7 +2009,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
}
|
||||
colLen1 = len;
|
||||
totalLen += colLen1;
|
||||
colLength1[i] = (blockVersion == 1) ? htonl(len) : len;
|
||||
colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
|
||||
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
len = numOfRows * sizeof(int32_t);
|
||||
memcpy(pStart1, pStart, len);
|
||||
|
@ -2098,7 +2098,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
|||
|
||||
char* pStart = p;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
if(blockVersion == 1){
|
||||
if(blockVersion == BLOCK_VERSION_1){
|
||||
colLength[i] = htonl(colLength[i]);
|
||||
}
|
||||
if (colLength[i] >= dataLen) {
|
||||
|
|
|
@ -1001,6 +1001,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
|
||||
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
|
||||
taosArrayPush(tBatch.req.pArray, pCreateReq);
|
||||
tBatch.req.source = TD_REQ_FROM_TAOX;
|
||||
|
||||
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
|
||||
} else { // add to the correct vgroup
|
||||
|
@ -1276,7 +1277,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
return terrno;
|
||||
}
|
||||
SVAlterTbReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
SDecoder dcoder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
|
@ -1297,8 +1298,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
// decode and process req
|
||||
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
|
||||
int32_t len = metaLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&coder, data, len);
|
||||
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
|
||||
tDecoderInit(&dcoder, data, len);
|
||||
if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto end;
|
||||
}
|
||||
|
@ -1340,14 +1341,36 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
goto end;
|
||||
}
|
||||
pVgData->vg = pInfo;
|
||||
pVgData->pData = taosMemoryMalloc(metaLen);
|
||||
if (NULL == pVgData->pData) {
|
||||
|
||||
int tlen = 0;
|
||||
req.source = TD_REQ_FROM_TAOX;
|
||||
tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
|
||||
if(code != 0){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
memcpy(pVgData->pData, meta, metaLen);
|
||||
((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
|
||||
pVgData->size = metaLen;
|
||||
tlen += sizeof(SMsgHead);
|
||||
void* pMsg = taosMemoryMalloc(tlen);
|
||||
if (NULL == pMsg) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
|
||||
((SMsgHead*)pMsg)->contLen = htonl(tlen);
|
||||
void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
|
||||
SEncoder coder = {0};
|
||||
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
|
||||
code = tEncodeSVAlterTbReq(&coder, &req);
|
||||
if(code != 0){
|
||||
tEncoderClear(&coder);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
tEncoderClear(&coder);
|
||||
|
||||
pVgData->pData = pMsg;
|
||||
pVgData->size = tlen;
|
||||
|
||||
pVgData->numOfTables = 1;
|
||||
taosArrayPush(pArray, &pVgData);
|
||||
|
||||
|
@ -1387,7 +1410,7 @@ end:
|
|||
if (pVgData) taosMemoryFreeClear(pVgData->pData);
|
||||
taosMemoryFreeClear(pVgData);
|
||||
destroyRequest(pRequest);
|
||||
tDecoderClear(&coder);
|
||||
tDecoderClear(&dcoder);
|
||||
qDestroyQuery(pQuery);
|
||||
terrno = code;
|
||||
return code;
|
||||
|
|
|
@ -389,7 +389,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
}
|
||||
}
|
||||
if (strcasecmp(key, "msg.consume.excluded") == 0) {
|
||||
conf->sourceExcluded = taosStr2int64(value);
|
||||
conf->sourceExcluded = (taosStr2int64(value) != 0) ? TD_REQ_FROM_TAOX : 0;
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
|
@ -1611,17 +1611,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
|||
return pRspObj;
|
||||
}
|
||||
|
||||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ;
|
||||
|
||||
void changeByteEndian(char* pData){
|
||||
char* p = pData;
|
||||
|
||||
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
|
||||
// version:
|
||||
int32_t blockVersion = *(int32_t*)p;
|
||||
ASSERT(blockVersion == BLOCK_VERSION_1);
|
||||
*(int32_t*)p = BLOCK_VERSION_2;
|
||||
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
int32_t cols = *(int32_t*)p;
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(uint64_t);
|
||||
// check fields
|
||||
p += cols * (sizeof(int8_t) + sizeof(int32_t));
|
||||
|
||||
int32_t* colLength = (int32_t*)p;
|
||||
for (int32_t i = 0; i < cols; ++i) {
|
||||
colLength[i] = htonl(colLength[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
|
||||
(*numOfRows) = 0;
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
pRspObj->resIter = -1;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
||||
|
||||
pRspObj->resInfo.totalRows = 0;
|
||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
|
@ -1633,41 +1655,44 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
|
|||
}
|
||||
// extract the rows in this data packet
|
||||
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||
int64_t rows = htobe64(pRetrieve->numOfRows);
|
||||
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||
void* rawData = NULL;
|
||||
int64_t rows = 0;
|
||||
// deal with compatibility
|
||||
if(*(int64_t*)pRetrieve == 0){
|
||||
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
|
||||
rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
|
||||
}else if(*(int64_t*)pRetrieve == 1){
|
||||
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
|
||||
rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
|
||||
}
|
||||
|
||||
pVg->numOfRows += rows;
|
||||
(*numOfRows) += rows;
|
||||
|
||||
if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
|
||||
SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
|
||||
if (schema) {
|
||||
changeByteEndian(rawData);
|
||||
if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
|
||||
SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
|
||||
if(schema){
|
||||
taosArrayPush(pRspObj->rsp.blockSchema, &schema);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
||||
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj);
|
||||
return pRspObj;
|
||||
}
|
||||
|
||||
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
pRspObj->resIter = -1;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
|
||||
|
||||
pRspObj->resInfo.totalRows = 0;
|
||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
|
||||
// extract the rows in this data packet
|
||||
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||
int64_t rows = htobe64(pRetrieve->numOfRows);
|
||||
pVg->numOfRows += rows;
|
||||
(*numOfRows) += rows;
|
||||
}
|
||||
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj);
|
||||
return pRspObj;
|
||||
}
|
||||
|
||||
|
|
|
@ -2196,7 +2196,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
|
||||
// todo extract method
|
||||
int32_t* version = (int32_t*)data;
|
||||
*version = 2;
|
||||
*version = BLOCK_VERSION_1;
|
||||
data += sizeof(int32_t);
|
||||
|
||||
int32_t* actualLen = (int32_t*)data;
|
||||
|
@ -2277,7 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
data += colSizes[col];
|
||||
}
|
||||
|
||||
// colSizes[col] = htonl(colSizes[col]);
|
||||
colSizes[col] = htonl(colSizes[col]);
|
||||
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
|
||||
// htonl(colSizes[col]), colSizes[col]);
|
||||
}
|
||||
|
@ -2342,9 +2342,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
|
|||
pStart += sizeof(int32_t) * numOfCols;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
if(version == 1){
|
||||
colLen[i] = htonl(colLen[i]);
|
||||
}
|
||||
colLen[i] = htonl(colLen[i]);
|
||||
ASSERT(colLen[i] >= 0);
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
|
|
@ -7513,6 +7513,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
|
|||
if (pReq->alterOriDataLen > 0) {
|
||||
if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
|
@ -7535,6 +7536,10 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
|
|||
if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1;
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(pCoder)) {
|
||||
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -7663,6 +7668,8 @@ int tEncodeSVCreateTbBatchReq(SEncoder *pCoder, const SVCreateTbBatchReq *pReq)
|
|||
if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -7677,6 +7684,10 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) {
|
|||
if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(pCoder)) {
|
||||
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -8034,6 +8045,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
|
|||
break;
|
||||
}
|
||||
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pReq->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
return 0;
|
||||
|
@ -8094,6 +8106,9 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
|
|||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
if (tDecodeI8(pDecoder, &pReq->source) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
|
@ -8670,7 +8685,6 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
|
|||
}
|
||||
}
|
||||
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
|
@ -8758,12 +8772,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
|
|||
goto _exit;
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(pCoder)) {
|
||||
if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
|
||||
|
|
|
@ -464,6 +464,7 @@ typedef struct {
|
|||
char* pAst1;
|
||||
char* pAst2;
|
||||
SRWLatch lock;
|
||||
int8_t source;
|
||||
} SStbObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -458,6 +458,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
|
|||
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
||||
req.alterOriData = alterOriData;
|
||||
req.alterOriDataLen = alterOriDataLen;
|
||||
req.source = pStb->source;
|
||||
// todo
|
||||
req.schemaRow.nCols = pStb->numOfColumns;
|
||||
req.schemaRow.version = pStb->colVer;
|
||||
|
@ -774,7 +775,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
|||
pDst->createdTime = taosGetTimestampMs();
|
||||
pDst->updateTime = pDst->createdTime;
|
||||
pDst->uid =
|
||||
(pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
(pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
|
||||
? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDst->dbUid = pDb->uid;
|
||||
pDst->tagVer = 1;
|
||||
pDst->colVer = 1;
|
||||
|
@ -790,6 +792,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
|||
pDst->numOfFuncs = pCreate->numOfFuncs;
|
||||
pDst->commentLen = pCreate->commentLen;
|
||||
pDst->pFuncs = pCreate->pFuncs;
|
||||
pDst->source = pCreate->source;
|
||||
pCreate->pFuncs = NULL;
|
||||
|
||||
if (pDst->commentLen > 0) {
|
||||
|
@ -1033,6 +1036,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
|||
memcpy(pDst, pStb, sizeof(SStbObj));
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
|
||||
pDst->source = createReq->source;
|
||||
pDst->updateTime = taosGetTimestampMs();
|
||||
pDst->numOfColumns = createReq->numOfColumns;
|
||||
pDst->numOfTags = createReq->numOfTags;
|
||||
|
@ -1141,7 +1145,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||
goto _OVER;
|
||||
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||
} else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
|
@ -2572,7 +2576,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (dropReq.source == TD_REQ_FROM_TAOX && pStb->uid != dropReq.suid) {
|
||||
if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -2484,7 +2484,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false);
|
||||
|
||||
char objName[20] = {0};
|
||||
char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)objName, false);
|
||||
|
|
|
@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
continue;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
|
||||
|
||||
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
|
||||
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);
|
||||
|
|
|
@ -392,7 +392,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
|||
pReader->msg.ver);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||
if ((pSubmitTbData->source & sourceExcluded) != 0) {
|
||||
if ((pSubmitTbData->flags & sourceExcluded) != 0) {
|
||||
pReader->nextBlk += 1;
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -267,7 +267,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
|
||||
}
|
||||
|
||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
|
||||
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
|
||||
goto loop_table;
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
|
@ -335,7 +335,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
|
||||
}
|
||||
|
||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
|
||||
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
|
||||
goto loop_db;
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
|
|
|
@ -815,7 +815,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
return;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
|
@ -860,7 +860,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
|
||||
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||
if (index == NULL) { // no data yet, append it
|
||||
|
|
|
@ -171,6 +171,22 @@ end : {
|
|||
}
|
||||
}
|
||||
|
||||
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \
|
||||
SDecoder decoder = {0};\
|
||||
TYPE req = {0}; \
|
||||
void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \
|
||||
int32_t len = pHead->bodyLen - sizeof(SMsgHead); \
|
||||
tDecoderInit(&decoder, data, len); \
|
||||
if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \
|
||||
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \
|
||||
fetchVer++; \
|
||||
tDecoderClear(&decoder); \
|
||||
continue; \
|
||||
} \
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
|
||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||
int code = 0;
|
||||
|
@ -239,6 +255,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
goto end;
|
||||
}
|
||||
|
||||
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
|
||||
if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
|
||||
PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq)
|
||||
} else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
|
||||
PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq)
|
||||
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
|
||||
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq)
|
||||
} else if (pHead->msgType == TDMT_VND_DELETE) {
|
||||
fetchVer++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
|
||||
metaRsp.resMsgType = pHead->msgType;
|
||||
|
|
|
@ -99,6 +99,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
|||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
}
|
||||
}
|
||||
if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
|
||||
pBuffInfo->rebuildWindow = true;
|
||||
}
|
||||
} else {
|
||||
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(
|
||||
pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
|
|
|
@ -1543,10 +1543,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
if (pBlk != NULL) {
|
||||
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId);
|
||||
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
||||
if ((pOrigOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||
(pOrigOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||
continue;
|
||||
}
|
||||
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||
if (bExtractedBlock) {
|
||||
blockDataDestroy(pBlk);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlk != NULL) {
|
||||
|
@ -1572,10 +1575,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tSimpleHashCleanup(mUidBlk);
|
||||
taosArrayDestroy(aBlkSort);
|
||||
taosArrayDestroy(aExtSrc);
|
||||
return code;
|
||||
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
|
||||
blockDataDestroy(taosArrayGetP(aBlkSort, i));
|
||||
}
|
||||
taosArrayClear(aBlkSort);
|
||||
break;
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
|
@ -1588,6 +1592,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
szSort = 0;
|
||||
qDebug("source %zu created", taosArrayGetSize(aExtSrc));
|
||||
}
|
||||
|
||||
if (pBlk == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -1603,6 +1608,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
tSimpleHashCleanup(mUidBlk);
|
||||
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
|
||||
blockDataDestroy(taosArrayGetP(aBlkSort, i));
|
||||
}
|
||||
taosArrayDestroy(aBlkSort);
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
if (!tsortIsClosed(pHandle)) {
|
||||
|
@ -1614,7 +1622,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
freeExtRowMemFileWriteBuf(pHandle);
|
||||
}
|
||||
pHandle->type = SORT_SINGLESOURCE_SORT;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||
|
|
|
@ -175,4 +175,8 @@ IF(NOT TD_DARWIN)
|
|||
NAME idxFstUT
|
||||
COMMAND idxFstUT
|
||||
)
|
||||
add_test(
|
||||
NAME idxFstTest
|
||||
COMMAND idxFstTest
|
||||
)
|
||||
ENDIF ()
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
|
||||
#include <gtest/gtest.h>
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
|
@ -14,6 +15,12 @@
|
|||
#include "tutil.h"
|
||||
void* callback(void* s) { return s; }
|
||||
|
||||
class FstEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {}
|
||||
virtual void TearDown() {}
|
||||
};
|
||||
|
||||
static std::string fileName = TD_TMP_DIR_PATH "tindex.tindex";
|
||||
class FstWriter {
|
||||
public:
|
||||
|
@ -154,7 +161,7 @@ class FstReadMemory {
|
|||
int32_t _size;
|
||||
};
|
||||
|
||||
#define L 100
|
||||
#define L 200
|
||||
#define M 100
|
||||
#define N 100
|
||||
|
||||
|
@ -200,7 +207,7 @@ void checkMillonWriteAndReadOfFst() {
|
|||
FstWriter* fw = new FstWriter;
|
||||
Performance_fstWriteRecords(fw);
|
||||
delete fw;
|
||||
FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024);
|
||||
FstReadMemory* fr = new FstReadMemory(1024 * 8 * 1024);
|
||||
|
||||
if (fr->init()) {
|
||||
printf("success to init fst read");
|
||||
|
@ -637,23 +644,31 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
|
|||
tfileIteratorDestroy(iter);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
// tool to check all kind of fst test
|
||||
// if (argc > 1) { validateTFile(argv[1]); }
|
||||
// if (argc > 4) {
|
||||
// path suid colName ver
|
||||
// iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
|
||||
//}
|
||||
checkFstCheckIterator1();
|
||||
// checkFstCheckIterator2();
|
||||
// checkFstCheckIteratorPrefix();
|
||||
// checkFstCheckIteratorRange1();
|
||||
// checkFstCheckIteratorRange2();
|
||||
// checkFstCheckIteratorRange3();
|
||||
// checkFstLongTerm();
|
||||
// checkFstPrefixSearch();
|
||||
// int main(int argc, char* argv[]) {
|
||||
// // tool to check all kind of fst test
|
||||
// // if (argc > 1) { validateTFile(argv[1]); }
|
||||
// // if (argc > 4) {
|
||||
// // path suid colName ver
|
||||
// // iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
|
||||
// //}
|
||||
// checkFstCheckIterator1();
|
||||
// // checkFstCheckIterator2();
|
||||
// // checkFstCheckIteratorPrefix();
|
||||
// // checkFstCheckIteratorRange1();
|
||||
// // checkFstCheckIteratorRange2();
|
||||
// // checkFstCheckIteratorRange3();
|
||||
// // checkFstLongTerm();
|
||||
// // checkFstPrefixSearch();
|
||||
|
||||
// checkMillonWriteAndReadOfFst();
|
||||
// // checkMillonWriteAndReadOfFst();
|
||||
|
||||
return 1;
|
||||
}
|
||||
// return 1;
|
||||
// }
|
||||
TEST_F(FstEnv, checkIterator1) { checkFstCheckIterator1(); }
|
||||
TEST_F(FstEnv, checkItertor2) { checkFstCheckIterator2(); }
|
||||
TEST_F(FstEnv, checkPrefix) { checkFstCheckIteratorPrefix(); }
|
||||
TEST_F(FstEnv, checkRange1) { checkFstCheckIteratorRange1(); }
|
||||
TEST_F(FstEnv, checkRange2) { checkFstCheckIteratorRange2(); }
|
||||
TEST_F(FstEnv, checkRange3) { checkFstCheckIteratorRange3(); }
|
||||
TEST_F(FstEnv, checkLongTerm) { checkFstLongTerm(); }
|
||||
TEST_F(FstEnv, checkMillonWriteData) { checkMillonWriteAndReadOfFst(); }
|
||||
|
|
|
@ -285,7 +285,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
|
|||
pTmp->suid = pSrc->suid;
|
||||
pTmp->uid = pSrc->uid;
|
||||
pTmp->sver = pSrc->sver;
|
||||
pTmp->source = pSrc->source;
|
||||
pTmp->pCreateTbReq = NULL;
|
||||
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
if (pSrc->pCreateTbReq) {
|
||||
|
@ -653,7 +652,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
|
||||
pTableCxt->pData->source = SOURCE_TAOSX;
|
||||
pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
|
||||
if(tmp == NULL){
|
||||
ret = initTableColSubmitData(pTableCxt);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -721,7 +720,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||
if (needChangeLength && version == 1) {
|
||||
if (needChangeLength && version == BLOCK_VERSION_1) {
|
||||
pStart += htonl(colLength[j]);
|
||||
} else {
|
||||
pStart += colLength[j];
|
||||
|
@ -752,7 +751,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||
if (needChangeLength && version == 1) {
|
||||
if (needChangeLength && version == BLOCK_VERSION_1) {
|
||||
pStart += htonl(colLength[i]);
|
||||
} else {
|
||||
pStart += colLength[i];
|
||||
|
|
|
@ -488,6 +488,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
|
|||
void* pFileStore = getStateFileStore(pFileState);
|
||||
SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
|
||||
if (pCur) {
|
||||
pCur->pStreamFileState = pFileState;
|
||||
SSessionKey key = {0};
|
||||
void* pVal = NULL;
|
||||
int len = 0;
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#include "tutil.h"
|
||||
|
||||
static int32_t initForwardBackwardPtr(SSkipList *pSkipList);
|
||||
static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur);
|
||||
static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur);
|
||||
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
|
||||
static void tSkipListCorrectLevel(SSkipList *pSkipList);
|
||||
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
|
||||
|
@ -131,12 +131,14 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
|
|||
return pNode;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
|
||||
void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) {
|
||||
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
bool hasDup = false;
|
||||
char *pKey = NULL;
|
||||
char *pDataKey = NULL;
|
||||
char * pKey = NULL;
|
||||
char * pDataKey = NULL;
|
||||
int32_t compare = 0;
|
||||
|
||||
tSkipListWLock(pSkipList);
|
||||
|
@ -260,6 +262,7 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
|
|||
tSkipListCorrectLevel(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
}
|
||||
#endif
|
||||
|
||||
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) {
|
||||
if (pSkipList == NULL) return NULL;
|
||||
|
@ -350,6 +353,7 @@ void *tSkipListDestroyIter(SSkipListIterator *iter) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
||||
if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) {
|
||||
return;
|
||||
|
@ -358,7 +362,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
|||
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
|
||||
|
||||
int32_t id = 1;
|
||||
char *prev = NULL;
|
||||
char * prev = NULL;
|
||||
|
||||
while (p != pSkipList->pTail) {
|
||||
char *key = SL_GET_NODE_KEY(pSkipList, p);
|
||||
|
@ -392,6 +396,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
|||
p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) {
|
||||
for (int32_t i = 0; i < pNode->level; ++i) {
|
||||
|
@ -460,7 +465,7 @@ static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) {
|
|||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {
|
||||
int32_t compare = 0;
|
||||
bool hasDupKey = false;
|
||||
char *pDataKey = pSkipList->keyFn(pData);
|
||||
char * pDataKey = pSkipList->keyFn(pData);
|
||||
|
||||
if (pSkipList->size == 0) {
|
||||
for (int32_t i = 0; i < pSkipList->maxLevel; i++) {
|
||||
|
@ -516,6 +521,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
|
|||
return hasDupKey;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) {
|
||||
int32_t level = pNode->level;
|
||||
uint8_t dupMode = SL_DUP_MODE(pSkipList);
|
||||
|
@ -540,6 +546,7 @@ static void tSkipListCorrectLevel(SSkipList *pSkipList) {
|
|||
pSkipList->level -= 1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList,
|
||||
int32_t level) { // record link count in each level
|
||||
|
|
|
@ -49,6 +49,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tms_memleak.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import tdDnodes
|
||||
from math import inf
|
||||
|
||||
class TDTestCase:
|
||||
def caseDescription(self):
|
||||
'''
|
||||
case1<shenglian zhou>: [TD-]
|
||||
'''
|
||||
return
|
||||
|
||||
def init(self, conn, logSql, replicaVer=1):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
self.conn = conn
|
||||
|
||||
def restartTaosd(self, index=1, dbname="db"):
|
||||
tdDnodes.stop(index)
|
||||
tdDnodes.startWithoutSleep(index)
|
||||
tdSql.execute(f"use tms_memleak")
|
||||
|
||||
def run(self):
|
||||
print("running {}".format(__file__))
|
||||
tdSql.execute("drop database if exists tms_memleak")
|
||||
tdSql.execute("create database if not exists tms_memleak")
|
||||
tdSql.execute('use tms_memleak')
|
||||
|
||||
tdSql.execute('create table st(ts timestamp, f int) tags (t int);')
|
||||
|
||||
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1)('2021-04-19 00:00:02', 2)('2021-04-19 00:00:03', 3)('2021-04-19 00:00:04', 4)")
|
||||
|
||||
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-20 00:00:01', 5)('2021-04-20 00:00:02', 6)('2021-04-20 00:00:03', 7)('2021-04-20 00:00:04', 8)")
|
||||
|
||||
tdSql.execute("insert into ct3 using st tags(3) values('2021-04-21 00:00:01', 5)('2021-04-21 00:00:02', 6)('2021-04-21 00:00:03', 7)('2021-04-21 00:00:04', 8)")
|
||||
|
||||
tdSql.execute("insert into ct4 using st tags(4) values('2021-04-22 00:00:01', 5)('2021-04-22 00:00:02', 6)('2021-04-22 00:00:03', 7)('2021-04-22 00:00:04', 8)")
|
||||
|
||||
tdSql.query("select * from st order by ts limit 1 ");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 1);
|
||||
|
||||
tdSql.execute('drop database tms_memleak')
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -323,7 +323,7 @@ class TDTestCase:
|
|||
tdSql.query("select * from st")
|
||||
tdSql.checkRows(8)
|
||||
|
||||
tdSql.execute(f'create topic topic_excluded with meta as database d1')
|
||||
tdSql.execute(f'create topic topic_all with meta as database d1')
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
|
@ -333,7 +333,7 @@ class TDTestCase:
|
|||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["topic_excluded"])
|
||||
consumer.subscribe(["topic_all"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include "types.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
static int running = 1;
|
||||
TdFilePtr g_fp = NULL;
|
||||
|
@ -966,7 +967,14 @@ void testConsumeExcluded(int topic_type){
|
|||
tmq_raw_data raw = {0};
|
||||
tmq_get_raw(msg, &raw);
|
||||
if(topic_type == 1){
|
||||
assert(raw.raw_type != 2 && raw.raw_type != 4);
|
||||
assert(raw.raw_type != 2 && raw.raw_type != 4 &&
|
||||
raw.raw_type != TDMT_VND_CREATE_STB &&
|
||||
raw.raw_type != TDMT_VND_ALTER_STB &&
|
||||
raw.raw_type != TDMT_VND_CREATE_TABLE &&
|
||||
raw.raw_type != TDMT_VND_ALTER_TABLE &&
|
||||
raw.raw_type != TDMT_VND_DELETE);
|
||||
assert(raw.raw_type == TDMT_VND_DROP_STB ||
|
||||
raw.raw_type == TDMT_VND_DROP_TABLE);
|
||||
}else if(topic_type == 2){
|
||||
assert(0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue