feat:get meta ok

This commit is contained in:
wangmm0220 2022-07-11 16:12:07 +08:00
parent 3d2e129dd6
commit 0a193cf811
5 changed files with 91 additions and 37 deletions

View File

@ -30,10 +30,25 @@ static void msg_process(TAOS_RES* msg) {
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
tmq_raw_data *raw = tmq_get_raw_meta(msg);
if(raw){
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", "abc1", 0);
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return;
}
taos_free_result(pRes);
int32_t ret = taos_write_raw_meta(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
free(raw);
@ -132,12 +147,12 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) {
@ -160,19 +175,19 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table ct3 ct1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
@ -209,12 +224,12 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table n1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {

View File

@ -2843,8 +2843,8 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
int32_t tlen = 0;
// tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
// tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
@ -2852,8 +2852,7 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
}
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
// buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
// buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);

View File

@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); }
// clang-format on
#define BUF_PAGE_DEBUG
//#define BUF_PAGE_DEBUG
#ifdef __cplusplus
}
#endif

View File

@ -708,7 +708,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
pDst->createdTime = taosGetTimestampMs();
pDst->updateTime = pDst->createdTime;
pDst->uid = (pCreate->source == 1) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->dbUid = pDb->uid;
pDst->tagVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->tagVer : 1;
pDst->colVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->colVer : 1;
@ -883,9 +883,9 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_STABLE_UID_NOT_MATCH;
goto _OVER;
} else if (createReq.tagVer > 0 || createReq.colVer > 0) {
int32_t tagDelta = pStb->tagVer - createReq.tagVer;
int32_t colDelta = pStb->colVer - createReq.colVer;
int32_t verDelta = tagDelta + verDelta;
int32_t tagDelta = createReq.tagVer - pStb->tagVer;
int32_t colDelta = createReq.colVer - pStb->colVer;
int32_t verDelta = tagDelta + colDelta;
mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
if (tagDelta <= 0 && colDelta <= 0) {
@ -936,7 +936,47 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
if (isAlter) {
bool needRsp = false;
code = mndAlterStbImp(pMnode, pReq, pDb, pStb, needRsp, NULL, 0);
SStbObj pDst = {0};
taosRLockLatch(&pStb->lock);
memcpy(&pDst, pStb, sizeof(SStbObj));
taosRUnLockLatch(&pStb->lock);
pDst.updateTime = taosGetTimestampMs();
pDst.nextColId = 1;
pDst.numOfColumns = createReq.numOfColumns;
pDst.numOfTags = createReq.numOfTags;
pDst.pColumns = taosMemoryCalloc(1, pDst.numOfColumns * sizeof(SSchema));
pDst.pTags = taosMemoryCalloc(1, pDst.numOfTags * sizeof(SSchema));
if (pDst.pColumns == NULL || pDst.pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
for (int32_t i = 0; i < pDst.numOfColumns; ++i) {
SField *pField = taosArrayGet(createReq.pColumns, i);
SSchema *pSchema = &pDst.pColumns[i];
pSchema->type = pField->type;
pSchema->bytes = pField->bytes;
pSchema->flags = pField->flags;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
pSchema->colId = pDst.nextColId;
pDst.nextColId++;
}
for (int32_t i = 0; i < pDst.numOfTags; ++i) {
SField *pField = taosArrayGet(createReq.pTags, i);
SSchema *pSchema = &pDst.pTags[i];
pSchema->type = pField->type;
pSchema->bytes = pField->bytes;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
pSchema->colId = pDst.nextColId;
pDst.nextColId++;
}
pDst.tagVer = createReq.tagVer;
pDst.colVer = createReq.colVer;
code = mndAlterStbImp(pMnode, pReq, pDb, &pDst, needRsp, NULL, 0);
taosMemoryFreeClear(pDst.pTags);
taosMemoryFreeClear(pDst.pColumns);
} else {
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
}

View File

@ -406,7 +406,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqInfo("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0};
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.rspOffset = fetchVer;*/
metaRsp.rspOffset = fetchVer;
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);