feat:[TS-4243] add logic for primark key in tmq

This commit is contained in:
wangmm0220 2024-03-22 17:48:59 +08:00
parent c7f108181d
commit f1853ced26
20 changed files with 659 additions and 430 deletions

View File

@ -3307,6 +3307,8 @@ enum {
ONLY_META = 2,
};
#define TQ_OFFSET_VERSION 1
typedef struct {
int8_t type;
union {
@ -3314,6 +3316,7 @@ typedef struct {
struct {
int64_t uid;
int64_t ts;
SValue primaryKey;
};
// log
struct {
@ -3322,10 +3325,14 @@ typedef struct {
};
} STqOffsetVal;
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts, SValue primaryKey) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
pOffsetVal->uid = uid;
pOffsetVal->ts = ts;
if(IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){
taosMemoryFree(pOffsetVal->primaryKey.pData);
}
pOffsetVal->primaryKey = primaryKey;
}
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
@ -3342,6 +3349,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight);
void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pOffsetVal);
void tOffsetDestroy(void* pVal);
typedef struct {
STqOffsetVal val;
@ -3648,6 +3657,7 @@ typedef struct {
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
int32_t tDeserializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
void tDestroySMqPollReq(SMqPollReq *pReq);
typedef struct {
int32_t vgId;
@ -3691,7 +3701,9 @@ typedef struct {
int32_t tEncodeMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp);
int32_t tDecodeMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp);
void tDeleteMqMetaRsp(SMqMetaRsp *pRsp);
#define MQ_DATA_RSP_VERSION 100
typedef struct {
SMqRspHead head;
STqOffsetVal reqOffset;
@ -3707,7 +3719,7 @@ typedef struct {
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp, int8_t dataVersion);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);
typedef struct {
@ -3728,7 +3740,7 @@ typedef struct {
} STaosxRsp;
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp, int8_t dateVersion);
void tDeleteSTaosxRsp(STaosxRsp* pRsp);
typedef struct {

View File

@ -131,7 +131,7 @@ typedef struct SMetaTableInfo {
} SMetaTableInfo;
typedef struct SSnapContext {
SMeta* pMeta; // todo remove it
SMeta* pMeta;
int64_t snapVersion;
void* pCur;
int64_t suid;
@ -142,6 +142,7 @@ typedef struct SSnapContext {
int32_t index;
int8_t withMeta;
int8_t queryMeta; // true-get meta, false-get data
bool hasPrimaryKey;
} SSnapContext;
typedef struct {
@ -219,6 +220,8 @@ typedef struct SStoreTqReader {
int32_t (*tqReaderAddTables)();
int32_t (*tqReaderRemoveTables)();
void (*tqSetTablePrimaryKey)();
bool (*tqGetTablePrimaryKey)();
bool (*tqReaderIsQueriedTable)();
bool (*tqReaderCurrentBlockConsumed)();
@ -230,6 +233,8 @@ typedef struct SStoreTqReader {
} SStoreTqReader;
typedef struct SStoreSnapshotFn {
bool (*taosXGetTablePrimaryKey)(SSnapContext *ctx);
void (*taosXSetTablePrimaryKey)(SSnapContext *ctx, int64_t uid);
int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid);
int32_t (*destroySnapshot)(SSnapContext* ctx);
SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);

View File

@ -432,6 +432,24 @@ static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uin
return 0;
}
static FORCE_INLINE int32_t tDecodeBinaryAlloc32(SDecoder* pCoder, uint8_t** val, uint32_t* len) {
uint32_t length = 0;
if (tDecodeU32v(pCoder, &length) < 0) return -1;
if (length) {
if (len) *len = length;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
*val = taosMemoryMalloc(length);
if (*val == NULL) return -1;
memcpy(*val, TD_CODER_CURRENT(pCoder), length);
TD_CODER_MOVE_POS(pCoder, length);
} else {
*val = NULL;
}
return 0;
}
static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SDecoder* pCoder, char** val, uint64_t* len) {
if (tDecodeBinaryAlloc(pCoder, (void**)val, len) < 0) return -1;
(*len) -= 1;

View File

@ -225,6 +225,7 @@ typedef struct {
} SMqRspObj;
typedef struct {
int8_t version;
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
@ -233,6 +234,7 @@ typedef struct {
} SMqMetaRspObj;
typedef struct {
int8_t version;
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];

View File

@ -342,27 +342,17 @@ void taos_free_result(TAOS_RES *res) {
destroyRequest(pRequest);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->rsp.blockDataLen);
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
// taosx
taosArrayDestroy(pRsp->rsp.createTableLen);
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
tDeleteSTaosxRsp(&pRsp->rsp);
doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res;
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->rsp.blockDataLen);
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
tDeleteMqDataRsp(&pRsp->rsp);
doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
taosMemoryFree(pRspObj->metaRsp.metaRsp);
tDeleteMqMetaRsp(&pRspObj->metaRsp);
taosMemoryFree(pRspObj);
}
}

View File

@ -64,6 +64,8 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
}
cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
cJSON_AddItemToObject(column, "isPrimarykey", isPk);
cJSON_AddItemToArray(columns, column);
}
cJSON_AddItemToObject(json, "columns", columns);
@ -1625,7 +1627,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
rspObj.resType = RES_TYPE__TMQ;
tDecoderInit(&decoder, data, dataLen);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp, *(int8_t*)data);
if (code != 0) {
code = TSDB_CODE_INVALID_MSG;
goto end;
@ -1754,7 +1756,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
rspObj.resType = RES_TYPE__TMQ_METADATA;
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp, *(int8_t*)data);
if (code != 0) {
code = TSDB_CODE_INVALID_MSG;
goto end;

View File

@ -925,26 +925,15 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
tDeleteMqDataRsp(&pRsp->dataRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);
taosMemoryFree(pRsp->metaRsp.metaRsp);
tDeleteMqMetaRsp(&pRsp->metaRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
// taosx
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
tDeleteSTaosxRsp(&pRsp->taosxRsp);
}
}
@ -1016,10 +1005,17 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
return rsp;
}
static void freeClientVg(void* param){
SMqClientVg* pVg = param;
tOffsetDestroy(&pVg->offsetInfo.endOffset);
tOffsetDestroy(&pVg->offsetInfo.beginOffset);
tOffsetDestroy(&pVg->offsetInfo.committedOffset);
}
static void freeClientVgImpl(void* param) {
SMqClientTopic* pTopic = param;
taosMemoryFreeClear(pTopic->schema.pSchema);
taosArrayDestroy(pTopic->vgs);
taosArrayDestroyEx(pTopic->vgs, freeClientVg);
}
void tmqFreeImpl(void* handle) {
@ -1381,7 +1377,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
if(tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1392,13 +1393,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
if(tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp) < 0){
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
} else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
if(tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
} else { // invalid rspType
@ -1472,26 +1483,22 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.numOfRows = pInfo ? pInfo->numOfRows : 0,
};
clientVg.offsetInfo.endOffset = pInfo ? pInfo->currentOffset : offsetNew;
clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
clientVg.offsetInfo.beginOffset = pInfo ? pInfo->seekOffset : offsetNew;
clientVg.offsetInfo.walVerBegin = -1;
clientVg.offsetInfo.walVerEnd = -1;
clientVg.seekUpdated = false;
if(pInfo) {
tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
}else{
clientVg.offsetInfo.endOffset = offsetNew;
clientVg.offsetInfo.committedOffset = offsetNew;
clientVg.offsetInfo.beginOffset = offsetNew;
}
taosArrayPush(pTopic->vgs, &clientVg);
}
}
static void freeClientVgInfo(void* param) {
SMqClientTopic* pTopic = param;
if (pTopic->schema.nCols) {
taosMemoryFreeClear(pTopic->schema.pSchema);
}
taosArrayDestroy(pTopic->vgs);
}
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false;
@ -1558,7 +1565,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
// destroy current buffered existed topics info
if (tmq->clientTopics) {
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
}
tmq->clientTopics = newTopics;
taosWUnLockLatch(&tmq->lock);
@ -1813,8 +1820,10 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
int64_t consumerId, bool hasData) {
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
if (hasData) pVg->offsetInfo.beginOffset = *reqOffset;
pVg->offsetInfo.endOffset = *rspOffset;
if (hasData) {
tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
}
tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
} else {
tscDebug("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
}
@ -1882,6 +1891,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) {
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
return NULL;
}
@ -1950,6 +1961,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) {
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
return NULL;
}
@ -1980,6 +1993,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) {
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
return NULL;
}
@ -2001,14 +2016,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
}
// build rsp
void* pRsp = NULL;
int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
tscError("consumer:0x%" PRIx64 " createTableNum should > 0 if rsp type is data_meta", tmq->consumerId);
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
}
void* pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0};
@ -2662,7 +2671,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqDataRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqDataRsp(&decoder, &rsp);
tDecodeMqDataRsp(&decoder, &rsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)));
tDecoderClear(&decoder);
SMqRspHead* pHead = pMsg->pData;
@ -2715,6 +2724,7 @@ static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) {
tOffsetDestroy(&pParam->vgOffset.offset);
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
@ -2799,6 +2809,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
code = pParam->vgOffset.offset.val.version;
} else {
tOffsetDestroy(&pParam->vgOffset.offset);
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
}

View File

@ -6593,22 +6593,6 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq)
return 0;
}
int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) {
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1;
return 0;
}
int32_t tDerializeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffset) {
if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1;
return 0;
}
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
@ -6627,7 +6611,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tEncodeU64(&encoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1;
@ -6648,10 +6632,6 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
// SMsgHead *pHead = buf;
// pHead->vgId = pReq->head.vgId;
// pHead->contLen = pReq->head.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
@ -6664,7 +6644,7 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tDecodeU64(&decoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1;
if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1;
@ -6680,6 +6660,9 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
return 0;
}
void tDestroySMqPollReq(SMqPollReq *pReq){
tOffsetDestroy(&pReq->reqOffset);
}
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
@ -8305,10 +8288,17 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) {
}
int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) {
if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1;
if (tEncodeI8(pEncoder, (TQ_OFFSET_VERSION << 4) | pOffsetVal->type) < 0) return -1;
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) {
if (tEncodeI64(pEncoder, pOffsetVal->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1;
if (tEncodeI8(pEncoder, pOffsetVal->primaryKey.type) < 0) return -1;
if (IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){
if (tEncodeBinary(pEncoder, pOffsetVal->primaryKey.pData, pOffsetVal->primaryKey.nData) < 0) return -1;
} else {
if (tEncodeI64(pEncoder, pOffsetVal->primaryKey.val) < 0) return -1;
}
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1;
} else {
@ -8322,6 +8312,15 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) {
if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1;
if ((pOffsetVal->type >> 4) >= TQ_OFFSET_VERSION) {
pOffsetVal->type = pOffsetVal->type & 0x0F;
if (tDecodeI8(pDecoder, &pOffsetVal->primaryKey.type) < 0) return -1;
if (IS_VAR_DATA_TYPE(pOffsetVal->primaryKey.type)){
if (tDecodeBinaryAlloc32(pDecoder, &pOffsetVal->primaryKey.pData, &pOffsetVal->primaryKey.nData) < 0) return -1;
} else {
if (tDecodeI64(pDecoder, &pOffsetVal->primaryKey.val) < 0) return -1;
}
}
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1;
} else {
@ -8353,6 +8352,10 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
if (pLeft->type == TMQ_OFFSET__LOG) {
return pLeft->version == pRight->version;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (pLeft->primaryKey.type != 0) {
if(pLeft->primaryKey.type != pRight->primaryKey.type) return false;
if(tValueCompare(&pLeft->primaryKey, &pRight->primaryKey) != 0) return false;
}
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
return pLeft->uid == pRight->uid;
@ -8364,6 +8367,21 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
return false;
}
void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight){
tOffsetDestroy(pLeft);
*pLeft = *pRight;
if(IS_VAR_DATA_TYPE(pRight->primaryKey.type)){
pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData);
memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData);
}
}
void tOffsetDestroy(void* param){
STqOffsetVal* pVal = (STqOffsetVal*)param;
if(IS_VAR_DATA_TYPE(pVal->primaryKey.type)){
taosMemoryFreeClear(pVal->primaryKey.pData);
}
}
int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) {
if (tEncodeSTqOffsetVal(pEncoder, &pOffset->val) < 0) return -1;
if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1;
@ -8476,6 +8494,10 @@ int32_t tDecodeMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) {
return 0;
}
void tDeleteMqMetaRsp(SMqMetaRsp *pRsp) {
taosMemoryFree(pRsp->metaRsp);
}
int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
@ -8502,6 +8524,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
}
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1;
if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1;
return 0;
@ -8553,7 +8576,10 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0;
}
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp, int8_t dataVersion) {
if (dataVersion >= MQ_DATA_RSP_VERSION){
if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1;
}
if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1;
@ -8570,9 +8596,12 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) {
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1;
if (tEncodeMqDataRspCommon(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
@ -8586,7 +8615,10 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
return 0;
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp, int8_t dataVersion) {
if (dataVersion >= MQ_DATA_RSP_VERSION){
if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1;
}
if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
@ -8618,6 +8650,8 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen);
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
pRsp->createTableReq = NULL;
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
}
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {

View File

@ -209,11 +209,15 @@ typedef struct STqReader {
SSchemaWrapper *pSchemaWrapper;
SSDataBlock *pResBlock;
int64_t lastTs;
bool hasPrimaryKey;
} STqReader;
STqReader *tqReaderOpen(SVnode *pVnode);
void tqReaderClose(STqReader *);
bool tqGetTablePrimaryKey(STqReader* pReader);
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
@ -248,6 +252,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
bool taosXGetTablePrimaryKey(SSnapContext *ctx);
void taosXSetTablePrimaryKey(SSnapContext *ctx, int64_t uid);
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext **ctxRet);
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);

View File

@ -466,6 +466,20 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
return c;
}
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid){
bool ret = false;
SSchemaWrapper *schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){
ret = true;
}
tDeleteSchemaWrapper(schema);
ctx->hasPrimaryKey = ret;
}
bool taosXGetTablePrimaryKey(SSnapContext* ctx){
return ctx->hasPrimaryKey;
}
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
int32_t ret = 0;
void* pKey = NULL;

View File

@ -190,10 +190,12 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
SMqVgOffset vgOffset = {0};
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
return -1;
code = TSDB_CODE_INVALID_MSG;
goto end;
}
tDecoderClear(&decoder);
@ -208,22 +210,28 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
pOffset->val.version);
} else {
tqError("invalid commit offset type:%d", pOffset->val.type);
return -1;
code = TSDB_CODE_INVALID_MSG;
goto end;
}
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) {
tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
return 0; // no need to update the offset value
goto end; // no need to update the offset value
}
// save the new offset value
if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
return -1;
code = tqOffsetWrite(pTq->pOffsetStore, pOffset);
if(code != 0) {
code = TSDB_CODE_INVALID_MSG;
goto end;
}
return 0;
end:
tOffsetDestroy(&vgOffset.offset.val);
return code;
}
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
@ -329,11 +337,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0};
int code = 0;
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
int code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
if (code < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
goto END;
}
int64_t consumerId = req.consumerId;
@ -357,7 +365,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
taosWUnLockLatch(&pTq->lock);
return -1;
code = -1;
goto END;
} while (0);
}
@ -368,7 +377,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosWUnLockLatch(&pTq->lock);
return -1;
code = -1;
goto END;
}
bool exec = tqIsHandleExec(pHandle);
@ -405,6 +415,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
req.subKey, pHandle);
END:
tDestroySMqPollReq(&req);
return code;
}
@ -423,8 +436,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
STqOffset* pOffset = &vgOffset.offset;
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, vgOffset.offset.subKey);
if (pSavedOffset == NULL) {
terrno = TSDB_CODE_TMQ_NO_COMMITTED;
return terrno;

View File

@ -60,7 +60,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
return -1;
}
STqOffset offset;
STqOffset offset = {0};
SDecoder decoder;
tDecoderInit(&decoder, pMemBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
@ -108,6 +108,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
return NULL;
}
taosHashSetFreeFp(pStore->pHash, tOffsetDestroy);
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
taosMemoryFree(fname);

View File

@ -247,6 +247,20 @@ END:
return code;
}
bool tqGetTablePrimaryKey(STqReader* pReader){
return pReader->hasPrimaryKey;
}
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid){
bool ret = false;
SSchemaWrapper *schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){
ret = true;
}
tDeleteSchemaWrapper(schema);
pReader->hasPrimaryKey = ret;
}
STqReader* tqReaderOpen(SVnode* pVnode) {
STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
if (pReader == NULL) {

View File

@ -113,9 +113,8 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pHandle->block = createOneDataBlock(pDataBlock, true);
// pHandle->block = createDataBlock();
// copyDataBlock(pHandle->block, pDataBlock);
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
if (code != 0) {
return code;
@ -139,6 +138,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
qStreamExtractOffset(task, &offset);
pRsp->sleepTime = offset.ts - pHandle->blockTime;
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
}
break;
} else {
@ -215,7 +215,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
continue;
} else {
rowCnt += pDataBlock->info.rows;
if (rowCnt <= 4096) continue;
if (rowCnt <= 1) continue;
}
}

View File

@ -19,8 +19,8 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const
const SMqMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
tOffsetCopy(&pRsp->reqOffset, &pOffset);
tOffsetCopy(&pRsp->rspOffset, &pOffset);
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
@ -40,8 +40,8 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
tOffsetCopy(&pRsp->reqOffset, &pOffset);
tOffsetCopy(&pRsp->rspOffset, &pOffset);
pRsp->withTbName = 1;
pRsp->withSchema = 1;
@ -81,7 +81,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
*pBlockReturned = false;
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
if (pOffset != NULL) {
*pOffsetVal = pOffset->val;
tOffsetCopy(pOffsetVal, &pOffset->val);
char formatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
@ -98,7 +98,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
if (pHandle->fetchMeta) {
tqOffsetResetToMeta(pOffsetVal, 0);
} else {
tqOffsetResetToData(pOffsetVal, 0, 0);
SValue val = {0};
tqOffsetResetToData(pOffsetVal, 0, 0, val);
}
} else {
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
@ -157,7 +158,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists
tOffsetCopy(&dataRsp.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
@ -207,7 +208,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp);
tDeleteMqMetaRsp(&metaRsp);
goto end;
}
@ -219,7 +220,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
*offset = taosxRsp.rspOffset;
tOffsetCopy(offset, &taosxRsp.rspOffset);
}
}
@ -309,33 +310,37 @@ end:
}
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
STqOffsetVal reqOffset = pRequest->reqOffset;
int32_t code = 0;
STqOffsetVal reqOffset = {0};
tOffsetCopy(&reqOffset, &pRequest->reqOffset);
// 1. reset the offset if needed
// reset the offset if needed
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
// handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false;
int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
return code;
goto END;
}
// empty block returned, quit
if (blockReturned) {
return 0;
goto END;
}
} else if (reqOffset.type == 0) { // use the consumer specified offset
uError("req offset type is 0");
return TSDB_CODE_TMQ_INVALID_MSG;
code = TSDB_CODE_TMQ_INVALID_MSG;
goto END;
}
// this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} else {
// for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
}
END:
tOffsetDestroy(&reqOffset);
return code;
}
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,

View File

@ -115,6 +115,8 @@ void initTqAPI(SStoreTqReader* pTq) {
pTq->tqReaderSeek = tqReaderSeek;
pTq->tqRetrieveBlock = tqRetrieveDataBlock;
pTq->tqGetTablePrimaryKey = tqGetTablePrimaryKey;
pTq->tqSetTablePrimaryKey = tqSetTablePrimaryKey;
pTq->tqReaderNextBlockInWal = tqNextBlockInWal;
pTq->tqNextBlockImpl = tqNextBlockImpl; // todo remove it
@ -258,6 +260,8 @@ void initCacheFn(SStoreCacheReader* pCache) {
}
void initSnapshotFn(SStoreSnapshotFn* pSnapshot) {
pSnapshot->taosXGetTablePrimaryKey = taosXGetTablePrimaryKey;
pSnapshot->taosXSetTablePrimaryKey = taosXSetTablePrimaryKey;
pSnapshot->setForSnapShot = setForSnapShot;
pSnapshot->destroySnapshot = destroySnapContext;
pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot;

View File

@ -1082,7 +1082,7 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));
tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
}
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
@ -1161,7 +1161,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
if (pOffset->type == TMQ_OFFSET__LOG) {
// todo refactor: move away
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
@ -1196,6 +1195,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
return -1;
}
}
pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
pInfo->pTableScanOp->resultInfo.totalRows);
@ -1220,7 +1220,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int64_t oldSkey = pScanBaseInfo->cond.twindows.skey;
// let's start from the next ts that returned to consumer.
pScanBaseInfo->cond.twindows.skey = ts + 1;
if(pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)){
pScanBaseInfo->cond.twindows.skey = ts;
}else{
pScanBaseInfo->cond.twindows.skey = ts + 1;
}
pScanInfo->scanTimes = 0;
if (pScanBaseInfo->dataReader == NULL) {
@ -1276,8 +1280,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
goto end; // no data
}
pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
if(pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)){
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
}else{
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1;
}
tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
@ -1312,7 +1321,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
end:
pTaskInfo->streamInfo.currentOffset = *pOffset;
tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
return 0;
}

View File

@ -209,7 +209,10 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
return pqSw;
}
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); }
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSchemaWrapper(pStreamInfo->schema);
tOffsetDestroy(&pStreamInfo->currentOffset);
}
static void freeBlock(void* pParam) {
SSDataBlock* pBlock = *(SSDataBlock**)pParam;

View File

@ -1910,6 +1910,45 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
}
}
static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal *offset) {
if(pBlock->info.window.skey != offset->ts || offset->primaryKey.type == 0){
return;
}
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pColTs = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
qDebug("doBlockDataWindowFilter primary key, ts:%" PRId64 " %"PRId64, offset->ts, offset->primaryKey.val);
ASSERT(pColPk->info.type == offset->primaryKey.type);
__compar_fn_t func = getComparFunc(pColPk->info.type, 0);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pColTs, i);
void *data = colDataGetData(pColPk, i);
if(IS_VAR_DATA_TYPE(pColPk->info.type)){
void *tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE);
memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData);
varDataLen(tmq) = offset->primaryKey.nData;
p[i] = (*ts >= offset->ts) && (func(data, tmq) > 0);
taosMemoryFree(tmq);
}else{
p[i] = (*ts >= offset->ts) && (func(data, &offset->primaryKey.val) > 0);
}
if (!p[i]) {
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
int32_t numOfRows = pBlock->info.rows;
@ -2031,6 +2070,25 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
return 0;
}
static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal *offset){
SValue val = {0};
if (hasPrimaryKey){
doBlockDataPrimaryKeyFilter(pBlock, offset);
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1);
val.type = pColPk->info.type;
if(IS_VAR_DATA_TYPE(pColPk->info.type)) {
val.pData = taosMemoryMalloc(varDataLen(tmp));
val.nData = varDataLen(tmp);
memcpy(val.pData, varDataVal(tmp), varDataLen(tmp));
}else{
memcpy(&val.val, tmp, pColPk->info.bytes);
}
}
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
}
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
@ -2046,8 +2104,11 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid);
return pResult;
}
@ -2585,7 +2646,6 @@ static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
}
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
@ -2616,9 +2676,12 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
return pBlock;
if (pBlock && pBlock->info.rows > 0) {
bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext);
processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
return pBlock;
}
}
SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext);
@ -2627,7 +2690,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap read snapshot done, change to get data from wal");
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1);
} else {
tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
SValue val = {0};
tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN, val);
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
}
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
@ -2647,7 +2711,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if (!sContext->queryMeta) { // change to get data next poll request
STqOffsetVal offset = {0};
tqOffsetResetToData(&offset, 0, INT64_MIN);
SValue val = {0};
tqOffsetResetToData(&offset, 0, INT64_MIN, val);
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
} else {
tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);

View File

@ -80,308 +80,330 @@ static void msg_process(TAOS_RES* msg) {
}
int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
/* test for TD-20612 start*/
pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 int, c2 int)");
/* test for primary key start*/
pRes = taos_query(pConn, "create table if not exists pk (ts timestamp, c1 int primary key, c2 int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tb1 (ts, c1) values(1669092069069, 0)");
pRes = taos_query(pConn, "insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tb1 (ts, c2) values(1669092069069, 1)");
pRes = taos_query(pConn, "insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
/* test for TD-20612 end*/
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(
pConn,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, "
"'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,
"insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) "
"(1626006833609, 51, 62, 'c333', 940)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 select * from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
if (g_conf.dropTable) {
pRes = taos_query(pConn, "drop table ct3, ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 comment 'hello'");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 drop column c1");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
if (g_conf.dropTable) {
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt2 using jt tags('')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into jt1 values(now, 1)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into jt2 values(now, 11)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
if (g_conf.dropTable) {
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
pRes = taos_query(pConn,
"create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,
"create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(
pConn,
"create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) "
"stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes =
taos_query(pConn,
"insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + "
"1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
"stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb "
"tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// /* test for TD-20612 start*/
// pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 int, c2 int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into tb1 (ts, c1) values(1669092069069, 0)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into tb1 (ts, c2) values(1669092069069, 1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// /* test for TD-20612 end*/
//
// pRes = taos_query(pConn,
// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// "nchar(8), t4 bool)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')");
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(
// pConn,
// "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, "
// "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn,
// "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) "
// "(1626006833609, 51, 62, 'c333', 940)");
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into ct3 select * from ct1");
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
// if (taos_errno(pRes) != 0) {
// printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606");
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// if (g_conf.dropTable) {
// pRes = taos_query(pConn, "drop table ct3, ct1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
// }
//
// pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
// if (taos_errno(pRes) != 0) {
// printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table n1 comment 'hello'");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table n1 drop column c1");
// if (taos_errno(pRes) != 0) {
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)");
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// if (g_conf.dropTable) {
// pRes = taos_query(pConn, "drop table n1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
// }
//
// pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table jt2 using jt tags('')");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into jt1 values(now, 1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into jt2 values(now, 11)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// if (g_conf.dropTable) {
// pRes = taos_query(pConn,
// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// "nchar(8), t4 bool)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
// }
//
// pRes = taos_query(pConn,
// "create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// "nchar(8), t4 bool)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn,
// "create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// "nchar(8), t4 bool)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(
// pConn,
// "create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) "
// "stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes =
// taos_query(pConn,
// "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + "
// "1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
// "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb "
// "tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
return 0;
}
@ -490,12 +512,12 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop database if exists abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in drop db, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
snprintf(sql, 128, "create database if not exists abc1 vgroups %d wal_retention_period 3600", g_conf.srcVgroups);
pRes = taos_query(pConn, sql);
@ -1034,7 +1056,7 @@ int main(int argc, char* argv[]) {
basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list);
testConsumeExcluded(1);
testConsumeExcluded(2);
// testConsumeExcluded(1);
// testConsumeExcluded(2);
taosCloseFile(&g_fp);
}