fix:add excluded msg for meta in tmq
This commit is contained in:
parent
036b549262
commit
6e13e4aa63
|
@ -207,9 +207,6 @@ typedef enum _mgmt_table {
|
||||||
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
|
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
|
||||||
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
|
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
|
||||||
|
|
||||||
#define TD_REQ_FROM_APP 0
|
|
||||||
#define TD_REQ_FROM_TAOX 1
|
|
||||||
|
|
||||||
typedef enum ENodeType {
|
typedef enum ENodeType {
|
||||||
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
|
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
|
||||||
// VALUE, OPERATOR, FUNCTION and so on.
|
// VALUE, OPERATOR, FUNCTION and so on.
|
||||||
|
@ -759,7 +756,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
int8_t source; // 1-taosX or 0-taosClient
|
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||||
int8_t reserved[6];
|
int8_t reserved[6];
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
int64_t delay1;
|
int64_t delay1;
|
||||||
|
@ -802,7 +799,7 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
int8_t source; // 1-taosX or 0-taosClient
|
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||||
int8_t reserved[6];
|
int8_t reserved[6];
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
|
@ -2661,6 +2658,7 @@ typedef struct SVCreateStbReq {
|
||||||
SRSmaParam rsmaParam;
|
SRSmaParam rsmaParam;
|
||||||
int32_t alterOriDataLen;
|
int32_t alterOriDataLen;
|
||||||
void* alterOriData;
|
void* alterOriData;
|
||||||
|
int8_t source;
|
||||||
} SVCreateStbReq;
|
} SVCreateStbReq;
|
||||||
|
|
||||||
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
|
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
|
||||||
|
@ -2730,6 +2728,7 @@ typedef struct {
|
||||||
SVCreateTbReq* pReqs;
|
SVCreateTbReq* pReqs;
|
||||||
SArray* pArray;
|
SArray* pArray;
|
||||||
};
|
};
|
||||||
|
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||||
} SVCreateTbBatchReq;
|
} SVCreateTbBatchReq;
|
||||||
|
|
||||||
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
|
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
|
||||||
|
@ -2822,6 +2821,7 @@ typedef struct {
|
||||||
int32_t newCommentLen;
|
int32_t newCommentLen;
|
||||||
char* newComment;
|
char* newComment;
|
||||||
int64_t ctimeMs; // fill by vnode
|
int64_t ctimeMs; // fill by vnode
|
||||||
|
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||||
} SVAlterTbReq;
|
} SVAlterTbReq;
|
||||||
|
|
||||||
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
|
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
|
||||||
|
@ -3919,12 +3919,13 @@ int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
|
||||||
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||||
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||||
|
|
||||||
|
#define TD_REQ_FROM_APP 0x0
|
||||||
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
||||||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||||
#define SUBMIT_REQ_FROM_FILE 0x4
|
#define SUBMIT_REQ_FROM_FILE 0x4
|
||||||
|
#define TD_REQ_FROM_TAOX 0x8
|
||||||
|
|
||||||
#define SOURCE_NULL 0
|
#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
|
||||||
#define SOURCE_TAOSX 1
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t flags;
|
int32_t flags;
|
||||||
|
@ -3937,7 +3938,6 @@ typedef struct {
|
||||||
SArray* aCol;
|
SArray* aCol;
|
||||||
};
|
};
|
||||||
int64_t ctimeMs;
|
int64_t ctimeMs;
|
||||||
int8_t source;
|
|
||||||
} SSubmitTbData;
|
} SSubmitTbData;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1001,6 +1001,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
|
|
||||||
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
|
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
|
||||||
taosArrayPush(tBatch.req.pArray, pCreateReq);
|
taosArrayPush(tBatch.req.pArray, pCreateReq);
|
||||||
|
tBatch.req.source = TD_REQ_FROM_TAOX;
|
||||||
|
|
||||||
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
|
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
|
||||||
} else { // add to the correct vgroup
|
} else { // add to the correct vgroup
|
||||||
|
@ -1276,7 +1277,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
SVAlterTbReq req = {0};
|
SVAlterTbReq req = {0};
|
||||||
SDecoder coder = {0};
|
SDecoder dcoder = {0};
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
|
@ -1297,8 +1298,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
// decode and process req
|
// decode and process req
|
||||||
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
|
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
|
||||||
int32_t len = metaLen - sizeof(SMsgHead);
|
int32_t len = metaLen - sizeof(SMsgHead);
|
||||||
tDecoderInit(&coder, data, len);
|
tDecoderInit(&dcoder, data, len);
|
||||||
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
|
if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
|
||||||
code = TSDB_CODE_INVALID_PARA;
|
code = TSDB_CODE_INVALID_PARA;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -1340,14 +1341,36 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
pVgData->vg = pInfo;
|
pVgData->vg = pInfo;
|
||||||
pVgData->pData = taosMemoryMalloc(metaLen);
|
|
||||||
if (NULL == pVgData->pData) {
|
int tlen = 0;
|
||||||
|
req.source = TD_REQ_FROM_TAOX;
|
||||||
|
tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
|
||||||
|
if(code != 0){
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
memcpy(pVgData->pData, meta, metaLen);
|
tlen += sizeof(SMsgHead);
|
||||||
((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
|
void* pMsg = taosMemoryMalloc(tlen);
|
||||||
pVgData->size = metaLen;
|
if (NULL == pMsg) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
|
||||||
|
((SMsgHead*)pMsg)->contLen = htonl(tlen);
|
||||||
|
void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
|
||||||
|
SEncoder coder = {0};
|
||||||
|
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
|
||||||
|
code = tEncodeSVAlterTbReq(&coder, &req);
|
||||||
|
if(code != 0){
|
||||||
|
tEncoderClear(&coder);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
tEncoderClear(&coder);
|
||||||
|
|
||||||
|
pVgData->pData = pMsg;
|
||||||
|
pVgData->size = tlen;
|
||||||
|
|
||||||
pVgData->numOfTables = 1;
|
pVgData->numOfTables = 1;
|
||||||
taosArrayPush(pArray, &pVgData);
|
taosArrayPush(pArray, &pVgData);
|
||||||
|
|
||||||
|
@ -1387,7 +1410,7 @@ end:
|
||||||
if (pVgData) taosMemoryFreeClear(pVgData->pData);
|
if (pVgData) taosMemoryFreeClear(pVgData->pData);
|
||||||
taosMemoryFreeClear(pVgData);
|
taosMemoryFreeClear(pVgData);
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
tDecoderClear(&coder);
|
tDecoderClear(&dcoder);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -389,7 +389,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (strcasecmp(key, "msg.consume.excluded") == 0) {
|
if (strcasecmp(key, "msg.consume.excluded") == 0) {
|
||||||
conf->sourceExcluded = taosStr2int64(value);
|
conf->sourceExcluded = (taosStr2int64(value) != 0) ? TD_REQ_FROM_TAOX : 0;
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7513,6 +7513,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
|
||||||
if (pReq->alterOriDataLen > 0) {
|
if (pReq->alterOriDataLen > 0) {
|
||||||
if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1;
|
if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -7535,6 +7536,10 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
|
||||||
if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1;
|
if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!tDecodeIsEnd(pCoder)) {
|
||||||
|
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
tEndDecode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -7663,6 +7668,8 @@ int tEncodeSVCreateTbBatchReq(SEncoder *pCoder, const SVCreateTbBatchReq *pReq)
|
||||||
if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1;
|
if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -7677,6 +7684,10 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) {
|
||||||
if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
|
if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!tDecodeIsEnd(pCoder)) {
|
||||||
|
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
tEndDecode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -8034,6 +8045,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pReq->source) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -8094,6 +8106,9 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
|
||||||
if (!tDecodeIsEnd(pDecoder)) {
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->source) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -8670,7 +8685,6 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
|
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
|
||||||
if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1;
|
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -8758,12 +8772,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!tDecodeIsEnd(pCoder)) {
|
|
||||||
if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
tEndDecode(pCoder);
|
||||||
|
|
||||||
|
|
|
@ -464,6 +464,7 @@ typedef struct {
|
||||||
char* pAst1;
|
char* pAst1;
|
||||||
char* pAst2;
|
char* pAst2;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
|
int8_t source;
|
||||||
} SStbObj;
|
} SStbObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -458,6 +458,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
|
||||||
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
||||||
req.alterOriData = alterOriData;
|
req.alterOriData = alterOriData;
|
||||||
req.alterOriDataLen = alterOriDataLen;
|
req.alterOriDataLen = alterOriDataLen;
|
||||||
|
req.source = pStb->source;
|
||||||
// todo
|
// todo
|
||||||
req.schemaRow.nCols = pStb->numOfColumns;
|
req.schemaRow.nCols = pStb->numOfColumns;
|
||||||
req.schemaRow.version = pStb->colVer;
|
req.schemaRow.version = pStb->colVer;
|
||||||
|
@ -774,7 +775,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
pDst->createdTime = taosGetTimestampMs();
|
pDst->createdTime = taosGetTimestampMs();
|
||||||
pDst->updateTime = pDst->createdTime;
|
pDst->updateTime = pDst->createdTime;
|
||||||
pDst->uid =
|
pDst->uid =
|
||||||
(pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
(pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
|
||||||
|
? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
pDst->dbUid = pDb->uid;
|
pDst->dbUid = pDb->uid;
|
||||||
pDst->tagVer = 1;
|
pDst->tagVer = 1;
|
||||||
pDst->colVer = 1;
|
pDst->colVer = 1;
|
||||||
|
@ -790,6 +792,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
pDst->numOfFuncs = pCreate->numOfFuncs;
|
pDst->numOfFuncs = pCreate->numOfFuncs;
|
||||||
pDst->commentLen = pCreate->commentLen;
|
pDst->commentLen = pCreate->commentLen;
|
||||||
pDst->pFuncs = pCreate->pFuncs;
|
pDst->pFuncs = pCreate->pFuncs;
|
||||||
|
pDst->source = pCreate->source;
|
||||||
pCreate->pFuncs = NULL;
|
pCreate->pFuncs = NULL;
|
||||||
|
|
||||||
if (pDst->commentLen > 0) {
|
if (pDst->commentLen > 0) {
|
||||||
|
@ -1033,6 +1036,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
||||||
memcpy(pDst, pStb, sizeof(SStbObj));
|
memcpy(pDst, pStb, sizeof(SStbObj));
|
||||||
taosRUnLockLatch(&pStb->lock);
|
taosRUnLockLatch(&pStb->lock);
|
||||||
|
|
||||||
|
pDst->source = createReq->source;
|
||||||
pDst->updateTime = taosGetTimestampMs();
|
pDst->updateTime = taosGetTimestampMs();
|
||||||
pDst->numOfColumns = createReq->numOfColumns;
|
pDst->numOfColumns = createReq->numOfColumns;
|
||||||
pDst->numOfTags = createReq->numOfTags;
|
pDst->numOfTags = createReq->numOfTags;
|
||||||
|
@ -1141,7 +1145,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
} else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||||
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -2572,7 +2576,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dropReq.source == TD_REQ_FROM_TAOX && pStb->uid != dropReq.suid) {
|
if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL};
|
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
|
||||||
|
|
||||||
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
|
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
|
||||||
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);
|
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);
|
||||||
|
|
|
@ -392,7 +392,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||||
pReader->msg.ver);
|
pReader->msg.ver);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
if ((pSubmitTbData->source & sourceExcluded) != 0) {
|
if ((pSubmitTbData->flags & sourceExcluded) != 0) {
|
||||||
pReader->nextBlk += 1;
|
pReader->nextBlk += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -267,7 +267,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
|
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
|
||||||
goto loop_table;
|
goto loop_table;
|
||||||
}
|
}
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
|
@ -335,7 +335,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
|
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
|
||||||
goto loop_db;
|
goto loop_db;
|
||||||
}
|
}
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
|
|
|
@ -815,7 +815,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -860,7 +860,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
pTask->execInfo.sink.numOfBlocks += 1;
|
pTask->execInfo.sink.numOfBlocks += 1;
|
||||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||||
|
|
||||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||||
|
|
||||||
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||||
if (index == NULL) { // no data yet, append it
|
if (index == NULL) { // no data yet, append it
|
||||||
|
|
|
@ -171,6 +171,22 @@ end : {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \
|
||||||
|
SDecoder decoder = {0};\
|
||||||
|
TYPE req = {0}; \
|
||||||
|
void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \
|
||||||
|
int32_t len = pHead->bodyLen - sizeof(SMsgHead); \
|
||||||
|
tDecoderInit(&decoder, data, len); \
|
||||||
|
if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \
|
||||||
|
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \
|
||||||
|
fetchVer++; \
|
||||||
|
tDecoderClear(&decoder); \
|
||||||
|
continue; \
|
||||||
|
} \
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
|
||||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
@ -239,6 +255,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
|
||||||
|
if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
|
||||||
|
PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq)
|
||||||
|
} else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
|
||||||
|
PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq)
|
||||||
|
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
|
||||||
|
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq)
|
||||||
|
} else if (pHead->msgType == TDMT_VND_DELETE) {
|
||||||
|
fetchVer++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
||||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
|
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
metaRsp.resMsgType = pHead->msgType;
|
||||||
|
|
|
@ -285,7 +285,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
|
||||||
pTmp->suid = pSrc->suid;
|
pTmp->suid = pSrc->suid;
|
||||||
pTmp->uid = pSrc->uid;
|
pTmp->uid = pSrc->uid;
|
||||||
pTmp->sver = pSrc->sver;
|
pTmp->sver = pSrc->sver;
|
||||||
pTmp->source = pSrc->source;
|
|
||||||
pTmp->pCreateTbReq = NULL;
|
pTmp->pCreateTbReq = NULL;
|
||||||
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||||
if (pSrc->pCreateTbReq) {
|
if (pSrc->pCreateTbReq) {
|
||||||
|
@ -653,7 +652,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableCxt->pData->source = SOURCE_TAOSX;
|
pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
|
||||||
if(tmp == NULL){
|
if(tmp == NULL){
|
||||||
ret = initTableColSubmitData(pTableCxt);
|
ret = initTableColSubmitData(pTableCxt);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue