diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 7a7f19c3af..0ace66b773 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -199,6 +199,7 @@ extern char tsSmlTsDefaultName[]; // extern int32_t tsSmlBatchSize; extern int32_t tmqMaxTopicNum; +extern int32_t tmqRowSize; // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 33e77aee2b..ff6e4de128 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3410,6 +3410,8 @@ enum { ONLY_META = 2, }; +#define TQ_OFFSET_VERSION 1 + typedef struct { int8_t type; union { @@ -3417,6 +3419,7 @@ typedef struct { struct { int64_t uid; int64_t ts; + SValue primaryKey; }; // log struct { @@ -3425,10 +3428,14 @@ typedef struct { }; } STqOffsetVal; -static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { +static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts, SValue primaryKey) { pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; pOffsetVal->uid = uid; pOffsetVal->ts = ts; + if(IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){ + taosMemoryFree(pOffsetVal->primaryKey.pData); + } + pOffsetVal->primaryKey = primaryKey; } static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) { @@ -3445,6 +3452,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal); int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal); int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal); bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight); +void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pOffsetVal); +void tOffsetDestroy(void* pVal); typedef struct { STqOffsetVal val; @@ -3751,6 +3760,7 @@ typedef struct { int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); int32_t tDeserializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); +void tDestroySMqPollReq(SMqPollReq *pReq); typedef struct { int32_t vgId; @@ -3794,7 +3804,9 @@ typedef struct { int32_t tEncodeMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp); int32_t tDecodeMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp); +void tDeleteMqMetaRsp(SMqMetaRsp *pRsp); +#define MQ_DATA_RSP_VERSION 100 typedef struct { SMqRspHead head; STqOffsetVal reqOffset; @@ -3810,7 +3822,7 @@ typedef struct { } SMqDataRsp; int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); -int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); +int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp, int8_t dataVersion); void tDeleteMqDataRsp(SMqDataRsp* pRsp); typedef struct { @@ -3831,7 +3843,7 @@ typedef struct { } STaosxRsp; int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp); -int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp); +int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp, int8_t dateVersion); void tDeleteSTaosxRsp(STaosxRsp* pRsp); typedef struct { diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 8b0f12d4b6..060341bc92 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -132,7 +132,7 @@ typedef struct SMetaTableInfo { } SMetaTableInfo; typedef struct SSnapContext { - SMeta* pMeta; // todo remove it + SMeta* pMeta; int64_t snapVersion; void* pCur; int64_t suid; @@ -143,6 +143,7 @@ typedef struct SSnapContext { int32_t index; int8_t withMeta; int8_t queryMeta; // true-get meta, false-get data + bool hasPrimaryKey; } SSnapContext; typedef struct { @@ -220,6 +221,8 @@ typedef struct SStoreTqReader { int32_t (*tqReaderAddTables)(); int32_t (*tqReaderRemoveTables)(); + void (*tqSetTablePrimaryKey)(); + bool (*tqGetTablePrimaryKey)(); bool (*tqReaderIsQueriedTable)(); bool (*tqReaderCurrentBlockConsumed)(); @@ -231,6 +234,8 @@ typedef struct SStoreTqReader { } SStoreTqReader; typedef struct SStoreSnapshotFn { + bool (*taosXGetTablePrimaryKey)(SSnapContext *ctx); + void (*taosXSetTablePrimaryKey)(SSnapContext *ctx, int64_t uid); int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid); int32_t (*destroySnapshot)(SSnapContext* ctx); SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx); diff --git a/include/util/tencode.h b/include/util/tencode.h index d05d4914e3..0b523ddfa2 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -432,6 +432,24 @@ static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uin return 0; } +static FORCE_INLINE int32_t tDecodeBinaryAlloc32(SDecoder* pCoder, uint8_t** val, uint32_t* len) { + uint32_t length = 0; + if (tDecodeU32v(pCoder, &length) < 0) return -1; + if (length) { + if (len) *len = length; + + if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1; + *val = taosMemoryMalloc(length); + if (*val == NULL) return -1; + memcpy(*val, TD_CODER_CURRENT(pCoder), length); + + TD_CODER_MOVE_POS(pCoder, length); + } else { + *val = NULL; + } + return 0; +} + static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SDecoder* pCoder, char** val, uint64_t* len) { if (tDecodeBinaryAlloc(pCoder, (void**)val, len) < 0) return -1; (*len) -= 1; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index a49d2091ac..9464baea52 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -342,27 +342,17 @@ void taos_free_result(TAOS_RES *res) { destroyRequest(pRequest); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; - taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->rsp.blockDataLen); - taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper); - // taosx - taosArrayDestroy(pRsp->rsp.createTableLen); - taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); - + tDeleteSTaosxRsp(&pRsp->rsp); doFreeReqResultInfo(&pRsp->resInfo); taosMemoryFree(pRsp); } else if (TD_RES_TMQ(res)) { SMqRspObj *pRsp = (SMqRspObj *)res; - taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->rsp.blockDataLen); - taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper); + tDeleteMqDataRsp(&pRsp->rsp); doFreeReqResultInfo(&pRsp->resInfo); taosMemoryFree(pRsp); } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res; - taosMemoryFree(pRspObj->metaRsp.metaRsp); + tDeleteMqMetaRsp(&pRspObj->metaRsp); taosMemoryFree(pRspObj); } } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index adc8c361cd..d25a5332d7 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -64,6 +64,8 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch cJSON* cbytes = cJSON_CreateNumber(length); cJSON_AddItemToObject(column, "length", cbytes); } + cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY); + cJSON_AddItemToObject(column, "isPrimarykey", isPk); cJSON_AddItemToArray(columns, column); } cJSON_AddItemToObject(json, "columns", columns); @@ -1625,7 +1627,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { rspObj.resType = RES_TYPE__TMQ; tDecoderInit(&decoder, data, dataLen); - code = tDecodeMqDataRsp(&decoder, &rspObj.rsp); + code = tDecodeMqDataRsp(&decoder, &rspObj.rsp, *(int8_t*)data); if (code != 0) { code = TSDB_CODE_INVALID_MSG; goto end; @@ -1754,7 +1756,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) rspObj.resType = RES_TYPE__TMQ_METADATA; tDecoderInit(&decoder, data, dataLen); - code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp); + code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp, *(int8_t*)data); if (code != 0) { code = TSDB_CODE_INVALID_MSG; goto end; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ce149921e3..4cac1febe5 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -602,7 +602,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); - pVg->offsetInfo.committedOffset = *offsetVal; + tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal); end: taosRUnLockLatch(&tmq->lock); @@ -691,7 +691,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); - pVg->offsetInfo.committedOffset = pVg->offsetInfo.endOffset; + tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset); } else { tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); @@ -925,26 +925,15 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); - taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->dataRsp.blockDataLen); - taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper); + tDeleteMqDataRsp(&pRsp->dataRsp); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); - - taosMemoryFree(pRsp->metaRsp.metaRsp); + tDeleteMqMetaRsp(&pRsp->metaRsp); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); - - taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree); - taosArrayDestroy(pRsp->taosxRsp.blockDataLen); - taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree); - taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper); - // taosx - taosArrayDestroy(pRsp->taosxRsp.createTableLen); - taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); + tDeleteSTaosxRsp(&pRsp->taosxRsp); } } @@ -1016,10 +1005,17 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { return rsp; } +static void freeClientVg(void* param){ + SMqClientVg* pVg = param; + tOffsetDestroy(&pVg->offsetInfo.endOffset); + tOffsetDestroy(&pVg->offsetInfo.beginOffset); + tOffsetDestroy(&pVg->offsetInfo.committedOffset); + +} static void freeClientVgImpl(void* param) { SMqClientTopic* pTopic = param; taosMemoryFreeClear(pTopic->schema.pSchema); - taosArrayDestroy(pTopic->vgs); + taosArrayDestroyEx(pTopic->vgs, freeClientVg); } void tmqFreeImpl(void* handle) { @@ -1381,7 +1377,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp); + if(tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){ + tDecoderClear(&decoder); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1392,13 +1393,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp); + if(tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp) < 0){ + tDecoderClear(&decoder); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); + if(tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){ + tDecoderClear(&decoder); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } tDecoderClear(&decoder); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); } else { // invalid rspType @@ -1472,26 +1483,22 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .numOfRows = pInfo ? pInfo->numOfRows : 0, }; - clientVg.offsetInfo.endOffset = pInfo ? pInfo->currentOffset : offsetNew; - clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; - clientVg.offsetInfo.beginOffset = pInfo ? pInfo->seekOffset : offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; - + if(pInfo) { + tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset); + tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset); + tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset); + }else{ + clientVg.offsetInfo.endOffset = offsetNew; + clientVg.offsetInfo.committedOffset = offsetNew; + clientVg.offsetInfo.beginOffset = offsetNew; + } taosArrayPush(pTopic->vgs, &clientVg); } } -static void freeClientVgInfo(void* param) { - SMqClientTopic* pTopic = param; - if (pTopic->schema.nCols) { - taosMemoryFreeClear(pTopic->schema.pSchema); - } - - taosArrayDestroy(pTopic->vgs); -} - static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; @@ -1558,7 +1565,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) // destroy current buffered existed topics info if (tmq->clientTopics) { - taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); + taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); } tmq->clientTopics = newTopics; taosWUnLockLatch(&tmq->lock); @@ -1823,8 +1830,10 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal int64_t consumerId, bool hasData) { if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId); - if (hasData) pVg->offsetInfo.beginOffset = *reqOffset; - pVg->offsetInfo.endOffset = *rspOffset; + if (hasData) { + tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset); + } + tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset); } else { tscDebug("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId); } @@ -1892,6 +1901,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return NULL; } @@ -1960,6 +1971,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return NULL; } @@ -1990,6 +2003,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return NULL; } @@ -2011,14 +2026,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } // build rsp - void* pRsp = NULL; int64_t numOfRows = 0; - if (pollRspWrapper->taosxRsp.createTableNum == 0) { - tscError("consumer:0x%" PRIx64 " createTableNum should > 0 if rsp type is data_meta", tmq->consumerId); - } else { - pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); - } - + void* pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; @@ -2676,7 +2685,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { SMqDataRsp rsp; SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqDataRsp(&decoder, &rsp); + tDecodeMqDataRsp(&decoder, &rsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))); tDecoderClear(&decoder); SMqRspHead* pHead = pMsg->pData; @@ -2729,6 +2738,7 @@ static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len); if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) { + tOffsetDestroy(&pParam->vgOffset.offset); code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -2813,6 +2823,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) { code = pParam->vgOffset.offset.val.version; } else { + tOffsetDestroy(&pParam->vgOffset.offset); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; } } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3381d52050..e0c9487e05 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -149,6 +149,7 @@ char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; // tmq int32_t tmqMaxTopicNum = 20; +int32_t tmqRowSize = 4096; // query int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; @@ -720,6 +721,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + return -1; + if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1183,6 +1187,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; + tmqRowSize = cfgGetItem(pCfg, "tmqRowSize")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32; @@ -1514,6 +1519,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { {"queryRspPolicy", &tsQueryRspPolicy}, {"timeseriesThreshold", &tsTimeSeriesThreshold}, {"tmqMaxTopicNum", &tmqMaxTopicNum}, + {"tmqRowSize", &tmqRowSize}, {"transPullupInterval", &tsTransPullupInterval}, {"compactPullupInterval", &tsCompactPullupInterval}, {"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e4c6c9494a..2705c6d5dc 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6947,22 +6947,6 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) return 0; } -int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) { - if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; - if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; - if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1; - - return 0; -} - -int32_t tDerializeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffset) { - if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1; - if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1; - if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1; - - return 0; -} - int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -6981,7 +6965,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tEncodeU64(&encoder, pReq->reqId) < 0) return -1; if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1; - if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; + if (tEncodeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1; if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1; @@ -7002,10 +6986,6 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); - // SMsgHead *pHead = buf; - // pHead->vgId = pReq->head.vgId; - // pHead->contLen = pReq->head.contLen; - SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); @@ -7018,7 +6998,7 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeU64(&decoder, &pReq->reqId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1; - if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; + if (tDecodeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; if (!tDecodeIsEnd(&decoder)) { if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1; @@ -7034,6 +7014,9 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { return 0; } +void tDestroySMqPollReq(SMqPollReq *pReq){ + tOffsetDestroy(&pReq->reqOffset); +} int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -8691,10 +8674,18 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) { } int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) { - if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1; + int8_t type = pOffsetVal->type < 0 ? pOffsetVal->type : (TQ_OFFSET_VERSION << 4) | pOffsetVal->type; + if (tEncodeI8(pEncoder, type) < 0) return -1; if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { if (tEncodeI64(pEncoder, pOffsetVal->uid) < 0) return -1; if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; + if (tEncodeI8(pEncoder, pOffsetVal->primaryKey.type) < 0) return -1; + if (IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){ + if (tEncodeBinary(pEncoder, pOffsetVal->primaryKey.pData, pOffsetVal->primaryKey.nData) < 0) return -1; + } else { + if (tEncodeI64(pEncoder, pOffsetVal->primaryKey.val) < 0) return -1; + } + } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1; } else { @@ -8705,9 +8696,22 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (tDecodeI8(pDecoder, &pOffsetVal->type) < 0) return -1; + int8_t offsetVersion = 0; + if (pOffsetVal->type > 0){ + offsetVersion = (pOffsetVal->type >> 4); + pOffsetVal->type = pOffsetVal->type & 0x0F; + } if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1; if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; + if (offsetVersion >= TQ_OFFSET_VERSION) { + if (tDecodeI8(pDecoder, &pOffsetVal->primaryKey.type) < 0) return -1; + if (IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){ + if (tDecodeBinaryAlloc32(pDecoder, &pOffsetVal->primaryKey.pData, &pOffsetVal->primaryKey.nData) < 0) return -1; + } else { + if (tDecodeI64(pDecoder, &pOffsetVal->primaryKey.val) < 0) return -1; + } + } } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1; } else { @@ -8726,7 +8730,15 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } else if (pVal->type == TMQ_OFFSET__LOG) { snprintf(buf, maxLen, "wal:%" PRId64, pVal->version); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { - snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts); + if(IS_VAR_DATA_TYPE(pVal->primaryKey.type)) { + char *tmp = taosMemoryCalloc(1, pVal->primaryKey.nData + 1); + if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY; + memcpy(tmp, pVal->primaryKey.pData, pVal->primaryKey.nData); + snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%s", pVal->uid, pVal->ts, pVal->primaryKey.type, tmp); + taosMemoryFree(tmp); + }else{ + snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%" PRId64, pVal->uid, pVal->ts, pVal->primaryKey.type, pVal->primaryKey.val); + } } else { return TSDB_CODE_INVALID_PARA; } @@ -8739,6 +8751,10 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { if (pLeft->type == TMQ_OFFSET__LOG) { return pLeft->version == pRight->version; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (pLeft->primaryKey.type != 0) { + if(pLeft->primaryKey.type != pRight->primaryKey.type) return false; + if(tValueCompare(&pLeft->primaryKey, &pRight->primaryKey) != 0) return false; + } return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { return pLeft->uid == pRight->uid; @@ -8750,6 +8766,21 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { return false; } +void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight){ + tOffsetDestroy(pLeft); + *pLeft = *pRight; + if(IS_VAR_DATA_TYPE(pRight->primaryKey.type)){ + pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData); + memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData); + } +} + +void tOffsetDestroy(void* param){ + STqOffsetVal* pVal = (STqOffsetVal*)param; + if(IS_VAR_DATA_TYPE(pVal->primaryKey.type)){ + taosMemoryFreeClear(pVal->primaryKey.pData); + } +} int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) { if (tEncodeSTqOffsetVal(pEncoder, &pOffset->val) < 0) return -1; if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1; @@ -8862,6 +8893,10 @@ int32_t tDecodeMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) { return 0; } +void tDeleteMqMetaRsp(SMqMetaRsp *pRsp) { + taosMemoryFree(pRsp->metaRsp); +} + int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; @@ -8888,6 +8923,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { } int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { + if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1; if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1; return 0; @@ -8939,7 +8975,10 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } -int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { +int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp, int8_t dataVersion) { + if (dataVersion >= MQ_DATA_RSP_VERSION){ + if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1; + } if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1; if (!tDecodeIsEnd(pDecoder)) { if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1; @@ -8956,9 +8995,12 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) { pRsp->blockSchema = NULL; taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); pRsp->blockTbName = NULL; + tOffsetDestroy(&pRsp->reqOffset); + tOffsetDestroy(&pRsp->rspOffset); } int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { + if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1; if (tEncodeMqDataRspCommon(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; @@ -8972,7 +9014,10 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { return 0; } -int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { +int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp, int8_t dataVersion) { + if (dataVersion >= MQ_DATA_RSP_VERSION){ + if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1; + } if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; @@ -9004,6 +9049,8 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen); taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); pRsp->createTableReq = NULL; + tOffsetDestroy(&pRsp->reqOffset); + tOffsetDestroy(&pRsp->rspOffset); } int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a6d7a24323..c875deb972 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1311,6 +1311,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons for(int i = 0; i < taosArrayGetSize(offsetRows); i++){ OffsetRows *tmp = taosArrayGet(offsetRows, i); if(tmp->vgId != pVgEp->vgId){ + mError("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId); continue; } data = tmp; @@ -1374,7 +1375,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows); } - // do not show for cleared subscription buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows); pBlock->info.rows = numOfRows; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 87a0db1e71..35477b0998 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -212,11 +212,15 @@ typedef struct STqReader { SSchemaWrapper *pSchemaWrapper; SSDataBlock *pResBlock; int64_t lastTs; + bool hasPrimaryKey; } STqReader; STqReader *tqReaderOpen(SVnode *pVnode); void tqReaderClose(STqReader *); +bool tqGetTablePrimaryKey(STqReader* pReader); +void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid); + void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); @@ -251,6 +255,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); +bool taosXGetTablePrimaryKey(SSnapContext *ctx); +void taosXSetTablePrimaryKey(SSnapContext *ctx, int64_t uid); int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta, SSnapContext **ctxRet); int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 5850e794fa..6b57db28cf 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -468,6 +468,20 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) { return c; } +void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid){ + bool ret = false; + SSchemaWrapper *schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1); + if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){ + ret = true; + } + tDeleteSchemaWrapper(schema); + ctx->hasPrimaryKey = ret; +} + +bool taosXGetTablePrimaryKey(SSnapContext* ctx){ + return ctx->hasPrimaryKey; +} + int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) { int32_t ret = 0; void* pKey = NULL; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b7882b547c..b2b65b54cb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -187,10 +187,12 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t SMqVgOffset vgOffset = {0}; int32_t vgId = TD_VID(pTq->pVnode); + int32_t code = 0; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { - return -1; + code = TSDB_CODE_INVALID_MSG; + goto end; } tDecoderClear(&decoder); @@ -205,22 +207,28 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t pOffset->val.version); } else { tqError("invalid commit offset type:%d", pOffset->val.type); - return -1; + code = TSDB_CODE_INVALID_MSG; + goto end; } STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) { tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); - return 0; // no need to update the offset value + goto end; // no need to update the offset value } // save the new offset value - if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { - return -1; + code = tqOffsetWrite(pTq->pOffsetStore, pOffset); + if(code != 0) { + code = TSDB_CODE_INVALID_MSG; + goto end; } return 0; +end: + tOffsetDestroy(&vgOffset.offset.val); + return code; } int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { @@ -326,11 +334,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; - int code = 0; - if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + int code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req); + if (code < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); terrno = TSDB_CODE_INVALID_MSG; - return -1; + goto END; } int64_t consumerId = req.consumerId; @@ -354,7 +362,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); terrno = TSDB_CODE_INVALID_MSG; taosWUnLockLatch(&pTq->lock); - return -1; + code = -1; + goto END; } while (0); } @@ -365,7 +374,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosWUnLockLatch(&pTq->lock); - return -1; + code = -1; + goto END; } bool exec = tqIsHandleExec(pHandle); @@ -402,6 +412,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + +END: + tDestroySMqPollReq(&req); return code; } @@ -420,8 +433,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); - STqOffset* pOffset = &vgOffset.offset; - STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); + STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, vgOffset.offset.subKey); if (pSavedOffset == NULL) { terrno = TSDB_CODE_TMQ_NO_COMMITTED; return terrno; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index f8bf61d4e7..7321e73d28 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -60,7 +60,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { return -1; } - STqOffset offset; + STqOffset offset = {0}; SDecoder decoder; tDecoderInit(&decoder, pMemBuf, size); if (tDecodeSTqOffset(&decoder, &offset) < 0) { @@ -108,6 +108,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { return NULL; } + taosHashSetFreeFp(pStore->pHash, tOffsetDestroy); char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); if (tqOffsetRestoreFromFile(pStore, fname) < 0) { taosMemoryFree(fname); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 40a24f0bd8..0bb82ed50a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -247,6 +247,20 @@ END: return code; } +bool tqGetTablePrimaryKey(STqReader* pReader){ + return pReader->hasPrimaryKey; +} + +void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid){ + bool ret = false; + SSchemaWrapper *schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); + if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){ + ret = true; + } + tDeleteSchemaWrapper(schema); + pReader->hasPrimaryKey = ret; +} + STqReader* tqReaderOpen(SVnode* pVnode) { STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader)); if (pReader == NULL) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index cc9e8c0136..017d5247d8 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -80,8 +80,6 @@ int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, S } int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { - const int32_t MAX_ROWS_TO_RETURN = 4096; - int32_t vgId = TD_VID(pTq->pVnode); int32_t code = 0; int32_t totalRows = 0; @@ -113,9 +111,8 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* STqOffsetVal offset = {0}; qStreamExtractOffset(task, &offset); pHandle->block = createOneDataBlock(pDataBlock, true); - // pHandle->block = createDataBlock(); - // copyDataBlock(pHandle->block, pDataBlock); pHandle->blockTime = offset.ts; + tOffsetDestroy(&offset); code = getDataBlock(task, pHandle, vgId, &pDataBlock); if (code != 0) { return code; @@ -139,6 +136,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* qStreamExtractOffset(task, &offset); pRsp->sleepTime = offset.ts - pHandle->blockTime; pHandle->blockTime = offset.ts; + tOffsetDestroy(&offset); } break; } else { @@ -153,7 +151,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pRsp->blockNum++; totalRows += pDataBlock->info.rows; - if (totalRows >= MAX_ROWS_TO_RETURN) { + if (totalRows >= tmqRowSize) { break; } } @@ -215,7 +213,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta continue; } else { rowCnt += pDataBlock->info.rows; - if (rowCnt <= 4096) continue; + if (rowCnt <= tmqRowSize) continue; } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d8440e996f..5e5c77265b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -19,8 +19,8 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const const SMqMetaRsp* pRsp, int32_t vgId); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { - pRsp->reqOffset = pOffset; - pRsp->rspOffset = pOffset; + tOffsetCopy(&pRsp->reqOffset, &pOffset); + tOffsetCopy(&pRsp->rspOffset, &pOffset); pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); @@ -40,8 +40,8 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { } static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { - pRsp->reqOffset = pOffset; - pRsp->rspOffset = pOffset; + tOffsetCopy(&pRsp->reqOffset, &pOffset); + tOffsetCopy(&pRsp->rspOffset, &pOffset); pRsp->withTbName = 1; pRsp->withSchema = 1; @@ -81,7 +81,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand *pBlockReturned = false; // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value. if (pOffset != NULL) { - *pOffsetVal = pOffset->val; + tOffsetCopy(pOffsetVal, &pOffset->val); char formatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal); @@ -98,7 +98,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand if (pHandle->fetchMeta) { tqOffsetResetToMeta(pOffsetVal, 0); } else { - tqOffsetResetToData(pOffsetVal, 0, 0); + SValue val = {0}; + tqOffsetResetToData(pOffsetVal, 0, 0, val); } } else { walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); @@ -157,7 +158,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } - dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists + tOffsetCopy(&dataRsp.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { @@ -207,7 +208,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); - taosMemoryFree(metaRsp.metaRsp); + tDeleteMqMetaRsp(&metaRsp); goto end; } @@ -219,7 +220,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { - *offset = taosxRsp.rspOffset; + tOffsetCopy(offset, &taosxRsp.rspOffset); } } @@ -290,7 +291,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } - if (totalRows >= 4096 || (taosGetTimestampMs() - st > 1000)) { + if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); code = tqSendDataRsp( pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, @@ -309,33 +310,37 @@ end: } int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { - STqOffsetVal reqOffset = pRequest->reqOffset; + int32_t code = 0; + STqOffsetVal reqOffset = {0}; + tOffsetCopy(&reqOffset, &pRequest->reqOffset); - // 1. reset the offset if needed + // reset the offset if needed if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) { - // handle the reset offset cases, according to the consumer's choice. bool blockReturned = false; - int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned); + code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned); if (code != 0) { - return code; + goto END; } // empty block returned, quit if (blockReturned) { - return 0; + goto END; } } else if (reqOffset.type == 0) { // use the consumer specified offset uError("req offset type is 0"); - return TSDB_CODE_TMQ_INVALID_MSG; + code = TSDB_CODE_TMQ_INVALID_MSG; + goto END; } - // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); + code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); } else { - // for taosx - return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); + code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); } + +END: + tOffsetDestroy(&reqOffset); + return code; } static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver, diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 16abe69def..48852dd159 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -115,6 +115,8 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderSeek = tqReaderSeek; pTq->tqRetrieveBlock = tqRetrieveDataBlock; + pTq->tqGetTablePrimaryKey = tqGetTablePrimaryKey; + pTq->tqSetTablePrimaryKey = tqSetTablePrimaryKey; pTq->tqReaderNextBlockInWal = tqNextBlockInWal; pTq->tqNextBlockImpl = tqNextBlockImpl; // todo remove it @@ -258,6 +260,8 @@ void initCacheFn(SStoreCacheReader* pCache) { } void initSnapshotFn(SStoreSnapshotFn* pSnapshot) { + pSnapshot->taosXGetTablePrimaryKey = taosXGetTablePrimaryKey; + pSnapshot->taosXSetTablePrimaryKey = taosXSetTablePrimaryKey; pSnapshot->setForSnapShot = setForSnapShot; pSnapshot->destroySnapshot = destroySnapContext; pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fae28ca32c..b5e049b692 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1583,7 +1583,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in tColDataArrGetRowKey(TARRAY_DATA(pSubmitTbData->aCol), TARRAY_SIZE(pSubmitTbData->aCol), iRow, &key); if (tRowKeyCompare(&lastKey, &key) >= 0) { code = TSDB_CODE_INVALID_MSG; - vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); + vError("vgId:%d %s failed 1 since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); goto _exit; } } @@ -1594,7 +1594,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in for (int32_t iRow = 0; iRow < nRow; ++iRow) { if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey) { code = TSDB_CODE_INVALID_MSG; - vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); + vError("vgId:%d %s failed 2 since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); goto _exit; } if (iRow == 0) { @@ -1605,7 +1605,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in if (tRowKeyCompare(&lastRowKey, &rowKey) >= 0) { code = TSDB_CODE_INVALID_MSG; - vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); + vError("vgId:%d %s failed 3 since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); goto _exit; } lastRowKey = rowKey; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 26a80cc6b5..f3da768eb9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1082,7 +1082,7 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal)); + tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); } int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) { @@ -1109,6 +1109,7 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s pColInfo->type = pMtInfo->schema->pSchema[i].type; pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes; pColInfo->colId = pMtInfo->schema->pSchema[i].colId; + pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY; pCond->pSlotList[i] = i; } @@ -1161,7 +1162,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { - // todo refactor: move away pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; @@ -1196,6 +1196,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT return -1; } } + pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid); qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); @@ -1220,7 +1221,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; // let's start from the next ts that returned to consumer. - pScanBaseInfo->cond.twindows.skey = ts + 1; + if(pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)){ + pScanBaseInfo->cond.twindows.skey = ts; + }else{ + pScanBaseInfo->cond.twindows.skey = ts + 1; + } pScanInfo->scanTimes = 0; if (pScanBaseInfo->dataReader == NULL) { @@ -1251,7 +1256,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } } else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB - if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; @@ -1276,8 +1280,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT goto end; // no data } + pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); - pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; + if(pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)){ + pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; + }else{ + pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1; + } tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0); @@ -1312,7 +1321,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } end: - pTaskInfo->streamInfo.currentOffset = *pOffset; + tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset); return 0; } diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 0fff5fa649..8b87f3da43 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -209,7 +209,10 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { return pqSw; } -static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); } +static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { + tDeleteSchemaWrapper(pStreamInfo->schema); + tOffsetDestroy(&pStreamInfo->currentOffset); +} static void freeBlock(void* pParam) { SSDataBlock* pBlock = *(SSDataBlock**)pParam; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 55eae7ae60..59247e1cb6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2077,6 +2077,45 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW } } +static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal *offset) { + if(pBlock->info.window.skey != offset->ts || offset->primaryKey.type == 0){ + return; + } + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pColTs = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); + + qDebug("doBlockDataWindowFilter primary key, ts:%" PRId64 " %"PRId64, offset->ts, offset->primaryKey.val); + ASSERT(pColPk->info.type == offset->primaryKey.type); + + __compar_fn_t func = getComparFunc(pColPk->info.type, 0); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pColTs, i); + void *data = colDataGetData(pColPk, i); + if(IS_VAR_DATA_TYPE(pColPk->info.type)){ + void *tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE); + memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData); + varDataLen(tmq) = offset->primaryKey.nData; + p[i] = (*ts > offset->ts) || (func(data, tmq) > 0); + taosMemoryFree(tmq); + }else{ + p[i] = (*ts > offset->ts) || (func(data, &offset->primaryKey.val) > 0); + } + + if (!p[i]) { + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); +} + // re-build the delete block, ONLY according to the split timestamp static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) { int32_t numOfRows = pBlock->info.rows; @@ -2198,6 +2237,25 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock return 0; } +static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal *offset){ + SValue val = {0}; + if (hasPrimaryKey){ + doBlockDataPrimaryKeyFilter(pBlock, offset); + SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); + + void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1); + val.type = pColPk->info.type; + if(IS_VAR_DATA_TYPE(pColPk->info.type)) { + val.pData = taosMemoryMalloc(varDataLen(tmp)); + val.nData = varDataLen(tmp); + memcpy(val.pData, varDataVal(tmp), varDataLen(tmp)); + }else{ + memcpy(&val.val, tmp, pColPk->info.bytes); + } + } + tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val); +} + static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -2213,8 +2271,11 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { - tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); + bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); + processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid); return pResult; } @@ -2761,7 +2822,6 @@ static SArray* extractTableIdList(const STableListInfo* pTableListInfo) { } static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { - // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -2792,9 +2852,12 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, terrno); } - qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); - tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey); - return pBlock; + if (pBlock && pBlock->info.rows > 0) { + bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext); + processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); + return pBlock; + } } SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext); @@ -2803,7 +2866,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { qDebug("tmqsnap read snapshot done, change to get data from wal"); tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1); } else { - tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN); + SValue val = {0}; + tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN, val); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); } qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); @@ -2823,7 +2887,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { if (!sContext->queryMeta) { // change to get data next poll request STqOffsetVal offset = {0}; - tqOffsetResetToData(&offset, 0, INT64_MIN); + SValue val = {0}; + tqOffsetResetToData(&offset, 0, INT64_MIN, val); qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); } else { tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 89926b1041..9b00075761 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -123,6 +123,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqOffset.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py new file mode 100644 index 0000000000..44de5466a8 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -0,0 +1,480 @@ +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +from util.dnodes import * +import datetime + +sys.path.append("./7-tmq") +from tmqCommon import * + + +class TDTestCase: + clientCfgDict = {'debugFlag': 135} + updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'tmqRowSize':1} + updatecfgDict["clientCfg"] = clientCfgDict + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def primaryKeyTestIntQuery(self): + print("==============Case 1: primary key test int for query") + tdSql.execute(f'create database if not exists db_pk_query vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_query;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);') + tdSql.execute(f'insert into pk values(1669092069068, 0, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 6, 1);') + tdSql.execute(f'flush database db_pk_query') + + tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);') + + tdSql.execute(f'create topic topic_pk_query as select * from pk') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_query"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]: + tdLog.exit(f"data error") + + consumer.commit(res) + break + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdSql.execute(f'use db_pk_query;') + tdSql.execute(f'insert into pk values(1669092069069, 10, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 5, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 12, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 7, 1);') + + tdSql.execute(f'flush database db_pk_query') + + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_query"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: + tdLog.exit(f"data error") + if index >= 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: + tdLog.exit(f"data error") + index += 1 + print("index:" + str(index)) + finally: + consumer.close() + + def primaryKeyTestIntStable(self): + print("==============Case 2: primary key test int for stable") + tdSql.execute(f'create database if not exists db_pk_stable vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_stable;') + tdSql.execute(f'create table if not exists pks (ts timestamp, c1 int primary key, c2 int) tags (t int);') + tdSql.execute(f'create table if not exists pk using pks tags(1);') + tdSql.execute(f'insert into pk values(1669092069068, 0, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 6, 1);') + tdSql.execute(f'flush database db_pk_stable') + + tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);') + + tdSql.execute(f'create topic topic_pk_stable as stable pks') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_stable"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]: + tdLog.exit(f"data error") + + consumer.commit(res) + break + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdSql.execute(f'use db_pk_stable;') + tdSql.execute(f'insert into pk values(1669092069069, 10, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 5, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 12, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 7, 1);') + + tdSql.execute(f'flush database db_pk_stable') + + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_stable"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: + tdLog.exit(f"data error") + if index >= 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: + tdLog.exit(f"data error") + index += 1 + print("index:" + str(index)) + finally: + consumer.close() + + def primaryKeyTestInt(self): + print("==============Case 3: primary key test int for db") + tdSql.execute(f'create database if not exists abc1 vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use abc1;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);') + tdSql.execute(f'insert into pk values(1669092069068, 0, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 6, 1);') + tdSql.execute(f'flush database abc1') + + tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);') + + tdSql.execute(f'create topic topic_in with meta as database abc1') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_in"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]: + tdLog.exit(f"data error") + + consumer.commit(res) + break + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdSql.execute(f'use abc1;') + tdSql.execute(f'insert into pk values(1669092069069, 10, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 5, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 12, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 7, 1);') + + tdSql.execute(f'flush database abc1') + + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_in"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: + tdLog.exit(f"data error") + if index >= 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: + tdLog.exit(f"data error") + index += 1 + print("index:" + str(index)) + finally: + consumer.close() + + def primaryKeyTestString(self): + print("==============Case 4: primary key test string for db") + tdSql.execute(f'create database if not exists db_pk_string vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_string;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 varchar(64) primary key, c2 int);') + tdSql.execute(f'insert into pk values(1669092069068, "ahello", 1);') + tdSql.execute(f'insert into pk values(1669092069068, "aworld", 1);') + tdSql.execute(f'flush database db_pk_string') + + tdSql.execute(f'insert into pk values(1669092069069, "him", 1) (1669092069069, "value", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "she", 1) (1669092069069, "like", 1);') + tdSql.execute(f'insert into pk values(1669092069068, "from", 1) (1669092069068, "it", 1);') + + tdSql.execute(f'create topic topic_pk_string with meta as database db_pk_string') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_string"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 'ahello', 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 'aworld', 1)]: + tdLog.exit(f"data error") + + consumer.commit(res) + break + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdDnodes.stop(1) + time.sleep(2) + tdDnodes.start(1) + + tdSql.execute(f'use db_pk_string;') + tdSql.execute(f'insert into pk values(1669092069069, "10", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "5", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "12", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "7", 1);') + + tdSql.execute(f'flush database db_pk_string') + + consumer = Consumer(consumer_dict) + try: + consumer.subscribe(["topic_pk_string"]) + except TmqError: + tdLog.exit(f"subscribe error") + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 'from', 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 'it', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'him', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'like', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'she', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'value', 1)]: + tdLog.exit(f"data error") + if index >= 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "10", 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "5", 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "12", 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "7", 1)]: + tdLog.exit(f"data error") + + index += 1 + print("index:" + str(index)) + finally: + consumer.close() + + def run(self): + self.primaryKeyTestIntQuery() + self.primaryKeyTestIntStable() + self.primaryKeyTestInt() + self.primaryKeyTestString() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) +