From f1853ced26ca1307839ab904af14e381f0095614 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 22 Mar 2024 17:48:59 +0800 Subject: [PATCH] feat:[TS-4243] add logic for primark key in tmq --- include/common/tmsg.h | 18 +- include/libs/executor/storageapi.h | 7 +- include/util/tencode.h | 18 + source/client/inc/clientInt.h | 2 + source/client/src/clientMain.c | 16 +- source/client/src/clientRawBlockWrite.c | 6 +- source/client/src/clientTmq.c | 95 ++-- source/common/src/tmsg.c | 84 ++- source/dnode/vnode/inc/vnode.h | 6 + source/dnode/vnode/src/meta/metaSnapshot.c | 14 + source/dnode/vnode/src/tq/tq.c | 36 +- source/dnode/vnode/src/tq/tqOffset.c | 3 +- source/dnode/vnode/src/tq/tqRead.c | 14 + source/dnode/vnode/src/tq/tqScan.c | 6 +- source/dnode/vnode/src/tq/tqUtil.c | 45 +- source/dnode/vnode/src/vnd/vnodeInitApi.c | 4 + source/libs/executor/src/executor.c | 19 +- source/libs/executor/src/querytask.c | 5 +- source/libs/executor/src/scanoperator.c | 79 ++- utils/test/c/tmq_taosx_ci.c | 612 +++++++++++---------- 20 files changed, 659 insertions(+), 430 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index abaa1b1854..bcaf8ce238 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3307,6 +3307,8 @@ enum { ONLY_META = 2, }; +#define TQ_OFFSET_VERSION 1 + typedef struct { int8_t type; union { @@ -3314,6 +3316,7 @@ typedef struct { struct { int64_t uid; int64_t ts; + SValue primaryKey; }; // log struct { @@ -3322,10 +3325,14 @@ typedef struct { }; } STqOffsetVal; -static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { +static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts, SValue primaryKey) { pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; pOffsetVal->uid = uid; pOffsetVal->ts = ts; + if(IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){ + taosMemoryFree(pOffsetVal->primaryKey.pData); + } + pOffsetVal->primaryKey = primaryKey; } static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) { @@ -3342,6 +3349,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal); int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal); int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal); bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight); +void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pOffsetVal); +void tOffsetDestroy(void* pVal); typedef struct { STqOffsetVal val; @@ -3648,6 +3657,7 @@ typedef struct { int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); int32_t tDeserializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); +void tDestroySMqPollReq(SMqPollReq *pReq); typedef struct { int32_t vgId; @@ -3691,7 +3701,9 @@ typedef struct { int32_t tEncodeMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp); int32_t tDecodeMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp); +void tDeleteMqMetaRsp(SMqMetaRsp *pRsp); +#define MQ_DATA_RSP_VERSION 100 typedef struct { SMqRspHead head; STqOffsetVal reqOffset; @@ -3707,7 +3719,7 @@ typedef struct { } SMqDataRsp; int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); -int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); +int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp, int8_t dataVersion); void tDeleteMqDataRsp(SMqDataRsp* pRsp); typedef struct { @@ -3728,7 +3740,7 @@ typedef struct { } STaosxRsp; int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp); -int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp); +int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp, int8_t dateVersion); void tDeleteSTaosxRsp(STaosxRsp* pRsp); typedef struct { diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 3433f98d38..440dd9b6b4 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -131,7 +131,7 @@ typedef struct SMetaTableInfo { } SMetaTableInfo; typedef struct SSnapContext { - SMeta* pMeta; // todo remove it + SMeta* pMeta; int64_t snapVersion; void* pCur; int64_t suid; @@ -142,6 +142,7 @@ typedef struct SSnapContext { int32_t index; int8_t withMeta; int8_t queryMeta; // true-get meta, false-get data + bool hasPrimaryKey; } SSnapContext; typedef struct { @@ -219,6 +220,8 @@ typedef struct SStoreTqReader { int32_t (*tqReaderAddTables)(); int32_t (*tqReaderRemoveTables)(); + void (*tqSetTablePrimaryKey)(); + bool (*tqGetTablePrimaryKey)(); bool (*tqReaderIsQueriedTable)(); bool (*tqReaderCurrentBlockConsumed)(); @@ -230,6 +233,8 @@ typedef struct SStoreTqReader { } SStoreTqReader; typedef struct SStoreSnapshotFn { + bool (*taosXGetTablePrimaryKey)(SSnapContext *ctx); + void (*taosXSetTablePrimaryKey)(SSnapContext *ctx, int64_t uid); int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid); int32_t (*destroySnapshot)(SSnapContext* ctx); SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx); diff --git a/include/util/tencode.h b/include/util/tencode.h index d05d4914e3..0b523ddfa2 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -432,6 +432,24 @@ static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uin return 0; } +static FORCE_INLINE int32_t tDecodeBinaryAlloc32(SDecoder* pCoder, uint8_t** val, uint32_t* len) { + uint32_t length = 0; + if (tDecodeU32v(pCoder, &length) < 0) return -1; + if (length) { + if (len) *len = length; + + if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1; + *val = taosMemoryMalloc(length); + if (*val == NULL) return -1; + memcpy(*val, TD_CODER_CURRENT(pCoder), length); + + TD_CODER_MOVE_POS(pCoder, length); + } else { + *val = NULL; + } + return 0; +} + static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SDecoder* pCoder, char** val, uint64_t* len) { if (tDecodeBinaryAlloc(pCoder, (void**)val, len) < 0) return -1; (*len) -= 1; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 6c3603b4e0..6fb82bcd9e 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -225,6 +225,7 @@ typedef struct { } SMqRspObj; typedef struct { + int8_t version; int8_t resType; char topic[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; @@ -233,6 +234,7 @@ typedef struct { } SMqMetaRspObj; typedef struct { + int8_t version; int8_t resType; char topic[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index a49d2091ac..9464baea52 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -342,27 +342,17 @@ void taos_free_result(TAOS_RES *res) { destroyRequest(pRequest); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; - taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->rsp.blockDataLen); - taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper); - // taosx - taosArrayDestroy(pRsp->rsp.createTableLen); - taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); - + tDeleteSTaosxRsp(&pRsp->rsp); doFreeReqResultInfo(&pRsp->resInfo); taosMemoryFree(pRsp); } else if (TD_RES_TMQ(res)) { SMqRspObj *pRsp = (SMqRspObj *)res; - taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->rsp.blockDataLen); - taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper); + tDeleteMqDataRsp(&pRsp->rsp); doFreeReqResultInfo(&pRsp->resInfo); taosMemoryFree(pRsp); } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res; - taosMemoryFree(pRspObj->metaRsp.metaRsp); + tDeleteMqMetaRsp(&pRspObj->metaRsp); taosMemoryFree(pRspObj); } } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index adc8c361cd..d25a5332d7 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -64,6 +64,8 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch cJSON* cbytes = cJSON_CreateNumber(length); cJSON_AddItemToObject(column, "length", cbytes); } + cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY); + cJSON_AddItemToObject(column, "isPrimarykey", isPk); cJSON_AddItemToArray(columns, column); } cJSON_AddItemToObject(json, "columns", columns); @@ -1625,7 +1627,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { rspObj.resType = RES_TYPE__TMQ; tDecoderInit(&decoder, data, dataLen); - code = tDecodeMqDataRsp(&decoder, &rspObj.rsp); + code = tDecodeMqDataRsp(&decoder, &rspObj.rsp, *(int8_t*)data); if (code != 0) { code = TSDB_CODE_INVALID_MSG; goto end; @@ -1754,7 +1756,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) rspObj.resType = RES_TYPE__TMQ_METADATA; tDecoderInit(&decoder, data, dataLen); - code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp); + code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp, *(int8_t*)data); if (code != 0) { code = TSDB_CODE_INVALID_MSG; goto end; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fb1882e472..5b3d8c7bb8 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -925,26 +925,15 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); - taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->dataRsp.blockDataLen); - taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper); + tDeleteMqDataRsp(&pRsp->dataRsp); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); - - taosMemoryFree(pRsp->metaRsp.metaRsp); + tDeleteMqMetaRsp(&pRsp->metaRsp); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); - - taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->taosxRsp.blockDataLen); - taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper); - // taosx - taosArrayDestroy(pRsp->taosxRsp.createTableLen); - taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); + tDeleteSTaosxRsp(&pRsp->taosxRsp); } } @@ -1016,10 +1005,17 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { return rsp; } +static void freeClientVg(void* param){ + SMqClientVg* pVg = param; + tOffsetDestroy(&pVg->offsetInfo.endOffset); + tOffsetDestroy(&pVg->offsetInfo.beginOffset); + tOffsetDestroy(&pVg->offsetInfo.committedOffset); + +} static void freeClientVgImpl(void* param) { SMqClientTopic* pTopic = param; taosMemoryFreeClear(pTopic->schema.pSchema); - taosArrayDestroy(pTopic->vgs); + taosArrayDestroyEx(pTopic->vgs, freeClientVg); } void tmqFreeImpl(void* handle) { @@ -1381,7 +1377,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp); + if(tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){ + tDecoderClear(&decoder); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1392,13 +1393,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp); + if(tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp) < 0){ + tDecoderClear(&decoder); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); + if(tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){ + tDecoderClear(&decoder); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } tDecoderClear(&decoder); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); } else { // invalid rspType @@ -1472,26 +1483,22 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .numOfRows = pInfo ? pInfo->numOfRows : 0, }; - clientVg.offsetInfo.endOffset = pInfo ? pInfo->currentOffset : offsetNew; - clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; - clientVg.offsetInfo.beginOffset = pInfo ? pInfo->seekOffset : offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; - + if(pInfo) { + tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset); + tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset); + tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset); + }else{ + clientVg.offsetInfo.endOffset = offsetNew; + clientVg.offsetInfo.committedOffset = offsetNew; + clientVg.offsetInfo.beginOffset = offsetNew; + } taosArrayPush(pTopic->vgs, &clientVg); } } -static void freeClientVgInfo(void* param) { - SMqClientTopic* pTopic = param; - if (pTopic->schema.nCols) { - taosMemoryFreeClear(pTopic->schema.pSchema); - } - - taosArrayDestroy(pTopic->vgs); -} - static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; @@ -1558,7 +1565,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) // destroy current buffered existed topics info if (tmq->clientTopics) { - taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); + taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); } tmq->clientTopics = newTopics; taosWUnLockLatch(&tmq->lock); @@ -1813,8 +1820,10 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal int64_t consumerId, bool hasData) { if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId); - if (hasData) pVg->offsetInfo.beginOffset = *reqOffset; - pVg->offsetInfo.endOffset = *rspOffset; + if (hasData) { + tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset); + } + tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset); } else { tscDebug("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId); } @@ -1882,6 +1891,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return NULL; } @@ -1950,6 +1961,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return NULL; } @@ -1980,6 +1993,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return NULL; } @@ -2001,14 +2016,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } // build rsp - void* pRsp = NULL; int64_t numOfRows = 0; - if (pollRspWrapper->taosxRsp.createTableNum == 0) { - tscError("consumer:0x%" PRIx64 " createTableNum should > 0 if rsp type is data_meta", tmq->consumerId); - } else { - pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); - } - + void* pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; @@ -2662,7 +2671,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { SMqDataRsp rsp; SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqDataRsp(&decoder, &rsp); + tDecodeMqDataRsp(&decoder, &rsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))); tDecoderClear(&decoder); SMqRspHead* pHead = pMsg->pData; @@ -2715,6 +2724,7 @@ static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len); if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) { + tOffsetDestroy(&pParam->vgOffset.offset); code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -2799,6 +2809,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) { code = pParam->vgOffset.offset.val.version; } else { + tOffsetDestroy(&pParam->vgOffset.offset); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; } } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9138d7c983..fbfea55910 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6593,22 +6593,6 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) return 0; } -int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) { - if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; - if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; - if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1; - - return 0; -} - -int32_t tDerializeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffset) { - if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1; - if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1; - if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1; - - return 0; -} - int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -6627,7 +6611,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tEncodeU64(&encoder, pReq->reqId) < 0) return -1; if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1; - if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; + if (tEncodeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1; if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1; @@ -6648,10 +6632,6 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); - // SMsgHead *pHead = buf; - // pHead->vgId = pReq->head.vgId; - // pHead->contLen = pReq->head.contLen; - SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); @@ -6664,7 +6644,7 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeU64(&decoder, &pReq->reqId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1; - if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; + if (tDecodeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; if (!tDecodeIsEnd(&decoder)) { if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1; @@ -6680,6 +6660,9 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { return 0; } +void tDestroySMqPollReq(SMqPollReq *pReq){ + tOffsetDestroy(&pReq->reqOffset); +} int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -8305,10 +8288,17 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) { } int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) { - if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1; + if (tEncodeI8(pEncoder, (TQ_OFFSET_VERSION << 4) | pOffsetVal->type) < 0) return -1; if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { if (tEncodeI64(pEncoder, pOffsetVal->uid) < 0) return -1; if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; + if (tEncodeI8(pEncoder, pOffsetVal->primaryKey.type) < 0) return -1; + if (IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){ + if (tEncodeBinary(pEncoder, pOffsetVal->primaryKey.pData, pOffsetVal->primaryKey.nData) < 0) return -1; + } else { + if (tEncodeI64(pEncoder, pOffsetVal->primaryKey.val) < 0) return -1; + } + } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1; } else { @@ -8322,6 +8312,15 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1; if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; + if ((pOffsetVal->type >> 4) >= TQ_OFFSET_VERSION) { + pOffsetVal->type = pOffsetVal->type & 0x0F; + if (tDecodeI8(pDecoder, &pOffsetVal->primaryKey.type) < 0) return -1; + if (IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){ + if (tDecodeBinaryAlloc32(pDecoder, &pOffsetVal->primaryKey.pData, &pOffsetVal->primaryKey.nData) < 0) return -1; + } else { + if (tDecodeI64(pDecoder, &pOffsetVal->primaryKey.val) < 0) return -1; + } + } } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1; } else { @@ -8353,6 +8352,10 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { if (pLeft->type == TMQ_OFFSET__LOG) { return pLeft->version == pRight->version; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (pLeft->primaryKey.type != 0) { + if(pLeft->primaryKey.type != pRight->primaryKey.type) return false; + if(tValueCompare(&pLeft->primaryKey, &pRight->primaryKey) != 0) return false; + } return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { return pLeft->uid == pRight->uid; @@ -8364,6 +8367,21 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { return false; } +void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight){ + tOffsetDestroy(pLeft); + *pLeft = *pRight; + if(IS_VAR_DATA_TYPE(pRight->primaryKey.type)){ + pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData); + memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData); + } +} + +void tOffsetDestroy(void* param){ + STqOffsetVal* pVal = (STqOffsetVal*)param; + if(IS_VAR_DATA_TYPE(pVal->primaryKey.type)){ + taosMemoryFreeClear(pVal->primaryKey.pData); + } +} int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) { if (tEncodeSTqOffsetVal(pEncoder, &pOffset->val) < 0) return -1; if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1; @@ -8476,6 +8494,10 @@ int32_t tDecodeMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) { return 0; } +void tDeleteMqMetaRsp(SMqMetaRsp *pRsp) { + taosMemoryFree(pRsp->metaRsp); +} + int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; @@ -8502,6 +8524,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { } int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { + if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1; if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1; return 0; @@ -8553,7 +8576,10 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } -int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { +int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp, int8_t dataVersion) { + if (dataVersion >= MQ_DATA_RSP_VERSION){ + if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1; + } if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1; if (!tDecodeIsEnd(pDecoder)) { if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1; @@ -8570,9 +8596,12 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) { pRsp->blockSchema = NULL; taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); pRsp->blockTbName = NULL; + tOffsetDestroy(&pRsp->reqOffset); + tOffsetDestroy(&pRsp->rspOffset); } int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { + if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1; if (tEncodeMqDataRspCommon(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; @@ -8586,7 +8615,10 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { return 0; } -int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { +int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp, int8_t dataVersion) { + if (dataVersion >= MQ_DATA_RSP_VERSION){ + if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1; + } if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; @@ -8618,6 +8650,8 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen); taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); pRsp->createTableReq = NULL; + tOffsetDestroy(&pRsp->reqOffset); + tOffsetDestroy(&pRsp->rspOffset); } int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 294e75602e..0b95c0211a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -209,11 +209,15 @@ typedef struct STqReader { SSchemaWrapper *pSchemaWrapper; SSDataBlock *pResBlock; int64_t lastTs; + bool hasPrimaryKey; } STqReader; STqReader *tqReaderOpen(SVnode *pVnode); void tqReaderClose(STqReader *); +bool tqGetTablePrimaryKey(STqReader* pReader); +void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid); + void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); @@ -248,6 +252,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); +bool taosXGetTablePrimaryKey(SSnapContext *ctx); +void taosXSetTablePrimaryKey(SSnapContext *ctx, int64_t uid); int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta, SSnapContext **ctxRet); int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index e86ed3b657..702e524e38 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -466,6 +466,20 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) { return c; } +void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid){ + bool ret = false; + SSchemaWrapper *schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1); + if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){ + ret = true; + } + tDeleteSchemaWrapper(schema); + ctx->hasPrimaryKey = ret; +} + +bool taosXGetTablePrimaryKey(SSnapContext* ctx){ + return ctx->hasPrimaryKey; +} + int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) { int32_t ret = 0; void* pKey = NULL; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47900d540c..d98fa67d7b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -190,10 +190,12 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t SMqVgOffset vgOffset = {0}; int32_t vgId = TD_VID(pTq->pVnode); + int32_t code = 0; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { - return -1; + code = TSDB_CODE_INVALID_MSG; + goto end; } tDecoderClear(&decoder); @@ -208,22 +210,28 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t pOffset->val.version); } else { tqError("invalid commit offset type:%d", pOffset->val.type); - return -1; + code = TSDB_CODE_INVALID_MSG; + goto end; } STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) { tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); - return 0; // no need to update the offset value + goto end; // no need to update the offset value } // save the new offset value - if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { - return -1; + code = tqOffsetWrite(pTq->pOffsetStore, pOffset); + if(code != 0) { + code = TSDB_CODE_INVALID_MSG; + goto end; } return 0; +end: + tOffsetDestroy(&vgOffset.offset.val); + return code; } int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { @@ -329,11 +337,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; - int code = 0; - if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + int code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req); + if (code < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); terrno = TSDB_CODE_INVALID_MSG; - return -1; + goto END; } int64_t consumerId = req.consumerId; @@ -357,7 +365,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); terrno = TSDB_CODE_INVALID_MSG; taosWUnLockLatch(&pTq->lock); - return -1; + code = -1; + goto END; } while (0); } @@ -368,7 +377,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosWUnLockLatch(&pTq->lock); - return -1; + code = -1; + goto END; } bool exec = tqIsHandleExec(pHandle); @@ -405,6 +415,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + +END: + tDestroySMqPollReq(&req); return code; } @@ -423,8 +436,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); - STqOffset* pOffset = &vgOffset.offset; - STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); + STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, vgOffset.offset.subKey); if (pSavedOffset == NULL) { terrno = TSDB_CODE_TMQ_NO_COMMITTED; return terrno; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index f8bf61d4e7..7321e73d28 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -60,7 +60,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { return -1; } - STqOffset offset; + STqOffset offset = {0}; SDecoder decoder; tDecoderInit(&decoder, pMemBuf, size); if (tDecodeSTqOffset(&decoder, &offset) < 0) { @@ -108,6 +108,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { return NULL; } + taosHashSetFreeFp(pStore->pHash, tOffsetDestroy); char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); if (tqOffsetRestoreFromFile(pStore, fname) < 0) { taosMemoryFree(fname); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a99caa3323..357b7216fb 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -247,6 +247,20 @@ END: return code; } +bool tqGetTablePrimaryKey(STqReader* pReader){ + return pReader->hasPrimaryKey; +} + +void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid){ + bool ret = false; + SSchemaWrapper *schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); + if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){ + ret = true; + } + tDeleteSchemaWrapper(schema); + pReader->hasPrimaryKey = ret; +} + STqReader* tqReaderOpen(SVnode* pVnode) { STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader)); if (pReader == NULL) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 103007eb57..a02f475e72 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -113,9 +113,8 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* STqOffsetVal offset = {0}; qStreamExtractOffset(task, &offset); pHandle->block = createOneDataBlock(pDataBlock, true); - // pHandle->block = createDataBlock(); - // copyDataBlock(pHandle->block, pDataBlock); pHandle->blockTime = offset.ts; + tOffsetDestroy(&offset); code = getDataBlock(task, pHandle, vgId, &pDataBlock); if (code != 0) { return code; @@ -139,6 +138,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* qStreamExtractOffset(task, &offset); pRsp->sleepTime = offset.ts - pHandle->blockTime; pHandle->blockTime = offset.ts; + tOffsetDestroy(&offset); } break; } else { @@ -215,7 +215,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta continue; } else { rowCnt += pDataBlock->info.rows; - if (rowCnt <= 4096) continue; + if (rowCnt <= 1) continue; } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 6029575e2c..746424be48 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -19,8 +19,8 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const const SMqMetaRsp* pRsp, int32_t vgId); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { - pRsp->reqOffset = pOffset; - pRsp->rspOffset = pOffset; + tOffsetCopy(&pRsp->reqOffset, &pOffset); + tOffsetCopy(&pRsp->rspOffset, &pOffset); pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); @@ -40,8 +40,8 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { } static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { - pRsp->reqOffset = pOffset; - pRsp->rspOffset = pOffset; + tOffsetCopy(&pRsp->reqOffset, &pOffset); + tOffsetCopy(&pRsp->rspOffset, &pOffset); pRsp->withTbName = 1; pRsp->withSchema = 1; @@ -81,7 +81,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand *pBlockReturned = false; // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value. if (pOffset != NULL) { - *pOffsetVal = pOffset->val; + tOffsetCopy(pOffsetVal, &pOffset->val); char formatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal); @@ -98,7 +98,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand if (pHandle->fetchMeta) { tqOffsetResetToMeta(pOffsetVal, 0); } else { - tqOffsetResetToData(pOffsetVal, 0, 0); + SValue val = {0}; + tqOffsetResetToData(pOffsetVal, 0, 0, val); } } else { walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); @@ -157,7 +158,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } - dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists + tOffsetCopy(&dataRsp.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { @@ -207,7 +208,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); - taosMemoryFree(metaRsp.metaRsp); + tDeleteMqMetaRsp(&metaRsp); goto end; } @@ -219,7 +220,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { - *offset = taosxRsp.rspOffset; + tOffsetCopy(offset, &taosxRsp.rspOffset); } } @@ -309,33 +310,37 @@ end: } int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { - STqOffsetVal reqOffset = pRequest->reqOffset; + int32_t code = 0; + STqOffsetVal reqOffset = {0}; + tOffsetCopy(&reqOffset, &pRequest->reqOffset); - // 1. reset the offset if needed + // reset the offset if needed if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) { - // handle the reset offset cases, according to the consumer's choice. bool blockReturned = false; - int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned); + code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned); if (code != 0) { - return code; + goto END; } // empty block returned, quit if (blockReturned) { - return 0; + goto END; } } else if (reqOffset.type == 0) { // use the consumer specified offset uError("req offset type is 0"); - return TSDB_CODE_TMQ_INVALID_MSG; + code = TSDB_CODE_TMQ_INVALID_MSG; + goto END; } - // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); + code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); } else { - // for taosx - return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); + code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); } + +END: + tOffsetDestroy(&reqOffset); + return code; } static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver, diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 16abe69def..48852dd159 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -115,6 +115,8 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderSeek = tqReaderSeek; pTq->tqRetrieveBlock = tqRetrieveDataBlock; + pTq->tqGetTablePrimaryKey = tqGetTablePrimaryKey; + pTq->tqSetTablePrimaryKey = tqSetTablePrimaryKey; pTq->tqReaderNextBlockInWal = tqNextBlockInWal; pTq->tqNextBlockImpl = tqNextBlockImpl; // todo remove it @@ -258,6 +260,8 @@ void initCacheFn(SStoreCacheReader* pCache) { } void initSnapshotFn(SStoreSnapshotFn* pSnapshot) { + pSnapshot->taosXGetTablePrimaryKey = taosXGetTablePrimaryKey; + pSnapshot->taosXSetTablePrimaryKey = taosXSetTablePrimaryKey; pSnapshot->setForSnapShot = setForSnapShot; pSnapshot->destroySnapshot = destroySnapContext; pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 831fd4e883..4cc3fac879 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1082,7 +1082,7 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal)); + tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); } int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) { @@ -1161,7 +1161,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { - // todo refactor: move away pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; @@ -1196,6 +1195,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT return -1; } } + pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid); qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); @@ -1220,7 +1220,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; // let's start from the next ts that returned to consumer. - pScanBaseInfo->cond.twindows.skey = ts + 1; + if(pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)){ + pScanBaseInfo->cond.twindows.skey = ts; + }else{ + pScanBaseInfo->cond.twindows.skey = ts + 1; + } pScanInfo->scanTimes = 0; if (pScanBaseInfo->dataReader == NULL) { @@ -1276,8 +1280,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT goto end; // no data } + pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); - pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; + if(pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)){ + pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; + }else{ + pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1; + } tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0); @@ -1312,7 +1321,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } end: - pTaskInfo->streamInfo.currentOffset = *pOffset; + tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset); return 0; } diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 9eb1c8d653..6f5c3aa65d 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -209,7 +209,10 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { return pqSw; } -static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); } +static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { + tDeleteSchemaWrapper(pStreamInfo->schema); + tOffsetDestroy(&pStreamInfo->currentOffset); +} static void freeBlock(void* pParam) { SSDataBlock* pBlock = *(SSDataBlock**)pParam; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 51edfcb42c..7b82b73579 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1910,6 +1910,45 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW } } +static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal *offset) { + if(pBlock->info.window.skey != offset->ts || offset->primaryKey.type == 0){ + return; + } + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pColTs = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); + + qDebug("doBlockDataWindowFilter primary key, ts:%" PRId64 " %"PRId64, offset->ts, offset->primaryKey.val); + ASSERT(pColPk->info.type == offset->primaryKey.type); + + __compar_fn_t func = getComparFunc(pColPk->info.type, 0); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pColTs, i); + void *data = colDataGetData(pColPk, i); + if(IS_VAR_DATA_TYPE(pColPk->info.type)){ + void *tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE); + memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData); + varDataLen(tmq) = offset->primaryKey.nData; + p[i] = (*ts >= offset->ts) && (func(data, tmq) > 0); + taosMemoryFree(tmq); + }else{ + p[i] = (*ts >= offset->ts) && (func(data, &offset->primaryKey.val) > 0); + } + + if (!p[i]) { + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); +} + // re-build the delete block, ONLY according to the split timestamp static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) { int32_t numOfRows = pBlock->info.rows; @@ -2031,6 +2070,25 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock return 0; } +static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal *offset){ + SValue val = {0}; + if (hasPrimaryKey){ + doBlockDataPrimaryKeyFilter(pBlock, offset); + SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); + + void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1); + val.type = pColPk->info.type; + if(IS_VAR_DATA_TYPE(pColPk->info.type)) { + val.pData = taosMemoryMalloc(varDataLen(tmp)); + val.nData = varDataLen(tmp); + memcpy(val.pData, varDataVal(tmp), varDataLen(tmp)); + }else{ + memcpy(&val.val, tmp, pColPk->info.bytes); + } + } + tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val); +} + static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -2046,8 +2104,11 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { - tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); + bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); + processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid); return pResult; } @@ -2585,7 +2646,6 @@ static SArray* extractTableIdList(const STableListInfo* pTableListInfo) { } static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { - // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -2616,9 +2676,12 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, terrno); } - qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); - tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey); - return pBlock; + if (pBlock && pBlock->info.rows > 0) { + bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext); + processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); + return pBlock; + } } SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext); @@ -2627,7 +2690,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { qDebug("tmqsnap read snapshot done, change to get data from wal"); tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1); } else { - tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN); + SValue val = {0}; + tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN, val); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); } qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); @@ -2647,7 +2711,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { if (!sContext->queryMeta) { // change to get data next poll request STqOffsetVal offset = {0}; - tqOffsetResetToData(&offset, 0, INT64_MIN); + SValue val = {0}; + tqOffsetResetToData(&offset, 0, INT64_MIN, val); qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); } else { tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid); diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 5012e50bab..1e12824c0c 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -80,308 +80,330 @@ static void msg_process(TAOS_RES* msg) { } int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { - /* test for TD-20612 start*/ - pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 int, c2 int)"); + /* test for primary key start*/ + pRes = taos_query(pConn, "create table if not exists pk (ts timestamp, c1 int primary key, c2 int)"); if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into tb1 (ts, c1) values(1669092069069, 0)"); + pRes = taos_query(pConn, "insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1)"); if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into tb1 (ts, c2) values(1669092069069, 1)"); + pRes = taos_query(pConn, "insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1)"); if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); - /* test for TD-20612 end*/ - - pRes = taos_query(pConn, - "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " - "nchar(8), t4 bool)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query( - pConn, - "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, " - "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, - "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) " - "(1626006833609, 51, 62, 'c333', 940)"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct3 select * from ct1"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); - if (taos_errno(pRes) != 0) { - printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - if (g_conf.dropTable) { - pRes = taos_query(pConn, "drop table ct3, ct1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - } - - pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); - if (taos_errno(pRes) != 0) { - printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 comment 'hello'"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 drop column c1"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - if (g_conf.dropTable) { - pRes = taos_query(pConn, "drop table n1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - } - - pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt2 using jt tags('')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into jt1 values(now, 1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into jt2 values(now, 11)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - if (g_conf.dropTable) { - pRes = taos_query(pConn, - "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " - "nchar(8), t4 bool)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - } - - pRes = taos_query(pConn, - "create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " - "nchar(8), t4 bool)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, - "create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " - "nchar(8), t4 bool)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query( - pConn, - "create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) " - "stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = - taos_query(pConn, - "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + " - "1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " - "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb " - "tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// /* test for TD-20612 start*/ +// pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 int, c2 int)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into tb1 (ts, c1) values(1669092069069, 0)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into tb1 (ts, c2) values(1669092069069, 1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// /* test for TD-20612 end*/ +// +// pRes = taos_query(pConn, +// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " +// "nchar(8), t4 bool)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')"); +// if (taos_errno(pRes) != 0) { +// printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); +// if (taos_errno(pRes) != 0) { +// printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query( +// pConn, +// "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, " +// "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); +// if (taos_errno(pRes) != 0) { +// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, +// "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) " +// "(1626006833609, 51, 62, 'c333', 940)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into ct3 select * from ct1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); +// if (taos_errno(pRes) != 0) { +// printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606"); +// if (taos_errno(pRes) != 0) { +// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// if (g_conf.dropTable) { +// pRes = taos_query(pConn, "drop table ct3, ct1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "drop table st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// } +// +// pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table n1 comment 'hello'"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "alter table n1 drop column c1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// if (g_conf.dropTable) { +// pRes = taos_query(pConn, "drop table n1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// } +// +// pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table jt2 using jt tags('')"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into jt1 values(now, 1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into jt2 values(now, 11)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// if (g_conf.dropTable) { +// pRes = taos_query(pConn, +// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " +// "nchar(8), t4 bool)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "drop table st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// } +// +// pRes = taos_query(pConn, +// "create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " +// "nchar(8), t4 bool)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, +// "create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " +// "nchar(8), t4 bool)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query( +// pConn, +// "create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) " +// "stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = +// taos_query(pConn, +// "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + " +// "1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " +// "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb " +// "tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); return 0; } @@ -490,12 +512,12 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "drop database if exists abc1"); - if (taos_errno(pRes) != 0) { - printf("error in drop db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "drop database if exists abc1"); +// if (taos_errno(pRes) != 0) { +// printf("error in drop db, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); snprintf(sql, 128, "create database if not exists abc1 vgroups %d wal_retention_period 3600", g_conf.srcVgroups); pRes = taos_query(pConn, sql); @@ -1034,7 +1056,7 @@ int main(int argc, char* argv[]) { basic_consume_loop(tmq, topic_list); tmq_list_destroy(topic_list); - testConsumeExcluded(1); - testConsumeExcluded(2); +// testConsumeExcluded(1); +// testConsumeExcluded(2); taosCloseFile(&g_fp); }