diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 5d7f1bbe70..c6248a62c1 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -30,10 +30,25 @@ static void msg_process(TAOS_RES* msg) { if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { tmq_raw_data *raw = tmq_get_raw_meta(msg); if(raw){ - TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", "abc1", 0); + TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0); if (pConn == NULL) { return; } + + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return; + } + taos_free_result(pRes); + int32_t ret = taos_write_raw_meta(pConn, raw); printf("write raw data: %s\n", tmq_err2str(ret)); free(raw); @@ -132,12 +147,12 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); if (taos_errno(pRes) != 0) { @@ -160,19 +175,19 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "drop table ct3 ct1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "drop table ct3 ct1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "drop table st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); if (taos_errno(pRes) != 0) { @@ -209,12 +224,12 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "drop table n1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "drop table n1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); if (taos_errno(pRes) != 0) { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b67f220e1c..01f8ad99fd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2843,8 +2843,8 @@ typedef struct { static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { int32_t tlen = 0; - // tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); - // tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); + tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); + tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI16(buf, pRsp->resMsgType); tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen); tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen); @@ -2852,8 +2852,7 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp } static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { - // buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); - // buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); + buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI16(buf, &pRsp->resMsgType); buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen); buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen); diff --git a/include/util/tlog.h b/include/util/tlog.h index a8c9eeabde..d186c32841 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons #define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); } #define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); } // clang-format on -#define BUF_PAGE_DEBUG +//#define BUF_PAGE_DEBUG #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 2117ef24d9..3a7b137f9f 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -708,7 +708,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN); pDst->createdTime = taosGetTimestampMs(); pDst->updateTime = pDst->createdTime; - pDst->uid = (pCreate->source == 1) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; pDst->tagVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->tagVer : 1; pDst->colVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->colVer : 1; @@ -883,9 +883,9 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_STABLE_UID_NOT_MATCH; goto _OVER; } else if (createReq.tagVer > 0 || createReq.colVer > 0) { - int32_t tagDelta = pStb->tagVer - createReq.tagVer; - int32_t colDelta = pStb->colVer - createReq.colVer; - int32_t verDelta = tagDelta + verDelta; + int32_t tagDelta = createReq.tagVer - pStb->tagVer; + int32_t colDelta = createReq.colVer - pStb->colVer; + int32_t verDelta = tagDelta + colDelta; mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d", createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer); if (tagDelta <= 0 && colDelta <= 0) { @@ -936,7 +936,47 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { if (isAlter) { bool needRsp = false; - code = mndAlterStbImp(pMnode, pReq, pDb, pStb, needRsp, NULL, 0); + SStbObj pDst = {0}; + taosRLockLatch(&pStb->lock); + memcpy(&pDst, pStb, sizeof(SStbObj)); + taosRUnLockLatch(&pStb->lock); + + pDst.updateTime = taosGetTimestampMs(); + pDst.nextColId = 1; + pDst.numOfColumns = createReq.numOfColumns; + pDst.numOfTags = createReq.numOfTags; + pDst.pColumns = taosMemoryCalloc(1, pDst.numOfColumns * sizeof(SSchema)); + pDst.pTags = taosMemoryCalloc(1, pDst.numOfTags * sizeof(SSchema)); + if (pDst.pColumns == NULL || pDst.pTags == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + for (int32_t i = 0; i < pDst.numOfColumns; ++i) { + SField *pField = taosArrayGet(createReq.pColumns, i); + SSchema *pSchema = &pDst.pColumns[i]; + pSchema->type = pField->type; + pSchema->bytes = pField->bytes; + pSchema->flags = pField->flags; + memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); + pSchema->colId = pDst.nextColId; + pDst.nextColId++; + } + + for (int32_t i = 0; i < pDst.numOfTags; ++i) { + SField *pField = taosArrayGet(createReq.pTags, i); + SSchema *pSchema = &pDst.pTags[i]; + pSchema->type = pField->type; + pSchema->bytes = pField->bytes; + memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); + pSchema->colId = pDst.nextColId; + pDst.nextColId++; + } + pDst.tagVer = createReq.tagVer; + pDst.colVer = createReq.colVer; + code = mndAlterStbImp(pMnode, pReq, pDb, &pDst, needRsp, NULL, 0); + taosMemoryFreeClear(pDst.pTags); + taosMemoryFreeClear(pDst.pColumns); } else { code = mndCreateStb(pMnode, pReq, &createReq, pDb); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 52949838b9..d97a958888 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -406,7 +406,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { tqInfo("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; /*metaRsp.reqOffset = pReq->reqOffset.version;*/ - /*metaRsp.rspOffset = fetchVer;*/ + metaRsp.rspOffset = fetchVer; /*metaRsp.rspOffsetNew.version = fetchVer;*/ tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version); tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);