Merge branch '3.0' into fix/nullcheck

This commit is contained in:
Haojun Liao 2024-02-24 01:46:15 +08:00
commit 2800454c35
50 changed files with 604 additions and 254 deletions

View File

@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo {
SColumnInfoData* pColData; SColumnInfoData* pColData;
} SBlockOrderInfo; } SBlockOrderInfo;
#define BLOCK_VERSION_1 1
#define BLOCK_VERSION_2 2
#define NBIT (3u) #define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])

View File

@ -207,9 +207,6 @@ typedef enum _mgmt_table {
#define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
#define TD_REQ_FROM_APP 0
#define TD_REQ_FROM_TAOX 1
typedef enum ENodeType { typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
// VALUE, OPERATOR, FUNCTION and so on. // VALUE, OPERATOR, FUNCTION and so on.
@ -759,7 +756,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
int8_t source; // 1-taosX or 0-taosClient int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
int8_t reserved[6]; int8_t reserved[6];
tb_uid_t suid; tb_uid_t suid;
int64_t delay1; int64_t delay1;
@ -802,7 +799,7 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists; int8_t igNotExists;
int8_t source; // 1-taosX or 0-taosClient int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
int8_t reserved[6]; int8_t reserved[6];
tb_uid_t suid; tb_uid_t suid;
int32_t sqlLen; int32_t sqlLen;
@ -1613,7 +1610,6 @@ typedef struct {
SEp ep; SEp ep;
char active[TSDB_ACTIVE_KEY_LEN]; char active[TSDB_ACTIVE_KEY_LEN];
char connActive[TSDB_CONN_ACTIVE_KEY_LEN]; char connActive[TSDB_CONN_ACTIVE_KEY_LEN];
char machineId[TSDB_MACHINE_ID_LEN + 1];
} SDnodeInfo; } SDnodeInfo;
typedef struct { typedef struct {
@ -2661,6 +2657,7 @@ typedef struct SVCreateStbReq {
SRSmaParam rsmaParam; SRSmaParam rsmaParam;
int32_t alterOriDataLen; int32_t alterOriDataLen;
void* alterOriData; void* alterOriData;
int8_t source;
} SVCreateStbReq; } SVCreateStbReq;
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq); int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
@ -2730,6 +2727,7 @@ typedef struct {
SVCreateTbReq* pReqs; SVCreateTbReq* pReqs;
SArray* pArray; SArray* pArray;
}; };
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
} SVCreateTbBatchReq; } SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
@ -2822,6 +2820,7 @@ typedef struct {
int32_t newCommentLen; int32_t newCommentLen;
char* newComment; char* newComment;
int64_t ctimeMs; // fill by vnode int64_t ctimeMs; // fill by vnode
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
} SVAlterTbReq; } SVAlterTbReq;
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
@ -3919,12 +3918,13 @@ int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define TD_REQ_FROM_APP 0x0
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 #define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
#define SUBMIT_REQ_FROM_FILE 0x4 #define SUBMIT_REQ_FROM_FILE 0x4
#define TD_REQ_FROM_TAOX 0x8
#define SOURCE_NULL 0 #define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
#define SOURCE_TAOSX 1
typedef struct { typedef struct {
int32_t flags; int32_t flags;
@ -3937,7 +3937,6 @@ typedef struct {
SArray* aCol; SArray* aCol;
}; };
int64_t ctimeMs; int64_t ctimeMs;
int8_t source;
} SSubmitTbData; } SSubmitTbData;
typedef struct { typedef struct {

View File

@ -22,6 +22,9 @@
extern "C" { extern "C" {
#endif #endif
#define TBASE_MAX_ILEN 4096
#define TBASE_MAX_OLEN 5653
uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen); uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen);
char *base58_encode(const uint8_t *value, int32_t vlen); char *base58_encode(const uint8_t *value, int32_t vlen);

View File

@ -1850,7 +1850,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
char* pStart = p + len; char* pStart = p + len;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
int32_t* offset = (int32_t*)pStart; int32_t* offset = (int32_t*)pStart;
@ -1949,8 +1949,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
char* pStart = p; char* pStart = p;
char* pStart1 = p1; char* pStart1 = p1;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
if (ASSERT(colLen < dataLen)) { if (ASSERT(colLen < dataLen)) {
tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
@ -2009,7 +2009,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
} }
colLen1 = len; colLen1 = len;
totalLen += colLen1; totalLen += colLen1;
colLength1[i] = (blockVersion == 1) ? htonl(len) : len; colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
len = numOfRows * sizeof(int32_t); len = numOfRows * sizeof(int32_t);
memcpy(pStart1, pStart, len); memcpy(pStart1, pStart, len);
@ -2098,7 +2098,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
char* pStart = p; char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
if(blockVersion == 1){ if(blockVersion == BLOCK_VERSION_1){
colLength[i] = htonl(colLength[i]); colLength[i] = htonl(colLength[i]);
} }
if (colLength[i] >= dataLen) { if (colLength[i] >= dataLen) {

View File

@ -1001,6 +1001,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, pCreateReq); taosArrayPush(tBatch.req.pArray, pCreateReq);
tBatch.req.source = TD_REQ_FROM_TAOX;
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)); taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup } else { // add to the correct vgroup
@ -1276,7 +1277,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
return terrno; return terrno;
} }
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};
SDecoder coder = {0}; SDecoder dcoder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
@ -1297,8 +1298,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
// decode and process req // decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead); int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&dcoder, data, len);
if (tDecodeSVAlterTbReq(&coder, &req) < 0) { if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
@ -1340,14 +1341,36 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
pVgData->vg = pInfo; pVgData->vg = pInfo;
pVgData->pData = taosMemoryMalloc(metaLen);
if (NULL == pVgData->pData) { int tlen = 0;
req.source = TD_REQ_FROM_TAOX;
tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
if(code != 0){
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
memcpy(pVgData->pData, meta, metaLen); tlen += sizeof(SMsgHead);
((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId); void* pMsg = taosMemoryMalloc(tlen);
pVgData->size = metaLen; if (NULL == pMsg) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
((SMsgHead*)pMsg)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
SEncoder coder = {0};
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
code = tEncodeSVAlterTbReq(&coder, &req);
if(code != 0){
tEncoderClear(&coder);
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tEncoderClear(&coder);
pVgData->pData = pMsg;
pVgData->size = tlen;
pVgData->numOfTables = 1; pVgData->numOfTables = 1;
taosArrayPush(pArray, &pVgData); taosArrayPush(pArray, &pVgData);
@ -1387,7 +1410,7 @@ end:
if (pVgData) taosMemoryFreeClear(pVgData->pData); if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData); taosMemoryFreeClear(pVgData);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&dcoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
terrno = code; terrno = code;
return code; return code;

View File

@ -389,7 +389,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
} }
if (strcasecmp(key, "msg.consume.excluded") == 0) { if (strcasecmp(key, "msg.consume.excluded") == 0) {
conf->sourceExcluded = taosStr2int64(value); conf->sourceExcluded = (taosStr2int64(value) != 0) ? TD_REQ_FROM_TAOX : 0;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
@ -1611,17 +1611,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj; return pRspObj;
} }
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
void changeByteEndian(char* pData){
char* p = pData;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
// version:
int32_t blockVersion = *(int32_t*)p;
ASSERT(blockVersion == BLOCK_VERSION_1);
*(int32_t*)p = BLOCK_VERSION_2;
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(int32_t);
int32_t cols = *(int32_t*)p;
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(uint64_t);
// check fields
p += cols * (sizeof(int8_t) + sizeof(int32_t));
int32_t* colLength = (int32_t*)p;
for (int32_t i = 0; i < cols; ++i) {
colLength[i] = htonl(colLength[i]);
}
}
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
(*numOfRows) = 0; (*numOfRows) = 0;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1; pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
@ -1633,41 +1655,44 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
} }
// extract the rows in this data packet // extract the rows in this data packet
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i);
int64_t rows = htobe64(pRetrieve->numOfRows); void* rawData = NULL;
int64_t rows = 0;
// deal with compatibility
if(*(int64_t*)pRetrieve == 0){
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
}else if(*(int64_t*)pRetrieve == 1){
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
}
pVg->numOfRows += rows; pVg->numOfRows += rows;
(*numOfRows) += rows; (*numOfRows) += rows;
changeByteEndian(rawData);
if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
if (schema) { if(schema){
taosArrayPush(pRspObj->rsp.blockSchema, &schema); taosArrayPush(pRspObj->rsp.blockSchema, &schema);
} }
} }
} }
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj);
return pRspObj; return pRspObj;
} }
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
pRspObj->resType = RES_TYPE__TMQ_METADATA; pRspObj->resType = RES_TYPE__TMQ_METADATA;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
pRspObj->resInfo.totalRows = 0; tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj);
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
// extract the rows in this data packet
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
int64_t rows = htobe64(pRetrieve->numOfRows);
pVg->numOfRows += rows;
(*numOfRows) += rows;
}
return pRspObj; return pRspObj;
} }

View File

@ -2196,7 +2196,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
// todo extract method // todo extract method
int32_t* version = (int32_t*)data; int32_t* version = (int32_t*)data;
*version = 2; *version = BLOCK_VERSION_1;
data += sizeof(int32_t); data += sizeof(int32_t);
int32_t* actualLen = (int32_t*)data; int32_t* actualLen = (int32_t*)data;
@ -2277,7 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
data += colSizes[col]; data += colSizes[col];
} }
// colSizes[col] = htonl(colSizes[col]); colSizes[col] = htonl(colSizes[col]);
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
// htonl(colSizes[col]), colSizes[col]); // htonl(colSizes[col]), colSizes[col]);
} }
@ -2342,9 +2342,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += sizeof(int32_t) * numOfCols; pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
if(version == 1){ colLen[i] = htonl(colLen[i]);
colLen[i] = htonl(colLen[i]);
}
ASSERT(colLen[i] >= 0); ASSERT(colLen[i] >= 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

View File

@ -7513,6 +7513,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
if (pReq->alterOriDataLen > 0) { if (pReq->alterOriDataLen > 0) {
if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1; if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1;
} }
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
@ -7535,6 +7536,10 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1; if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1;
} }
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
}
tEndDecode(pCoder); tEndDecode(pCoder);
return 0; return 0;
} }
@ -7663,6 +7668,8 @@ int tEncodeSVCreateTbBatchReq(SEncoder *pCoder, const SVCreateTbBatchReq *pReq)
if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1; if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1;
} }
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
} }
@ -7677,6 +7684,10 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) {
if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1; if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
} }
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
}
tEndDecode(pCoder); tEndDecode(pCoder);
return 0; return 0;
} }
@ -8034,6 +8045,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
break; break;
} }
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1; if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->source) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return 0; return 0;
@ -8094,6 +8106,9 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (!tDecodeIsEnd(pDecoder)) { if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
} }
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI8(pDecoder, &pReq->source) < 0) return -1;
}
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
@ -8670,7 +8685,6 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
} }
} }
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1; if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1;
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
@ -8758,12 +8772,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
goto _exit; goto _exit;
} }
} }
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
tEndDecode(pCoder); tEndDecode(pCoder);

View File

@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmInt.h" #include "dmInt.h"
#include "systable.h" #include "systable.h"
#include "tgrant.h"
extern SConfig *tsCfg; extern SConfig *tsCfg;
@ -118,11 +117,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.memTotal = tsTotalMemoryKB * 1024; req.memTotal = tsTotalMemoryKB * 1024;
req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024; req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024;
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
char *machine = tGetMachineId(); tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
if (machine) {
tstrncpy(req.machineId, machine, TSDB_MACHINE_ID_LEN + 1);
taosMemoryFreeClear(machine);
}
req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0; req.clusterCfg.checkTime = 0;

View File

@ -22,6 +22,7 @@
#ifdef TD_TSZ #ifdef TD_TSZ
#include "tcompression.h" #include "tcompression.h"
#include "tglobal.h" #include "tglobal.h"
#include "tgrant.h"
#endif #endif
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
@ -137,6 +138,16 @@ int32_t dmInitVars(SDnode *pDnode) {
pData->rebootTime = taosGetTimestampMs(); pData->rebootTime = taosGetTimestampMs();
pData->dropped = 0; pData->dropped = 0;
pData->stopped = 0; pData->stopped = 0;
char *machineId = tGetMachineId();
if (machineId) {
tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1);
taosMemoryFreeClear(machineId);
} else {
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
return -1;
#endif
}
pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pData->dnodeHash == NULL) { if (pData->dnodeHash == NULL) {

View File

@ -109,6 +109,7 @@ typedef struct {
SMsgCb msgCb; SMsgCb msgCb;
bool validMnodeEps; bool validMnodeEps;
int64_t ipWhiteVer; int64_t ipWhiteVer;
char machineId[TSDB_MACHINE_ID_LEN + 1];
} SDnodeData; } SDnodeData;
typedef struct { typedef struct {

View File

@ -464,6 +464,7 @@ typedef struct {
char* pAst1; char* pAst1;
char* pAst2; char* pAst2;
SRWLatch lock; SRWLatch lock;
int8_t source;
} SStbObj; } SStbObj;
typedef struct { typedef struct {

View File

@ -19,16 +19,16 @@ int32_t mndGetTableIdx(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq); int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq);
int32_t mndSetCreateIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx); int32_t mndSetCreateIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx); int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetDropIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx); int32_t mndSetDropIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx); int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetAlterIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx); int32_t mndSetAlterIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx); int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_MND_IDX_H_*/ #endif /*_TD_MND_IDX_H_*/

View File

@ -41,7 +41,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
@ -256,17 +256,29 @@ _OVER:
return pRow; return pRow;
} }
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
SDbObj *pNewDb = pObj; SSdb *pSdb = pMnode->pSdb;
SSdbRow *pRow = NULL;
SDbObj *pNewDb = NULL;
int code = -1;
pRow = mndDbActionDecode(pRaw);
if (pRow == NULL) goto _OVER;
pNewDb = sdbGetRowObj(pRow);
if (pNewDb == NULL) goto _OVER;
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
if (pOldDb != NULL) { if (pOldDb != NULL) {
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
sdbRelease(pMnode->pSdb, pOldDb); sdbRelease(pMnode->pSdb, pOldDb);
return -1; goto _OVER;
} }
return 0; code = 0;
_OVER:
if (pNewDb) mndDbActionDelete(pSdb, pNewDb);
taosMemoryFreeClear(pRow);
return code;
} }
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
@ -884,10 +896,10 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
return terrno; return terrno;
} }
static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { static int32_t mndSetAlterDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld); SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw); sdbFreeRaw(pRedoRaw);
return -1; return -1;
} }
@ -943,7 +955,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
mndTransSetDbName(pTrans, pOld->name, NULL); mndTransSetDbName(pTrans, pOld->name, NULL);
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
@ -1120,10 +1132,10 @@ _OVER:
return code; return code;
} }
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { static int32_t mndSetDropDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb); SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
return 0; return 0;
@ -1257,7 +1269,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
goto _OVER; goto _OVER;
} }
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbPrepareLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ /*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ /*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/

View File

@ -141,7 +141,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN); memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
taosMemoryFreeClear(machineId); taosMemoryFreeClear(machineId);
} else { } else {
#ifdef TD_ENTERPRISE #if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE; terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
goto _OVER; goto _OVER;
#endif #endif
@ -410,9 +410,6 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
dInfo.ep.port = pDnode->port; dInfo.ep.port = pDnode->port;
dInfo.offlineReason = pDnode->offlineReason; dInfo.offlineReason = pDnode->offlineReason;
tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN); tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
tstrncpy(dInfo.active, pDnode->active, TSDB_ACTIVE_KEY_LEN);
tstrncpy(dInfo.connActive, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN);
tstrncpy(dInfo.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
if (mndIsMnode(pMnode, pDnode->id)) { if (mndIsMnode(pMnode, pDnode->id)) {
dInfo.isMnode = 1; dInfo.isMnode = 1;

View File

@ -331,10 +331,10 @@ SDbObj *mndAcquireDbByIdx(SMnode *pMnode, const char *idxName) {
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
} }
int32_t mndSetCreateIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) { int32_t mndSetCreateIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx); SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
return 0; return 0;
@ -349,10 +349,10 @@ int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx)
return 0; return 0;
} }
int32_t mndSetAlterIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) { int32_t mndSetAlterIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx); SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw); sdbFreeRaw(pRedoRaw);
return -1; return -1;
} }
@ -482,10 +482,10 @@ _OVER:
return code; return code;
} }
int32_t mndSetDropIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) { int32_t mndSetDropIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx); SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
return 0; return 0;
@ -652,7 +652,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
if (mndSetCreateIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetCreateIdxPrepareLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1) != 0) goto _OVER; if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1) != 0) goto _OVER;
@ -771,7 +771,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
if (mndSetDropIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetDropIdxPrepareLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0) != 0) goto _OVER; if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0) != 0) goto _OVER;

View File

@ -257,6 +257,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId); mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto _OVER; if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto _OVER;
@ -380,6 +381,7 @@ static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id); mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER; if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;

View File

@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) {
} }
} }
static bool countWindowStreamTask(SSubplan* pPlan) { static bool isCountWindowStreamTask(SSubplan* pPlan) {
SPhysiNode* pNode = pPlan->pNode; return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
return hasCountWindowNode(pNode);
} }
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
} }
} }
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) { static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
bool hasCountWindowNode = countWindowStreamTask(pPlan); bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0);
if (hasCountWindowNode && isRelStreamTask) { if (hasCountWindowNode && (!isFillhistoryTask)) {
SStreamStatus* pStatus = &pTask->status; SStreamStatus* pStatus = &pTask->status;
mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set", mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT)); pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
pStatus->taskStatus = TASK_STATUS__HALT; pStatus->taskStatus = TASK_STATUS__HALT;
} }
} }
@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) {
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey, static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) { SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
// new stream task
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam); SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
haltInitialTaskStatus(pTask, plan); if (pStream->conf.fillHistory) {
haltInitialTaskStatus(pTask, plan, isFillhistory);
}
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
terrno = code; terrno = code;
return terrno; return terrno;
} }
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }

View File

@ -257,6 +257,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-snode"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-snode");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId); mInfo("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
@ -383,6 +384,7 @@ static int32_t mndDropSnode(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-snode"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-snode");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop snode:%d", pTrans->id, pObj->id); mInfo("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER; if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;

View File

@ -458,6 +458,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.rollup = pStb->ast1Len > 0 ? 1 : 0;
req.alterOriData = alterOriData; req.alterOriData = alterOriData;
req.alterOriDataLen = alterOriDataLen; req.alterOriDataLen = alterOriDataLen;
req.source = pStb->source;
// todo // todo
req.schemaRow.nCols = pStb->numOfColumns; req.schemaRow.nCols = pStb->numOfColumns;
req.schemaRow.version = pStb->colVer; req.schemaRow.version = pStb->colVer;
@ -616,10 +617,10 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
return 0; return 0;
} }
static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { static int32_t mndSetCreateStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw); sdbFreeRaw(pRedoRaw);
return -1; return -1;
} }
@ -628,18 +629,6 @@ static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0; return 0;
} }
static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
sdbFreeRaw(pUndoRaw);
return -1;
}
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb); SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
@ -774,7 +763,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->createdTime = taosGetTimestampMs(); pDst->createdTime = taosGetTimestampMs();
pDst->updateTime = pDst->createdTime; pDst->updateTime = pDst->createdTime;
pDst->uid = pDst->uid =
(pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->dbUid = pDb->uid; pDst->dbUid = pDb->uid;
pDst->tagVer = 1; pDst->tagVer = 1;
pDst->colVer = 1; pDst->colVer = 1;
@ -790,6 +780,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->numOfFuncs = pCreate->numOfFuncs; pDst->numOfFuncs = pCreate->numOfFuncs;
pDst->commentLen = pCreate->commentLen; pDst->commentLen = pCreate->commentLen;
pDst->pFuncs = pCreate->pFuncs; pDst->pFuncs = pCreate->pFuncs;
pDst->source = pCreate->source;
pCreate->pFuncs = NULL; pCreate->pFuncs = NULL;
if (pDst->commentLen > 0) { if (pDst->commentLen > 0) {
@ -910,8 +901,6 @@ _OVER:
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTransCheckConflict(pMnode, pTrans) != 0) return -1; if (mndTransCheckConflict(pMnode, pTrans) != 0) return -1;
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
@ -1033,6 +1022,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
memcpy(pDst, pStb, sizeof(SStbObj)); memcpy(pDst, pStb, sizeof(SStbObj));
taosRUnLockLatch(&pStb->lock); taosRUnLockLatch(&pStb->lock);
pDst->source = createReq->source;
pDst->updateTime = taosGetTimestampMs(); pDst->updateTime = taosGetTimestampMs();
pDst->numOfColumns = createReq->numOfColumns; pDst->numOfColumns = createReq->numOfColumns;
pDst->numOfTags = createReq->numOfTags; pDst->numOfTags = createReq->numOfTags;
@ -1141,7 +1131,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
} }
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
goto _OVER; goto _OVER;
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) { } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) {
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
code = 0; code = 0;
goto _OVER; goto _OVER;
@ -1742,10 +1732,10 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO
return 0; return 0;
} }
static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { static int32_t mndSetAlterStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw); sdbFreeRaw(pRedoRaw);
return -1; return -1;
} }
@ -2151,7 +2141,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
mndTransSetRpcRsp(pTrans, pCont, contLen); mndTransSetRpcRsp(pTrans, pCont, contLen);
} }
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen) != 0) goto _OVER; if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
@ -2188,11 +2178,11 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) { if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
exist = true; exist = true;
} }
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (exist == true) { if (exist == true) {
if (mndSetDropIdxRedoLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER; if (mndSetDropIdxPrepareLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
if (mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER; if (mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
} }
@ -2211,13 +2201,13 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
exist = true; exist = true;
} }
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (exist == true) { if (exist == true) {
memcpy(idxObj.colName, nTagName, strlen(nTagName)); memcpy(idxObj.colName, nTagName, strlen(nTagName));
idxObj.colName[strlen(nTagName)] = 0; idxObj.colName[strlen(nTagName)] = 0;
if (mndSetAlterIdxRedoLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER; if (mndSetAlterIdxPrepareLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
if (mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER; if (mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
} }
@ -2350,10 +2340,10 @@ _OVER:
return code; return code;
} }
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { static int32_t mndSetDropStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw); sdbFreeRaw(pRedoRaw);
return -1; return -1;
} }
@ -2423,7 +2413,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbPrepareLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndDropIdxsByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndDropIdxsByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
@ -2572,7 +2562,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
} }
} }
if (dropReq.source == TD_REQ_FROM_TAOX && pStb->uid != dropReq.suid) { if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
code = 0; code = 0;
goto _OVER; goto _OVER;
} }
@ -3543,7 +3533,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER; if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;

View File

@ -77,29 +77,16 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
SSdbRaw *pRaw = pAction->pRaw; SSdbRaw *pRaw = pAction->pRaw;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SSdbRow *pRow = NULL; int code = 0;
void *pObj = NULL;
int code = -1;
if (pRaw->status != SDB_STATUS_CREATING) goto _OUT; if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
pRow = (pSdb->decodeFps[pRaw->type])(pRaw);
if (pRow == NULL) goto _OUT;
pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto _OUT;
SdbValidateFp validateFp = pSdb->validateFps[pRaw->type]; SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
code = 0;
if (validateFp) { if (validateFp) {
code = validateFp(pMnode, pTrans, pObj); code = validateFp(pMnode, pTrans, pRaw);
} }
_OUT: _OUT:
if (pRow) {
SdbDeleteFp deleteFp = pSdb->deleteFps[pRaw->type];
if (deleteFp) (*deleteFp)(pSdb, pRow->pObj, false);
taosMemoryFreeClear(pRow);
}
return code; return code;
} }

View File

@ -1419,7 +1419,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
pTrans->stage = TRN_STAGE_COMMIT; pTrans->stage = TRN_STAGE_COMMIT;
mInfo("trans:%d, stage from redoAction to commit", pTrans->id); mInfo("trans:%d, stage from redoAction to commit", pTrans->id);
continueExec = true; continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mInfo("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code)); mInfo("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
continueExec = false; continueExec = false;
} else { } else {

View File

@ -2484,7 +2484,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false); colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false);
char objName[20] = {0}; char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)objName, false); colDataSetVal(pColInfo, numOfRows, (const char *)objName, false);

View File

@ -31,10 +31,10 @@
#define VGROUP_VER_NUMBER 1 #define VGROUP_VER_NUMBER 1
#define VGROUP_RESERVE_SIZE 64 #define VGROUP_RESERVE_SIZE 64
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
@ -181,15 +181,28 @@ _OVER:
return pRow; return pRow;
} }
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
SVgObj *pVgroup = pObj; SSdb *pSdb = pMnode->pSdb;
SSdbRow *pRow = NULL;
SVgObj *pVgroup = NULL;
int code = -1;
pRow = mndVgroupActionDecode(pRaw);
if (pRow == NULL) goto _OVER;
pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OVER;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) { if (maxVgId > pVgroup->vgId) {
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
return -1; goto _OVER;
} }
return 0;
code = 0;
_OVER:
if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
taosMemoryFreeClear(pRow);
return code;
} }
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) { static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {

View File

@ -106,7 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj); typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, SSdbRaw *pRaw);
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);

View File

@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
continue; continue;
} }
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true); tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);

View File

@ -392,7 +392,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
pReader->msg.ver); pReader->msg.ver);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if ((pSubmitTbData->source & sourceExcluded) != 0) { if ((pSubmitTbData->flags & sourceExcluded) != 0) {
pReader->nextBlk += 1; pReader->nextBlk += 1;
continue; continue;
} }

View File

@ -267,7 +267,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
} }
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) { if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
goto loop_table; goto loop_table;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
@ -335,7 +335,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
} }
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) { if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
goto loop_db; goto loop_db;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {

View File

@ -816,7 +816,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return; return;
} }
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
@ -861,7 +861,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
pTask->execInfo.sink.numOfBlocks += 1; pTask->execInfo.sink.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
if (index == NULL) { // no data yet, append it if (index == NULL) { // no data yet, append it

View File

@ -171,6 +171,22 @@ end : {
} }
} }
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \
SDecoder decoder = {0};\
TYPE req = {0}; \
void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \
int32_t len = pHead->bodyLen - sizeof(SMsgHead); \
tDecoderInit(&decoder, data, len); \
if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \
fetchVer++; \
tDecoderClear(&decoder); \
continue; \
} \
tDecoderClear(&decoder);
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) { SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0; int code = 0;
@ -239,6 +255,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto end; goto end;
} }
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq)
} else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq)
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq)
} else if (pHead->msgType == TDMT_VND_DELETE) {
fetchVer++;
continue;
}
}
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;

View File

@ -2543,7 +2543,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
while (1) { while (1) {
// only check here, since the iterate data in memory is very fast. // only check here, since the iterate data in memory is very fast.
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
return pReader->code; return pReader->code;
} }
@ -2694,7 +2694,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
while (1) { while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
return pReader->code; return pReader->code;
} }
@ -2909,7 +2909,7 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_
while (1) { while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
return pReader->code; return pReader->code;
} }
@ -2951,7 +2951,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en
while (1) { while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
return pReader->code; return pReader->code;
} }

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "vnd.h"
#include "tsdb.h" #include "tsdb.h"
#include "vnd.h"
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ #define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
do { \ do { \
@ -49,7 +49,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
// decode req // decode req
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) { if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit4;
} }
metaRsp.dbId = pVnode->config.dbId; metaRsp.dbId = pVnode->config.dbId;
@ -59,7 +59,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName); sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
code = vnodeValidateTableHash(pVnode, tableFName); code = vnodeValidateTableHash(pVnode, tableFName);
if (code) { if (code) {
goto _exit; goto _exit4;
} }
// query meta // query meta
@ -67,7 +67,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) { if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
code = terrno; code = terrno;
goto _exit; goto _exit3;
} }
metaRsp.tableType = mer1.me.type; metaRsp.tableType = mer1.me.type;
@ -81,7 +81,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaRsp.suid = mer1.me.uid; metaRsp.suid = mer1.me.uid;
} else if (mer1.me.type == TSDB_CHILD_TABLE) { } else if (mer1.me.type == TSDB_CHILD_TABLE) {
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2;
strcpy(metaRsp.stbName, mer2.me.name); strcpy(metaRsp.stbName, mer2.me.name);
metaRsp.suid = mer2.me.uid; metaRsp.suid = mer2.me.uid;
@ -125,6 +125,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
_exit: _exit:
taosMemoryFree(metaRsp.pSchemas);
_exit2:
metaReaderClear(&mer2);
_exit3:
metaReaderClear(&mer1);
_exit4:
rpcMsg.info = pMsg->info; rpcMsg.info = pMsg->info;
rpcMsg.pCont = pRsp; rpcMsg.pCont = pRsp;
rpcMsg.contLen = rspLen; rpcMsg.contLen = rspLen;
@ -141,9 +147,6 @@ _exit:
*pMsg = rpcMsg; *pMsg = rpcMsg;
} }
taosMemoryFree(metaRsp.pSchemas);
metaReaderClear(&mer2);
metaReaderClear(&mer1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -706,5 +709,5 @@ void *vnodeGetIvtIdx(void *pVnode) {
} }
int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) { int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) {
return tsdbGetTableSchema(((SVnode*)pVnode)->pMeta, uid, pSchema, suid); return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid);
} }

View File

@ -99,6 +99,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
(void**)&pCurWin->winInfo.pStatePos, &size); (void**)&pCurWin->winInfo.pStatePos, &size);
} }
} }
if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
pBuffInfo->rebuildWindow = true;
}
} else { } else {
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist( code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(
pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size);
@ -115,8 +118,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
} }
} }
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
SSHashObj* pStDeleted, bool* pRebuild) { SSessionKey key = {0};
getSessionHashKey(pKey, &key);
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
}
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
SSHashObj* pStDeleted, bool* pRebuild) {
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
int32_t num = 0; int32_t num = 0;
for (int32_t i = start; i < rows; i++) { for (int32_t i = start; i < rows; i++) {
@ -148,6 +159,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
if (needDelState) { if (needDelState) {
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
if (pWinInfo->winInfo.pStatePos->needFree) { if (pWinInfo->winInfo.pStatePos->needFree) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey); pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
} }
@ -242,7 +254,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo); setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
slidingRows = *curWin.pWindowCount; slidingRows = *curWin.pWindowCount;
if (!buffInfo.rebuildWindow) { if (!buffInfo.rebuildWindow) {
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow); winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
pStDeleted, &buffInfo.rebuildWindow);
} }
if (buffInfo.rebuildWindow) { if (buffInfo.rebuildWindow) {
SSessionKey range = {0}; SSessionKey range = {0};

View File

@ -1125,8 +1125,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t firstRowTs = *(int64_t*)tsCol->pData; int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { (pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
continue; if (bExtractedBlock) {
} blockDataDestroy(pBlk);
}
continue;
}
} }
if (pBlk != NULL) { if (pBlk != NULL) {
@ -1149,10 +1152,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
tSimpleHashClear(mUidBlk); tSimpleHashClear(mUidBlk);
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tSimpleHashCleanup(mUidBlk); for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
taosArrayDestroy(aBlkSort); blockDataDestroy(taosArrayGetP(aBlkSort, i));
taosArrayDestroy(aExtSrc); }
return code; taosArrayClear(aBlkSort);
break;
} }
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
@ -1165,6 +1169,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
szSort = 0; szSort = 0;
qDebug("source %zu created", taosArrayGetSize(aExtSrc)); qDebug("source %zu created", taosArrayGetSize(aExtSrc));
} }
if (pBlk == NULL) { if (pBlk == NULL) {
break; break;
} }
@ -1180,6 +1185,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
} }
tSimpleHashCleanup(mUidBlk); tSimpleHashCleanup(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayDestroy(aBlkSort); taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
if (!tsortIsClosed(pHandle)) { if (!tsortIsClosed(pHandle)) {
@ -1188,7 +1196,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
taosArrayDestroy(aExtSrc); taosArrayDestroy(aExtSrc);
tSimpleHashCleanup(mTableNumRows); tSimpleHashCleanup(mTableNumRows);
pHandle->type = SORT_SINGLESOURCE_SORT; pHandle->type = SORT_SINGLESOURCE_SORT;
return TSDB_CODE_SUCCESS; return code;
} }
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {

View File

@ -175,4 +175,8 @@ IF(NOT TD_DARWIN)
NAME idxFstUT NAME idxFstUT
COMMAND idxFstUT COMMAND idxFstUT
) )
add_test(
NAME idxFstTest
COMMAND idxFstTest
)
ENDIF () ENDIF ()

View File

@ -1,4 +1,5 @@
#include <gtest/gtest.h>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
#include <string> #include <string>
@ -14,6 +15,12 @@
#include "tutil.h" #include "tutil.h"
void* callback(void* s) { return s; } void* callback(void* s) { return s; }
class FstEnv : public ::testing::Test {
protected:
virtual void SetUp() {}
virtual void TearDown() {}
};
static std::string fileName = TD_TMP_DIR_PATH "tindex.tindex"; static std::string fileName = TD_TMP_DIR_PATH "tindex.tindex";
class FstWriter { class FstWriter {
public: public:
@ -154,7 +161,7 @@ class FstReadMemory {
int32_t _size; int32_t _size;
}; };
#define L 100 #define L 200
#define M 100 #define M 100
#define N 100 #define N 100
@ -200,7 +207,7 @@ void checkMillonWriteAndReadOfFst() {
FstWriter* fw = new FstWriter; FstWriter* fw = new FstWriter;
Performance_fstWriteRecords(fw); Performance_fstWriteRecords(fw);
delete fw; delete fw;
FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024); FstReadMemory* fr = new FstReadMemory(1024 * 8 * 1024);
if (fr->init()) { if (fr->init()) {
printf("success to init fst read"); printf("success to init fst read");
@ -637,23 +644,31 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
tfileIteratorDestroy(iter); tfileIteratorDestroy(iter);
} }
int main(int argc, char* argv[]) { // int main(int argc, char* argv[]) {
// tool to check all kind of fst test // // tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); } // // if (argc > 1) { validateTFile(argv[1]); }
// if (argc > 4) { // // if (argc > 4) {
// path suid colName ver // // path suid colName ver
// iterTFileReader(argv[1], argv[2], argv[3], argv[4]); // // iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
//} // //}
checkFstCheckIterator1(); // checkFstCheckIterator1();
// checkFstCheckIterator2(); // // checkFstCheckIterator2();
// checkFstCheckIteratorPrefix(); // // checkFstCheckIteratorPrefix();
// checkFstCheckIteratorRange1(); // // checkFstCheckIteratorRange1();
// checkFstCheckIteratorRange2(); // // checkFstCheckIteratorRange2();
// checkFstCheckIteratorRange3(); // // checkFstCheckIteratorRange3();
// checkFstLongTerm(); // // checkFstLongTerm();
// checkFstPrefixSearch(); // // checkFstPrefixSearch();
// checkMillonWriteAndReadOfFst(); // // checkMillonWriteAndReadOfFst();
return 1; // return 1;
} // }
TEST_F(FstEnv, checkIterator1) { checkFstCheckIterator1(); }
TEST_F(FstEnv, checkItertor2) { checkFstCheckIterator2(); }
TEST_F(FstEnv, checkPrefix) { checkFstCheckIteratorPrefix(); }
TEST_F(FstEnv, checkRange1) { checkFstCheckIteratorRange1(); }
TEST_F(FstEnv, checkRange2) { checkFstCheckIteratorRange2(); }
TEST_F(FstEnv, checkRange3) { checkFstCheckIteratorRange3(); }
TEST_F(FstEnv, checkLongTerm) { checkFstLongTerm(); }
TEST_F(FstEnv, checkMillonWriteData) { checkMillonWriteAndReadOfFst(); }

View File

@ -285,7 +285,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
pTmp->suid = pSrc->suid; pTmp->suid = pSrc->suid;
pTmp->uid = pSrc->uid; pTmp->uid = pSrc->uid;
pTmp->sver = pSrc->sver; pTmp->sver = pSrc->sver;
pTmp->source = pSrc->source;
pTmp->pCreateTbReq = NULL; pTmp->pCreateTbReq = NULL;
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
if (pSrc->pCreateTbReq) { if (pSrc->pCreateTbReq) {
@ -653,7 +652,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
pTableCxt->pData->source = SOURCE_TAOSX; pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
if(tmp == NULL){ if(tmp == NULL){
ret = initTableColSubmitData(pTableCxt); ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
@ -721,7 +720,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == 1) { if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[j]); pStart += htonl(colLength[j]);
} else { } else {
pStart += colLength[j]; pStart += colLength[j];
@ -752,7 +751,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == 1) { if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[i]); pStart += htonl(colLength[i]);
} else { } else {
pStart += colLength[i]; pStart += colLength[i];

View File

@ -488,6 +488,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey); SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
if (pCur) { if (pCur) {
pCur->pStreamFileState = pFileState;
SSessionKey key = {0}; SSessionKey key = {0};
void* pVal = NULL; void* pVal = NULL;
int len = 0; int len = 0;
@ -736,6 +737,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
void* pRockVal = NULL; void* pRockVal = NULL;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
streamStateFreeCur(pCur);
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
@ -743,7 +745,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
streamStateFreeCur(pCur);
goto _end; goto _end;
} }
} }
@ -751,7 +752,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
pWinKey->win.ekey = endTs; pWinKey->win.ekey = endTs;
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL); (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
taosMemoryFree(pRockVal); taosMemoryFree(pRockVal);
streamStateFreeCur(pCur);
} else { } else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;

View File

@ -1329,7 +1329,6 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
char *data = taosMemoryMalloc(compressSize); char *data = taosMemoryMalloc(compressSize);
gzFile dstFp = NULL; gzFile dstFp = NULL;
TdFilePtr pFile = NULL;
TdFilePtr pSrcFile = NULL; TdFilePtr pSrcFile = NULL;
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM); pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
@ -1369,8 +1368,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
} }
cmp_end: cmp_end:
if (pFile) { if (fd >= 0) {
taosCloseFile(&pFile); close(fd);
} }
if (pSrcFile) { if (pSrcFile) {
taosCloseFile(&pSrcFile); taosCloseFile(&pSrcFile);

View File

@ -18,24 +18,29 @@
#include <math.h> #include <math.h>
#include <stdbool.h> #include <stdbool.h>
#define BASE_BUF_SIZE 256 #define TBASE_BUF_SIZE 256
static const char *basis_58 = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"; static const char *basis_58 = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz";
char *base58_encode(const uint8_t *value, int32_t vlen) { char *base58_encode(const uint8_t *value, int32_t vlen) {
const uint8_t *pb = value; const uint8_t *pb = value;
const uint8_t *pe = pb + vlen; const uint8_t *pe = pb + vlen;
uint8_t buf[BASE_BUF_SIZE] = {0}; uint8_t buf[TBASE_BUF_SIZE] = {0};
uint8_t *pbuf = &buf[0]; uint8_t *pbuf = &buf[0];
bool bfree = false; bool bfree = false;
int32_t nz = 0, size = 0, len = 0; int32_t nz = 0, size = 0, len = 0;
if (vlen > TBASE_MAX_ILEN) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
while (pb != pe && *pb == 0) { while (pb != pe && *pb == 0) {
++pb; ++pb;
++nz; ++nz;
} }
size = (pe - pb) * 69 / 50 + 1; size = (pe - pb) * 69 / 50 + 1;
if (size > BASE_BUF_SIZE) { if (size > TBASE_BUF_SIZE) {
if (!(pbuf = taosMemoryCalloc(1, size))) { if (!(pbuf = taosMemoryCalloc(1, size))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -47,7 +52,7 @@ char *base58_encode(const uint8_t *value, int32_t vlen) {
int32_t num = *pb; int32_t num = *pb;
int32_t i = 0; int32_t i = 0;
for (int32_t j = (int32_t)size - 1; (num != 0 || i < len) && j >= 0; --j, ++i) { for (int32_t j = (int32_t)size - 1; (num != 0 || i < len) && j >= 0; --j, ++i) {
num += ((int32_t)buf[j]) << 8; num += ((int32_t)pbuf[j]) << 8;
pbuf[j] = num % 58; pbuf[j] = num % 58;
num /= 58; num /= 58;
} }
@ -57,7 +62,7 @@ char *base58_encode(const uint8_t *value, int32_t vlen) {
const uint8_t *pi = pbuf + (size - len); const uint8_t *pi = pbuf + (size - len);
while (pi != pbuf + size && *pi == 0) ++pi; while (pi != pbuf + size && *pi == 0) ++pi;
uint8_t *result = taosMemoryCalloc(1, size + 1); uint8_t *result = taosMemoryCalloc(1, nz + (pbuf + size - pi) + 1);
if (!result) { if (!result) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
if (bfree) taosMemoryFree(pbuf); if (bfree) taosMemoryFree(pbuf);
@ -82,20 +87,35 @@ static const signed char index_58[256] = {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}; -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) { uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) {
const char *pb = value;
const char *pe = value + inlen; const char *pe = value + inlen;
uint8_t buf[BASE_BUF_SIZE] = {0}; uint8_t buf[TBASE_BUF_SIZE] = {0};
uint8_t *pbuf = &buf[0]; uint8_t *pbuf = &buf[0];
bool bfree = false; bool bfree = false;
int32_t nz = 0, size = 0, len = 0; int32_t nz = 0, size = 0, len = 0;
while (*value && isspace(*value)) ++value; if (inlen > TBASE_MAX_OLEN) {
while (*value == '1') { terrno = TSDB_CODE_INVALID_PARA;
++nz; return NULL;
++value;
} }
size = (int32_t)(pe - value) * 733 / 1000 + 1; while (pb != pe) {
if (size > BASE_BUF_SIZE) { if (*pb == 0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
++pb;
}
pb = value;
while (pb != pe && *pb && isspace(*pb)) ++pb;
while (pb != pe && *pb == '1') {
++nz;
++pb;
}
size = (int32_t)(pe - pb) * 733 / 1000 + 1;
if (size > TBASE_BUF_SIZE) {
if (!(pbuf = taosMemoryCalloc(1, size))) { if (!(pbuf = taosMemoryCalloc(1, size))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -103,9 +123,10 @@ uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) {
bfree = true; bfree = true;
} }
while (*value && !isspace(*value)) { while (pb != pe && *pb && !isspace(*pb)) {
int32_t num = index_58[(uint8_t)*value]; int32_t num = index_58[(uint8_t)*pb];
if (num == -1) { if (num == -1) {
terrno = TSDB_CODE_INVALID_PARA;
if (bfree) taosMemoryFree(pbuf); if (bfree) taosMemoryFree(pbuf);
return NULL; return NULL;
} }
@ -116,18 +137,18 @@ uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) {
num >>= 8; num >>= 8;
} }
len = i; len = i;
++value; ++pb;
} }
while (isspace(*value)) ++value; while (pb != pe && isspace(*pb)) ++pb;
if (*value != 0) { if (*pb != 0) {
if (bfree) taosMemoryFree(pbuf); if (bfree) taosMemoryFree(pbuf);
return NULL; return NULL;
} }
const uint8_t *it = pbuf + (size - len); const uint8_t *it = pbuf + (size - len);
while (it != pbuf + size && *it == 0) ++it; while (it != pbuf + size && *it == 0) ++it;
uint8_t *result = taosMemoryCalloc(1, size + 1); uint8_t *result = taosMemoryCalloc(1, nz + (pbuf + size - it) + 1);
if (!result) { if (!result) {
if (bfree) taosMemoryFree(pbuf); if (bfree) taosMemoryFree(pbuf);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -21,7 +21,7 @@
#include "tutil.h" #include "tutil.h"
static int32_t initForwardBackwardPtr(SSkipList *pSkipList); static int32_t initForwardBackwardPtr(SSkipList *pSkipList);
static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur); static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur);
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
static void tSkipListCorrectLevel(SSkipList *pSkipList); static void tSkipListCorrectLevel(SSkipList *pSkipList);
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
@ -131,12 +131,14 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
return pNode; return pNode;
} }
#ifdef BUILD_NO_CALL
void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) { void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) {
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
bool hasDup = false; bool hasDup = false;
char *pKey = NULL; char * pKey = NULL;
char *pDataKey = NULL; char * pDataKey = NULL;
int32_t compare = 0; int32_t compare = 0;
tSkipListWLock(pSkipList); tSkipListWLock(pSkipList);
@ -260,6 +262,7 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
tSkipListCorrectLevel(pSkipList); tSkipListCorrectLevel(pSkipList);
tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
} }
#endif
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) { SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) {
if (pSkipList == NULL) return NULL; if (pSkipList == NULL) return NULL;
@ -350,6 +353,7 @@ void *tSkipListDestroyIter(SSkipListIterator *iter) {
return NULL; return NULL;
} }
#ifdef BUILD_NO_CALL
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) { if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) {
return; return;
@ -358,7 +362,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
int32_t id = 1; int32_t id = 1;
char *prev = NULL; char * prev = NULL;
while (p != pSkipList->pTail) { while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p); char *key = SL_GET_NODE_KEY(pSkipList, p);
@ -392,6 +396,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1); p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1);
} }
} }
#endif
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) { static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) {
for (int32_t i = 0; i < pNode->level; ++i) { for (int32_t i = 0; i < pNode->level; ++i) {
@ -460,7 +465,7 @@ static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) {
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) { static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {
int32_t compare = 0; int32_t compare = 0;
bool hasDupKey = false; bool hasDupKey = false;
char *pDataKey = pSkipList->keyFn(pData); char * pDataKey = pSkipList->keyFn(pData);
if (pSkipList->size == 0) { if (pSkipList->size == 0) {
for (int32_t i = 0; i < pSkipList->maxLevel; i++) { for (int32_t i = 0; i < pSkipList->maxLevel; i++) {
@ -516,6 +521,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
return hasDupKey; return hasDupKey;
} }
#ifdef BUILD_NO_CALL
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) {
int32_t level = pNode->level; int32_t level = pNode->level;
uint8_t dupMode = SL_DUP_MODE(pSkipList); uint8_t dupMode = SL_DUP_MODE(pSkipList);
@ -540,6 +546,7 @@ static void tSkipListCorrectLevel(SSkipList *pSkipList) {
pSkipList->level -= 1; pSkipList->level -= 1;
} }
} }
#endif
UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList,
int32_t level) { // record link count in each level int32_t level) { // record link count in each level

View File

@ -100,3 +100,11 @@ add_test(
NAME talgoTest NAME talgoTest
COMMAND talgoTest COMMAND talgoTest
) )
# tbaseCodecTest
add_executable(tbaseCodecTest "tbaseCodecTest.cpp")
target_link_libraries(tbaseCodecTest os util common gtest_main)
add_test(
NAME tbaseCodecTest
COMMAND tbaseCodecTest
)

View File

@ -0,0 +1,83 @@
#include <gtest/gtest.h>
#include <cassert>
#include <iostream>
#include "os.h"
#include "osTime.h"
#include "taos.h"
#include "taoserror.h"
#include "tbase58.h"
#include "tglobal.h"
using namespace std;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) {
int64_t start = taosGetTimestampUs();
char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen);
ASSERT_NE(nullptr, pEnc);
int32_t encLen = strlen(pEnc);
int64_t endOfEnc = taosGetTimestampUs();
std::cout << "index:" << index << ", encLen is " << encLen << ", cost:" << endOfEnc - start << " us" << std::endl;
int32_t decLen = 0;
char *pDec = (char *)base58_decode((const char *)pEnc, encLen, &decLen);
std::cout << "index:" << index << ", decLen is " << decLen << ", cost:" << taosGetTimestampUs() - endOfEnc << " us"
<< std::endl;
ASSERT_NE(nullptr, pDec);
ASSERT_EQ(rawLen, decLen);
ASSERT_LE(rawLen, encLen);
ASSERT_EQ(0, strncmp((char *)pRaw, pDec, rawLen));
taosMemoryFreeClear(pDec);
taosMemoryFreeClear(pEnc);
}
TEST(TD_BASE_CODEC_TEST, tbase58_test) {
const int32_t TEST_LEN_MAX = TBASE_MAX_ILEN;
const int32_t TEST_LEN_STEP = 10;
int32_t rawLen = 0;
uint8_t *pRaw = NULL;
pRaw = (uint8_t *)taosMemoryCalloc(1, TEST_LEN_MAX);
ASSERT_NE(nullptr, pRaw);
// 1. normal case
// string blend with char and '\0'
rawLen = TEST_LEN_MAX;
for (int32_t i = 0; i < TEST_LEN_MAX; i += 500) {
checkBase58Codec(pRaw, rawLen, i);
pRaw[i] = i & 127;
}
// string without '\0'
for (int32_t i = 0; i < TEST_LEN_MAX; ++i) {
pRaw[i] = i & 127;
}
checkBase58Codec(pRaw, TEST_LEN_MAX, 0);
for (int32_t i = 0; i < TEST_LEN_MAX; i += 500) {
rawLen = i;
checkBase58Codec(pRaw, rawLen, i);
}
taosMemoryFreeClear(pRaw);
ASSERT_EQ(nullptr, pRaw);
// 2. overflow case
char tmp[1];
char *pEnc = base58_encode((const uint8_t *)tmp, TBASE_MAX_ILEN + 1);
ASSERT_EQ(nullptr, pEnc);
char *pDec = (char *)base58_decode((const char *)tmp, TBASE_MAX_OLEN + 1, NULL);
ASSERT_EQ(nullptr, pDec);
taosMemoryFreeClear(pRaw);
ASSERT_EQ(nullptr, pRaw);
}

View File

@ -25,6 +25,7 @@ from frame.cases import *
from frame.sql import * from frame.sql import *
from frame.caseBase import * from frame.caseBase import *
from frame import * from frame import *
from frame.srvCtl import *
class TDTestCase(TBase): class TDTestCase(TBase):
@ -65,6 +66,21 @@ class TDTestCase(TBase):
sql = f"select avg(dc) from {self.db}.{self.stb}" sql = f"select avg(dc) from {self.db}.{self.stb}"
tdSql.checkFirstValue(sql, 200) tdSql.checkFirstValue(sql, 200)
def alterReplica3(self):
sql = f"alter database {self.db} replica 3"
tdSql.execute(sql, show=True)
time.sleep(2)
sc.dnodeStop(2)
sc.dnodeStop(3)
time.sleep(5)
sc.dnodeStart(2)
sc.dnodeStart(3)
if self.waitTransactionZero() is False:
tdLog.exit(f"{sql} transaction not finished")
return False
return True
def doAction(self): def doAction(self):
tdLog.info(f"do action.") tdLog.info(f"do action.")
self.flushDb() self.flushDb()
@ -81,7 +97,7 @@ class TDTestCase(TBase):
self.alterReplica(1) self.alterReplica(1)
self.checkAggCorrect() self.checkAggCorrect()
self.compactDb() self.compactDb()
self.alterReplica(3) self.alterReplica3()
vgids = self.getVGroup(self.db) vgids = self.getVGroup(self.db)
selid = random.choice(vgids) selid = random.choice(vgids)

View File

@ -28,12 +28,12 @@ from frame import *
from frame.eos import * from frame.eos import *
# #
# 192.168.1.52 MINIO S3 API KEY: MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa # 192.168.1.52 MINIO S3
# #
''' '''
s3EndPoint http://192.168.1.52:9000 s3EndPoint http://192.168.1.52:9000
s3AccessKey MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX'
s3BucketName ci-bucket s3BucketName ci-bucket
s3UploadDelaySec 60 s3UploadDelaySec 60
''' '''
@ -42,7 +42,7 @@ s3UploadDelaySec 60
class TDTestCase(TBase): class TDTestCase(TBase):
updatecfgDict = { updatecfgDict = {
's3EndPoint': 'http://192.168.1.52:9000', 's3EndPoint': 'http://192.168.1.52:9000',
's3AccessKey': 'MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa', 's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
's3BucketName': 'ci-bucket', 's3BucketName': 'ci-bucket',
's3BlockSize': '10240', 's3BlockSize': '10240',
's3BlockCacheSize': '320', 's3BlockCacheSize': '320',
@ -78,14 +78,27 @@ class TDTestCase(TBase):
self.trimDb(True) self.trimDb(True)
rootPath = sc.clusterRootPath() rootPath = sc.clusterRootPath()
cmd = f"ls {rootPath}/dnode1/data20/vnode/vnode*/tsdb/*.data" cmd = f"ls {rootPath}/dnode1/data2*/vnode/vnode*/tsdb/*.data"
tdLog.info(cmd) tdLog.info(cmd)
loop = 0 loop = 0
while len(eos.runRetList(cmd)) > 0 and loop < 40: rets = []
time.sleep(5) while loop < 180:
time.sleep(3)
rets = eos.runRetList(cmd)
cnt = len(rets)
if cnt == 0:
tdLog.info("All data file upload to server over.")
break
self.trimDb(True) self.trimDb(True)
tdLog.info(f"loop={loop} no upload {cnt} data files wait 3s retry ...")
if loop == 0:
sc.dnodeStop(1)
time.sleep(2)
sc.dnodeStart(1)
loop += 1 loop += 1
tdLog.info(f"loop={loop} wait 5s...")
if len(rets) > 0:
tdLog.exit(f"s3 can not upload all data to server. data files cnt={len(rets)} list={rets}")
def checkStreamCorrect(self): def checkStreamCorrect(self):
sql = f"select count(*) from {self.db}.stm1" sql = f"select count(*) from {self.db}.stm1"

View File

@ -33,14 +33,14 @@ class srvCtl:
# control server # control server
# #
# start # start idx base is 1
def dnodeStart(self, idx): def dnodeStart(self, idx):
if clusterDnodes.getModel() == 'cluster': if clusterDnodes.getModel() == 'cluster':
return clusterDnodes.starttaosd(idx) return clusterDnodes.starttaosd(idx)
return tdDnodes.starttaosd(idx) return tdDnodes.starttaosd(idx)
# stop # stop idx base is 1
def dnodeStop(self, idx): def dnodeStop(self, idx):
if clusterDnodes.getModel() == 'cluster': if clusterDnodes.getModel() == 'cluster':
return clusterDnodes.stoptaosd(idx) return clusterDnodes.stoptaosd(idx)

View File

@ -49,6 +49,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tms_memleak.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3

View File

@ -0,0 +1,51 @@
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import tdDnodes
from math import inf
class TDTestCase:
def caseDescription(self):
'''
case1<shenglian zhou>: [TD-]
'''
return
def init(self, conn, logSql, replicaVer=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
self.conn = conn
def restartTaosd(self, index=1, dbname="db"):
tdDnodes.stop(index)
tdDnodes.startWithoutSleep(index)
tdSql.execute(f"use tms_memleak")
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists tms_memleak")
tdSql.execute("create database if not exists tms_memleak")
tdSql.execute('use tms_memleak')
tdSql.execute('create table st(ts timestamp, f int) tags (t int);')
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1)('2021-04-19 00:00:02', 2)('2021-04-19 00:00:03', 3)('2021-04-19 00:00:04', 4)")
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-20 00:00:01', 5)('2021-04-20 00:00:02', 6)('2021-04-20 00:00:03', 7)('2021-04-20 00:00:04', 8)")
tdSql.execute("insert into ct3 using st tags(3) values('2021-04-21 00:00:01', 5)('2021-04-21 00:00:02', 6)('2021-04-21 00:00:03', 7)('2021-04-21 00:00:04', 8)")
tdSql.execute("insert into ct4 using st tags(4) values('2021-04-22 00:00:01', 5)('2021-04-22 00:00:02', 6)('2021-04-22 00:00:03', 7)('2021-04-22 00:00:04', 8)")
tdSql.query("select * from st order by ts limit 1 ");
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1);
tdSql.execute('drop database tms_memleak')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -323,7 +323,7 @@ class TDTestCase:
tdSql.query("select * from st") tdSql.query("select * from st")
tdSql.checkRows(8) tdSql.checkRows(8)
tdSql.execute(f'create topic topic_excluded with meta as database d1') tdSql.execute(f'create topic topic_all with meta as database d1')
consumer_dict = { consumer_dict = {
"group.id": "g1", "group.id": "g1",
"td.connect.user": "root", "td.connect.user": "root",
@ -333,7 +333,7 @@ class TDTestCase:
consumer = Consumer(consumer_dict) consumer = Consumer(consumer_dict)
try: try:
consumer.subscribe(["topic_excluded"]) consumer.subscribe(["topic_all"])
except TmqError: except TmqError:
tdLog.exit(f"subscribe error") tdLog.exit(f"subscribe error")

View File

@ -20,6 +20,7 @@
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
#include "types.h" #include "types.h"
#include "tmsg.h"
static int running = 1; static int running = 1;
TdFilePtr g_fp = NULL; TdFilePtr g_fp = NULL;
@ -966,7 +967,14 @@ void testConsumeExcluded(int topic_type){
tmq_raw_data raw = {0}; tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw); tmq_get_raw(msg, &raw);
if(topic_type == 1){ if(topic_type == 1){
assert(raw.raw_type != 2 && raw.raw_type != 4); assert(raw.raw_type != 2 && raw.raw_type != 4 &&
raw.raw_type != TDMT_VND_CREATE_STB &&
raw.raw_type != TDMT_VND_ALTER_STB &&
raw.raw_type != TDMT_VND_CREATE_TABLE &&
raw.raw_type != TDMT_VND_ALTER_TABLE &&
raw.raw_type != TDMT_VND_DELETE);
assert(raw.raw_type == TDMT_VND_DROP_STB ||
raw.raw_type == TDMT_VND_DROP_TABLE);
}else if(topic_type == 2){ }else if(topic_type == 2){
assert(0); assert(0);
} }