Merge pull request #16536 from taosdata/feature/TD-14761
fix: add filter logic for tmq in stable wal
This commit is contained in:
commit
8652074fad
|
@ -2070,6 +2070,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc
|
||||||
// TDMT_VND_DROP_TABLE =================
|
// TDMT_VND_DROP_TABLE =================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char* name;
|
char* name;
|
||||||
|
uint64_t suid; // for tmq in wal format
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
} SVDropTbReq;
|
} SVDropTbReq;
|
||||||
|
|
||||||
|
|
|
@ -356,6 +356,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns,
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRequest->syncQuery = true;
|
||||||
if (!pRequest->pDb) {
|
if (!pRequest->pDb) {
|
||||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||||
goto end;
|
goto end;
|
||||||
|
|
|
@ -5141,6 +5141,7 @@ static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
|
||||||
|
if (tEncodeU64(pCoder, pReq->suid) < 0) return -1;
|
||||||
if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1;
|
if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
|
@ -5151,6 +5152,7 @@ static int32_t tDecodeSVDropTbReq(SDecoder *pCoder, SVDropTbReq *pReq) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
|
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
|
||||||
|
if (tDecodeU64(pCoder, &pReq->suid) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1;
|
if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
tEndDecode(pCoder);
|
||||||
|
|
|
@ -18,12 +18,25 @@
|
||||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
|
if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
|
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
||||||
|
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
|
||||||
|
int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
|
||||||
|
if (tEncodeI32(pEncoder, size) < 0) return -1;
|
||||||
|
void *pIter = NULL;
|
||||||
|
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
||||||
|
while(pIter){
|
||||||
|
int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL);
|
||||||
|
if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
|
||||||
|
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
|
||||||
|
}
|
||||||
|
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
|
||||||
|
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
|
@ -32,12 +45,25 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
||||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
||||||
|
}else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
|
||||||
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
int32_t size = 0;
|
||||||
|
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
||||||
|
for(int32_t i = 0; i < size; i++){
|
||||||
|
int64_t tbUid = 0;
|
||||||
|
if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
|
||||||
|
taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
|
||||||
|
}
|
||||||
|
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
|
||||||
|
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -267,14 +293,28 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
ASSERT(scanner);
|
ASSERT(scanner);
|
||||||
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
ASSERT(handle.execHandle.pExecReader);
|
ASSERT(handle.execHandle.pExecReader);
|
||||||
} else {
|
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
handle.execHandle.execDb.pFilterOutTbUid =
|
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
|
||||||
// handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
|
||||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
|
||||||
|
|
||||||
|
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
||||||
|
handle.execHandle.task =
|
||||||
|
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||||
|
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
|
vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList);
|
||||||
|
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
|
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||||
|
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
|
||||||
|
}
|
||||||
|
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
|
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
|
||||||
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
|
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
||||||
handle.execHandle.task =
|
handle.execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,162 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
|
|
||||||
|
bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
|
||||||
|
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int16_t msgType = pHead->msgType;
|
||||||
|
char* body = pHead->body;
|
||||||
|
int32_t bodyLen = pHead->bodyLen;
|
||||||
|
|
||||||
|
int64_t tbSuid = pHandle->execHandle.execTb.suid;
|
||||||
|
int64_t realTbSuid = 0;
|
||||||
|
SDecoder coder;
|
||||||
|
void* data = POINTER_SHIFT(body, sizeof(SMsgHead));
|
||||||
|
int32_t len = bodyLen - sizeof(SMsgHead);
|
||||||
|
tDecoderInit(&coder, data, len);
|
||||||
|
|
||||||
|
if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
|
||||||
|
SVCreateStbReq req = {0};
|
||||||
|
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
realTbSuid = req.suid;
|
||||||
|
} else if (msgType == TDMT_VND_DROP_STB) {
|
||||||
|
SVDropStbReq req = {0};
|
||||||
|
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
realTbSuid = req.suid;
|
||||||
|
} else if (msgType == TDMT_VND_CREATE_TABLE) {
|
||||||
|
SVCreateTbBatchReq req = {0};
|
||||||
|
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t needRebuild = 0;
|
||||||
|
SVCreateTbReq* pCreateReq = NULL;
|
||||||
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
|
pCreateReq = req.pReqs + iReq;
|
||||||
|
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
|
||||||
|
needRebuild++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(needRebuild == 0){
|
||||||
|
// do nothing
|
||||||
|
}else if(needRebuild == req.nReqs){
|
||||||
|
realTbSuid = tbSuid;
|
||||||
|
}else{
|
||||||
|
realTbSuid = tbSuid;
|
||||||
|
SVCreateTbBatchReq reqNew = {0};
|
||||||
|
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
|
||||||
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
|
pCreateReq = req.pReqs + iReq;
|
||||||
|
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
|
||||||
|
reqNew.nReqs++;
|
||||||
|
taosArrayPush(reqNew.pArray, pCreateReq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int tlen;
|
||||||
|
int32_t ret = 0;
|
||||||
|
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
|
||||||
|
void* buf = taosMemoryMalloc(tlen);
|
||||||
|
if (NULL == buf) {
|
||||||
|
taosArrayDestroy(reqNew.pArray);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
SEncoder coderNew = {0};
|
||||||
|
tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
|
||||||
|
tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
|
||||||
|
tEncoderClear(&coderNew);
|
||||||
|
memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
|
||||||
|
pHead->bodyLen = tlen + sizeof(SMsgHead);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
taosArrayDestroy(reqNew.pArray);
|
||||||
|
}
|
||||||
|
} else if (msgType == TDMT_VND_ALTER_TABLE) {
|
||||||
|
SVAlterTbReq req = {0};
|
||||||
|
|
||||||
|
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pHandle->execHandle.pExecReader->pVnodeMeta, 0);
|
||||||
|
|
||||||
|
if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
realTbSuid = mr.me.ctbEntry.suid;
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
} else if (msgType == TDMT_VND_DROP_TABLE) {
|
||||||
|
SVDropTbBatchReq req = {0};
|
||||||
|
|
||||||
|
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t needRebuild = 0;
|
||||||
|
SVDropTbReq* pDropReq = NULL;
|
||||||
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
|
pDropReq = req.pReqs + iReq;
|
||||||
|
|
||||||
|
if(pDropReq->suid == tbSuid){
|
||||||
|
needRebuild++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(needRebuild == 0){
|
||||||
|
// do nothing
|
||||||
|
}else if(needRebuild == req.nReqs){
|
||||||
|
realTbSuid = tbSuid;
|
||||||
|
}else{
|
||||||
|
realTbSuid = tbSuid;
|
||||||
|
SVDropTbBatchReq reqNew = {0};
|
||||||
|
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
|
||||||
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
|
pDropReq = req.pReqs + iReq;
|
||||||
|
if(pDropReq->suid == tbSuid){
|
||||||
|
reqNew.nReqs++;
|
||||||
|
taosArrayPush(reqNew.pArray, pDropReq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int tlen;
|
||||||
|
int32_t ret = 0;
|
||||||
|
tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
|
||||||
|
void* buf = taosMemoryMalloc(tlen);
|
||||||
|
if (NULL == buf) {
|
||||||
|
taosArrayDestroy(reqNew.pArray);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
SEncoder coderNew = {0};
|
||||||
|
tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
|
||||||
|
tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
|
||||||
|
tEncoderClear(&coderNew);
|
||||||
|
memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
|
||||||
|
pHead->bodyLen = tlen + sizeof(SMsgHead);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
taosArrayDestroy(reqNew.pArray);
|
||||||
|
}
|
||||||
|
} else if (msgType == TDMT_VND_DELETE) {
|
||||||
|
SDeleteRes req = {0};
|
||||||
|
if (tDecodeDeleteRes(&coder, &req) < 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
realTbSuid = req.suid;
|
||||||
|
} else{
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
tDecoderClear(&coder);
|
||||||
|
return tbSuid == realTbSuid;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||||
|
@ -53,11 +209,13 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
||||||
code = -1;
|
code = -1;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
if(isValValidForTable(pHandle, pHead)){
|
||||||
*fetchOffset = offset;
|
*fetchOffset = offset;
|
||||||
code = 0;
|
code = 0;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
|
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -6408,8 +6408,8 @@ typedef struct SVgroupDropTableBatch {
|
||||||
char dbName[TSDB_DB_NAME_LEN];
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
} SVgroupDropTableBatch;
|
} SVgroupDropTableBatch;
|
||||||
|
|
||||||
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo) {
|
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo, uint64_t suid) {
|
||||||
SVDropTbReq req = {.name = pClause->tableName, .igNotExists = pClause->ignoreNotExists};
|
SVDropTbReq req = {.name = pClause->tableName, .suid = suid, .igNotExists = pClause->ignoreNotExists};
|
||||||
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||||
if (NULL == pTableBatch) {
|
if (NULL == pTableBatch) {
|
||||||
SVgroupDropTableBatch tBatch = {0};
|
SVgroupDropTableBatch tBatch = {0};
|
||||||
|
@ -6450,7 +6450,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
|
||||||
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
|
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info);
|
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info, pTableMeta->suid);
|
||||||
}
|
}
|
||||||
|
|
||||||
over:
|
over:
|
||||||
|
|
|
@ -372,7 +372,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
||||||
int64_t code;
|
int64_t code;
|
||||||
|
|
||||||
ASSERT(pRead->curVersion == pHead->head.version);
|
// ASSERT(pRead->curVersion == pHead->head.version);
|
||||||
|
|
||||||
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ TdFilePtr g_fp = NULL;
|
||||||
typedef struct{
|
typedef struct{
|
||||||
bool snapShot;
|
bool snapShot;
|
||||||
bool dropTable;
|
bool dropTable;
|
||||||
|
bool subTable;
|
||||||
int srcVgroups;
|
int srcVgroups;
|
||||||
int dstVgroups;
|
int dstVgroups;
|
||||||
char dir[64];
|
char dir[64];
|
||||||
|
@ -74,57 +75,7 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t init_env(Config *conf) {
|
int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
|
||||||
if (pConn == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
char sql[128] = {0};
|
|
||||||
snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", conf->dstVgroups);
|
|
||||||
pRes = taos_query(pConn, sql);
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "drop topic if exists topic_db");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "drop database if exists abc1");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
snprintf(sql, 128, "create database if not exists abc1 vgroups %d", conf->srcVgroups);
|
|
||||||
pRes = taos_query(pConn, sql);
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "use abc1");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn,
|
pRes = taos_query(pConn,
|
||||||
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
||||||
"nchar(8), t4 bool)");
|
"nchar(8), t4 bool)");
|
||||||
|
@ -232,7 +183,7 @@ int32_t init_env(Config *conf) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
if(conf->dropTable){
|
if(g_conf.dropTable){
|
||||||
pRes = taos_query(pConn, "drop table ct3 ct1");
|
pRes = taos_query(pConn, "drop table ct3 ct1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
|
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
|
||||||
|
@ -297,7 +248,7 @@ int32_t init_env(Config *conf) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
if(conf->dropTable){
|
if(g_conf.dropTable){
|
||||||
pRes = taos_query(pConn, "drop table n1");
|
pRes = taos_query(pConn, "drop table n1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
@ -341,7 +292,7 @@ int32_t init_env(Config *conf) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
if(conf->dropTable){
|
if(g_conf.dropTable){
|
||||||
pRes = taos_query(pConn,
|
pRes = taos_query(pConn,
|
||||||
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
||||||
"nchar(8), t4 bool)");
|
"nchar(8), t4 bool)");
|
||||||
|
@ -358,6 +309,112 @@ int32_t init_env(Config *conf) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int buildStable(TAOS* pConn, TAOS_RES* pRes){
|
||||||
|
pRes = taos_query(pConn, "CREATE STABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` VARCHAR(16))");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create table d1 using meters tags(2, 'Beijing')");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create stream meters_summary_s into meters_summary as select _wstart, max(current) as current, groupid, location from meters partition by groupid, location interval(10m)");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create super table meters_summary, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t init_env() {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
if (pConn == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
char sql[128] = {0};
|
||||||
|
snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", g_conf.dstVgroups);
|
||||||
|
pRes = taos_query(pConn, sql);
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "drop topic if exists topic_db");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "drop topic if exists meters_summary_t1");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "drop database if exists abc1");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
snprintf(sql, 128, "create database if not exists abc1 vgroups %d", g_conf.srcVgroups);
|
||||||
|
pRes = taos_query(pConn, sql);
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "use abc1");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
if(g_conf.subTable){
|
||||||
|
buildStable(pConn, pRes);
|
||||||
|
}else{
|
||||||
|
buildDatabase(pConn, pRes);
|
||||||
|
}
|
||||||
|
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -377,12 +434,21 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
if(g_conf.subTable){
|
||||||
|
pRes = taos_query(pConn, "create topic meters_summary_t1 with meta as stable meters_summary");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create topic meters_summary_t1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
}else{
|
||||||
pRes = taos_query(pConn, "create topic topic_db with meta as database abc1");
|
pRes = taos_query(pConn, "create topic topic_db with meta as database abc1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
}
|
||||||
|
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -392,7 +458,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
printf("commit %d tmq %p param %p\n", code, tmq, param);
|
printf("commit %d tmq %p param %p\n", code, tmq, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_t* build_consumer(Config *config) {
|
tmq_t* build_consumer() {
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
tmq_conf_set(conf, "group.id", "tg2");
|
tmq_conf_set(conf, "group.id", "tg2");
|
||||||
tmq_conf_set(conf, "client.id", "my app 1");
|
tmq_conf_set(conf, "client.id", "my app 1");
|
||||||
|
@ -402,7 +468,7 @@ tmq_t* build_consumer(Config *config) {
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "enable.heartbeat.background", "true");
|
tmq_conf_set(conf, "enable.heartbeat.background", "true");
|
||||||
|
|
||||||
if(config->snapShot){
|
if(g_conf.snapShot){
|
||||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,7 +481,11 @@ tmq_t* build_consumer(Config *config) {
|
||||||
|
|
||||||
tmq_list_t* build_topic_list() {
|
tmq_list_t* build_topic_list() {
|
||||||
tmq_list_t* topic_list = tmq_list_new();
|
tmq_list_t* topic_list = tmq_list_new();
|
||||||
|
if(g_conf.subTable){
|
||||||
|
tmq_list_append(topic_list, "meters_summary_t1");
|
||||||
|
}else{
|
||||||
tmq_list_append(topic_list, "topic_db");
|
tmq_list_append(topic_list, "topic_db");
|
||||||
|
}
|
||||||
return topic_list;
|
return topic_list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,16 +516,16 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
fprintf(stderr, "%% Consumer closed\n");
|
fprintf(stderr, "%% Consumer closed\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
void initLogFile(Config *conf) {
|
void initLogFile() {
|
||||||
char f1[256] = {0};
|
char f1[256] = {0};
|
||||||
char f2[256] = {0};
|
char f2[256] = {0};
|
||||||
|
|
||||||
if(conf->snapShot){
|
if(g_conf.snapShot){
|
||||||
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", conf->dir);
|
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", g_conf.dir);
|
||||||
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", conf->dir);
|
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", g_conf.dir);
|
||||||
}else{
|
}else{
|
||||||
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", conf->dir);
|
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", g_conf.dir);
|
||||||
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", conf->dir);
|
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", g_conf.dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
|
@ -471,7 +541,7 @@ void initLogFile(Config *conf) {
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(conf->snapShot){
|
if(g_conf.snapShot){
|
||||||
char *result[] = {
|
char *result[] = {
|
||||||
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
|
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
|
||||||
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
|
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
|
||||||
|
@ -531,20 +601,22 @@ int main(int argc, char* argv[]) {
|
||||||
g_conf.srcVgroups = atol(argv[++i]);
|
g_conf.srcVgroups = atol(argv[++i]);
|
||||||
}else if(strcmp(argv[i], "-dv") == 0){
|
}else if(strcmp(argv[i], "-dv") == 0){
|
||||||
g_conf.dstVgroups = atol(argv[++i]);
|
g_conf.dstVgroups = atol(argv[++i]);
|
||||||
|
}else if(strcmp(argv[i], "-t") == 0){
|
||||||
|
g_conf.subTable = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("env init\n");
|
printf("env init\n");
|
||||||
if(strlen(g_conf.dir) != 0){
|
if(strlen(g_conf.dir) != 0){
|
||||||
initLogFile(&g_conf);
|
initLogFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (init_env(&g_conf) < 0) {
|
if (init_env() < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
create_topic();
|
create_topic();
|
||||||
|
|
||||||
tmq_t* tmq = build_consumer(&g_conf);
|
tmq_t* tmq = build_consumer();
|
||||||
tmq_list_t* topic_list = build_topic_list();
|
tmq_list_t* topic_list = build_topic_list();
|
||||||
basic_consume_loop(tmq, topic_list);
|
basic_consume_loop(tmq, topic_list);
|
||||||
taosCloseFile(&g_fp);
|
taosCloseFile(&g_fp);
|
||||||
|
|
Loading…
Reference in New Issue