Merge branch 'main' into fix/cencVer

This commit is contained in:
dapan1121 2023-03-30 15:36:30 +08:00 committed by GitHub
commit 15578cf980
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 391 additions and 199 deletions

View File

@ -78,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
* @param SReadHandle * @param SReadHandle
* @return * @return
*/ */
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema); qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id);
/** /**
* set the task Id, usually used by message queue process * set the task Id, usually used by message queue process
@ -89,6 +89,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
* @param tinfo * @param tinfo

View File

@ -535,10 +535,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
atomic_add_fetch_32(&pParamSet->totalRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%" PRId64 " prev:%" PRId64 char offsetBuf[80] = {0};
", ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->val);
tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn,
pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId); char commitBuf[80] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->committedOffset);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
tmq->consumerId, pOffset->subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
totalVgroups, pMsgSendInfo->requestId);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);

View File

@ -2453,6 +2453,11 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
} else { } else {
if(ASSERT(varDataTLen(data + offset) <= bytes)){
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes);
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](pColData, (uint8_t *)varDataVal(data + offset), code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](pColData, (uint8_t *)varDataVal(data + offset),
varDataLen(data + offset)); varDataLen(data + offset));
} }

View File

@ -399,8 +399,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal); tFormatOffset(formatBuf, 80, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64,
consumerId, pHandle->subKey, vgId, formatBuf); consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
return 0; return 0;
} else { } else {
// no poll occurs in this vnode for this topic, let's seek to the right offset value. // no poll occurs in this vnode for this topic, let's seek to the right offset value.
@ -799,7 +799,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle tqHandle = {0}; STqHandle tqHandle = {0};
pHandle = &tqHandle; pHandle = &tqHandle;
/*taosInitRWLatch(&pExec->lock);*/
uint64_t oldConsumerId = pHandle->consumerId; uint64_t oldConsumerId = pHandle->consumerId;
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
@ -834,7 +833,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.qmsg = NULL; req.qmsg = NULL;
pHandle->execHandle.task = pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL); qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner); qExtractStreamScanner(pHandle->execHandle.task, &scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
@ -847,7 +846,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext)); (SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
@ -865,7 +864,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext)); (SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
} }
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));

View File

@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task = handle.execHandle.task =
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL); qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
if (handle.execHandle.task == NULL) { if (handle.execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle.subKey); tqError("cannot create exec task for %s", handle.subKey);
code = -1; code = -1;
@ -332,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext)); (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
@ -341,7 +341,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
} }
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
@ -349,9 +349,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.fetchMeta, (SSnapContext**)(&reader.sContext)); handle.fetchMeta, (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
} }
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
} }

View File

@ -74,14 +74,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
qTaskInfo_t task = pExec->task; qTaskInfo_t task = pExec->task;
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return code; return code;
} else { } else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return code; return code;
} }
@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId); tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
code = qExecTask(task, &pDataBlock, &ts); code = qExecTask(task, &pDataBlock, &ts);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -107,6 +107,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t
uint64_t tableListGetSize(const STableListInfo* pTableList); uint64_t tableListGetSize(const STableListInfo* pTableList);
uint64_t tableListGetSuid(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);

View File

@ -143,9 +143,6 @@ typedef struct {
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t fillHistoryVer1; int64_t fillHistoryVer1;
int64_t fillHistoryVer2; int64_t fillHistoryVer2;
// int8_t triggerSaved;
// int64_t deleteMarkSaved;
SStreamState* pState; SStreamState* pState;
} SStreamTaskInfo; } SStreamTaskInfo;
@ -175,7 +172,6 @@ struct SExecTaskInfo {
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
int32_t qbufQuota; // total available buffer (in KB) during execution query int32_t qbufQuota; // total available buffer (in KB) during execution query
int64_t version; // used for stream to record wal version, why not move to sschemainfo int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo; SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo; SSchemaInfo schemaInfo;
@ -188,6 +184,7 @@ struct SExecTaskInfo {
SLocalFetch localFetch; SLocalFetch localFetch;
SArray* pResultBlockList; // result block list SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
SRWLatch lock; // secure the access of STableListInfo
}; };
enum { enum {
@ -486,12 +483,6 @@ typedef struct SStreamScanInfo {
} SStreamScanInfo; } SStreamScanInfo;
typedef struct { typedef struct {
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode* vnode; SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader; STsdbReader* dataReader;
@ -694,6 +685,8 @@ typedef struct SStreamFillOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
int32_t optrDummyOpenFn(SOperatorInfo* pOperator); int32_t optrDummyOpenFn(SOperatorInfo* pOperator);

View File

@ -571,6 +571,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
memcpy(pStart, data, len); memcpy(pStart, data, len);
pStart += len; pStart += len;
} else if (IS_VAR_DATA_TYPE(pValue->info.type)) { } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
if (varDataTLen(data) > pValue->info.bytes) {
code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
goto end;
}
memcpy(pStart, data, varDataTLen(data)); memcpy(pStart, data, varDataTLen(data));
pStart += varDataTLen(data); pStart += varDataTLen(data);
} else { } else {
@ -1800,6 +1804,21 @@ STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index)
return taosArrayGet(pTableList->pTableList, index); return taosArrayGet(pTableList->pTableList, index);
} }
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
if (startIndex >= numOfTables) {
return -1;
}
for (int32_t i = startIndex; i < numOfTables; ++i) {
STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
if (p->uid == uid) {
return i;
}
}
return -1;
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL); ASSERT(pTableList->map != NULL && slot != NULL);

View File

@ -242,29 +242,27 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
return code; return code;
} }
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) { qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) {
if (msg == NULL) { if (msg == NULL) { // create raw scan
// create raw scan SExecTaskInfo* pTaskInfo = doCreateExecTaskInfo(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, "");
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (NULL == pTaskInfo) { if (NULL == pTaskInfo) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo); pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
if (NULL == pTaskInfo->pRoot) { if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo); taosMemoryFree(pTaskInfo);
return NULL; return NULL;
} }
qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
return pTaskInfo; return pTaskInfo;
} }
struct SSubplan* pPlan = NULL; struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan); int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
@ -292,9 +290,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
} }
} }
if (pSchema) {
*pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
}
return pTaskInfo; return pTaskInfo;
} }
@ -410,6 +405,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
taosWLockLatch(&pTaskInfo->lock);
for (int32_t i = 0; i < numOfQualifiedTables; ++i) { for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i); uint64_t* uid = taosArrayGet(qa, i);
@ -424,6 +420,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
taosArrayDestroy(qa); taosArrayDestroy(qa);
taosWUnLockLatch(&pTaskInfo->lock);
return code; return code;
} }
} }
@ -445,6 +442,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId); tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
} }
taosWUnLockLatch(&pTaskInfo->lock);
if (keyBuf != NULL) { if (keyBuf != NULL) {
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
} }
@ -452,7 +450,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
taosArrayDestroy(qa); taosArrayDestroy(qa);
} else { // remove the table id in current list } else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
taosWLockLatch(&pTaskInfo->lock);
code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList); code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
taosWUnLockLatch(&pTaskInfo->lock);
} }
return code; return code;
@ -1000,6 +1000,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
} }
return 0; return 0;
} }
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished; return pTaskInfo->streamInfo.recoverScanFinished;
@ -1082,6 +1083,9 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* id = GET_TASKID(pTaskInfo);
pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.prepareStatus = *pOffset;
pTaskInfo->streamInfo.returned = 0; pTaskInfo->streamInfo.returned = 0;
@ -1094,21 +1098,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// TODO add more check // TODO add more check
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if(pOperator->numOfDownstream != 1){ if (pOperator->numOfDownstream != 1) {
qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream); qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id);
return -1; return -1;
} }
pOperator = pOperator->pDownstream[0]; pOperator = pOperator->pDownstream[0];
} }
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pScanBaseInfo->dataReader);
tsdbReaderClose(pTSInfo->base.dataReader); pScanBaseInfo->dataReader = NULL;
pTSInfo->base.dataReader = NULL;
// let's seek to the next version in wal file // let's seek to the next version in wal file
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1); qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id);
return -1; return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
@ -1116,73 +1123,79 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// those data are from the snapshot in tsdb, besides the data in the wal file. // those data are from the snapshot in tsdb, besides the data in the wal file.
int64_t uid = pOffset->uid; int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts; int64_t ts = pOffset->ts;
int32_t index = 0;
// this value may be changed if new tables are created
taosRLockLatch(&pTaskInfo->lock);
int32_t numOfTables = tableListGetSize(pTableListInfo);
if (uid == 0) { if (uid == 0) {
if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) { if (numOfTables != 0) {
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
uid = pTableInfo->uid; uid = pTableInfo->uid;
ts = INT64_MIN; ts = INT64_MIN;
pScanInfo->currentTable = 0;
} else { } else {
qError("uid == 0 and tablelist size is 0"); taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
return -1; return -1;
} }
} }
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0; pInfo->pTableScanOp->resultInfo.totalRows = 0;
bool found = false; // start from current accessed position
for (int32_t i = 0; i < numOfTables; i++) { // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); // position, let's find it from the beginning.
if (pTableInfo->uid == uid) { index = tableListFind(pTableListInfo, uid, 0);
found = true; taosRUnLockLatch(&pTaskInfo->lock);
pTableScanInfo->currentTable = i;
break;
}
}
// TODO after dropping table, table may not found if (index >= 0) {
if(!found){ pScanInfo->currentTable = index;
qError("uid not found in tablelist %" PRId64, uid);
return -1;
}
if (pTableScanInfo->base.dataReader == NULL) {
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL, false) < 0 ||
pTableScanInfo->base.dataReader == NULL) {
qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
return -1;
}
}
STableKeyInfo tki = {.uid = uid};
tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey;
pTableScanInfo->base.cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pTableScanInfo->base.cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, numOfTables);
} else { } else {
qError("invalid pOffset->type:%d", pOffset->type); qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id);
return -1; return -1;
} }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
STableKeyInfo keyInfo = {.uid = uid};
int64_t oldSkey = pScanBaseInfo->cond.twindows.skey;
// let's start from the next ts that returned to consumer.
pScanBaseInfo->cond.twindows.skey = ts + 1;
pScanInfo->scanTimes = 0;
if (pScanBaseInfo->dataReader == NULL) {
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id, false);
if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
terrno = code;
return -1;
}
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
} else {
tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
}
// restore the key value
pScanBaseInfo->cond.twindows.skey = oldSkey;
} else {
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
return -1;
}
} else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
return -1; return -1;
} }
@ -1191,7 +1204,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
tableListClear(pTaskInfo->pTableInfoList); tableListClear(pTableListInfo);
if (mtInfo.uid == 0) { if (mtInfo.uid == 0) {
return 0; // no data return 0; // no data
@ -1200,14 +1213,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
if (pTaskInfo->pTableInfoList == NULL) { tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
pTaskInfo->pTableInfoList = tableListCreate();
}
tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
int32_t size = tableListGetSize(pTableListInfo);
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false);
@ -1216,7 +1225,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema; pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64" %s", mtInfo.uid, pOffset->ts, id);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
@ -1224,13 +1233,15 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
return -1; return -1;
} }
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id);
} else if (pOffset->type == TMQ_OFFSET__LOG) { } else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
tsdbReaderClose(pInfo->dataReader); tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
qDebug("tmqsnap qStreamPrepareScan snapshot log"); qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
} }
}
return 0; return 0;
} }

View File

@ -1974,7 +1974,7 @@ char* buildTaskId(uint64_t taskId, uint64_t queryId) {
return p; return p;
} }
static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) { SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (pTaskInfo == NULL) { if (pTaskInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1982,6 +1982,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
} }
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName); pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName);
pTaskInfo->execModel = model; pTaskInfo->execModel = model;
@ -1989,6 +1990,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
taosInitRWLatch(&pTaskInfo->lock);
pTaskInfo->id.vgId = vgId; pTaskInfo->id.vgId = vgId;
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
pTaskInfo->id.str = buildTaskId(taskId, queryId); pTaskInfo->id.str = buildTaskId(taskId, queryId);
@ -2464,7 +2466,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
goto _complete; goto _complete;
} }
(*pTaskInfo)->cost.created = taosGetTimestampUs();
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_complete: _complete:

View File

@ -782,13 +782,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if no data, switch to next table and continue scan // if no data, switch to next table and continue scan
pInfo->currentTable++; pInfo->currentTable++;
if (pInfo->currentTable >= numOfTables) { if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
return NULL; return NULL;
} }
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, pTaskInfo->id.str); pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
@ -1601,19 +1602,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
const char* id = GET_TASKID(pTaskInfo);
qDebug("start to exec queue scan"); qDebug("start to exec queue scan, %s", id);
if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
/*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
/*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ if (pInfo->tqReader->msg2.msgStr == NULL) {
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/
SPackedData submit = pTaskInfo->streamInfo.submit; SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr); qError("submit msg messed up when initing stream submit block %p, %s", submit.msgStr, id);
pInfo->tqReader->msg2 = (SPackedData){0}; pInfo->tqReader->msg2 = (SPackedData){0};
pInfo->tqReader->setMsg = 0; pInfo->tqReader->setMsg = 0;
ASSERT(0); ASSERT(0);
@ -1647,18 +1645,20 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) { if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id);
pTaskInfo->streamInfo.returned = 1; pTaskInfo->streamInfo.returned = 1;
return pResult; return pResult;
} else { } else {
// no data has return already, try to extract data in the WAL
if (!pTaskInfo->streamInfo.returned) { if (!pTaskInfo->streamInfo.returned) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
qDebug("3");
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
qDebug("queue scan tsdb over, switch to wal ver:%" PRId64 " %s", pTaskInfo->streamInfo.snapshotVer + 1, id);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL; return NULL;
@ -1675,7 +1675,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (tqNextBlock(pInfo->tqReader, &ret) < 0) { if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
// if the end is reached, terrno is 0 // if the end is reached, terrno is 0
if (terrno != 0) { if (terrno != 0) {
qError("failed to get next log block since %s", terrstr()); qError("failed to get next log block since %s, %s", terrstr(), id);
} }
} }
@ -1690,9 +1690,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
qError("unexpected ret.fetchType:%d", ret.fetchType); qError("unexpected ret.fetchType:%d", ret.fetchType);
continue; continue;
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE || } else if (ret.fetchType == FETCH_TYPE__NONE ||
(ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
@ -1704,7 +1701,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} }
} }
} else { } else {
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); qError("unexpected streamInfo prepare type: %d %s", pTaskInfo->streamInfo.prepareStatus.type, id);
return NULL; return NULL;
} }
} }

View File

@ -128,8 +128,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
if (end >= 0) { if (end >= 0) {
forwardRows = end; forwardRows = end;
if (pData[end + pos] == ekey) { while (pData[end + pos] == ekey) {
forwardRows += 1; forwardRows += 1;
++pos;
} }
} }
} else { } else {
@ -137,8 +138,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
if (end >= 0) { if (end >= 0) {
forwardRows = end; forwardRows = end;
if (pData[end + pos] == ekey) { while (pData[end + pos] == ekey) {
forwardRows += 1; forwardRows += 1;
++pos;
} }
} }
// int32_t end = searchFn((char*)pData, pos + 1, ekey, order); // int32_t end = searchFn((char*)pData, pos + 1, ekey, order);

View File

@ -10,21 +10,21 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
@ -46,6 +46,11 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
@ -868,6 +873,7 @@
,,y,script,./test.sh -f tsim/query/session.sim ,,y,script,./test.sh -f tsim/query/session.sim
,,y,script,./test.sh -f tsim/query/udf.sim ,,y,script,./test.sh -f tsim/query/udf.sim
,,y,script,./test.sh -f tsim/query/udf_with_const.sim ,,y,script,./test.sh -f tsim/query/udf_with_const.sim
,,y,script,./test.sh -f tsim/query/join_interval.sim
,,y,script,./test.sh -f tsim/query/sys_tbname.sim ,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/event.sim ,,y,script,./test.sh -f tsim/query/event.sim

View File

@ -0,0 +1,42 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c udf -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step create databases
sql create database d1
sql create database d2
sql create table d1.t1(ts timestamp, i int) tags(t int);
sql create table d2.t1(ts timestamp, i int);
sql insert into d1.t11 using d1.t1 tags(1) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d1.t12 using d1.t1 tags(2) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d1.t13 using d1.t1 tags(3) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d2.t1 values(1500000000000,0)(1500000000001,1)(1500000000002,2)
sql select _wstart,_wend,count((a.ts)),count(b.ts) from d1.t1 a, d2.t1 b where a.ts is not null and a.ts = b.ts interval(1a) ;
if $data02 != 3 then
return -1
endi
if $data03 != 3 then
return -1
endi
if $data12 != 3 then
return -1
endi
if $data13 != 3 then
return -1
endi
if $data22 != 3 then
return -1
endi
if $data23 != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -292,7 +292,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare() tdSql.prepare()
# time.sleep(2) # time.sleep(2)
vgroups = "30" vgroups = "8"
sql = "create database db3 vgroups " + vgroups sql = "create database db3 vgroups " + vgroups
tdSql.query(sql) tdSql.query(sql)
sql = "create table db3.stb (ts timestamp, f int) tags (t int)" sql = "create table db3.stb (ts timestamp, f int) tags (t int)"

View File

@ -181,7 +181,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare() tdSql.prepare()
# time.sleep(2) # time.sleep(2)
vgroups = "30" vgroups = "8"
sql = "create database db3 vgroups " + vgroups sql = "create database db3 vgroups " + vgroups
tdSql.query(sql) tdSql.query(sql)

View File

@ -83,7 +83,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), True)
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
@ -147,9 +147,33 @@ class TDTestCase:
tdSql.checkData(1, 1, '55555') tdSql.checkData(1, 1, '55555')
tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))")
keyDict['s'] = "\"alter table db1.tba add column f2 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select * from tba order by ts")
tdSql.query("select * from tba order by ts")
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 2, None)
keyDict['s'] = "\"alter table db1.tba add column f3 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select f3 from tba order by ts")
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.query("create table stb (ts timestamp, f1 int, f2 binary(2)) tags (tg1 binary(2))")
tdSql.query("create table tb1 using stb tags('bb')") tdSql.query("create table tb1 using stb tags('bb')")
tdSql.query("insert into tb1 values (now, 2)") tdSql.query("insert into tb1 values (now, 2,'22')")
tdSql.query("select count(*) from stb group by tg1") tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1) tdSql.checkData(0, 0, 1)
@ -163,13 +187,23 @@ class TDTestCase:
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -s fail") tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\"" keyDict['s'] = "\"insert into db1.tb2 values (now, 2,'22')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"alter table db1.stb modify column f2 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 3,'55555')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '') retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -s fail") tdLog.exit("taos -s fail")
tdSql.query("select count(*) from stb group by tg1") tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1) tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 1) tdSql.checkData(1, 0, 1)

View File

@ -6144,7 +6144,7 @@ class TDTestCase:
startTime = time.time() startTime = time.time()
self.function_before_26() #self.function_before_26()
self.math_nest(['UNIQUE']) self.math_nest(['UNIQUE'])
self.math_nest(['MODE']) self.math_nest(['MODE'])
@ -6157,9 +6157,9 @@ class TDTestCase:
# self.math_nest(['MAVG']) # self.math_nest(['MAVG'])
# self.math_nest(['HYPERLOGLOG']) # self.math_nest(['HYPERLOGLOG'])
# self.math_nest(['TAIL']) # self.math_nest(['TAIL'])
# self.math_nest(['CSUM']) self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration']) self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM']) self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER']) # self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH']) # self.str_nest(['LENGTH','CHAR_LENGTH'])

View File

@ -0,0 +1,76 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.cases import tdCases
from .nestedQuery import *
class TDTestCase(TDTestCase):
def run(self):
tdSql.prepare()
startTime = time.time()
self.function_before_26()
# self.math_nest(['UNIQUE'])
# self.math_nest(['MODE'])
# self.math_nest(['SAMPLE'])
# self.math_nest(['ABS','SQRT'])
# self.math_nest(['SIN','COS','TAN','ASIN','ACOS','ATAN'])
# self.math_nest(['POW','LOG'])
# self.math_nest(['FLOOR','CEIL','ROUND'])
# self.math_nest(['MAVG'])
# self.math_nest(['HYPERLOGLOG'])
# self.math_nest(['TAIL'])
# self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH'])
# self.str_nest(['SUBSTR'])
# self.str_nest(['CONCAT'])
# self.str_nest(['CONCAT_WS'])
# self.time_nest(['CAST']) #放到time里起来弄
# self.time_nest(['CAST_1'])
# self.time_nest(['CAST_2'])
# self.time_nest(['CAST_3'])
# self.time_nest(['CAST_4'])
# self.time_nest(['NOW','TODAY'])
# self.time_nest(['TIMEZONE'])
# self.time_nest(['TIMETRUNCATE'])
# self.time_nest(['TO_ISO8601'])
# self.time_nest(['TO_UNIXTIMESTAMP'])
# self.time_nest(['ELAPSED'])
#self.time_nest(['TIMEDIFF_1'])
#self.time_nest(['TIMEDIFF_2'])
endTime = time.time()
print("total time %ds" % (endTime - startTime))
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -34,9 +34,9 @@ class TDTestCase(TDTestCase):
self.math_nest(['MAVG']) self.math_nest(['MAVG'])
self.math_nest(['HYPERLOGLOG']) self.math_nest(['HYPERLOGLOG'])
self.math_nest(['TAIL']) self.math_nest(['TAIL'])
self.math_nest(['CSUM']) # self.math_nest(['CSUM'])
self.math_nest(['statecount','stateduration']) # self.math_nest(['statecount','stateduration'])
self.math_nest(['HISTOGRAM']) # self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER']) # self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH']) # self.str_nest(['LENGTH','CHAR_LENGTH'])

View File

@ -162,19 +162,18 @@ class TDTestCase:
sql = "select count(*) from (select distinct(tbname) from %s.meters)" %dbname sql = "select count(*) from (select distinct(tbname) from %s.meters)" %dbname
tdSql.query(sql) tdSql.query(sql)
num = tdSql.getData(0,0) # 目前不需要了
# num = tdSql.getData(0,0)
# for i in range(0,num):
# sql1 = "select count(*) from %s.d%d" %(dbname,i)
# tdSql.query(sql1)
# sql1_result = tdSql.getData(0,0)
# tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
for i in range(0,num):
sql1 = "select count(*) from %s.d%d" %(dbname,i)
tdSql.query(sql1)
sql1_result = tdSql.getData(0,0)
tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
def check_out_of_order(self,dbname,tables,per_table_num,order,replica): def check_out_of_order(self,dbname,tables,per_table_num,order,replica):
self.run_benchmark(dbname,tables,per_table_num,order,replica) self.run_benchmark(dbname,tables,per_table_num,order,replica)
print("sleep 10 seconds")
#time.sleep(10)
print("sleep 10 seconds finish")
self.run_sql(dbname) self.run_sql(dbname)
@ -182,7 +181,7 @@ class TDTestCase:
startTime = time.time() startTime = time.time()
#self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1) #self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1)
self.check_out_of_order('db1',random.randint(50,200),random.randint(10000,20000),random.randint(1,5),1) self.check_out_of_order('db1',random.randint(50,100),random.randint(10000,20000),random.randint(1,5),1)
# self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1) # self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1)

View File

@ -13,11 +13,11 @@ from util.dnodes import *
class TDTestCase: class TDTestCase:
hostname = socket.gethostname() hostname = socket.gethostname()
#rpcDebugFlagVal = '143' # rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal # updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict) #print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):

View File

@ -16,6 +16,8 @@ sys.path.append("./7-tmq")
from tmqCommon import * from tmqCommon import *
class TDTestCase: class TDTestCase:
updatecfgDict = {"tsdbDebugFlag":135}
def __init__(self): def __init__(self):
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 10 self.ctbNum = 10