Merge branch 'main' into enh/rocksdbSstate
This commit is contained in:
commit
c6e5879f44
|
@ -189,7 +189,7 @@ show table distributed d0\G;
|
||||||
<summary> Show Example </summary>
|
<summary> Show Example </summary>
|
||||||
<pre><code>
|
<pre><code>
|
||||||
*************************** 1.row ***************************
|
*************************** 1.row ***************************
|
||||||
_block_dist: Total_Blocks=[5] Total_Size=[93.65 Kb] Average_size=[18.73 Kb] Compression_Ratio=[23.98 %]
|
_block_dist: Total_Blocks=[5] Total_Size=[93.65 KB] Average_size=[18.73 KB] Compression_Ratio=[23.98 %]
|
||||||
|
|
||||||
Total_Blocks : Table `d0` contains total 5 blocks
|
Total_Blocks : Table `d0` contains total 5 blocks
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
taos.exec_many([
|
taos.exec_many([
|
||||||
format!("DROP TOPIC IF EXISTS tmq_meters"),
|
format!("DROP TOPIC IF EXISTS tmq_meters"),
|
||||||
format!("DROP DATABASE IF EXISTS `{db}`"),
|
format!("DROP DATABASE IF EXISTS `{db}`"),
|
||||||
format!("CREATE DATABASE `{db}`"),
|
format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
|
||||||
format!("USE `{db}`"),
|
format!("USE `{db}`"),
|
||||||
// create super table
|
// create super table
|
||||||
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
|
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
|
||||||
|
|
|
@ -189,7 +189,7 @@ SHOW TABLE DISTRIBUTED table_name;
|
||||||
|
|
||||||
*************************** 1.row ***************************
|
*************************** 1.row ***************************
|
||||||
|
|
||||||
_block_dist: Total_Blocks=[5] Total_Size=[93.65 Kb] Average_size=[18.73 Kb] Compression_Ratio=[23.98 %]
|
_block_dist: Total_Blocks=[5] Total_Size=[93.65 KB] Average_size=[18.73 KB] Compression_Ratio=[23.98 %]
|
||||||
|
|
||||||
Total_Blocks: 表 d0 占用的 block 个数为 5 个
|
Total_Blocks: 表 d0 占用的 block 个数为 5 个
|
||||||
|
|
||||||
|
|
|
@ -132,7 +132,7 @@ typedef struct {
|
||||||
} SWalRef;
|
} SWalRef;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t scanUncommited;
|
// int8_t scanUncommited;
|
||||||
int8_t scanNotApplied;
|
int8_t scanNotApplied;
|
||||||
int8_t scanMeta;
|
int8_t scanMeta;
|
||||||
int8_t enableRef;
|
int8_t enableRef;
|
||||||
|
|
|
@ -1580,7 +1580,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
|
||||||
code = smlModifyDBSchemas(info);
|
code = smlModifyDBSchemas(info);
|
||||||
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|
||||||
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
|
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
|
||||||
|| code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break;
|
|| code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
||||||
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
||||||
|
|
|
@ -932,7 +932,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
|
if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
|
||||||
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1164,7 +1164,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){
|
if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){
|
||||||
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1476,7 +1476,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){
|
if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols){
|
||||||
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
|
|
||||||
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
|
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
|
||||||
const SMqRebOutputVg *pRebVg) {
|
const SMqRebOutputVg *pRebVg) {
|
||||||
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
return -1;
|
// return -1;
|
||||||
}
|
// }
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
|
@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
|
||||||
|
for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
|
||||||
|
SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
|
||||||
|
SMqRebOutputVg outputVg = {
|
||||||
|
.oldConsumerId = pConsumerEp->consumerId,
|
||||||
|
.newConsumerId = pConsumerEp->consumerId,
|
||||||
|
.pVgEp = pVgEp,
|
||||||
|
};
|
||||||
|
taosArrayPush(pOutput->rebVgs, &outputVg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
|
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
|
||||||
int32_t imbConsumerNum) {
|
int32_t imbConsumerNum) {
|
||||||
const char *pSubKey = pOutput->pSub->key;
|
const char *pSubKey = pOutput->pSub->key;
|
||||||
|
@ -290,10 +302,6 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
|
||||||
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
|
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
|
||||||
if (consumerVgNum > minVgCnt) {
|
if (consumerVgNum > minVgCnt) {
|
||||||
if (imbCnt < imbConsumerNum) {
|
if (imbCnt < imbConsumerNum) {
|
||||||
if (consumerVgNum == minVgCnt + 1) {
|
|
||||||
imbCnt++;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
// pop until equal minVg + 1
|
// pop until equal minVg + 1
|
||||||
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
|
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
|
||||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
||||||
|
@ -307,7 +315,6 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
imbCnt++;
|
imbCnt++;
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
|
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
|
||||||
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
|
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
|
||||||
|
@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
putNoTransferToOutput(pOutput, pConsumerEp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
|
int ret = 0;
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
|
|
||||||
|
@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
if (req.newConsumerId == -1) {
|
if (req.newConsumerId == -1) {
|
||||||
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||||
taosMemoryFree(req.qmsg);
|
goto end;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STqHandle tqHandle = {0};
|
STqHandle tqHandle = {0};
|
||||||
|
@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
// TODO version should be assigned and refed during preprocess
|
// TODO version should be assigned and refed during preprocess
|
||||||
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
|
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
taosMemoryFree(req.qmsg);
|
ret = -1;
|
||||||
return -1;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ver = pRef->refVer;
|
int64_t ver = pRef->refVer;
|
||||||
|
@ -534,21 +534,19 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||||
pHandle->consumerId, oldConsumerId);
|
pHandle->consumerId, oldConsumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
taosMemoryFree(req.qmsg);
|
goto end;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||||
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
||||||
atomic_store_32(&pHandle->epoch, -1);
|
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
taosMemoryFree(req.qmsg);
|
|
||||||
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
req.newConsumerId);
|
req.newConsumerId);
|
||||||
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
|
}
|
||||||
// kill executing task
|
// kill executing task
|
||||||
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
||||||
if (pTaskInfo != NULL) {
|
if (pTaskInfo != NULL) {
|
||||||
|
@ -556,27 +554,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
atomic_store_32(&pHandle->epoch, 0);
|
|
||||||
|
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
|
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
qStreamCloseTsdbReader(pTaskInfo);
|
qStreamCloseTsdbReader(pTaskInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
taosMemoryFree(req.qmsg);
|
goto end;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
return 0;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
|
@ -120,6 +120,8 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -165,12 +165,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
qTaskInfo_t task = pHandle->execHandle.task;
|
||||||
|
if(qTaskIsExecuting(task)){
|
||||||
|
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
if(code != 0) {
|
if(code != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4855,7 +4855,11 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
|
||||||
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
||||||
|
|
||||||
if (pReader->flag == READER_STATUS_SUSPEND) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
code = tsdbReaderResume(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
|
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
|
||||||
|
@ -5133,11 +5137,17 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
|
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
|
||||||
tsdbAcquireReader(pReader);
|
tsdbAcquireReader(pReader);
|
||||||
|
|
||||||
if (pReader->flag == READER_STATUS_SUSPEND) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
code = tsdbReaderResume(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
||||||
|
@ -5172,8 +5182,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
||||||
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
|
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
// no data in files, let's try buffer in memory
|
||||||
if (pStatus->fileIter.numOfFiles == 0) {
|
if (pStatus->fileIter.numOfFiles == 0) {
|
||||||
pStatus->loadFromFile = false;
|
pStatus->loadFromFile = false;
|
||||||
|
@ -5218,7 +5226,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
||||||
// find the start data block in file
|
// find the start data block in file
|
||||||
tsdbAcquireReader(pReader);
|
tsdbAcquireReader(pReader);
|
||||||
if (pReader->flag == READER_STATUS_SUSPEND) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
code = tsdbReaderResume(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
|
@ -5286,12 +5298,17 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int64_t rows = 0;
|
int64_t rows = 0;
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
tsdbAcquireReader(pReader);
|
tsdbAcquireReader(pReader);
|
||||||
if (pReader->flag == READER_STATUS_SUSPEND) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
code = tsdbReaderResume(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
|
|
|
@ -5572,7 +5572,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
|
int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
|
||||||
"Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
|
"Total_Blocks=[%d] Total_Size=[%.2f KB] Average_size=[%.2f KB] Compression_Ratio=[%.2f %c]",
|
||||||
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
|
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
|
|
|
@ -6130,17 +6130,50 @@ static bool isEventWindowQuery(SSelectStmt* pSelect) {
|
||||||
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
|
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool hasJsonTypeProjection(SSelectStmt* pSelect) {
|
||||||
|
SNode* pProj = NULL;
|
||||||
|
FOREACH(pProj, pSelect->pProjectionList) {
|
||||||
|
if (TSDB_DATA_TYPE_JSON == ((SExprNode*)pProj)->resType.type) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static EDealRes hasColumnOrPseudoColumn(SNode* pNode, void* pContext) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
*(bool*)pContext = true;
|
||||||
|
return DEAL_RES_END;
|
||||||
|
}
|
||||||
|
if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)) {
|
||||||
|
*(bool*)pContext = true;
|
||||||
|
return DEAL_RES_END;
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) {
|
||||||
|
bool hasColumn = false;
|
||||||
|
nodesWalkExprPostOrder(pNode, hasColumnOrPseudoColumn, &hasColumn);
|
||||||
|
return hasColumn;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
||||||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
||||||
crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) {
|
crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
}
|
}
|
||||||
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
|
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||||
"SUBTABLE expression must be of VARCHAR type");
|
"SUBTABLE expression must be of VARCHAR type");
|
||||||
}
|
}
|
||||||
|
if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasColumnOrPseudoColumn(pSelect->pSubtable)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||||
|
"SUBTABLE expression must not has column when no partition by clause");
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
|
if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||||
"The trigger mode of non window query can only be AT_ONCE");
|
"The trigger mode of non window query can only be AT_ONCE");
|
||||||
|
@ -8269,6 +8302,11 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||||
|
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_MAX_COLUMNS == pTableMeta->tableInfo.numOfColumns) {
|
if (TSDB_MAX_COLUMNS == pTableMeta->tableInfo.numOfColumns) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -920,6 +920,10 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
|
||||||
|
|
||||||
run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)",
|
run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)",
|
||||||
TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
|
TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
|
||||||
|
run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME",
|
||||||
|
TSDB_CODE_PAR_INVALID_STREAM_QUERY);
|
||||||
|
run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tbname)) "
|
||||||
|
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|
||||||
#add_subdirectory(filter)
|
add_subdirectory(filter)
|
||||||
add_subdirectory(scalar)
|
add_subdirectory(scalar)
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
|
#include "filterInt.h"
|
||||||
#include "nodes.h"
|
#include "nodes.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "stub.h"
|
#include "stub.h"
|
||||||
|
@ -344,6 +345,7 @@ TEST(timerangeTest, greater_and_lower_not_strict) {
|
||||||
nodesDestroyNode(logicNode1);
|
nodesDestroyNode(logicNode1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
TEST(columnTest, smallint_column_greater_double_value) {
|
TEST(columnTest, smallint_column_greater_double_value) {
|
||||||
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
|
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
|
||||||
int16_t leftv[5] = {1, 2, 3, 4, 5};
|
int16_t leftv[5] = {1, 2, 3, 4, 5};
|
||||||
|
@ -1337,6 +1339,127 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
|
||||||
nodesDestroyNode(logicNode1);
|
nodesDestroyNode(logicNode1);
|
||||||
blockDataDestroy(src);
|
blockDataDestroy(src);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
template <class SignedT, class UnsignedT>
|
||||||
|
int32_t compareSignedWithUnsigned(SignedT l, UnsignedT r) {
|
||||||
|
if (l < 0) return -1;
|
||||||
|
auto l_uint64 = static_cast<uint64_t>(l);
|
||||||
|
auto r_uint64 = static_cast<uint64_t>(r);
|
||||||
|
if (l_uint64 < r_uint64) return -1;
|
||||||
|
if (l_uint64 > r_uint64) return 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class UnsignedT, class SignedT>
|
||||||
|
int32_t compareUnsignedWithSigned(UnsignedT l, SignedT r) {
|
||||||
|
if (r < 0) return 1;
|
||||||
|
auto l_uint64 = static_cast<uint64_t>(l);
|
||||||
|
auto r_uint64 = static_cast<uint64_t>(r);
|
||||||
|
if (l_uint64 < r_uint64) return -1;
|
||||||
|
if (l_uint64 > r_uint64) return 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class SignedT, class UnsignedT>
|
||||||
|
void doCompareWithValueRange_SignedWithUnsigned(__compar_fn_t fp) {
|
||||||
|
int32_t signedMin = -10, signedMax = 10;
|
||||||
|
int32_t unsignedMin = 0, unsignedMax = 10;
|
||||||
|
for (SignedT l = signedMin; l <= signedMax; ++l) {
|
||||||
|
for (UnsignedT r = unsignedMin; r <= unsignedMax; ++r) {
|
||||||
|
ASSERT_EQ(fp(&l, &r), compareSignedWithUnsigned(l, r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class UnsignedT, class SignedT>
|
||||||
|
void doCompareWithValueRange_UnsignedWithSigned(__compar_fn_t fp) {
|
||||||
|
int32_t signedMin = -10, signedMax = 10;
|
||||||
|
int32_t unsignedMin = 0, unsignedMax = 10;
|
||||||
|
for (UnsignedT l = unsignedMin; l <= unsignedMax; ++l) {
|
||||||
|
for (SignedT r = signedMin; r <= signedMax; ++r) {
|
||||||
|
ASSERT_EQ(fp(&l, &r), compareUnsignedWithSigned(l, r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class LType>
|
||||||
|
void doCompareWithValueRange_OnlyLeftType(__compar_fn_t fp, int32_t rType) {
|
||||||
|
switch (rType) {
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
doCompareWithValueRange_SignedWithUnsigned<LType, uint8_t>(fp);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
doCompareWithValueRange_SignedWithUnsigned<LType, uint16_t>(fp);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
doCompareWithValueRange_SignedWithUnsigned<LType, uint32_t>(fp);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
doCompareWithValueRange_SignedWithUnsigned<LType, uint64_t>(fp);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
doCompareWithValueRange_UnsignedWithSigned<LType, int8_t>(fp);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
doCompareWithValueRange_UnsignedWithSigned<LType, int16_t>(fp);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
doCompareWithValueRange_UnsignedWithSigned<LType, int32_t>(fp);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
doCompareWithValueRange_UnsignedWithSigned<LType, int64_t>(fp);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FAIL();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void doCompare(const std::vector<int32_t> &lTypes, const std::vector<int32_t> &rTypes, int32_t oper) {
|
||||||
|
for (int i = 0; i < lTypes.size(); ++i) {
|
||||||
|
for (int j = 0; j < rTypes.size(); ++j) {
|
||||||
|
auto fp = filterGetCompFuncEx(lTypes[i], rTypes[j], oper);
|
||||||
|
switch (lTypes[i]) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<int8_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<int16_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<int32_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<int64_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<uint8_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<uint16_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<uint32_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
doCompareWithValueRange_OnlyLeftType<uint64_t>(fp, rTypes[j]);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
FAIL();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(dataCompareTest, signed_and_unsigned_int) {
|
||||||
|
std::vector<int32_t> lType = {TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_INT,
|
||||||
|
TSDB_DATA_TYPE_BIGINT};
|
||||||
|
std::vector<int32_t> rType = {TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_UINT,
|
||||||
|
TSDB_DATA_TYPE_UBIGINT};
|
||||||
|
|
||||||
|
doCompare(lType, rType, OP_TYPE_GREATER_THAN);
|
||||||
|
doCompare(rType, lType, OP_TYPE_GREATER_THAN);
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
taosSeedRand(taosGetTimestampSec());
|
taosSeedRand(taosGetTimestampSec());
|
||||||
|
|
|
@ -189,6 +189,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
tFreeStreamTask(pTask);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
|
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
|
||||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -171,6 +171,8 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||||
|
if (pObj == NULL) return;
|
||||||
|
|
||||||
SSyncNode *pNode = pObj->data;
|
SSyncNode *pNode = pObj->data;
|
||||||
sTrace("vgId:%d, clean all resp", pNode->vgId);
|
sTrace("vgId:%d, clean all resp", pNode->vgId);
|
||||||
|
|
||||||
|
|
|
@ -587,12 +587,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
|
||||||
|
|
||||||
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||||
void* pool = pThrd->pool;
|
void* pool = pThrd->pool;
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||||
STrans* pTranInst = pThrd->pTransInst;
|
STrans* pTranInst = pThrd->pTransInst;
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list = {0};
|
SConnList list = {0};
|
||||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||||
plist = taosHashGet(pool, key, strlen(key));
|
plist = taosHashGet(pool, key, strlen(key) + 1);
|
||||||
|
|
||||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
QUEUE_INIT(&nList->msgQ);
|
QUEUE_INIT(&nList->msgQ);
|
||||||
|
@ -627,11 +627,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||||
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||||
void* pool = pThrd->pool;
|
void* pool = pThrd->pool;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list = {0};
|
SConnList list = {0};
|
||||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||||
plist = taosHashGet(pool, key, strlen(key));
|
plist = taosHashGet(pool, key, strlen(key) + 1);
|
||||||
|
|
||||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
QUEUE_INIT(&nList->msgQ);
|
QUEUE_INIT(&nList->msgQ);
|
||||||
|
@ -717,7 +717,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
cliDestroyConnMsgs(conn, false);
|
cliDestroyConnMsgs(conn, false);
|
||||||
|
|
||||||
if (conn->list == NULL) {
|
if (conn->list == NULL) {
|
||||||
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
|
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
SConnList* pList = conn->list;
|
SConnList* pList = conn->list;
|
||||||
|
@ -822,7 +822,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
|
tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
|
||||||
|
T_REF_VAL_GET(conn));
|
||||||
conn->broken = true;
|
conn->broken = true;
|
||||||
cliHandleExcept(conn);
|
cliHandleExcept(conn);
|
||||||
}
|
}
|
||||||
|
@ -875,8 +876,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
connList->list->numOfConn--;
|
connList->list->numOfConn--;
|
||||||
connList->size--;
|
connList->size--;
|
||||||
} else {
|
} else {
|
||||||
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
|
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1);
|
||||||
connList->list->numOfConn--;
|
if (connList != NULL) connList->list->numOfConn--;
|
||||||
}
|
}
|
||||||
conn->list = NULL;
|
conn->list = NULL;
|
||||||
pThrd->newConnCount--;
|
pThrd->newConnCount--;
|
||||||
|
@ -1269,7 +1270,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
||||||
|
|
||||||
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
||||||
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
||||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
|
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1);
|
||||||
int64_t cTimestamp = taosGetTimestampMs();
|
int64_t cTimestamp = taosGetTimestampMs();
|
||||||
if (item != NULL) {
|
if (item != NULL) {
|
||||||
int32_t elapse = cTimestamp - item->timestamp;
|
int32_t elapse = cTimestamp - item->timestamp;
|
||||||
|
@ -1281,7 +1282,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||||
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
|
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1459,7 +1460,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
||||||
}
|
}
|
||||||
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
|
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
|
||||||
uint32_t addr = 0;
|
uint32_t addr = 0;
|
||||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
|
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
|
||||||
if (v == NULL) {
|
if (v == NULL) {
|
||||||
addr = taosGetIpv4FromFqdn(fqdn);
|
addr = taosGetIpv4FromFqdn(fqdn);
|
||||||
if (addr == 0xffffffff) {
|
if (addr == 0xffffffff) {
|
||||||
|
@ -1468,7 +1469,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
|
||||||
return addr;
|
return addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
|
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
|
||||||
} else {
|
} else {
|
||||||
addr = *v;
|
addr = *v;
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,7 +314,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tWarn("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
|
tDebug("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
conn->broken = true;
|
conn->broken = true;
|
||||||
if (conn->status == ConnAcquire) {
|
if (conn->status == ConnAcquire) {
|
||||||
|
|
|
@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
if (cond) {
|
if (cond) {
|
||||||
pReader->cond = *cond;
|
pReader->cond = *cond;
|
||||||
} else {
|
} else {
|
||||||
pReader->cond.scanUncommited = 0;
|
// pReader->cond.scanUncommited = 0;
|
||||||
pReader->cond.scanNotApplied = 0;
|
pReader->cond.scanNotApplied = 0;
|
||||||
pReader->cond.scanMeta = 0;
|
pReader->cond.scanMeta = 0;
|
||||||
pReader->cond.enableRef = 0;
|
pReader->cond.enableRef = 0;
|
||||||
|
@ -74,13 +74,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int64_t lastVer = walGetLastVer(pReader->pWal);
|
int64_t lastVer = walGetLastVer(pReader->pWal);
|
||||||
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||||
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
||||||
endVer = TMIN(appliedVer, endVer);
|
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
||||||
|
taosMsleep(1);
|
||||||
|
appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
|
}
|
||||||
|
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
||||||
|
// endVer = TMIN(appliedVer, endVer);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
||||||
", applied index:%" PRId64 ", end index:%" PRId64,
|
", applied index:%" PRId64,
|
||||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
||||||
while (fetchVer <= endVer) {
|
while (fetchVer <= committedVer) {
|
||||||
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,17 +308,19 @@ int32_t compareInt8Uint16(const void *pLeft, const void *pRight) {
|
||||||
|
|
||||||
int32_t compareInt8Uint32(const void *pLeft, const void *pRight) {
|
int32_t compareInt8Uint32(const void *pLeft, const void *pRight) {
|
||||||
int8_t left = GET_INT8_VAL(pLeft);
|
int8_t left = GET_INT8_VAL(pLeft);
|
||||||
|
if (left < 0) return -1;
|
||||||
uint32_t right = GET_UINT32_VAL(pRight);
|
uint32_t right = GET_UINT32_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if ((uint32_t)left > right) return 1;
|
||||||
if (left < right) return -1;
|
if ((uint32_t)left < right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareInt8Uint64(const void *pLeft, const void *pRight) {
|
int32_t compareInt8Uint64(const void *pLeft, const void *pRight) {
|
||||||
int8_t left = GET_INT8_VAL(pLeft);
|
int8_t left = GET_INT8_VAL(pLeft);
|
||||||
|
if (left < 0) return -1;
|
||||||
uint64_t right = GET_UINT64_VAL(pRight);
|
uint64_t right = GET_UINT64_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if ((uint64_t)left > right) return 1;
|
||||||
if (left < right) return -1;
|
if ((uint64_t)left < right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,17 +382,19 @@ int32_t compareInt16Uint16(const void *pLeft, const void *pRight) {
|
||||||
|
|
||||||
int32_t compareInt16Uint32(const void *pLeft, const void *pRight) {
|
int32_t compareInt16Uint32(const void *pLeft, const void *pRight) {
|
||||||
int16_t left = GET_INT16_VAL(pLeft);
|
int16_t left = GET_INT16_VAL(pLeft);
|
||||||
|
if (left < 0) return -1;
|
||||||
uint32_t right = GET_UINT32_VAL(pRight);
|
uint32_t right = GET_UINT32_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if ((uint32_t)left > right) return 1;
|
||||||
if (left < right) return -1;
|
if ((uint32_t)left < right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareInt16Uint64(const void *pLeft, const void *pRight) {
|
int32_t compareInt16Uint64(const void *pLeft, const void *pRight) {
|
||||||
int16_t left = GET_INT16_VAL(pLeft);
|
int16_t left = GET_INT16_VAL(pLeft);
|
||||||
|
if (left < 0) return -1;
|
||||||
uint64_t right = GET_UINT64_VAL(pRight);
|
uint64_t right = GET_UINT64_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if ((uint64_t)left > right) return 1;
|
||||||
if (left < right) return -1;
|
if ((uint64_t)left < right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,17 +456,19 @@ int32_t compareInt32Uint16(const void *pLeft, const void *pRight) {
|
||||||
|
|
||||||
int32_t compareInt32Uint32(const void *pLeft, const void *pRight) {
|
int32_t compareInt32Uint32(const void *pLeft, const void *pRight) {
|
||||||
int32_t left = GET_INT32_VAL(pLeft);
|
int32_t left = GET_INT32_VAL(pLeft);
|
||||||
|
if (left < 0) return -1;
|
||||||
uint32_t right = GET_UINT32_VAL(pRight);
|
uint32_t right = GET_UINT32_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if ((uint32_t)left > right) return 1;
|
||||||
if (left < right) return -1;
|
if ((uint32_t)left < right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareInt32Uint64(const void *pLeft, const void *pRight) {
|
int32_t compareInt32Uint64(const void *pLeft, const void *pRight) {
|
||||||
int32_t left = GET_INT32_VAL(pLeft);
|
int32_t left = GET_INT32_VAL(pLeft);
|
||||||
|
if (left < 0) return -1;
|
||||||
uint64_t right = GET_UINT64_VAL(pRight);
|
uint64_t right = GET_UINT64_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if ((uint64_t)left > right) return 1;
|
||||||
if (left < right) return -1;
|
if ((uint64_t)left < right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,9 +538,10 @@ int32_t compareInt64Uint32(const void *pLeft, const void *pRight) {
|
||||||
|
|
||||||
int32_t compareInt64Uint64(const void *pLeft, const void *pRight) {
|
int32_t compareInt64Uint64(const void *pLeft, const void *pRight) {
|
||||||
int64_t left = GET_INT64_VAL(pLeft);
|
int64_t left = GET_INT64_VAL(pLeft);
|
||||||
|
if (left < 0) return -1;
|
||||||
uint64_t right = GET_UINT64_VAL(pRight);
|
uint64_t right = GET_UINT64_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if ((uint64_t)left > right) return 1;
|
||||||
if (left < right) return -1;
|
if ((uint64_t)left < right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -857,24 +864,27 @@ int32_t compareUint16Uint64(const void *pLeft, const void *pRight) {
|
||||||
int32_t compareUint32Int8(const void *pLeft, const void *pRight) {
|
int32_t compareUint32Int8(const void *pLeft, const void *pRight) {
|
||||||
uint32_t left = GET_UINT32_VAL(pLeft);
|
uint32_t left = GET_UINT32_VAL(pLeft);
|
||||||
int8_t right = GET_INT8_VAL(pRight);
|
int8_t right = GET_INT8_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if (right < 0) return 1;
|
||||||
if (left < right) return -1;
|
if (left > (uint32_t)right) return 1;
|
||||||
|
if (left < (uint32_t)right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareUint32Int16(const void *pLeft, const void *pRight) {
|
int32_t compareUint32Int16(const void *pLeft, const void *pRight) {
|
||||||
uint32_t left = GET_UINT32_VAL(pLeft);
|
uint32_t left = GET_UINT32_VAL(pLeft);
|
||||||
int16_t right = GET_INT16_VAL(pRight);
|
int16_t right = GET_INT16_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if (right < 0) return 1;
|
||||||
if (left < right) return -1;
|
if (left > (uint32_t)right) return 1;
|
||||||
|
if (left < (uint32_t)right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareUint32Int32(const void *pLeft, const void *pRight) {
|
int32_t compareUint32Int32(const void *pLeft, const void *pRight) {
|
||||||
uint32_t left = GET_UINT32_VAL(pLeft);
|
uint32_t left = GET_UINT32_VAL(pLeft);
|
||||||
int32_t right = GET_INT32_VAL(pRight);
|
int32_t right = GET_INT32_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if (right < 0) return 1;
|
||||||
if (left < right) return -1;
|
if (left > (uint32_t)right) return 1;
|
||||||
|
if (left < (uint32_t)right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -929,32 +939,36 @@ int32_t compareUint32Uint64(const void *pLeft, const void *pRight) {
|
||||||
int32_t compareUint64Int8(const void *pLeft, const void *pRight) {
|
int32_t compareUint64Int8(const void *pLeft, const void *pRight) {
|
||||||
uint64_t left = GET_UINT64_VAL(pLeft);
|
uint64_t left = GET_UINT64_VAL(pLeft);
|
||||||
int8_t right = GET_INT8_VAL(pRight);
|
int8_t right = GET_INT8_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if (right < 0) return 1;
|
||||||
if (left < right) return -1;
|
if (left > (uint64_t)right) return 1;
|
||||||
|
if (left < (uint64_t)right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareUint64Int16(const void *pLeft, const void *pRight) {
|
int32_t compareUint64Int16(const void *pLeft, const void *pRight) {
|
||||||
uint64_t left = GET_UINT64_VAL(pLeft);
|
uint64_t left = GET_UINT64_VAL(pLeft);
|
||||||
int16_t right = GET_INT16_VAL(pRight);
|
int16_t right = GET_INT16_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if (right < 0) return 1;
|
||||||
if (left < right) return -1;
|
if (left > (uint64_t)right) return 1;
|
||||||
|
if (left < (uint64_t)right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareUint64Int32(const void *pLeft, const void *pRight) {
|
int32_t compareUint64Int32(const void *pLeft, const void *pRight) {
|
||||||
uint64_t left = GET_UINT64_VAL(pLeft);
|
uint64_t left = GET_UINT64_VAL(pLeft);
|
||||||
int32_t right = GET_INT32_VAL(pRight);
|
int32_t right = GET_INT32_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if (right < 0) return 1;
|
||||||
if (left < right) return -1;
|
if (left > (uint64_t)right) return 1;
|
||||||
|
if (left < (uint64_t)right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareUint64Int64(const void *pLeft, const void *pRight) {
|
int32_t compareUint64Int64(const void *pLeft, const void *pRight) {
|
||||||
uint64_t left = GET_UINT64_VAL(pLeft);
|
uint64_t left = GET_UINT64_VAL(pLeft);
|
||||||
int64_t right = GET_INT64_VAL(pRight);
|
int64_t right = GET_INT64_VAL(pRight);
|
||||||
if (left > right) return 1;
|
if (right < 0) return 1;
|
||||||
if (left < right) return -1;
|
if (left > (uint64_t)right) return 1;
|
||||||
|
if (left < (uint64_t)right) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -342,12 +342,29 @@ def main():
|
||||||
print('======== crash_gen run sucess and exit as expected ========')
|
print('======== crash_gen run sucess and exit as expected ========')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
text = f'''exit status: {msg_dict[status]}
|
cmd = crash_cmds.split('&')[2]
|
||||||
git commit : {git_commit}
|
if status == 0:
|
||||||
|
log_dir = "none"
|
||||||
|
else:
|
||||||
|
log_dir= "/root/pxiao/crash_gen_logs"
|
||||||
|
|
||||||
|
if status == 3:
|
||||||
|
core_dir = "/root/pxiao/crash_gen_logs"
|
||||||
|
else:
|
||||||
|
core_dir = "none"
|
||||||
|
|
||||||
|
text = f'''
|
||||||
|
exit status: {msg_dict[status]}
|
||||||
|
test scope: crash_gen
|
||||||
|
owner: pxiao
|
||||||
hostname: {hostname}
|
hostname: {hostname}
|
||||||
start time: {starttime}
|
start time: {starttime}
|
||||||
end time: {endtime}
|
end time: {endtime}
|
||||||
cmd: {crash_cmds}'''
|
git commit : {git_commit}
|
||||||
|
log dir: {log_dir}
|
||||||
|
core dir: {core_dir}
|
||||||
|
cmd: {cmd}'''
|
||||||
|
|
||||||
send_msg(get_msg(text))
|
send_msg(get_msg(text))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("exception:", e)
|
print("exception:", e)
|
||||||
|
|
|
@ -377,12 +377,29 @@ def main():
|
||||||
print('======== crash_gen run sucess and exit as expected ========')
|
print('======== crash_gen run sucess and exit as expected ========')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
text = f'''exit status: {msg_dict[status]}
|
cmd = crash_cmds.split('&')[2]
|
||||||
git commit : {git_commit}
|
if status == 0:
|
||||||
|
log_dir = "none"
|
||||||
|
else:
|
||||||
|
log_dir= "/root/pxiao/crash_gen_logs"
|
||||||
|
|
||||||
|
if status == 3:
|
||||||
|
core_dir = "/root/pxiao/crash_gen_logs"
|
||||||
|
else:
|
||||||
|
core_dir = "none"
|
||||||
|
|
||||||
|
text = f'''
|
||||||
|
exit status: {msg_dict[status]}
|
||||||
|
test scope: crash_gen
|
||||||
|
owner: pxiao
|
||||||
hostname: {hostname}
|
hostname: {hostname}
|
||||||
start time: {starttime}
|
start time: {starttime}
|
||||||
end time: {endtime}
|
end time: {endtime}
|
||||||
cmd: {crash_cmds}'''
|
git commit : {git_commit}
|
||||||
|
log dir: {log_dir}
|
||||||
|
core dir: {core_dir}
|
||||||
|
cmd: {cmd}'''
|
||||||
|
|
||||||
send_msg(get_msg(text))
|
send_msg(get_msg(text))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("exception:", e)
|
print("exception:", e)
|
||||||
|
|
|
@ -377,12 +377,29 @@ def main():
|
||||||
print('======== crash_gen run sucess and exit as expected ========')
|
print('======== crash_gen run sucess and exit as expected ========')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
text = f'''exit status: {msg_dict[status]}
|
cmd = crash_cmds.split('&')[2]
|
||||||
git commit : {git_commit}
|
if status == 0:
|
||||||
|
log_dir = "none"
|
||||||
|
else:
|
||||||
|
log_dir= "/root/pxiao/crash_gen_logs"
|
||||||
|
|
||||||
|
if status == 3:
|
||||||
|
core_dir = "/root/pxiao/crash_gen_logs"
|
||||||
|
else:
|
||||||
|
core_dir = "none"
|
||||||
|
|
||||||
|
text = f'''
|
||||||
|
exit status: {msg_dict[status]}
|
||||||
|
test scope: crash_gen
|
||||||
|
owner: pxiao
|
||||||
hostname: {hostname}
|
hostname: {hostname}
|
||||||
start time: {starttime}
|
start time: {starttime}
|
||||||
end time: {endtime}
|
end time: {endtime}
|
||||||
cmd: {crash_cmds}'''
|
git commit : {git_commit}
|
||||||
|
log dir: {log_dir}
|
||||||
|
core dir: {core_dir}
|
||||||
|
cmd: {cmd}'''
|
||||||
|
|
||||||
send_msg(get_msg(text))
|
send_msg(get_msg(text))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("exception:", e)
|
print("exception:", e)
|
||||||
|
|
|
@ -657,6 +657,17 @@ if $data20 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print =============== error
|
||||||
|
sql create table tb2023(ts timestamp, f int);
|
||||||
|
sql_error alter table tb2023 add column v varchar(16375);
|
||||||
|
sql_error alter table tb2023 add column v varchar(16385);
|
||||||
|
sql_error alter table tb2023 add column v varchar(33100);
|
||||||
|
sql alter table tb2023 add column v varchar(16374);
|
||||||
|
sql desc tb2023
|
||||||
|
sql alter table tb2023 drop column v
|
||||||
|
sql_error alter table tb2023 add column v nchar(4094);
|
||||||
|
sql alter table tb2023 add column v nchar(4093);
|
||||||
|
sql desc tb2023
|
||||||
print ======= over
|
print ======= over
|
||||||
sql drop database d1
|
sql drop database d1
|
||||||
sql select * from information_schema.ins_databases
|
sql select * from information_schema.ins_databases
|
||||||
|
|
|
@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb2.py
|
python3 ./test.py -f 7-tmq/subscribeDb2.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb3.py
|
python3 ./test.py -f 7-tmq/subscribeDb3.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb4.py
|
python3 ./test.py -f 7-tmq/subscribeDb4.py
|
||||||
#python3 ./test.py -f 7-tmq/subscribeStb.py
|
python3 ./test.py -f 7-tmq/subscribeStb.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb1.py
|
python3 ./test.py -f 7-tmq/subscribeStb1.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb2.py
|
python3 ./test.py -f 7-tmq/subscribeStb2.py
|
||||||
|
|
Loading…
Reference in New Issue