Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fix/udf
This commit is contained in:
commit
43eefef68c
|
@ -114,21 +114,30 @@ typedef struct SWal {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
// ref
|
// ref
|
||||||
SHashObj *pRefHash; // ref -> SWalRef
|
SHashObj *pRefHash; // refId -> SWalRef
|
||||||
// path
|
// path
|
||||||
char path[WAL_PATH_LEN];
|
char path[WAL_PATH_LEN];
|
||||||
// reusable write head
|
// reusable write head
|
||||||
SWalCkHead writeHead;
|
SWalCkHead writeHead;
|
||||||
} SWal; // WAL HANDLE
|
} SWal;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t refId;
|
||||||
|
int64_t refVer;
|
||||||
|
int64_t refFile;
|
||||||
|
SWal *pWal;
|
||||||
|
} SWalRef;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t scanUncommited;
|
int8_t scanUncommited;
|
||||||
|
int8_t scanNotApplied;
|
||||||
int8_t scanMeta;
|
int8_t scanMeta;
|
||||||
int8_t enableRef;
|
int8_t enableRef;
|
||||||
} SWalFilterCond;
|
} SWalFilterCond;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SWal *pWal;
|
SWal *pWal;
|
||||||
|
int64_t readerId;
|
||||||
TdFilePtr pLogFile;
|
TdFilePtr pLogFile;
|
||||||
TdFilePtr pIdxFile;
|
TdFilePtr pIdxFile;
|
||||||
int64_t curFileFirstVer;
|
int64_t curFileFirstVer;
|
||||||
|
@ -138,7 +147,8 @@ typedef struct {
|
||||||
int8_t curStopped;
|
int8_t curStopped;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
SWalFilterCond cond;
|
SWalFilterCond cond;
|
||||||
SWalCkHead *pHead;
|
// TODO remove it
|
||||||
|
SWalCkHead *pHead;
|
||||||
} SWalReader;
|
} SWalReader;
|
||||||
|
|
||||||
// module initialization
|
// module initialization
|
||||||
|
@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_
|
||||||
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
|
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
|
||||||
int32_t bodyLen);
|
int32_t bodyLen);
|
||||||
|
|
||||||
// This interface assign version automatically and return to caller.
|
// Assign version automatically and return to caller,
|
||||||
// When using this interface with concurrent writes,
|
|
||||||
// wal will write all logs atomically,
|
|
||||||
// but not sure which one will be actually write first,
|
|
||||||
// and then the unique index of successful writen is returned.
|
|
||||||
// -1 will be returned for failed writes
|
// -1 will be returned for failed writes
|
||||||
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
|
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
|
||||||
|
|
||||||
|
@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
||||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
||||||
typedef struct {
|
|
||||||
int64_t refId;
|
SWalRef *walRefCommittedVer(SWal *);
|
||||||
int64_t ver;
|
|
||||||
} SWalRef;
|
|
||||||
|
|
||||||
SWalRef *walOpenRef(SWal *);
|
SWalRef *walOpenRef(SWal *);
|
||||||
void walCloseRef(SWalRef *);
|
void walCloseRef(SWal *pWal, int64_t refId);
|
||||||
int32_t walRefVer(SWalRef *, int64_t ver);
|
int32_t walRefVer(SWalRef *, int64_t ver);
|
||||||
int32_t walUnrefVer(SWal *);
|
void walUnrefVer(SWalRef *);
|
||||||
|
|
||||||
// help function for raft
|
// helper function for raft
|
||||||
bool walLogExist(SWal *, int64_t ver);
|
bool walLogExist(SWal *, int64_t ver);
|
||||||
bool walIsEmpty(SWal *);
|
bool walIsEmpty(SWal *);
|
||||||
|
|
||||||
|
|
|
@ -104,6 +104,8 @@ typedef struct {
|
||||||
// TODO remove
|
// TODO remove
|
||||||
SWalReader* pWalReader;
|
SWalReader* pWalReader;
|
||||||
|
|
||||||
|
SWalRef* pRef;
|
||||||
|
|
||||||
// push
|
// push
|
||||||
STqPushHandle pushHandle;
|
STqPushHandle pushHandle;
|
||||||
|
|
||||||
|
|
|
@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
|
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
|
||||||
|
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*}*/
|
/*}*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
|
@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||||
int64_t fetchVer = fetchOffsetNew.version + 1;
|
int64_t fetchVer = fetchOffsetNew.version + 1;
|
||||||
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
code = -1;
|
code = -1;
|
||||||
goto OVER;
|
goto OVER;
|
||||||
|
@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
|
||||||
pHandle->execHandle.subType = req.subType;
|
pHandle->execHandle.subType = req.subType;
|
||||||
pHandle->fetchMeta = req.withMeta;
|
pHandle->fetchMeta = req.withMeta;
|
||||||
|
// TODO version should be assigned and refed during preprocess
|
||||||
|
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
||||||
|
if (pRef == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
int64_t ver = pRef->refVer;
|
||||||
|
pHandle->pRef = pRef;
|
||||||
|
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
|
||||||
|
|
||||||
// TODO version should be assigned in preprocess
|
|
||||||
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
pHandle->execHandle.execCol.qmsg = req.qmsg;
|
pHandle->execHandle.execCol.qmsg = req.qmsg;
|
||||||
pHandle->snapshotVer = ver;
|
pHandle->snapshotVer = ver;
|
||||||
|
@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
ASSERT(pHandle->execHandle.pExecReader);
|
ASSERT(pHandle->execHandle.pExecReader);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
||||||
|
|
|
@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
TXN txn;
|
TXN txn = {0};
|
||||||
|
|
||||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) {
|
||||||
STqHandle handle;
|
STqHandle handle;
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
tDecodeSTqHandle(&decoder, &handle);
|
tDecodeSTqHandle(&decoder, &handle);
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
|
||||||
|
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
||||||
|
if (handle.pRef == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
walRefVer(handle.pRef, handle.snapshotVer);
|
||||||
|
|
||||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
SReadHandle reader = {
|
SReadHandle reader = {
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
|
@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) {
|
||||||
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
ASSERT(handle.execHandle.pExecReader);
|
ASSERT(handle.execHandle.pExecReader);
|
||||||
} else {
|
} else {
|
||||||
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
handle.execHandle.execDb.pFilterOutTbUid =
|
handle.execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// init ref
|
// init ref
|
||||||
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||||
if (pWal->pRefHash == NULL) {
|
if (pWal->pRefHash == NULL) {
|
||||||
taosMemoryFree(pWal);
|
taosMemoryFree(pWal);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -21,107 +21,112 @@ static int32_t walFetchBodyNew(SWalReader *pRead);
|
||||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
||||||
|
|
||||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
SWalReader *pRead = taosMemoryCalloc(1, sizeof(SWalReader));
|
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
|
||||||
if (pRead == NULL) {
|
if (pReader == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->pWal = pWal;
|
pReader->pWal = pWal;
|
||||||
pRead->pIdxFile = NULL;
|
pReader->readerId = tGenIdPI64();
|
||||||
pRead->pLogFile = NULL;
|
pReader->pIdxFile = NULL;
|
||||||
pRead->curVersion = -1;
|
pReader->pLogFile = NULL;
|
||||||
pRead->curFileFirstVer = -1;
|
pReader->curVersion = -1;
|
||||||
pRead->curInvalid = 1;
|
pReader->curFileFirstVer = -1;
|
||||||
pRead->capacity = 0;
|
pReader->curInvalid = 1;
|
||||||
|
pReader->capacity = 0;
|
||||||
if (cond) {
|
if (cond) {
|
||||||
pRead->cond = *cond;
|
pReader->cond = *cond;
|
||||||
} else {
|
} else {
|
||||||
pRead->cond.scanMeta = 0;
|
pReader->cond.scanUncommited = 0;
|
||||||
pRead->cond.scanUncommited = 0;
|
pReader->cond.scanNotApplied = 0;
|
||||||
pRead->cond.enableRef = 0;
|
pReader->cond.scanMeta = 0;
|
||||||
|
pReader->cond.enableRef = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexInit(&pRead->mutex, NULL);
|
taosThreadMutexInit(&pReader->mutex, NULL);
|
||||||
|
|
||||||
/*if (pRead->cond.enableRef) {*/
|
pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
|
||||||
/*walOpenRef(pWal);*/
|
if (pReader->pHead == NULL) {
|
||||||
/*}*/
|
|
||||||
|
|
||||||
pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
|
|
||||||
if (pRead->pHead == NULL) {
|
|
||||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pRead);
|
taosMemoryFree(pReader);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pRead;
|
/*if (pReader->cond.enableRef) {*/
|
||||||
|
/* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
|
||||||
|
/*}*/
|
||||||
|
|
||||||
|
return pReader;
|
||||||
}
|
}
|
||||||
|
|
||||||
void walCloseReader(SWalReader *pRead) {
|
void walCloseReader(SWalReader *pReader) {
|
||||||
taosCloseFile(&pRead->pIdxFile);
|
taosCloseFile(&pReader->pIdxFile);
|
||||||
taosCloseFile(&pRead->pLogFile);
|
taosCloseFile(&pReader->pLogFile);
|
||||||
taosMemoryFreeClear(pRead->pHead);
|
/*if (pReader->cond.enableRef) {*/
|
||||||
taosMemoryFree(pRead);
|
/*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/
|
||||||
|
/*}*/
|
||||||
|
taosMemoryFreeClear(pReader->pHead);
|
||||||
|
taosMemoryFree(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walNextValidMsg(SWalReader *pRead) {
|
int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int64_t fetchVer = pRead->curVersion;
|
int64_t fetchVer = pReader->curVersion;
|
||||||
int64_t lastVer = walGetLastVer(pRead->pWal);
|
int64_t lastVer = walGetLastVer(pReader->pWal);
|
||||||
int64_t committedVer = walGetCommittedVer(pRead->pWal);
|
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||||
int64_t appliedVer = walGetAppliedVer(pRead->pWal);
|
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer;
|
int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
||||||
endVer = TMIN(appliedVer, endVer);
|
endVer = TMIN(appliedVer, endVer);
|
||||||
|
|
||||||
wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
|
wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
|
||||||
pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
||||||
pRead->curStopped = 0;
|
pReader->curStopped = 0;
|
||||||
while (fetchVer <= endVer) {
|
while (fetchVer <= endVer) {
|
||||||
if (walFetchHeadNew(pRead, fetchVer) < 0) {
|
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
|
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
|
||||||
(IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
|
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
|
||||||
if (walFetchBodyNew(pRead) < 0) {
|
if (walFetchBodyNew(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
if (walSkipFetchBodyNew(pRead) < 0) {
|
if (walSkipFetchBodyNew(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
fetchVer++;
|
fetchVer++;
|
||||||
ASSERT(fetchVer == pRead->curVersion);
|
ASSERT(fetchVer == pReader->curVersion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pRead->curStopped = 1;
|
pReader->curStopped = 1;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
|
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
|
||||||
int64_t ret = 0;
|
int64_t ret = 0;
|
||||||
|
|
||||||
TdFilePtr pIdxTFile = pRead->pIdxFile;
|
TdFilePtr pIdxTFile = pReader->pIdxFile;
|
||||||
TdFilePtr pLogTFile = pRead->pLogFile;
|
TdFilePtr pLogTFile = pReader->pLogFile;
|
||||||
|
|
||||||
// seek position
|
// seek position
|
||||||
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
|
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
|
||||||
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
|
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver,
|
wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
|
||||||
offset, terrstr());
|
ver, offset, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SWalIdxEntry entry = {0};
|
SWalIdxEntry entry = {0};
|
||||||
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
|
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, failed to read idx file, since %s", pRead->pWal->cfg.vgId, terrstr());
|
wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64,
|
wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64,
|
||||||
pRead->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
|
pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -130,79 +135,79 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64
|
||||||
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver,
|
wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
|
||||||
entry.offset, terrstr());
|
ver, entry.offset, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
|
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
|
||||||
taosCloseFile(&pRead->pIdxFile);
|
taosCloseFile(&pReader->pIdxFile);
|
||||||
taosCloseFile(&pRead->pLogFile);
|
taosCloseFile(&pReader->pLogFile);
|
||||||
|
|
||||||
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
|
walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
|
||||||
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||||
if (pLogTFile == NULL) {
|
if (pLogFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr());
|
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->pLogFile = pLogTFile;
|
pReader->pLogFile = pLogFile;
|
||||||
|
|
||||||
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
|
walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
|
||||||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||||
if (pIdxTFile == NULL) {
|
if (pIdxFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr());
|
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->pIdxFile = pIdxTFile;
|
pReader->pIdxFile = pIdxFile;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
|
int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
||||||
SWal *pWal = pRead->pWal;
|
SWal *pWal = pReader->pWal;
|
||||||
|
|
||||||
|
// bsearch in fileSet
|
||||||
SWalFileInfo tmpInfo;
|
SWalFileInfo tmpInfo;
|
||||||
tmpInfo.firstVer = ver;
|
tmpInfo.firstVer = ver;
|
||||||
// bsearch in fileSet
|
|
||||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
ASSERT(pRet != NULL);
|
ASSERT(pRet != NULL);
|
||||||
if (pRead->curFileFirstVer != pRet->firstVer) {
|
if (pReader->curFileFirstVer != pRet->firstVer) {
|
||||||
// error code was set inner
|
// error code was set inner
|
||||||
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
|
if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// error code was set inner
|
// error code was set inner
|
||||||
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
|
if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver);
|
wDebug("wal version reset from %ld(invalid: %d) to %ld", pReader->curVersion, pReader->curInvalid, ver);
|
||||||
|
|
||||||
pRead->curVersion = ver;
|
pReader->curVersion = ver;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
|
||||||
SWal *pWal = pRead->pWal;
|
SWal *pWal = pReader->pWal;
|
||||||
if (!pRead->curInvalid && ver == pRead->curVersion) {
|
if (!pReader->curInvalid && ver == pReader->curVersion) {
|
||||||
wDebug("wal version %ld match, no need to reset", ver);
|
wDebug("wal version %ld match, no need to reset", ver);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->curInvalid = 1;
|
pReader->curInvalid = 1;
|
||||||
pRead->curVersion = ver;
|
pReader->curVersion = ver;
|
||||||
|
|
||||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||||
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -210,7 +215,7 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||||
if (ver < pWal->vers.snapshotVer) {
|
if (ver < pWal->vers.snapshotVer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walReadSeekVerImpl(pRead, ver) < 0) {
|
if (walReadSeekVerImpl(pReader, ver) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "cJSON.h"
|
||||||
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "walInt.h"
|
||||||
|
|
||||||
|
SWalRef *walOpenRef(SWal *pWal) {
|
||||||
|
SWalRef *pRef = taosMemoryCalloc(1, sizeof(SWalRef));
|
||||||
|
if (pRef == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pRef->refId = tGenIdPI64();
|
||||||
|
pRef->refVer = -1;
|
||||||
|
pRef->refFile = -1;
|
||||||
|
pRef->pWal = pWal;
|
||||||
|
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
|
||||||
|
return pRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
void walCloseRef(SWal *pWal, int64_t refId) {
|
||||||
|
SWalRef *pRef = *(SWalRef **)taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||||
|
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||||
|
taosMemoryFree(pRef);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
||||||
|
SWal *pWal = pRef->pWal;
|
||||||
|
if (pRef->refVer != ver) {
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRef->refVer = ver;
|
||||||
|
// bsearch in fileSet
|
||||||
|
SWalFileInfo tmpInfo;
|
||||||
|
tmpInfo.firstVer = ver;
|
||||||
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
|
ASSERT(pRet != NULL);
|
||||||
|
pRef->refFile = pRet->firstVer;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void walUnrefVer(SWalRef *pRef) {
|
||||||
|
pRef->refId = -1;
|
||||||
|
pRef->refFile = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SWalRef *walRefCommittedVer(SWal *pWal) {
|
||||||
|
SWalRef *pRef = walOpenRef(pWal);
|
||||||
|
if (pRef == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
|
int64_t ver = walGetCommittedVer(pWal);
|
||||||
|
|
||||||
|
pRef->refVer = ver;
|
||||||
|
// bsearch in fileSet
|
||||||
|
SWalFileInfo tmpInfo;
|
||||||
|
tmpInfo.firstVer = ver;
|
||||||
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
|
ASSERT(pRet != NULL);
|
||||||
|
pRef->refFile = pRet->firstVer;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return pRef;
|
||||||
|
}
|
|
@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SWalRef *pRef = (SWalRef *)pIter;
|
SWalRef *pRef = (SWalRef *)pIter;
|
||||||
if (pRef->ver != -1) {
|
if (pRef->refVer != -1) {
|
||||||
taosHashCancelIterate(pWal->pRefHash, pIter);
|
taosHashCancelIterate(pWal->pRefHash, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -215,22 +215,23 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
|
|
||||||
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
|
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
|
||||||
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||||
/*pWal->vers.firstVer = index;*/
|
|
||||||
if (walRollImpl(pWal) < 0) {
|
if (walRollImpl(pWal) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
return 0;
|
||||||
int64_t passed = walGetSeq() - pWal->lastRollSeq;
|
}
|
||||||
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
|
|
||||||
if (walRollImpl(pWal) < 0) {
|
int64_t passed = walGetSeq() - pWal->lastRollSeq;
|
||||||
return -1;
|
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
|
||||||
}
|
if (walRollImpl(pWal) < 0) {
|
||||||
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
|
return -1;
|
||||||
if (walRollImpl(pWal) < 0) {
|
}
|
||||||
return -1;
|
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
|
||||||
}
|
if (walRollImpl(pWal) < 0) {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,6 +261,16 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
pWal->vers.snapshotVer = ver;
|
pWal->vers.snapshotVer = ver;
|
||||||
int ts = taosGetTimestampSec();
|
int ts = taosGetTimestampSec();
|
||||||
|
|
||||||
|
int64_t minVerToDelete = ver;
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
SWalRef *pRef = *(SWalRef **)pIter;
|
||||||
|
if (pRef->refVer == -1) continue;
|
||||||
|
minVerToDelete = TMIN(minVerToDelete, pRef->refVer);
|
||||||
|
}
|
||||||
|
|
||||||
int deleteCnt = 0;
|
int deleteCnt = 0;
|
||||||
int64_t newTotSize = pWal->totSize;
|
int64_t newTotSize = pWal->totSize;
|
||||||
SWalFileInfo tmp;
|
SWalFileInfo tmp;
|
||||||
|
|
|
@ -0,0 +1,198 @@
|
||||||
|
from distutils.log import error
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import subprocess
|
||||||
|
import platform
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def __init__(self):
|
||||||
|
self.snapshot = 0
|
||||||
|
self.replica = 3
|
||||||
|
self.vgroups = 3
|
||||||
|
self.ctbNum = 2
|
||||||
|
self.rowsPerTbl = 2
|
||||||
|
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
|
def checkFileContent(self, consumerId, queryString):
|
||||||
|
buildPath = tdCom.getBuildPath()
|
||||||
|
cfgPath = tdCom.getClientCfgPath()
|
||||||
|
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
|
||||||
|
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
|
||||||
|
tdLog.info(cmdStr)
|
||||||
|
os.system(cmdStr)
|
||||||
|
|
||||||
|
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
|
||||||
|
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
|
||||||
|
|
||||||
|
consumeFile = open(consumeRowsFile, mode='r')
|
||||||
|
queryFile = open(dstFile, mode='r')
|
||||||
|
|
||||||
|
# skip first line for it is schema
|
||||||
|
queryFile.readline()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
dst = queryFile.readline()
|
||||||
|
src = consumeFile.readline()
|
||||||
|
|
||||||
|
if dst:
|
||||||
|
if dst != src:
|
||||||
|
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
return
|
||||||
|
|
||||||
|
def prepareTestEnv(self):
|
||||||
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 4,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 2,
|
||||||
|
'rowsPerTbl': 1000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 3,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica)
|
||||||
|
tdLog.info("create stb")
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
|
tdLog.info("create ctb")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||||
|
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
# tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||||
|
tdLog.printNoPrefix("11111111111111111111111")
|
||||||
|
tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1)
|
||||||
|
tdLog.printNoPrefix("222222222222222")
|
||||||
|
tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("333333333333333333333")
|
||||||
|
tdSql.query("drop database %s"%paraDict["dbName"])
|
||||||
|
tdLog.printNoPrefix("44444444444444444")
|
||||||
|
return
|
||||||
|
|
||||||
|
def tmqCase1(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
|
||||||
|
# create and start thread
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 4,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 100,
|
||||||
|
'rowsPerTbl': 1000,
|
||||||
|
'batchNum': 100,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 3,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 1}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb1")
|
||||||
|
topicFromStb1 = 'topic_stb1'
|
||||||
|
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||||
|
topicList = topicFromStb1
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 100
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
|
||||||
|
tdLog.info("start to check consume result")
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
tdSql.query(queryString)
|
||||||
|
totalRowsInserted = tdSql.getRows()
|
||||||
|
|
||||||
|
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
|
||||||
|
|
||||||
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
|
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
|
||||||
|
tdSql.query("drop topic %s"%topicFromStb1)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.prepareTestEnv()
|
||||||
|
# self.tmqCase1()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue